add examples

This commit is contained in:
Joeri Exelmans 2024-11-13 10:07:16 +01:00
parent 8504ba52f6
commit 42757ddc4f
35 changed files with 1104 additions and 609 deletions

View file

@ -0,0 +1,62 @@
from state.devstate import DevState
from bootstrap.scd import bootstrap_scd
from api.od import ODAPI
from concrete_syntax.common import indent
from concrete_syntax.textual_od import renderer as od_renderer
from concrete_syntax.plantuml import renderer as plantuml
from concrete_syntax.plantuml.make_url import make_url as make_plantuml_url
from concrete_syntax.graphviz.make_url import make_url as make_graphviz_url
from concrete_syntax.graphviz import renderer as graphviz
from transformation.matcher.mvs_adapter import match_od
from transformation.rewriter import rewrite
from transformation.cloner import clone_od
from transformation.ramify import ramify
from transformation.rule import RuleMatcherRewriter, ActionGenerator
from examples.semantics.operational import simulator
from util import loader
import models
state = DevState()
scd_mmm = bootstrap_scd(state)
print("Parsing models...")
mm, mm_rt, m, m_rt_initial = models.load_fibonacci(state, scd_mmm)
mm_rt_ramified = ramify(state, mm_rt)
# print("RT-MM")
# print(make_plantuml_url(plantuml.render_class_diagram(state, mm_rt)))
# print("RAMIFIED RT-MM")
# print(make_plantuml_url(plantuml.render_class_diagram(state, mm_rt_ramified)))
high_priority_rules, low_priority_rules = models.load_rules(state, mm_rt_ramified)
matcher_rewriter = RuleMatcherRewriter(state, mm_rt, mm_rt_ramified)
high_priority_actions = ActionGenerator(matcher_rewriter, high_priority_rules)
low_priority_actions = ActionGenerator(matcher_rewriter, low_priority_rules)
# yields the currently enabled actions
def generate_actions(od):
at_least_one_match = yield from high_priority_actions(od)
if not at_least_one_match:
# Only if no other action is possible, can time advance:
yield from low_priority_actions(od)
sim = simulator.Simulator(
action_generator=generate_actions,
# decision_maker=simulator.InteractiveDecisionMaker(auto_proceed=False),
decision_maker=simulator.RandomDecisionMaker(seed=0),
termination_condition=lambda od: "Time is up" if od.get_slot_value(od.get_all_instances("Clock")[0][1], "time") >= 10 else None,
check_conformance=True,
verbose=True,
renderer=lambda od: od_renderer.render_od(state, od.m, od.mm, hide_names=False),
)
sim.run(ODAPI(state, m_rt_initial, mm_rt))

View file

@ -1,71 +1,41 @@
# This module loads all the models (including the transformation rules) and performs a conformance-check on them.
from util import loader
import os
from framework.conformance import Conformance, render_conformance_check_result
from concrete_syntax.textual_od import parser
from transformation.ramify import ramify
THIS_DIR = os.path.dirname(__file__)
# get file contents as string
def read_file(filename):
dir = os.path.dirname(__file__)
with open(dir+'/'+filename) as file:
with open(THIS_DIR+'/'+filename) as file:
return file.read()
def parse_and_check(state, m_cs, mm, descr: str):
try:
m = parser.parse_od(
state,
m_text=m_cs,
mm=mm,
)
except Exception as e:
e.add_note("While parsing model " + descr)
raise
try:
conf = Conformance(state, m, mm)
errors = conf.check_nominal()
if len(errors) > 0:
print(render_conformance_check_result(errors))
except Exception as e:
e.add_note("In model " + descr)
raise
return m
def get_metamodels(state, scd_mmm):
def load_metamodels(state, scd_mmm):
mm_cs = read_file('models/mm_design.od')
mm_rt_cs = mm_cs + read_file('models/mm_runtime.od')
mm = parse_and_check(state, mm_cs, scd_mmm, "Design meta-model")
mm_rt = parse_and_check(state, mm_rt_cs, scd_mmm, "Runtime meta-model")
mm = loader.parse_and_check(state, mm_cs, scd_mmm, "Design meta-model")
mm_rt = loader.parse_and_check(state, mm_rt_cs, scd_mmm, "Runtime meta-model")
return (mm, mm_rt)
def get_fibonacci(state, scd_mmm):
mm, mm_rt = get_metamodels(state, scd_mmm)
def load_fibonacci(state, scd_mmm):
mm, mm_rt = load_metamodels(state, scd_mmm)
m_cs = read_file('models/m_fibonacci.od')
m_rt_initial_cs = m_cs + read_file('models/m_fibonacci_initial.od')
m = parse_and_check(state, m_cs, mm, "Fibonacci model")
m_rt_initial = parse_and_check(state, m_rt_initial_cs, mm_rt, "Fibonacci initial state")
m = loader.parse_and_check(state, m_cs, mm, "Fibonacci model")
m_rt_initial = loader.parse_and_check(state, m_rt_initial_cs, mm_rt, "Fibonacci initial state")
return (mm, mm_rt, m, m_rt_initial)
RULE_NAMES = ["delay_out", "function_out", "delay_in", "advance_time"]
KINDS = ["nac", "lhs", "rhs"]
def get_rules(state, rt_mm):
rt_mm_ramified = ramify(state, rt_mm)
RULES0 = ["delay_in", "delay_out", "function_out"] # high priority
RULES1 = ["advance_time"] # low priority
rules = {} # e.g., { "delay": {"nac": <UUID>, "lhs": <UUID>, ...}, ...}
def load_rules(state, mm_rt_ramified):
get_filename = lambda rule_name, kind: f"{THIS_DIR}/models/r_{rule_name}_{kind}.od"
for rule_name in RULE_NAMES:
rule = {}
for kind in KINDS:
filename = f"models/r_{rule_name}_{kind}.od";
cs = read_file(filename)
rule_m = parse_and_check(state, cs, rt_mm_ramified, descr=f"'{filename}'")
rule[kind] = rule_m
rules[rule_name] = rule
rules0 = loader.load_rules(state, get_filename, mm_rt_ramified, RULES0)
rules1 = loader.load_rules(state, get_filename, mm_rt_ramified, RULES1)
return (rt_mm_ramified, rules)
return rules0, rules1

View file

@ -0,0 +1,11 @@
This directory contains the following files:
mm_design.od: Meta-model of design model
mm_runtime.od: Meta-model of runtime model
m_fibonacci.od: Design model for computing Fibonacci numbers
m_fibonacci_initial.od: Initial runtime model (=initial state), for our Fibonacci model
r_<rule_name>_lhs.od: Left-hand side of model transformation rule
r_<rule_name>_nac.od: Negative application condition of model transformation rule
r_<rule_name>_rhs.od: Right-hand side of model transformation rule

View file

