Extract language-independent 'Simulator' class from woods example

This commit is contained in:
Joeri Exelmans 2024-10-29 11:20:32 +01:00
parent 590ce0b0b9
commit c738e8bcd1
3 changed files with 177 additions and 113 deletions

View file

@ -0,0 +1,121 @@
import abc
import random
import math
import functools
import sys
from framework.conformance import Conformance, render_conformance_check_result
from concrete_syntax.common import indent
from concrete_syntax.textual_od.renderer import render_od
class DecisionMaker:
@abc.abstractmethod
def __call__(self, actions):
pass
class Simulator:
def __init__(self,
action_generator,
decision_maker: DecisionMaker,
termination_condition,
check_conformance=True,
verbose=True,
renderer=lambda od: render_od(od.state, od.m, od.mm),
):
self.action_generator = action_generator
self.decision_maker = decision_maker
self.termination_condition = termination_condition
self.check_conformance = check_conformance
self.verbose = verbose
self.renderer = renderer
def __print(self, *args):
if self.verbose:
print(*args)
# Run simulation until termination condition satisfied
def run(self, od):
self.__print("Start simulation")
step_counter = 0
while True:
self.__print("--------------")
self.__print(indent(self.renderer(od), 4))
self.__print("--------------")
termination_reason = self.termination_condition(od)
if termination_reason != None:
self.__print(f"Termination condition satisfied.\nReason: {termination_reason}.")
break
actions = self.action_generator(od)
chosen_action = self.decision_maker(actions)
if chosen_action == None:
self.__print(f"No enabled actions.")
break
(od, msgs) = chosen_action()
self.__print(indent('\n'.join(f"{msg}" for msg in msgs), 2))
step_counter += 1
if self.check_conformance:
self.__print()
conf = Conformance(od.state, od.m, od.mm)
self.__print(render_conformance_check_result(conf.check_nominal()))
self.__print(f"Executed {step_counter} steps.")
return od
def filter_valid_actions(actions):
result = {}
def make_tuple(new_od, msgs):
return (new_od, msgs)
for name, callback in actions:
print(f"attempt '{name}' ...", end='\r')
(new_od, msgs) = callback()
conf = Conformance(new_od.state, new_od.m, new_od.mm)
errors = conf.check_nominal()
# erase current line:
print(" ", end='\r')
if len(errors) == 0:
# updated RT-M is conform, we have a valid action:
yield (name, functools.partial(make_tuple, new_od, msgs))
class RandomDecisionMaker(DecisionMaker):
def __init__(self, seed=0, verbose=True):
self.r = random.Random(seed)
def __call__(self, actions):
arr = [action for descr, action in actions]
i = math.floor(self.r.random()*len(arr))
return arr[i]
class InteractiveDecisionMaker(DecisionMaker):
def __init__(self, msg="Select action:"):
self.msg = msg
def __call__(self, actions):
arr = []
for i, (key, result) in enumerate(actions):
print(f" {i}. {key}")
arr.append(result)
if len(arr) == 0:
return
def __choose():
sys.stdout.write(f"{self.msg} ")
try:
raw = input()
choice = int(raw) # may raise ValueError
if choice >= 0 and choice < len(arr):
return arr[choice]
except ValueError:
pass
print("Invalid option")
return __choose()
return __choose()

View file

