diff --git a/examples/semantics/operational/simulator.py b/examples/semantics/operational/simulator.py new file mode 100644 index 0000000..1b8fbea --- /dev/null +++ b/examples/semantics/operational/simulator.py @@ -0,0 +1,121 @@ +import abc +import random +import math +import functools +import sys + +from framework.conformance import Conformance, render_conformance_check_result + +from concrete_syntax.common import indent +from concrete_syntax.textual_od.renderer import render_od + +class DecisionMaker: + @abc.abstractmethod + def __call__(self, actions): + pass + +class Simulator: + def __init__(self, + action_generator, + decision_maker: DecisionMaker, + termination_condition, + check_conformance=True, + verbose=True, + renderer=lambda od: render_od(od.state, od.m, od.mm), + ): + self.action_generator = action_generator + self.decision_maker = decision_maker + self.termination_condition = termination_condition + self.check_conformance = check_conformance + self.verbose = verbose + self.renderer = renderer + + def __print(self, *args): + if self.verbose: + print(*args) + + # Run simulation until termination condition satisfied + def run(self, od): + self.__print("Start simulation") + step_counter = 0 + while True: + self.__print("--------------") + self.__print(indent(self.renderer(od), 4)) + self.__print("--------------") + + termination_reason = self.termination_condition(od) + if termination_reason != None: + self.__print(f"Termination condition satisfied.\nReason: {termination_reason}.") + break + + actions = self.action_generator(od) + + chosen_action = self.decision_maker(actions) + + if chosen_action == None: + self.__print(f"No enabled actions.") + break + + (od, msgs) = chosen_action() + self.__print(indent('\n'.join(f"▸ {msg}" for msg in msgs), 2)) + + step_counter += 1 + + if self.check_conformance: + self.__print() + conf = Conformance(od.state, od.m, od.mm) + self.__print(render_conformance_check_result(conf.check_nominal())) + self.__print(f"Executed {step_counter} steps.") + return od + + +def filter_valid_actions(actions): + result = {} + def make_tuple(new_od, msgs): + return (new_od, msgs) + for name, callback in actions: + print(f"attempt '{name}' ...", end='\r') + (new_od, msgs) = callback() + conf = Conformance(new_od.state, new_od.m, new_od.mm) + errors = conf.check_nominal() + # erase current line: + print(" ", end='\r') + if len(errors) == 0: + # updated RT-M is conform, we have a valid action: + yield (name, functools.partial(make_tuple, new_od, msgs)) + + +class RandomDecisionMaker(DecisionMaker): + def __init__(self, seed=0, verbose=True): + self.r = random.Random(seed) + + def __call__(self, actions): + arr = [action for descr, action in actions] + i = math.floor(self.r.random()*len(arr)) + return arr[i] + +class InteractiveDecisionMaker(DecisionMaker): + def __init__(self, msg="Select action:"): + self.msg = msg + + def __call__(self, actions): + arr = [] + for i, (key, result) in enumerate(actions): + print(f" {i}. {key}") + arr.append(result) + if len(arr) == 0: + return + + def __choose(): + sys.stdout.write(f"{self.msg} ") + try: + raw = input() + choice = int(raw) # may raise ValueError + if choice >= 0 and choice < len(arr): + return arr[choice] + except ValueError: + pass + print("Invalid option") + return __choose() + + return __choose() diff --git a/examples/semantics/operational/woods_pysem.py b/examples/semantics/operational/woods_pysem.py index f19bc55..2a2f0f9 100644 --- a/examples/semantics/operational/woods_pysem.py +++ b/examples/semantics/operational/woods_pysem.py @@ -12,11 +12,12 @@ from util import prompt from transformation.cloner import clone_od from api.od import ODAPI +from examples.semantics.operational.simulator import Simulator, RandomDecisionMaker, InteractiveDecisionMaker, filter_valid_actions + state = DevState() -print("Loading meta-meta-model...") +# Load meta-meta-model scd_mmm = bootstrap_scd(state) -print("Done") # Design meta-model woods_mm_cs = """ @@ -51,9 +52,8 @@ woods_mm = parser.parse_od( m_text=woods_mm_cs, mm=scd_mmm) -print("MM valid?") conf = Conformance(state, woods_mm, scd_mmm) -print(render_conformance_check_result(conf.check_nominal())) +print("MM ...", render_conformance_check_result(conf.check_nominal())) # Runtime meta-model woods_rt_mm_cs = woods_mm_cs + """ @@ -142,9 +142,8 @@ woods_rt_mm = parser.parse_od( m_text=woods_rt_mm_cs, mm=scd_mmm) -print("RT-MM valid?") conf = Conformance(state, woods_rt_mm, scd_mmm) -print(render_conformance_check_result(conf.check_nominal())) +print("RT-MM ...", render_conformance_check_result(conf.check_nominal())) # print("--------------") # print(indent( @@ -180,9 +179,8 @@ woods_m = parser.parse_od( m_text=woods_m_cs, mm=woods_mm) -print("M valid?") conf = Conformance(state, woods_m, woods_mm) -print(render_conformance_check_result(conf.check_nominal())) +print("M ...", render_conformance_check_result(conf.check_nominal())) # Our runtime model - the part that changes with every execution step woods_rt_initial_m_cs = woods_m_cs + """ @@ -218,9 +216,8 @@ woods_rt_m = parser.parse_od( m_text=woods_rt_initial_m_cs, mm=woods_rt_mm) -print("RT-M valid?") conf = Conformance(state, woods_rt_m, woods_rt_mm) -print(render_conformance_check_result(conf.check_nominal())) +print("RT-M ...", render_conformance_check_result(conf.check_nominal())) # Helpers @@ -251,7 +248,7 @@ def advance_time(od): if od.get_slot_value(bear_state, "dead"): continue # bear already dead old_hunger = od.get_slot_value(bear_state, "hunger") - new_hunger = min(old_hunger + 5, 100) + new_hunger = min(old_hunger + 10, 100) od.set_slot_value(bear_state, "hunger", new_hunger) bear = od.get_target(od.get_outgoing(bear_state, "of")[0]) bear_name = od.get_name(bear) @@ -277,48 +274,35 @@ def attack(od, animal_name: str, man_name: str): msgs.append(f"{animal_name} is now attacking {man_name}") return msgs -def get_actions(od): - # can always advance time: - actions = { "advance time": advance_time } - # who can attack whom? - for _, afraid_link in od.get_all_instances("afraidOf"): - man = od.get_source(afraid_link) - animal = od.get_target(afraid_link) - animal_name = od.get_name(animal) - man_name = od.get_name(man) - man_state = state_of(od, man) - animal_state = state_of(od, animal) - descr = f"{animal_name} ({od.get_type_name(animal)}) attacks {man_name} ({od.get_type_name(man)})" - actions[descr] = functools.partial(attack, animal_name=animal_name, man_name=man_name) - - return { action_descr: functools.partial(exec_pure, action, od) for action_descr, action in actions.items() } +def get_all_actions(od): + def _get_actions(od): + # can always advance time: + yield ("advance time", advance_time) -# Copy model before modifying it -def exec_pure(action, od): - cloned_rt_m = clone_od(state, od.m, od.mm) - new_od = ODAPI(state, cloned_rt_m, od.mm) - msgs = action(new_od) - return (new_od, msgs) - -def filter_actions(actions): - result = {} - def make_tuple(new_od, msgs): + # who can attack whom? + for _, afraid_link in od.get_all_instances("afraidOf"): + man = od.get_source(afraid_link) + animal = od.get_target(afraid_link) + animal_name = od.get_name(animal) + man_name = od.get_name(man) + man_state = state_of(od, man) + animal_state = state_of(od, animal) + descr = f"{animal_name} ({od.get_type_name(animal)}) attacks {man_name} ({od.get_type_name(man)})" + yield (descr, functools.partial(attack, animal_name=animal_name, man_name=man_name)) + + # Copy model before modifying it + def exec_pure(action, od): + cloned_rt_m = clone_od(state, od.m, od.mm) + new_od = ODAPI(state, cloned_rt_m, od.mm) + msgs = action(new_od) return (new_od, msgs) - for name, callback in actions.items(): - print(f"attempt '{name}' ...", end='\r') - (new_od, msgs) = callback() - conf = Conformance(state, new_od.m, new_od.mm) - errors = conf.check_nominal() - # erase current line: - print(" ", end='\r') - if len(errors) == 0: - # updated RT-M is conform, we have a valid action: - yield (name, functools.partial(make_tuple, new_od, msgs)) -def unfilter_actions(actions, od): - for name, callback in actions.items(): - yield (name, callback) + for descr, action in _get_actions(od): + yield (descr, functools.partial(exec_pure, action, od)) + +def get_valid_actions(od): + return filter_valid_actions(get_all_actions(od)) def render_woods(od): txt = "" @@ -352,53 +336,33 @@ def render_woods(od): txt += f" 👨 {od.get_name(man)} ({render_dead(man_state)}) {render_attacking(man_state)}{being_attacked}\n" return txt -od = ODAPI(state, woods_rt_m, woods_rt_mm) - -RANDOM_SEED = 0 - -r = random.Random(RANDOM_SEED) - -def random_choice(options): - arr = [action for descr, action in options] - i = math.floor(r.random()*len(arr)) - return arr[i] def termination_condition(od): _, time = get_time(od) - return time >= 10 # stop after 10 steps + if time >= 10: + return "Took too long" -print(f"Using random seed: {RANDOM_SEED} (only applicable to random simulation)") + # End simulation when 2 animals are dead + who_is_dead = [] + for _, animal_state in od.get_all_instances("AnimalState"): + if od.get_slot_value(animal_state, "dead"): + animal_name = od.get_name(animal_of(od, animal_state)) + who_is_dead.append(animal_name) + if len(who_is_dead) >= 2: + return f"{' and '.join(who_is_dead)} are dead" -while True: - print("--------------") - print(indent(render_woods(od), 4)) - print("--------------") +sim = Simulator( + action_generator=get_valid_actions, + # action_generator=get_actions, + decision_maker=RandomDecisionMaker(seed=0), + # decision_maker=InteractiveDecisionMaker(), + termination_condition=termination_condition, + check_conformance=True, + verbose=True, + renderer=render_woods, +) - if termination_condition(od): - print("Termination condition satisfied. Quit.") - break +od = ODAPI(state, woods_rt_m, woods_rt_mm) - # print(indent( - # renderer.render_od(state, - # m_id=od.m, - # mm_id=od.mm), - # 4)) - - # 1. Only 'valid' actions or all actions? - # actions = unfilter_actions(get_actions(od), od) - actions = filter_actions(get_actions(od)) - - # 2. Manual or random selection? - # action = prompt.choose("Select action:", actions) - action = random_choice(actions) - - if action == None: - print("No enabled actions. Quit.") - break - - (od, msgs) = action() - print(indent('\n'.join(f"▸ {msg}" for msg in msgs), 2)) - - print() - conf = Conformance(state, od.m, od.mm) - print(render_conformance_check_result(conf.check_nominal())) +print() +sim.run(od) diff --git a/util/prompt.py b/util/prompt.py index 918c4b6..bbc9b5c 100644 --- a/util/prompt.py +++ b/util/prompt.py @@ -16,24 +16,3 @@ def pause(): print("press any key...") input() -def choose(msg:str, options): - arr = [] - for i, (key, result) in enumerate(options): - print(f" {i}. {key}") - arr.append(result) - if len(arr) == 0: - return - return __choose(msg, arr) - -def __choose(msg: str, arr): - sys.stdout.write(f"{msg} ") - try: - raw = input() - choice = int(raw) # may raise ValueError - if choice >= 0 and choice < len(arr): - return arr[choice] - except ValueError: - pass - - print("Invalid option") - return __choose(msg, arr)