@ -1,125 +0,0 @@
import functools
import pprint
from state.devstate import DevState
from bootstrap.scd import bootstrap_scd
from api.od import ODAPI
from concrete_syntax.common import indent
from concrete_syntax.textual_od import renderer as od_renderer
from concrete_syntax.plantuml import renderer as plantuml
from concrete_syntax.plantuml.make_url import make_url as make_plantuml_url
from concrete_syntax.graphviz.make_url import make_url as make_graphviz_url
from concrete_syntax.graphviz import renderer as graphviz
from transformation.matcher.mvs_adapter import match_od
from transformation.rewriter import rewrite
from transformation.cloner import clone_od
from examples.semantics.operational import simulator
import models
def match_rule(rule_name, od: ODAPI, lhs, nac):
lhs_matcher = match_od(state,
host_m=od.m,
host_mm=od.mm,
pattern_m=lhs,
pattern_mm=mm_rt_ram)
try:
for i, lhs_match in enumerate(lhs_matcher):
nac_matcher = match_od(state,
host_m=od.m,
host_mm=od.mm,
pattern_m=nac,
pattern_mm=mm_rt_ram,
pivot=lhs_match)
try:
for j, nac_match in enumerate(nac_matcher):
break # there may be more NAC-matches, but we already now enough -> proceed to next lhs_match
else:
yield lhs_match # got match
except Exception as e:
# Make exceptions raised in eval'ed code easier to trace:
e.add_note(f"while matching NAC of '{rule_name}'")
raise
except Exception as e:
# Make exceptions raised in eval'ed code easier to trace:
e.add_note(f"while matching LHS of '{rule_name}'")
raise
def exec_action(rule_name, od: ODAPI, lhs, rhs, lhs_match):
# copy these, will be overwritten in-place
cloned_m = clone_od(state, od.m, od.mm)
rhs_match = dict(lhs_match)
try:
rewrite(state,
lhs_m=lhs,
rhs_m=rhs,
pattern_mm=mm_rt_ram,
lhs_name_mapping=rhs_match,
host_m=cloned_m,
host_mm=od.mm)
except Exception as e:
# Make exceptions raised in eval'ed code easier to trace:
e.add_note(f"while executing RHS of '{rule_name}'")
raise
print("Updated match:\n" + indent(pp.pformat(rhs_match), 6))
return (ODAPI(state, cloned_m, od.mm), [f"executed rule '{rule_name}'"])
pp = pprint.PrettyPrinter(depth=4)
def attempt_rules(od: ODAPI, rule_dict):
at_least_one_match = False
for rule_name, rule in rule_dict.items():
for lhs_match in match_rule(rule_name, od, rule["lhs"], rule["nac"]):
# We got a match!
yield (rule_name + '\n' + indent(pp.pformat(lhs_match), 6),
functools.partial(exec_action,
rule_name, od, rule["lhs"], rule["rhs"], lhs_match))
at_least_one_match = True
return at_least_one_match
def get_actions(od: ODAPI):
# transformation schedule
rule_advance_time = rules["advance_time"]
rules_not_advancing_time = { rule_name: rule for rule_name, rule in rules.items() if rule_name != "advance_time" }
at_least_one_match = yield from attempt_rules(od, rules_not_advancing_time)
if not at_least_one_match:
yield from attempt_rules(od, {"advance_time": rule_advance_time})
state = DevState()
scd_mmm = bootstrap_scd(state)
print("Parsing models...")
mm, mm_rt, m, m_rt_initial = models.get_fibonacci(state, scd_mmm)
mm_rt_ram, rules = models.get_rules(state, mm_rt)
# print("RT-MM")
# print(make_plantuml_url(plantuml.render_class_diagram(state, mm_rt)))
# print("RAMIFIED RT-MM")
# print(make_plantuml_url(plantuml.render_class_diagram(state, mm_rt_ram)))
sim = simulator.Simulator(
action_generator=get_actions,
# decision_maker=simulator.InteractiveDecisionMaker(auto_proceed=False),
decision_maker=simulator.RandomDecisionMaker(seed=0),
termination_condition=lambda od: "Time is up" if od.get_slot_value(od.get_all_instances("Clock")[0][1], "time") >= 10 else None,
check_conformance=True,
verbose=True,
renderer=lambda od: od_renderer.render_od(state, od.m, od.mm, hide_names=False),
)
sim.run(ODAPI(state, m_rt_initial, mm_rt))

View file