@ -12,11 +12,12 @@ from util import prompt
from transformation.cloner import clone_od
from api.od import ODAPI
from examples.semantics.operational.simulator import Simulator, RandomDecisionMaker, InteractiveDecisionMaker, filter_valid_actions
state = DevState()
print("Loading meta-meta-model...")
# Load meta-meta-model
scd_mmm = bootstrap_scd(state)
print("Done")
# Design meta-model
woods_mm_cs = """
@ -51,9 +52,8 @@ woods_mm = parser.parse_od(
m_text=woods_mm_cs,
mm=scd_mmm)
print("MM valid?")
conf = Conformance(state, woods_mm, scd_mmm)
print(render_conformance_check_result(conf.check_nominal()))
print("MM ...", render_conformance_check_result(conf.check_nominal()))
# Runtime meta-model
woods_rt_mm_cs = woods_mm_cs + """
@ -142,9 +142,8 @@ woods_rt_mm = parser.parse_od(
m_text=woods_rt_mm_cs,
mm=scd_mmm)
print("RT-MM valid?")
conf = Conformance(state, woods_rt_mm, scd_mmm)
print(render_conformance_check_result(conf.check_nominal()))
print("RT-MM ...", render_conformance_check_result(conf.check_nominal()))
# print("--------------")
# print(indent(
@ -180,9 +179,8 @@ woods_m = parser.parse_od(
m_text=woods_m_cs,
mm=woods_mm)
print("M valid?")
conf = Conformance(state, woods_m, woods_mm)
print(render_conformance_check_result(conf.check_nominal()))
print("M ...", render_conformance_check_result(conf.check_nominal()))
# Our runtime model - the part that changes with every execution step
woods_rt_initial_m_cs = woods_m_cs + """
@ -218,9 +216,8 @@ woods_rt_m = parser.parse_od(
m_text=woods_rt_initial_m_cs,
mm=woods_rt_mm)
print("RT-M valid?")
conf = Conformance(state, woods_rt_m, woods_rt_mm)
print(render_conformance_check_result(conf.check_nominal()))
print("RT-M ...", render_conformance_check_result(conf.check_nominal()))
# Helpers
@ -251,7 +248,7 @@ def advance_time(od):
if od.get_slot_value(bear_state, "dead"):
continue # bear already dead
old_hunger = od.get_slot_value(bear_state, "hunger")
new_hunger = min(old_hunger + 5, 100)
new_hunger = min(old_hunger + 10, 100)
od.set_slot_value(bear_state, "hunger", new_hunger)
bear = od.get_target(od.get_outgoing(bear_state, "of")[0])
bear_name = od.get_name(bear)
@ -277,9 +274,11 @@ def attack(od, animal_name: str, man_name: str):
msgs.append(f"{animal_name} is now attacking {man_name}")
return msgs
def get_actions(od):
def get_all_actions(od):
def _get_actions(od):
# can always advance time:
actions = { "advance time": advance_time }
yield ("advance time", advance_time)
# who can attack whom?
for _, afraid_link in od.get_all_instances("afraidOf"):
@ -290,9 +289,7 @@ def get_actions(od):
man_state = state_of(od, man)
animal_state = state_of(od, animal)
descr = f"{animal_name} ({od.get_type_name(animal)}) attacks {man_name} ({od.get_type_name(man)})"
actions[descr] = functools.partial(attack, animal_name=animal_name, man_name=man_name)
return { action_descr: functools.partial(exec_pure, action, od) for action_descr, action in actions.items() }
yield (descr, functools.partial(attack, animal_name=animal_name, man_name=man_name))
# Copy model before modifying it
def exec_pure(action, od):
@ -301,24 +298,11 @@ def exec_pure(action, od):
msgs = action(new_od)
return (new_od, msgs)
def filter_actions(actions):
result = {}
def make_tuple(new_od, msgs):
return (new_od, msgs)
for name, callback in actions.items():
print(f"attempt '{name}' ...", end='\r')
(new_od, msgs) = callback()
conf = Conformance(state, new_od.m, new_od.mm)
errors = conf.check_nominal()
# erase current line:
print(" ", end='\r')
if len(errors) == 0:
# updated RT-M is conform, we have a valid action:
yield (name, functools.partial(make_tuple, new_od, msgs))
for descr, action in _get_actions(od):
yield (descr, functools.partial(exec_pure, action, od))
def unfilter_actions(actions, od):
for name, callback in actions.items():
yield (name, callback)
def get_valid_actions(od):
return filter_valid_actions(get_all_actions(od))
def render_woods(od):
txt = ""
@ -352,53 +336,33 @@ def render_woods(od):
txt += f" 👨 {od.get_name(man)} ({render_dead(man_state)}) {render_attacking(man_state)}{being_attacked}\n"
return txt
od = ODAPI(state, woods_rt_m, woods_rt_mm)
RANDOM_SEED = 0
r = random.Random(RANDOM_SEED)
def random_choice(options):
arr = [action for descr, action in options]
i = math.floor(r.random()*len(arr))
return arr[i]
def termination_condition(od):
_, time = get_time(od)
return time >= 10 # stop after 10 steps
if time >= 10:
return "Took too long"
print(f"Using random seed: {RANDOM_SEED} (only applicable to random simulation)")
# End simulation when 2 animals are dead
who_is_dead = []
for _, animal_state in od.get_all_instances("AnimalState"):
if od.get_slot_value(animal_state, "dead"):
animal_name = od.get_name(animal_of(od, animal_state))
who_is_dead.append(animal_name)
if len(who_is_dead) >= 2:
return f"{' and '.join(who_is_dead)} are dead"
while True:
print("--------------")
print(indent(render_woods(od), 4))
print("--------------")
sim = Simulator(
action_generator=get_valid_actions,
# action_generator=get_actions,
decision_maker=RandomDecisionMaker(seed=0),
# decision_maker=InteractiveDecisionMaker(),
termination_condition=termination_condition,
check_conformance=True,
verbose=True,
renderer=render_woods,
)
if termination_condition(od):
print("Termination condition satisfied. Quit.")
break
# print(indent(
# renderer.render_od(state,
# m_id=od.m,
# mm_id=od.mm),
# 4))
# 1. Only 'valid' actions or all actions?
# actions = unfilter_actions(get_actions(od), od)
actions = filter_actions(get_actions(od))
# 2. Manual or random selection?
# action = prompt.choose("Select action:", actions)
action = random_choice(actions)
if action == None:
print("No enabled actions. Quit.")
break
(od, msgs) = action()
print(indent('\n'.join(f"{msg}" for msg in msgs), 2))
od = ODAPI(state, woods_rt_m, woods_rt_mm)
print()
conf = Conformance(state, od.m, od.mm)
print(render_conformance_check_result(conf.check_nominal()))
sim.run(od)

View file

@ -16,24 +16,3 @@ def pause():
print("press any key...")
input()
def choose(msg:str, options):
arr = []
for i, (key, result) in enumerate(options):
print(f" {i}. {key}")
arr.append(result)
if len(arr) == 0:
return
return __choose(msg, arr)
def __choose(msg: str, arr):
sys.stdout.write(f"{msg} ")
try:
raw = input()
choice = int(raw) # may raise ValueError
if choice >= 0 and choice < len(arr):
return arr[choice]
except ValueError:
pass
print("Invalid option")
return __choose(msg, arr)