@ -98,7 +98,14 @@ port_rt_mm_cs = port_mm_cs + """
BerthState:Class {
# status == empty <=> numShips == 0
constraint = `(get_slot_value(this, "numShips") == 0) == (get_slot_value(this, "status") == "empty")`;
constraint = ```
errors = []
numShips = get_slot_value(this, "numShips")
status = get_slot_value(this, "status")
if (numShips == 0) != (status == "empty"):
errors.append(f"Inconsistent: numShips = {numShips}, but status = {status}")
errors
```;
}
:Inheritance (BerthState -> PlaceState)

View file

@ -0,0 +1,62 @@
import urllib.parse
from state.devstate import DevState
from bootstrap.scd import bootstrap_scd
from framework.conformance import Conformance, render_conformance_check_result
from concrete_syntax.textual_od import parser
from concrete_syntax.plantuml.renderer import render_object_diagram, render_class_diagram
from api.od import ODAPI
from transformation.ramify import ramify
from examples.semantics.operational.simulator import Simulator, RandomDecisionMaker, InteractiveDecisionMaker
from examples.semantics.operational.port import models
from examples.semantics.operational.port.helpers import design_to_state, state_to_design, get_time
from examples.semantics.operational.port.renderer import render_port_textual, render_port_graphviz
from examples.semantics.operational.port import rulebased_sem
state = DevState()
scd_mmm = bootstrap_scd(state) # Load meta-meta-model
### Load (meta-)models ###
def parse_and_check(m_cs: str, mm, descr: str):
m = parser.parse_od(
state,
m_text=m_cs,
mm=mm)
conf = Conformance(state, m, mm)
print(descr, "...", render_conformance_check_result(conf.check_nominal()))
return m
port_mm = parse_and_check(models.port_mm_cs, scd_mmm, "MM")
port_m = parse_and_check(models.port_m_cs, port_mm, "M")
port_rt_mm = parse_and_check(models.port_rt_mm_cs, scd_mmm, "RT-MM")
port_rt_m = parse_and_check(models.port_rt_m_cs, port_rt_mm, "RT-M")
print()
# print(render_class_diagram(state, port_rt_mm))
### Simulate ###
port_rt_mm_ramified = ramify(state, port_rt_mm)
rulebased_action_generator = rulebased_sem.get_action_generator(state, port_rt_mm, port_rt_mm_ramified)
termination_condition = rulebased_sem.TerminationCondition(state, port_rt_mm_ramified)
sim = Simulator(
action_generator=rulebased_action_generator,
# decision_maker=RandomDecisionMaker(seed=2),
decision_maker=InteractiveDecisionMaker(),
termination_condition=termination_condition,
check_conformance=True,
verbose=True,
renderer=render_port_textual,
# renderer=render_port_graphviz,
)
od = ODAPI(state, port_rt_m, port_rt_mm)
sim.run(od)

View file

@ -0,0 +1,67 @@
### Operational Semantics - defined by rule-based model transformation ###
from concrete_syntax.textual_od.parser import parse_od
from transformation.rule import Rule, RuleMatcherRewriter, PriorityActionGenerator
from transformation.matcher.mvs_adapter import match_od
from util import loader
import os
THIS_DIR = os.path.dirname(__file__)
# kind: lhs, rhs, nac
get_filename = lambda rule_name, kind: f"{THIS_DIR}/rules/r_{rule_name}_{kind}.od"
def get_action_generator(state, rt_mm, rt_mm_ramified):
matcher_rewriter = RuleMatcherRewriter(state, rt_mm, rt_mm_ramified)
#############################################################################
# TO IMPLEMENT: Full semantics as a set of rule-based model transformations #
rules0_dict = loader.load_rules(state, get_filename, rt_mm_ramified,
["ship_sinks"] # <- list of rule_name of equal priority
)
rules1_dict = loader.load_rules(state, get_filename, rt_mm_ramified,
["ship_appears_in_berth"]
)
# rules2_dict = ...
generator = PriorityActionGenerator(matcher_rewriter, [
rules0_dict, # highest priority
rules1_dict, # lower priority
# rules2_dict, # lowest priority
])
# TO IMPLEMENT: Full semantics as a set of rule-based model transformations #
#############################################################################
return generator
# The termination condition can also be specified as a pattern:
class TerminationCondition:
def __init__(self, state, rt_mm_ramified):
self.state = state
self.rt_mm_ramified = rt_mm_ramified
# TO IMPLEMENT: terminate simulation when the place 'served' contains 2 ships.
########################################
# You should only edit the pattern below
pattern_cs = """
# Placeholder to make the termination condition never hold:
:GlobalCondition {
condition = `False`;
}
"""
# You should only edit the pattern above
########################################
self.pattern = parse_od(state, pattern_cs, rt_mm_ramified)
def __call__(self, od):
for match in match_od(self.state, od.m, od.mm, self.pattern, self.rt_mm_ramified):
# stop after the first match (no need to look for more matches):
return "There are 2 ships served." # Termination condition statisfied

View file

@ -0,0 +1,13 @@
The names of the files in this directory are important.
A rule must always be named:
r_<rule_name>_<lhs|rhs|nac>.od
It is allowed to have more than one NAC. In this case, the NACs must be named:
r_<rule_name>_nac.od
r_<rule_name>_nac2.od
r_<rule_name>_nac3.od
...
For the assignment, you can delete the existing rules (they are nonsense) and start fresh.

View file

@ -0,0 +1,4 @@
berthState:RAM_BerthState {
RAM_numShips = `get_value(this) == 0`;
RAM_status = `get_value(this) == "empty"`;
}

View file

@ -0,0 +1,4 @@
berthState:RAM_BerthState {
RAM_numShips = `1`;
RAM_status = `"served"`;
}

View file

@ -0,0 +1,5 @@
# Find any place that has at least one ship:
placeState:RAM_PlaceState {
RAM_numShips = `get_value(this) > 0`;
}

View file

@ -0,0 +1,4 @@
placeState:RAM_PlaceState {
# Decrement number of ships:
RAM_numShips = `get_value(this) - 1`;
}

View file

@ -10,12 +10,10 @@ from concrete_syntax.textual_od.renderer import render_od
from transformation.cloner import clone_od
from api.od import ODAPI
class DecisionMaker:
@abc.abstractmethod
def __call__(self, actions):
pass
from util.simulator import MinimalSimulator, DecisionMaker, RandomDecisionMaker, InteractiveDecisionMaker
class Simulator:
class Simulator(MinimalSimulator):
def __init__(self,
action_generator,
decision_maker: DecisionMaker,
@ -24,51 +22,26 @@ class Simulator:
verbose=True,
renderer=lambda od: render_od(od.state, od.m, od.mm),
):
self.action_generator = action_generator
self.decision_maker = decision_maker
self.termination_condition = termination_condition
super().__init__(
action_generator=action_generator,
decision_maker=decision_maker,
termination_condition=lambda od: self.check_render_termination_condition(od),
verbose=verbose,
)
self.check_conformance = check_conformance
self.verbose = verbose
self.actual_termination_condition = termination_condition
self.renderer = renderer
def __print(self, *args):
if self.verbose:
print(*args)
# Run simulation until termination condition satisfied
def run(self, od: ODAPI):
self.__print("Start simulation")
self.__print(f"Decision maker: {self.decision_maker}")
step_counter = 0
while True:
self.__print("--------------")
self.__print(indent(self.renderer(od), 4))
self.__print("--------------")
termination_reason = self.termination_condition(od)
if termination_reason != None:
self.__print(f"Termination condition satisfied.\nReason: {termination_reason}.")
break
actions = self.action_generator(od)
chosen_action = self.decision_maker(actions)
if chosen_action == None:
self.__print(f"No enabled actions.")
break
(od, msgs) = chosen_action()
self.__print(indent('\n'.join(f"{msg}" for msg in msgs), 2))
step_counter += 1
def check_render_termination_condition(self, od):
# A termination condition checker that also renders the model, and performs conformance check
self._print("--------------")
self._print(indent(self.renderer(od), 2))
self._print("--------------")
if self.check_conformance:
self.__print()
conf = Conformance(od.state, od.m, od.mm)
self.__print(render_conformance_check_result(conf.check_nominal()))
self.__print(f"Executed {step_counter} steps.")
return od
self._print(render_conformance_check_result(conf.check_nominal()))
self._print()
return self.actual_termination_condition(od)
def make_actions_pure(actions, od):
# Copy model before modifying it
@ -81,65 +54,17 @@ def make_actions_pure(actions, od):
for descr, action in actions:
yield (descr, functools.partial(exec_pure, action, od))
def filter_valid_actions(pure_actions):
result = {}
def make_tuple(new_od, msgs):
return (new_od, msgs)
for name, callback in pure_actions:
print(f"attempt '{name}' ...", end='\r')
# print(f"attempt '{name}' ...", end='\r')
(new_od, msgs) = callback()
conf = Conformance(new_od.state, new_od.m, new_od.mm)
errors = conf.check_nominal()
# erase current line:
print(" ", end='\r')
# print(" ", end='\r')
if len(errors) == 0:
# updated RT-M is conform, we have a valid action:
yield (name, functools.partial(make_tuple, new_od, msgs))
class RandomDecisionMaker(DecisionMaker):
def __init__(self, seed=0, verbose=True):
self.seed = seed
self.r = random.Random(seed)
def __str__(self):
return f"RandomDecisionMaker(seed={self.seed})"
def __call__(self, actions):
arr = [action for descr, action in actions]
i = math.floor(self.r.random()*len(arr))
return arr[i]
class InteractiveDecisionMaker(DecisionMaker):
# auto_proceed: whether to prompt if there is only one enabled action
def __init__(self, msg="Select action:", auto_proceed=False):
self.msg = msg
self.auto_proceed = auto_proceed
def __str__(self):
return f"InteractiveDecisionMaker()"
def __call__(self, actions):
arr = []
for i, (key, result) in enumerate(actions):
print(f" {i}. {key}")
arr.append(result)
if len(arr) == 0:
return
if len(arr) == 1 and self.auto_proceed:
return arr[0]
def __choose():
sys.stdout.write(f"{self.msg} ")
try:
raw = input()
choice = int(raw) # may raise ValueError
if choice >= 0 and choice < len(arr):
return arr[choice]
except ValueError:
pass
print("Invalid option")
return __choose()
return __choose()

View file

@ -1,337 +0,0 @@
import functools
from state.devstate import DevState
from bootstrap.scd import bootstrap_scd
from framework.conformance import Conformance, render_conformance_check_result
from concrete_syntax.textual_od import parser, renderer
from concrete_syntax.plantuml import renderer as plantuml
from api.od import ODAPI
from examples.semantics.operational.simulator import Simulator, RandomDecisionMaker, InteractiveDecisionMaker, make_actions_pure, filter_valid_actions
state = DevState()
scd_mmm = bootstrap_scd(state) # Load meta-meta-model
### Load (meta-)models ###
# Design meta-model
woods_mm_cs = """
Animal:Class {
abstract = True;
}
Bear:Class
:Inheritance (Bear -> Animal)
Man:Class {
lower_cardinality = 1;
upper_cardinality = 2;
constraint = `get_value(get_slot(this, "weight")) > 20`;
}
:Inheritance (Man -> Animal)
Man_weight:AttributeLink (Man -> Integer) {
name = "weight";
optional = False;
}
afraidOf:Association (Man -> Animal) {
source_upper_cardinality = 6;
target_lower_cardinality = 1;
}
"""
# Runtime meta-model
woods_rt_mm_cs = woods_mm_cs + """
AnimalState:Class {
abstract = True;
}
AnimalState_dead:AttributeLink (AnimalState -> Boolean) {
name = "dead";
optional = False;
}
of:Association (AnimalState -> Animal) {
source_lower_cardinality = 1;
source_upper_cardinality = 1;
target_lower_cardinality = 1;
target_upper_cardinality = 1;
}
BearState:Class {
constraint = `get_type_name(get_target(get_outgoing(this, "of")[0])) == "Bear"`;
}
:Inheritance (BearState -> AnimalState)
BearState_hunger:AttributeLink (BearState -> Integer) {
name = "hunger";
optional = False;
constraint = ```
val = get_value(get_target(this))
val >= 0 and val <= 100
```;
}
ManState:Class {
constraint = `get_type_name(get_target(get_outgoing(this, "of")[0])) == "Man"`;
}
:Inheritance (ManState -> AnimalState)
attacking:Association (AnimalState -> ManState) {
# Animal can only attack one Man at a time
target_upper_cardinality = 1;
# Man can only be attacked by one Animal at a time
source_upper_cardinality = 1;
constraint = ```
attacker = get_source(this)
if get_type_name(attacker) == "BearState":
# only BearState has 'hunger' attribute
hunger = get_value(get_slot(attacker, "hunger"))
else:
hunger = 100 # Man can always attack
attacker_dead = get_value(get_slot(attacker, "dead"))
attacked_state = get_target(this)
attacked_dead = get_value(get_slot(attacked_state, "dead"))
(
hunger >= 50
and not attacker_dead # cannot attack while dead
and not attacked_dead # cannot attack whoever is dead
)
```;
}
attacking_starttime:AttributeLink (attacking -> Integer) {
name = "starttime";
optional = False;
constraint = ```
val = get_value(get_target(this))
_, clock = get_all_instances("Clock")[0]
current_time = get_slot_value(clock, "time")
val >= 0 and val <= current_time
```;
}
# Just a clock singleton for keeping the time
Clock:Class {
lower_cardinality = 1;
upper_cardinality = 1;
}
Clock_time:AttributeLink (Clock -> Integer) {
name = "time";
optional = False;
constraint = `get_value(get_target(this)) >= 0`;
}
"""
# Our design model - the part that doesn't change
woods_m_cs = """
george:Man {
weight = 80;
}
bill:Man {
weight = 70;
}
teddy:Bear
mrBrown:Bear
# george is afraid of both bears
:afraidOf (george -> teddy)
:afraidOf (george -> mrBrown)
# the men are afraid of each other
:afraidOf (bill -> george)
:afraidOf (george -> bill)
"""
# Our runtime model - the part that changes with every execution step
woods_rt_initial_m_cs = woods_m_cs + """
georgeState:ManState {
dead = False;
}
:of (georgeState -> george)
billState:ManState {
dead = False;
}
:of (billState -> bill)
teddyState:BearState {
dead = False;
hunger = 40;
}
:of (teddyState -> teddy)
mrBrownState:BearState {
dead = False;
hunger = 80;
}
:of (mrBrownState -> mrBrown)
clock:Clock {
time = 0;
}
"""
def parse_and_check(m_cs: str, mm, descr: str):
m = parser.parse_od(
state,
m_text=m_cs,
mm=mm)
conf = Conformance(state, m, mm)
print(descr, "...", render_conformance_check_result(conf.check_nominal()))
return m
woods_mm = parse_and_check(woods_mm_cs, scd_mmm, "MM")
woods_rt_mm = parse_and_check(woods_rt_mm_cs, scd_mmm, "RT-MM")
woods_m = parse_and_check(woods_m_cs, woods_mm, "M")
woods_rt_m = parse_and_check(woods_rt_initial_m_cs, woods_rt_mm, "RT-M")
print()
### Semantics ###
# Helpers
def state_of(od, animal):
return od.get_source(od.get_incoming(animal, "of")[0])
def animal_of(od, state):
return od.get_target(od.get_outgoing(state, "of")[0])
def get_time(od):
_, clock = od.get_all_instances("Clock")[0]
return clock, od.get_slot_value(clock, "time")
# Action: Time advances, whoever is being attacked dies, bears become hungrier
def action_advance_time(od):
msgs = []
clock, old_time = get_time(od)
new_time = old_time + 1
od.set_slot_value(clock, "time", new_time)
for _, attacking_link in od.get_all_instances("attacking"):
man_state = od.get_target(attacking_link)
animal_state = od.get_source(attacking_link)
if od.get_type_name(animal_state) == "BearState":
od.set_slot_value(animal_state, "hunger", max(od.get_slot_value(animal_state, "hunger") - 50, 0))
od.set_slot_value(man_state, "dead", True)
od.delete(attacking_link)
msgs.append(f"{od.get_name(animal_of(od, animal_state))} kills {od.get_name(animal_of(od, man_state))}.")
for _, bear_state in od.get_all_instances("BearState"):
if od.get_slot_value(bear_state, "dead"):
continue # bear already dead
old_hunger = od.get_slot_value(bear_state, "hunger")
new_hunger = min(old_hunger + 10, 100)
od.set_slot_value(bear_state, "hunger", new_hunger)
bear = od.get_target(od.get_outgoing(bear_state, "of")[0])
bear_name = od.get_name(bear)
if new_hunger == 100:
od.set_slot_value(bear_state, "dead", True)
msgs.append(f"Bear {bear_name} dies of hunger.")
else:
msgs.append(f"Bear {bear_name}'s hunger level is now {new_hunger}.")
return msgs
# Action: Animal attacks Man
# Note: We must use the names of the objects as parameters, because when cloning, the IDs of objects change!
def action_attack(od, animal_name: str, man_name: str):
msgs = []
animal = od.get(animal_name)
man = od.get(man_name)
animal_state = state_of(od, animal)
man_state = state_of(od, man)
attack_link = od.create_link(None, # auto-generate link name
"attacking", animal_state, man_state)
_, clock = od.get_all_instances("Clock")[0]
current_time = od.get_slot_value(clock, "time")
od.set_slot_value(attack_link, "starttime", current_time)
msgs.append(f"{animal_name} is now attacking {man_name}")
return msgs
# Get all actions that can be performed (including those that bring us to a non-conforming state)
def get_all_actions(od):
def _generate_actions(od):
# can always advance time:
yield ("advance time", action_advance_time)
# if A is afraid of B, then B can attack A:
for _, afraid_link in od.get_all_instances("afraidOf"):
man = od.get_source(afraid_link)
animal = od.get_target(afraid_link)
animal_name = od.get_name(animal)
man_name = od.get_name(man)
man_state = state_of(od, man)
animal_state = state_of(od, animal)
descr = f"{animal_name} ({od.get_type_name(animal)}) attacks {man_name} ({od.get_type_name(man)})"
yield (descr, functools.partial(action_attack, animal_name=animal_name, man_name=man_name))
return make_actions_pure(_generate_actions(od), od)
# Only get those actions that bring us to a conforming state
def get_valid_actions(od):
return filter_valid_actions(get_all_actions(od))
# Render our run-time state to a string
def render_woods(od):
txt = ""
_, time = get_time(od)
txt += f"T = {time}.\n"
txt += "Bears:\n"
def render_attacking(animal_state):
attacking = od.get_outgoing(animal_state, "attacking")
if len(attacking) == 1:
whom_state = od.get_target(attacking[0])
whom_name = od.get_name(animal_of(od, whom_state))
return f" attacking {whom_name}"
else:
return ""
def render_dead(animal_state):
return 'dead' if od.get_slot_value(animal_state, 'dead') else 'alive'
for _, bear_state in od.get_all_instances("BearState"):
bear = animal_of(od, bear_state)
hunger = od.get_slot_value(bear_state, "hunger")
txt += f" 🐻 {od.get_name(bear)} (hunger: {hunger}, {render_dead(bear_state)}) {render_attacking(bear_state)}\n"
txt += "Men:\n"
for _, man_state in od.get_all_instances("ManState"):
man = animal_of(od, man_state)
attacked_by = od.get_incoming(man_state, "attacking")
if len(attacked_by) == 1:
whom_state = od.get_source(attacked_by[0])
whom_name = od.get_name(animal_of(od, whom_state))
being_attacked = f" being attacked by {whom_name}"
else:
being_attacked = ""
txt += f" 👨 {od.get_name(man)} ({render_dead(man_state)}) {render_attacking(man_state)}{being_attacked}\n"
return txt
# When should simulation stop?
def termination_condition(od):
_, time = get_time(od)
if time >= 10:
return "Took too long"
# End simulation when 2 animals are dead
who_is_dead = []
for _, animal_state in od.get_all_instances("AnimalState"):
if od.get_slot_value(animal_state, "dead"):
animal_name = od.get_name(animal_of(od, animal_state))
who_is_dead.append(animal_name)
if len(who_is_dead) >= 2:
return f"{' and '.join(who_is_dead)} are dead"
sim = Simulator(
action_generator=get_valid_actions,
# action_generator=get_all_actions,
decision_maker=RandomDecisionMaker(seed=0),
# decision_maker=InteractiveDecisionMaker(),
termination_condition=termination_condition,
check_conformance=False,
verbose=True,
renderer=render_woods,
)
od = ODAPI(state, woods_rt_m, woods_rt_mm)
sim.run(od)

58
examples/woods/common.py Normal file
View file

@ -0,0 +1,58 @@
# Helpers
def state_of(od, animal):
return od.get_source(od.get_incoming(animal, "of")[0])
def animal_of(od, state):
return od.get_target(od.get_outgoing(state, "of")[0])
def get_time(od):
_, clock = od.get_all_instances("Clock")[0]
return clock, od.get_slot_value(clock, "time")
# Render our run-time state to a string
def render_woods(od):
txt = ""
_, time = get_time(od)
txt += f"T = {time}.\n"
txt += "Bears:\n"
def render_attacking(animal_state):
attacking = od.get_outgoing(animal_state, "attacking")
if len(attacking) == 1:
whom_state = od.get_target(attacking[0])
whom_name = od.get_name(animal_of(od, whom_state))
return f" attacking {whom_name}"
else:
return ""
def render_dead(animal_state):
return 'dead' if od.get_slot_value(animal_state, 'dead') else 'alive'
for _, bear_state in od.get_all_instances("BearState"):
bear = animal_of(od, bear_state)
hunger = od.get_slot_value(bear_state, "hunger")
txt += f" 🐻 {od.get_name(bear)} (hunger: {hunger}, {render_dead(bear_state)}) {render_attacking(bear_state)}\n"
txt += "Men:\n"
for _, man_state in od.get_all_instances("ManState"):
man = animal_of(od, man_state)
attacked_by = od.get_incoming(man_state, "attacking")
if len(attacked_by) == 1:
whom_state = od.get_source(attacked_by[0])
whom_name = od.get_name(animal_of(od, whom_state))
being_attacked = f" being attacked by {whom_name}"
else:
being_attacked = ""
txt += f" 👨 {od.get_name(man)} ({render_dead(man_state)}) {render_attacking(man_state)}{being_attacked}\n"
return txt
# When should simulation stop?
def termination_condition(od):
_, time = get_time(od)
if time >= 10:
return "Took too long"
# End simulation when 2 animals are dead
who_is_dead = []
for _, animal_state in od.get_all_instances("AnimalState"):
if od.get_slot_value(animal_state, "dead"):
animal_name = od.get_name(animal_of(od, animal_state))
who_is_dead.append(animal_name)
if len(who_is_dead) >= 2:
return f"{' and '.join(who_is_dead)} are dead"

158
examples/woods/models.py Normal file
View file

@ -0,0 +1,158 @@
# Design meta-model
woods_mm_cs = """
Animal:Class {
abstract = True;
}
Bear:Class
:Inheritance (Bear -> Animal)
Man:Class {
lower_cardinality = 1;
upper_cardinality = 2;
constraint = `get_value(get_slot(this, "weight")) > 20`;
}
:Inheritance (Man -> Animal)
Man_weight:AttributeLink (Man -> Integer) {
name = "weight";
optional = False;
}
afraidOf:Association (Man -> Animal) {
source_upper_cardinality = 6;
target_lower_cardinality = 1;
}
"""
# Runtime meta-model
woods_rt_mm_cs = woods_mm_cs + """
AnimalState:Class {
abstract = True;
}
AnimalState_dead:AttributeLink (AnimalState -> Boolean) {
name = "dead";
optional = False;
}
of:Association (AnimalState -> Animal) {
source_lower_cardinality = 1;
source_upper_cardinality = 1;
target_lower_cardinality = 1;
target_upper_cardinality = 1;
}
BearState:Class {
constraint = `get_type_name(get_target(get_outgoing(this, "of")[0])) == "Bear"`;
}
:Inheritance (BearState -> AnimalState)
BearState_hunger:AttributeLink (BearState -> Integer) {
name = "hunger";
optional = False;
constraint = ```
val = get_value(get_target(this))
val >= 0 and val <= 100
```;
}
ManState:Class {
constraint = `get_type_name(get_target(get_outgoing(this, "of")[0])) == "Man"`;
}
:Inheritance (ManState -> AnimalState)
attacking:Association (AnimalState -> ManState) {
# Animal can only attack one Man at a time
target_upper_cardinality = 1;
# Man can only be attacked by one Animal at a time
source_upper_cardinality = 1;
constraint = ```
attacker = get_source(this)
if get_type_name(attacker) == "BearState":
# only BearState has 'hunger' attribute
hunger = get_value(get_slot(attacker, "hunger"))
else:
hunger = 100 # Man can always attack
attacker_dead = get_value(get_slot(attacker, "dead"))
attacked_state = get_target(this)
attacked_dead = get_value(get_slot(attacked_state, "dead"))
(
hunger >= 50
and not attacker_dead # cannot attack while dead
and not attacked_dead # cannot attack whoever is dead
)
```;
}
attacking_starttime:AttributeLink (attacking -> Integer) {
name = "starttime";
optional = False;
constraint = ```
val = get_value(get_target(this))
_, clock = get_all_instances("Clock")[0]
current_time = get_slot_value(clock, "time")
val >= 0 and val <= current_time
```;
}
# Just a clock singleton for keeping the time
Clock:Class {
lower_cardinality = 1;
upper_cardinality = 1;
}
Clock_time:AttributeLink (Clock -> Integer) {
name = "time";
optional = False;
constraint = `get_value(get_target(this)) >= 0`;
}
"""
# Our design model - the part that doesn't change
woods_m_cs = """
george:Man {
weight = 80;
}
bill:Man {
weight = 70;
}
teddy:Bear
mrBrown:Bear
# george is afraid of both bears
:afraidOf (george -> teddy)
:afraidOf (george -> mrBrown)
# the men are afraid of each other
:afraidOf (bill -> george)
:afraidOf (george -> bill)
"""
# Our runtime model - the part that changes with every execution step
woods_rt_initial_m_cs = woods_m_cs + """
georgeState:ManState {
dead = False;
}
:of (georgeState -> george)
billState:ManState {
dead = False;
}
:of (billState -> bill)
teddyState:BearState {
dead = False;
hunger = 40;
}
:of (teddyState -> teddy)
mrBrownState:BearState {
dead = False;
hunger = 80;
}
:of (mrBrownState -> mrBrown)
clock:Clock {
time = 0;
}
"""

View file

@ -0,0 +1,75 @@
### Operational Semantics - coded in Python ###
import functools
from examples.semantics.operational.simulator import make_actions_pure, filter_valid_actions
from examples.woods.common import *
# Action: Time advances, whoever is being attacked dies, bears become hungrier
def action_advance_time(od):
msgs = []
clock, old_time = get_time(od)
new_time = old_time + 1
od.set_slot_value(clock, "time", new_time)
for _, attacking_link in od.get_all_instances("attacking"):
man_state = od.get_target(attacking_link)
animal_state = od.get_source(attacking_link)
if od.get_type_name(animal_state) == "BearState":
od.set_slot_value(animal_state, "hunger", max(od.get_slot_value(animal_state, "hunger") - 50, 0))
od.set_slot_value(man_state, "dead", True)
od.delete(attacking_link)
msgs.append(f"{od.get_name(animal_of(od, animal_state))} kills {od.get_name(animal_of(od, man_state))}.")
for _, bear_state in od.get_all_instances("BearState"):
if od.get_slot_value(bear_state, "dead"):
continue # bear already dead
old_hunger = od.get_slot_value(bear_state, "hunger")
new_hunger = min(old_hunger + 10, 100)
od.set_slot_value(bear_state, "hunger", new_hunger)
bear = od.get_target(od.get_outgoing(bear_state, "of")[0])
bear_name = od.get_name(bear)
if new_hunger == 100:
od.set_slot_value(bear_state, "dead", True)
msgs.append(f"Bear {bear_name} dies of hunger.")
else:
msgs.append(f"Bear {bear_name}'s hunger level is now {new_hunger}.")
return msgs
# Action: Animal attacks Man
# Note: We must use the names of the objects as parameters, because when cloning, the IDs of objects change!
def action_attack(od, animal_name: str, man_name: str):
msgs = []
animal = od.get(animal_name)
man = od.get(man_name)
animal_state = state_of(od, animal)
man_state = state_of(od, man)
attack_link = od.create_link(None, # auto-generate link name
"attacking", animal_state, man_state)
_, clock = od.get_all_instances("Clock")[0]
current_time = od.get_slot_value(clock, "time")
od.set_slot_value(attack_link, "starttime", current_time)
msgs.append(f"{animal_name} is now attacking {man_name}")
return msgs
# Get all actions that can be performed (including those that bring us to a non-conforming state)
def get_all_actions(od):
def _generate_actions(od):
# can always advance time:
yield ("advance time", action_advance_time)
# if A is afraid of B, then B can attack A:
for _, afraid_link in od.get_all_instances("afraidOf"):
man = od.get_source(afraid_link)
animal = od.get_target(afraid_link)
animal_name = od.get_name(animal)
man_name = od.get_name(man)
man_state = state_of(od, man)
animal_state = state_of(od, animal)
descr = f"{animal_name} ({od.get_type_name(animal)}) attacks {man_name} ({od.get_type_name(man)})"
yield (descr, functools.partial(action_attack, animal_name=animal_name, man_name=man_name))
return make_actions_pure(_generate_actions(od), od)
# Only get those actions that bring us to a conforming state
def get_valid_actions(od):
return filter_valid_actions(get_all_actions(od))

View file

@ -0,0 +1,25 @@
### Operational Semantics - defined by rule-based model transformation ###
from transformation.rule import Rule, RuleMatcherRewriter, PriorityActionGenerator
from transformation.ramify import ramify
from util import loader
import os
THIS_DIR = os.path.dirname(__file__)
get_filename = lambda rule_name, kind: f"{THIS_DIR}/rules/r_{rule_name}_{kind}.od"
def get_action_generator(state, rt_mm):
rt_mm_ramified = ramify(state, rt_mm)
matcher_rewriter = RuleMatcherRewriter(state, rt_mm, rt_mm_ramified)
rules0_dict = loader.load_rules(state, get_filename, rt_mm_ramified, ["hungry_bear_dies"])
rules1_dict = loader.load_rules(state, get_filename, rt_mm_ramified, ["advance_time", "attack"])
generator = PriorityActionGenerator(matcher_rewriter, [
rules0_dict, # highest priority
rules1_dict, # lowest priority
])
return generator

View file

@ -0,0 +1,4 @@
clock:RAM_Clock {
RAM_time = `True`;
}

View file

@ -0,0 +1,27 @@
clock:RAM_Clock {
RAM_time = `get_value(this) + 1`;
}
# Advance time has a bunch of side-effects that we cannot easily model using NAC/LHS/RHS-kind of rules,
# so we just do it in code:
:GlobalCondition {
condition = ```
for _, attacking_link in get_all_instances("attacking"):
man_state = get_target(attacking_link)
animal_state = get_source(attacking_link)
if get_type_name(animal_state) == "BearState":
# Bear hunger decreases
set_slot_value(animal_state, "hunger", max(get_slot_value(animal_state, "hunger") - 50, 0))
set_slot_value(man_state, "dead", True)
delete(attacking_link)
# Bear hunger increases
for _, bear_state in get_all_instances("BearState"):
if get_slot_value(bear_state, "dead"):
continue # bear already dead
old_hunger = get_slot_value(bear_state, "hunger")
new_hunger = min(old_hunger + 10, 100)
set_slot_value(bear_state, "hunger", new_hunger)
```;
}

View file

@ -0,0 +1,18 @@
# Some man is afraid of some animal:
man:RAM_Man
animal:RAM_Animal
manAfraidOfAnimal:RAM_afraidOf (man -> animal)
# Both man and animal have an associated state:
manState:RAM_ManState
man2State:RAM_of (manState -> man)
animalState:RAM_AnimalState
animal2State:RAM_of (animalState -> animal)

View file

@ -0,0 +1,7 @@
# Cannot attack if already attacking
manState:RAM_ManState
animalState:RAM_AnimalState
:RAM_attacking(animalState -> manState)

View file

@ -0,0 +1,7 @@
# Bear won't attack unless hungry
animalState:RAM_AnimalState {
condition = ```
get_type_name(this) == "BearState" and get_slot_value(this, "hunger") < 50
```;
}

View file

@ -0,0 +1,5 @@
# If dead, cannot be attacked
manState:RAM_ManState {
RAM_dead = `get_value(this)`;
}

View file

@ -0,0 +1,5 @@
# If dead, cannot attack
animalState:RAM_AnimalState {
RAM_dead = `get_value(this)`;
}

View file

@ -0,0 +1,7 @@
# Not already attacking someone else:
animalState:RAM_AnimalState
other:RAM_ManState
:RAM_attacking(animalState -> other)

View file

@ -0,0 +1,7 @@
# Not already being attacked by someone else:
manState:RAM_ManState
other:RAM_AnimalState
:RAM_attacking(other -> manState)

View file

@ -0,0 +1,28 @@
# Our entire LHS (don't delete anything)
# Some man is afraid of some animal:
man:RAM_Man
animal:RAM_Animal
manAfraidOfAnimal:RAM_afraidOf (man -> animal)
# Both man and animal have an associated state:
manState:RAM_ManState
man2State:RAM_of (manState -> man)
animalState:RAM_AnimalState
animal2State:RAM_of (animalState -> animal)
# Animal attacks man:
:RAM_attacking(animalState -> manState) {
RAM_starttime = `get_slot_value(get_all_instances("Clock")[0][1], "time")`;
}

View file

@ -0,0 +1,8 @@
bearState:RAM_BearState {
RAM_hunger = ```
get_value(this) == 100
```;
RAM_dead = ```
not get_value(this)
```;
}

View file

@ -0,0 +1,4 @@
bearState:RAM_BearState {
RAM_hunger = `get_value(this)`; # unchanged
RAM_dead = `True`;
}

View file

@ -0,0 +1,42 @@
from state.devstate import DevState
from bootstrap.scd import bootstrap_scd
from framework.conformance import Conformance, render_conformance_check_result
from concrete_syntax.textual_od import parser, renderer
from concrete_syntax.plantuml import renderer as plantuml
from api.od import ODAPI
from examples.semantics.operational.simulator import Simulator, RandomDecisionMaker, InteractiveDecisionMaker
from examples.woods import models, opsem_python, opsem_rulebased
from examples.woods.common import termination_condition, render_woods
from util import loader
state = DevState()
scd_mmm = bootstrap_scd(state) # Load meta-meta-model
### Load (meta-)models ###
woods_mm = loader.parse_and_check(state, models.woods_mm_cs, scd_mmm, "MM")
woods_rt_mm = loader.parse_and_check(state, models.woods_rt_mm_cs, scd_mmm, "RT-MM")
woods_m = loader.parse_and_check(state, models.woods_m_cs, woods_mm, "M")
woods_rt_m = loader.parse_and_check(state, models.woods_rt_initial_m_cs, woods_rt_mm, "RT-M")
print()
rulebased_action_generator = opsem_rulebased.get_action_generator(state, woods_rt_mm)
sim = Simulator(
# action_generator=opsem_python.get_valid_actions,
# action_generator=opsem_python.get_all_actions,
action_generator=rulebased_action_generator,
# decision_maker=RandomDecisionMaker(seed=3),
decision_maker=InteractiveDecisionMaker(),
termination_condition=termination_condition,
check_conformance=True,
verbose=True,
renderer=render_woods,
)
od = ODAPI(state, woods_rt_m, woods_rt_mm)
sim.run(od)

View file

@ -62,8 +62,8 @@ def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict,
to_delete = lhs_keys - common
to_create = rhs_keys - common
# print("to_delete:", to_delete)
# print("to_create:", to_create)
# print("to delete:", to_delete)
# print("to create:", to_create)
# to be grown
rhs_match = { name : lhs_match[name] for name in common }

147
transformation/rule.py Normal file
View file

@ -0,0 +1,147 @@
import pprint
from typing import Generator, Callable
from uuid import UUID
import functools
from api.od import ODAPI
from concrete_syntax.common import indent
from transformation.matcher.mvs_adapter import match_od
from transformation.rewriter import rewrite
from transformation.cloner import clone_od
class Rule:
def __init__(self, nacs: list[UUID], lhs: UUID, rhs: UUID):
self.nacs = nacs
self.lhs = lhs
self.rhs = rhs
PP = pprint.PrettyPrinter(depth=4)
# Helper for executing NAC/LHS/RHS-type rules
class RuleMatcherRewriter:
def __init__(self, state, mm: UUID, mm_ramified: UUID):
self.state = state
self.mm = mm
self.mm_ramified = mm_ramified
# Generates matches.
# Every match is a dictionary with entries LHS_element_name -> model_element_name
def match_rule(self, m: UUID, lhs: UUID, nacs: list[UUID], rule_name: str) -> Generator[dict, None, None]:
lhs_matcher = match_od(self.state,
host_m=m,
host_mm=self.mm,
pattern_m=lhs,
pattern_mm=self.mm_ramified)
try:
# First we iterate over LHS-matches:
for i, lhs_match in enumerate(lhs_matcher):
nac_matched = False
for nac in nacs:
# For every LHS-match, we see if there is a NAC-match:
nac_matcher = match_od(self.state,
host_m=m,
host_mm=self.mm,
pattern_m=nac,
pattern_mm=self.mm_ramified,
pivot=lhs_match) # try to "grow" LHS-match with NAC-match
try:
for j, nac_match in enumerate(nac_matcher):
# The NAC has at least one match
# (there could be more, but we know enough, so let's not waste CPU/MEM resources and proceed to next LHS match)
nac_matched = True
break
except Exception as e:
# The exception may originate from eval'ed condition-code in LHS or NAC
# Decorate exception with some context, to help with debugging
e.add_note(f"while matching NAC of '{rule_name}'")
raise
if nac_matched:
break
if not nac_matched:
# There were no NAC matches -> yield LHS-match!
yield lhs_match
except Exception as e:
# The exception may originate from eval'ed condition-code in LHS or NAC
# Decorate exception with some context, to help with debugging
e.add_note(f"while matching LHS of '{rule_name}'")
raise
def exec_rule(self, m: UUID, lhs: UUID, rhs: UUID, lhs_match: dict, rule_name: str):
cloned_m = clone_od(self.state, m, self.mm)
try:
rhs_match = rewrite(self.state,
lhs_m=lhs,
rhs_m=rhs,
pattern_mm=self.mm_ramified,
lhs_match=lhs_match,
host_m=cloned_m,
host_mm=self.mm)
except Exception as e:
# Make exceptions raised in eval'ed code easier to trace:
e.add_note(f"while executing RHS of '{rule_name}'")
raise
return (cloned_m, rhs_match)
# Generator that yields actions in the format expected by 'Simulator' class
class ActionGenerator:
def __init__(self, matcher_rewriter: RuleMatcherRewriter, rule_dict: dict[str, Rule]):
self.matcher_rewriter = matcher_rewriter
self.rule_dict = rule_dict
def __call__(self, od: ODAPI):
at_least_one_match = False
for rule_name, rule in self.rule_dict.items():
for lhs_match in self.matcher_rewriter.match_rule(od.m, rule.lhs, rule.nacs, rule_name):
# We got a match!
def do_action(od, rule, lhs_match, rule_name):
new_m, rhs_match = self.matcher_rewriter.exec_rule(od.m, rule.lhs, rule.rhs, lhs_match, rule_name)
msgs = [f"executed rule '{rule_name}'\n" + indent(PP.pformat(rhs_match), 6)]
return (ODAPI(od.state, new_m, od.mm), msgs)
yield (
rule_name + '\n' + indent(PP.pformat(lhs_match), 6), # description of action
functools.partial(do_action, od, rule, lhs_match, rule_name) # the action itself (as a callback)
)
at_least_one_match = True
return at_least_one_match
# Given a list of actions (in high -> low priority), will always yield the highest priority enabled actions.
class PriorityActionGenerator:
def __init__(self, matcher_rewriter: RuleMatcherRewriter, rule_dicts: list[dict[str, Rule]]):
self.generators = [ActionGenerator(matcher_rewriter, rule_dict) for rule_dict in rule_dicts]
def __call__(self, od: ODAPI):
for generator in self.generators:
at_least_one_match = yield from generator(od)
if at_least_one_match:
return True
return False
# class ForAllGenerator:
# def __init__(self, matcher_rewriter: RuleMatcherRewriter, rule_dict: dict[str, Rule]):
# self.matcher_rewriter = matcher_rewriter
# self.rule_dict = rule_dict
# def __call__(self, od: ODAPI):
# matches = []
# for rule_name, rule in self.rule_dict.items():
# for lhs_match in self.matcher_rewriter.match_rule(od.m, rule.lhs, rule.nacs, rule_name):
# matches.append((rule_name, rule, lhs_match))
# def do_action(matches):
# pass
# if len(matches) > 0:
# yield (
# [rule_name for rule_name, _, _ in matches]
# )
# return True
# return False

79
util/loader.py Normal file
View file

@ -0,0 +1,79 @@
import os.path
from framework.conformance import Conformance, render_conformance_check_result
from concrete_syntax.textual_od import parser
from transformation.rule import Rule
# parse model and check conformance
def parse_and_check(state, m_cs, mm, descr: str):
try:
m = parser.parse_od(
state,
m_text=m_cs,
mm=mm,
)
except Exception as e:
e.add_note("While parsing model " + descr)
raise
try:
conf = Conformance(state, m, mm)
errors = conf.check_nominal()
if len(errors) > 0:
print(render_conformance_check_result(errors))
except Exception as e:
e.add_note("In model " + descr)
raise
return m
# get file contents as string
def read_file(filename):
with open(filename) as file:
return file.read()
KINDS = ["nac", "lhs", "rhs"]
# load model transformation rules
def load_rules(state, get_filename, rt_mm_ramified, rule_names):
rules = {}
files_read = []
for rule_name in rule_names:
rule = {}
def parse(kind):
filename = get_filename(rule_name, kind)
descr = "'"+filename+"'"
if kind == "nac":
suffix = ""
nacs = []
try:
while True:
base, ext = os.path.splitext(filename)
processed_filename = base+suffix+ext
nac = parse_and_check(state, read_file(processed_filename), rt_mm_ramified, descr)
nacs.append(nac)
suffix = "2" if suffix == "" else str(int(suffix)+1)
files_read.append(processed_filename)
except FileNotFoundError:
if suffix == "":
print(f"Warning: rule {rule_name} has no NAC ({filename} not found)")
return nacs
elif kind == "lhs" or kind == "rhs":
try:
m = parse_and_check(state, read_file(filename), rt_mm_ramified, descr)
files_read.append(filename)
return m
except FileNotFoundError as e:
print(f"Warning: using empty {kind} ({filename} not found)")
# Use empty model as fill-in:
return parse_and_check(
state,
"",
rt_mm_ramified,
descr="'"+filename+"'")
rules[rule_name] = Rule(*(parse(kind) for kind in KINDS))
print("Rules loaded:\n" + '\n'.join(files_read))
return rules

112
util/simulator.py Normal file
View file

@ -0,0 +1,112 @@
import abc
import random
import math
import functools
import sys
from typing import Callable, Generator
from framework.conformance import Conformance, render_conformance_check_result
from concrete_syntax.common import indent
from concrete_syntax.textual_od.renderer import render_od
from transformation.cloner import clone_od
from api.od import ODAPI
class DecisionMaker:
@abc.abstractmethod
def __call__(self, actions):
pass
class RandomDecisionMaker(DecisionMaker):
def __init__(self, seed=0, verbose=True):
self.seed = seed
self.r = random.Random(seed)
def __str__(self):
return f"RandomDecisionMaker(seed={self.seed})"
def __call__(self, actions):
arr = [action for descr, action in actions]
i = math.floor(self.r.random()*len(arr))
return arr[i]
class InteractiveDecisionMaker(DecisionMaker):
# auto_proceed: whether to prompt if there is only one enabled action
def __init__(self, msg="Select action:", auto_proceed=False):
self.msg = msg
self.auto_proceed = auto_proceed
def __str__(self):
return f"InteractiveDecisionMaker()"
def __call__(self, actions):
arr = []
for i, (key, result) in enumerate(actions):
print(f" {i}. {key}")
arr.append(result)
if len(arr) == 0:
return
if len(arr) == 1 and self.auto_proceed:
return arr[0]
def __choose():
sys.stdout.write(f"{self.msg} ")
try:
raw = input()
choice = int(raw) # may raise ValueError
if choice >= 0 and choice < len(arr):
return arr[choice]
except ValueError:
pass
print("Invalid option")
return __choose()
return __choose()
class MinimalSimulator:
def __init__(self,
action_generator: Callable[[any], Generator[any, None, None]],
decision_maker: DecisionMaker = RandomDecisionMaker(seed=0),
# Returns 'None' to keep running, or a string to end simulation
# Can also have side effects, such as rendering the model, and performing a conformance check.
# BTW, Simulation will always end when there are no more enabled actions.
termination_condition=lambda model: None,
verbose=True,
):
self.action_generator = action_generator
self.decision_maker = decision_maker
self.termination_condition = termination_condition
self.verbose = verbose
def _print(self, *args):
if self.verbose:
print(*args)
# Run simulation until termination condition satisfied
def run(self, model):
self._print("Start simulation")
self._print(f"Decision maker: {self.decision_maker}")
step_counter = 0
while True:
termination_reason = self.termination_condition(model)
if termination_reason != None:
self._print(f"Termination condition satisfied.\nReason: {termination_reason}.")
break
chosen_action = self.decision_maker(self.action_generator(model))
if chosen_action == None:
self._print(f"No enabled actions.")
break
(model, msgs) = chosen_action()
self._print(indent('\n'.join(f"{msg}" for msg in msgs), 4))
step_counter += 1
self._print(f"Executed {step_counter} steps.")
return model