Compare commits

..

10 commits

21 changed files with 611 additions and 134 deletions

View file

@ -104,6 +104,7 @@ class ODAPI:
result.append(i) result.append(i)
return result return result
# Returns list of tuples (name, obj)
def get_all_instances(self, type_name: str, include_subtypes=True): def get_all_instances(self, type_name: str, include_subtypes=True):
if include_subtypes: if include_subtypes:
all_types = self.cdapi.transitive_sub_types[type_name] all_types = self.cdapi.transitive_sub_types[type_name]

View file

@ -1,26 +1,55 @@
import functools
from uuid import UUID from uuid import UUID
from api.od import ODAPI
from services import scd, od from services import scd, od
from services.bottom.V0 import Bottom from services.bottom.V0 import Bottom
from concrete_syntax.common import display_value, display_name, indent from concrete_syntax.common import display_value, display_name, indent
# Turn ModelVerse/muMLE ID into GraphViz ID
def make_graphviz_id(uuid, prefix="") -> str:
result = 'n'+(prefix+str(uuid).replace('-',''))[24:] # we assume that the first 24 characters are always zero...
return result
def render_object_diagram(state, m, mm, render_attributes=True, prefix_ids=""): def render_object_diagram(state, m, mm,
render_attributes=True, # doesn't do anything (yet)
prefix_ids="",
reify=False, # If true, will create a node in the middle of every link. This allows links to be the src/tgt of other links (which muMLE supports), but will result in a larger diagram.
only_render=None, # List of type names or None. If specified, only render instances of these types. E.g., ["Place", "connection"]
type_to_style={}, # Dictionary. Mapping from type-name to graphviz style. E.g., { "generic_link": ",color=purple" }
type_to_label={}, # Dictionary. Mapping from type-name to callback for custom label creation.
):
bottom = Bottom(state) bottom = Bottom(state)
mm_scd = scd.SCD(mm, state) mm_scd = scd.SCD(mm, state)
m_od = od.OD(mm, m, state) m_od = od.OD(mm, m, state)
odapi = ODAPI(state, m, mm)
def make_id(uuid) -> str: make_id = functools.partial(make_graphviz_id, prefix=prefix_ids)
return 'n'+(prefix_ids+str(uuid).replace('-',''))[24:]
output = "" output = ""
# Render objects # Render objects
for class_name, class_node in mm_scd.get_classes().items(): for class_name, class_node in mm_scd.get_classes().items():
if only_render != None and class_name not in only_render:
continue
make_label = type_to_label.get(class_name,
# default, if not found:
lambda obj_name, obj, odapi: f"{display_name(obj_name)} : {class_name}")
output += f"\nsubgraph {class_name} {{"
if render_attributes: if render_attributes:
attributes = od.get_attributes(bottom, class_node) attributes = od.get_attributes(bottom, class_node)
custom_style = type_to_style.get(class_name, "")
if custom_style == "":
output += f"\nnode [shape=rect]"
else:
output += f"\nnode [shape=rect,{custom_style}]"
for obj_name, obj_node in m_od.get_objects(class_node).items(): for obj_name, obj_node in m_od.get_objects(class_node).items():
output += f"\n{make_id(obj_node)} [label=\"{display_name(obj_name)} : {class_name}\", shape=rect] ;" output += f"\n{make_id(obj_node)} [label=\"{make_label(obj_name, obj_node, odapi)}\"] ;"
#" {{" #" {{"
# if render_attributes: # if render_attributes:
@ -31,17 +60,46 @@ def render_object_diagram(state, m, mm, render_attributes=True, prefix_ids=""):
# output += f"\n{attr_name} => {display_value(val, type_name)}" # output += f"\n{attr_name} => {display_value(val, type_name)}"
# output += '\n}' # output += '\n}'
output += '\n}'
output += '\n' output += '\n'
# Render links # Render links
for assoc_name, assoc_edge in mm_scd.get_associations().items(): for assoc_name, assoc_edge in mm_scd.get_associations().items():
if only_render != None and assoc_name not in only_render:
continue
make_label = type_to_label.get(assoc_name,
# default, if not found:
lambda lnk_name, lnk, odapi: f"{display_name(lnk_name)} : {assoc_name}")
output += f"\nsubgraph {assoc_name} {{"
custom_style = type_to_style.get(assoc_name, "")
if custom_style != "":
output += f"\nedge [{custom_style}]"
if reify:
if custom_style != "":
# created nodes will be points of matching style:
output += f"\nnode [{custom_style},shape=point]"
else:
output += "\nnode [shape=point]"
for link_name, link_edge in m_od.get_objects(assoc_edge).items(): for link_name, link_edge in m_od.get_objects(assoc_edge).items():
src_obj = bottom.read_edge_source(link_edge) src_obj = bottom.read_edge_source(link_edge)
tgt_obj = bottom.read_edge_target(link_edge) tgt_obj = bottom.read_edge_target(link_edge)
src_name = m_od.get_object_name(src_obj) src_name = m_od.get_object_name(src_obj)
tgt_name = m_od.get_object_name(tgt_obj) tgt_name = m_od.get_object_name(tgt_obj)
output += f"\n{make_id(src_obj)} -> {make_id(tgt_obj)} [label=\"{display_name(link_name)}:{assoc_name}\"] ;" if reify:
# intermediary node:
output += f"\n{make_id(src_obj)} -> {make_id(link_edge)} [arrowhead=none]"
output += f"\n{make_id(link_edge)} -> {make_id(tgt_obj)}"
output += f"\n{make_id(link_edge)} [xlabel=\"{make_label(link_name, link_edge, odapi)}\"]"
else:
output += f"\n{make_id(src_obj)} -> {make_id(tgt_obj)} [label=\"{make_label(link_name, link_edge, odapi)}\", {custom_style}] ;"
output += '\n}'
return output return output
@ -50,10 +108,8 @@ def render_trace_match(state, name_mapping: dict, pattern_m: UUID, host_m: UUID,
class_type = od.get_scd_mm_class_node(bottom) class_type = od.get_scd_mm_class_node(bottom)
attr_link_type = od.get_scd_mm_attributelink_node(bottom) attr_link_type = od.get_scd_mm_attributelink_node(bottom)
def make_pattern_id(uuid) -> str: make_pattern_id = functools.partial(make_graphviz_id, prefix=prefix_pattern_ids)
return 'n'+(prefix_pattern_ids+str(uuid).replace('-',''))[24:] make_host_id = functools.partial(make_graphviz_id, prefix=prefix_host_ids)
def make_host_id(uuid) -> str:
return 'n'+(prefix_host_ids+str(uuid).replace('-',''))[24:]
output = "" output = ""

View file

@ -0,0 +1,18 @@
{% macro render_name(name) %}{{ name if not hide_names or name.startswith("__") else "" }}{% endmacro %}
{% macro render_attributes(obj) %} {
{% for attr_name in odapi.get_slots(obj) %}
{{ attr_name}} = {{ display_value(
val=odapi.get_slot_value(obj, attr_name),
type_name=odapi.get_type_name(odapi.get_slot(obj, attr_name)),
indentation=4) }};
{% endfor %}
}{% endmacro %}
{% for obj_name, obj in objects %}
{{ render_name(obj_name) }}:{{ odapi.get_type_name(obj) }}{{ render_attributes(obj) }}
{% endfor %}
{% for lnk_name, lnk in links %}
{{ render_name(obj_name) }}:{{ odapi.get_type_name(lnk) }} ({{odapi.get_name(odapi.get_source(lnk))}} -> {{odapi.get_name(odapi.get_target(lnk))}}){{ render_attributes(lnk) }}
{% endfor %}

View file

@ -1,10 +1,10 @@
# Parser for Object Diagrams textual concrete syntax # Parser for Object Diagrams textual concrete syntax
from lark import Lark, logger from lark import Lark, logger, Transformer
from lark.indenter import Indenter from lark.indenter import Indenter
from api.od import ODAPI from api.od import ODAPI
from services.scd import SCD from services.scd import SCD
from concrete_syntax.common import _Code, TBase from concrete_syntax.common import _Code
from uuid import UUID from uuid import UUID
grammar = r""" grammar = r"""
@ -41,11 +41,25 @@ rev_link_spec: "(" IDENTIFIER "<-" IDENTIFIER ")"
slot: IDENTIFIER "=" literal ";" slot: IDENTIFIER "=" literal ";"
""" """
parser = Lark(grammar, parser='lalr') parser = Lark(grammar, parser='lalr', propagate_positions=True)
class DefaultNameGenerator:
def __init__(self):
self.counter = 0
def __call__(self, type_name):
name = f"__{type_name}_{self.counter}"
self.counter += 1
return name
# given a concrete syntax text string, and a meta-model, parses the CS # given a concrete syntax text string, and a meta-model, parses the CS
# Parameter 'type_transform' is useful for adding prefixes to the type names, when parsing a model and pretending it is an instance of a prefixed meta-model. # Parameter 'type_transform' is useful for adding prefixes to the type names, when parsing a model and pretending it is an instance of a prefixed meta-model.
def parse_od(state, m_text, mm, type_transform=lambda type_name: type_name): def parse_od(state,
m_text, # text to parse
mm, # meta-model of model that will be parsed. The meta-model must already have been parsed.
type_transform=lambda type_name: type_name,
name_generator=DefaultNameGenerator(), # exception to raise if anonymous (nameless) object/link occurs in the model. Main reason for this is to forbid them in LHS of transformation rules.
):
tree = parser.parse(m_text) tree = parser.parse(m_text)
m = state.create_node() m = state.create_node()
@ -56,62 +70,95 @@ def parse_od(state, m_text, mm, type_transform=lambda type_name: type_name):
for type_name in ["Integer", "String", "Boolean", "ActionCode"] for type_name in ["Integer", "String", "Boolean", "ActionCode"]
} }
class T(TBase): class T(Transformer):
def __init__(self, visit_tokens): def __init__(self, visit_tokens):
super().__init__(visit_tokens) super().__init__(visit_tokens)
self.obj_counter = 0 # used for generating unique names for anonymous objects
def IDENTIFIER(self, token):
return (str(token), token.line)
def INT(self, token):
return (int(token), token.line)
def BOOL(self, token):
return (token == "True", token.line)
def STR(self, token):
return (str(token[1:-1]), token.line) # strip the "" or ''
def CODE(self, token):
return (_Code(str(token[1:-1])), token.line) # strip the ``
def INDENTED_CODE(self, token):
skip = 4 # strip the ``` and the following newline character
space_count = 0
while token[skip+space_count] == " ":
space_count += 1
lines = token.split('\n')[1:-1]
for line in lines:
if len(line) >= space_count and line[0:space_count] != ' '*space_count:
raise Exception("wrong indentation of INDENTED_CODE")
unindented_lines = [l[space_count:] for l in lines]
return (_Code('\n'.join(unindented_lines)), token.line)
def literal(self, el):
return el[0]
def link_spec(self, el): def link_spec(self, el):
[src, tgt] = el [(src, src_line), (tgt, _)] = el
return (src, tgt) return (src, tgt, src_line)
def rev_link_spec(self, el): def rev_link_spec(self, el):
[tgt, src] = el # <-- reversed :) [(tgt, tgt_line), (src, _)] = el # <-- reversed :)
return (src, tgt) return (src, tgt, tgt_line)
def type_name(self, el): def type_name(self, el):
type_name = el[0] type_name, line = el[0]
if type_name in primitive_types: if type_name in primitive_types:
return type_name return (type_name, line)
else: else:
return type_transform(el[0]) return (type_transform(type_name), line)
def slot(self, el): def slot(self, el):
[attr_name, value] = el [(attr_name, line), (value, _)] = el
return (attr_name, value) return (attr_name, value, line)
def object(self, el): def object(self, el):
[obj_name, type_name, link] = el[0:3] [obj, (type_name, line), link] = el[0:3]
slots = el[3:] slots = el[3:]
if state.read_dict(m, obj_name) != None: try:
msg = f"Element '{obj_name}:{type_name}': name '{obj_name}' already in use." if obj != None:
# raise Exception(msg + " Names must be unique") (obj_name, _) = obj
print(msg + " Ignoring.")
return
if obj_name == None:
# object/link names are optional
# generate a unique name if no name given
obj_name = f"__{type_name}_{self.obj_counter}"
self.obj_counter += 1
if link == None:
obj_node = od.create_object(obj_name, type_name)
else:
src, tgt = link
if tgt in primitive_types:
if state.read_dict(m, tgt) == None:
scd = SCD(m, state)
scd.create_model_ref(tgt, primitive_types[tgt])
src_obj = od.get(src)
tgt_obj = od.get(tgt)
obj_node = od.create_link(obj_name, type_name, src_obj, tgt_obj)
# Create slots
for attr_name, value in slots:
if isinstance(value, _Code):
od.set_slot_value(obj_node, attr_name, value.code, is_code=True)
else: else:
od.set_slot_value(obj_node, attr_name, value) # anonymous object - auto-generate a name
obj_name = name_generator(type_name)
if state.read_dict(m, obj_name) != None:
msg = f"Element '{obj_name}:{type_name}': name '{obj_name}' already in use."
raise Exception(msg + " Names must be unique")
# print(msg + " Ignoring.")
return
if link == None:
obj_node = od.create_object(obj_name, type_name)
else:
(src, tgt, _) = link
if tgt in primitive_types:
if state.read_dict(m, tgt) == None:
scd = SCD(m, state)
scd.create_model_ref(tgt, primitive_types[tgt])
src_obj = od.get(src)
tgt_obj = od.get(tgt)
obj_node = od.create_link(obj_name, type_name, src_obj, tgt_obj)
# Create slots
for attr_name, value, line in slots:
if isinstance(value, _Code):
od.set_slot_value(obj_node, attr_name, value.code, is_code=True)
else:
od.set_slot_value(obj_node, attr_name, value)
return obj_name return obj_name
except Exception as e:
# raising a *new* exception (instead of adding a note to the existing exception) because Lark will also raise a new exception, and ignore our note:
raise Exception(f"at line {line}:\n " + m_text.split('\n')[line-1] + "\n"+ str(e)) from e
t = T(visit_tokens=True).transform(tree) t = T(visit_tokens=True).transform(tree)

View file

@ -0,0 +1,58 @@
import jinja2
import os
from uuid import UUID
THIS_DIR = os.path.dirname(__file__)
from api.od import ODAPI
from concrete_syntax import common
from services.bottom.V0 import Bottom
from util.module_to_dict import module_to_dict
def render_od_jinja2(state, m, mm):
bottom = Bottom(state)
type_model_id = state.read_dict(state.read_root(), "SCD")
scd_model = UUID(state.read_value(type_model_id))
type_odapi = ODAPI(state, mm, scd_model)
objects = []
links = []
to_add = bottom.read_keys(m)
already_added = set()
while len(to_add) > 0:
next_round = []
for obj_name in to_add:
obj = state.read_dict(m, obj_name)
src, tgt = state.read_edge(obj)
if src == None:
# not a link
objects.append((obj_name, obj))
already_added.add(obj)
else:
# A link can only be written out after its source and target have been written out
if src in already_added and tgt in already_added:
links.append((obj_name, obj))
else:
# try again later
next_round.append(obj_name)
if len(next_round) == len(to_add):
raise Exception("We got stuck!", next_round)
to_add = next_round
loader = jinja2.FileSystemLoader(searchpath=THIS_DIR)
environment = jinja2.Environment(
loader=loader,
# whitespace control:
trim_blocks=True,
lstrip_blocks=True,
)
template = environment.get_template("objectdiagrams.jinja2")
return template.render({
'objects': objects,
'links': links,
'odapi': ODAPI(state, m, mm),
**globals()['__builtins__'],
**module_to_dict(common),
})

View file

@ -0,0 +1,6 @@
from uuid import UUID
def get_num_tokens(odapi, place: UUID):
pn_of = odapi.get_incoming(place, "pn_of")[0]
place_state = odapi.get_source(pn_of)
return odapi.get_slot_value(place_state, "numTokens")

View file

@ -1,5 +1,6 @@
from api.od import ODAPI from api.od import ODAPI
from concrete_syntax.graphviz.make_url import show_graphviz from concrete_syntax.graphviz.make_url import show_graphviz
from concrete_syntax.graphviz.renderer import make_graphviz_id
try: try:
import graphviz import graphviz
@ -14,37 +15,45 @@ def render_tokens(num_tokens: int):
return '●●\\n●●' return '●●\\n●●'
return str(num_tokens) return str(num_tokens)
def render_petri_net(od: ODAPI): def render_petri_net_to_dot(od: ODAPI) -> str:
dot = "" dot = ""
dot += "rankdir=LR;" dot += "rankdir=LR;"
dot += "center=true;" dot += "center=true;"
dot += "margin=1;" dot += "margin=1;"
dot += "nodesep=1;" dot += "nodesep=1;"
dot += "edge [arrowhead=vee];"
dot += "node[fontname=Arial,fontsize=10];\n"
dot += "subgraph places {" dot += "subgraph places {"
dot += " node [shape=circle,fixedsize=true,label=\"\", height=.35,width=.35];" dot += " node [fontname=Arial,fontsize=10,shape=circle,fixedsize=true,label=\"\", height=.35,width=.35];"
for _, place in od.get_all_instances("PNPlace"): for place_name, place in od.get_all_instances("PNPlace"):
place_name = od.get_name(place) # place_name = od.get_name(place)
try: try:
place_state = od.get_source(od.get_incoming(place, "pn_of")[0]) place_state = od.get_source(od.get_incoming(place, "pn_of")[0])
num_tokens = od.get_slot_value(place_state, "numTokens") num_tokens = od.get_slot_value(place_state, "numTokens")
except IndexError: except IndexError:
num_tokens = 0 num_tokens = 0
dot += f" {place_name} [label=\"{place_name}\\n\\n{render_tokens(num_tokens)}\\n\\n­\"];\n" dot += f" {make_graphviz_id(place)} [label=\"{place_name}\\n\\n{render_tokens(num_tokens)}\\n\\n­\"];\n"
dot += "}\n" dot += "}\n"
dot += "subgraph transitions {" dot += "subgraph transitions {\n"
dot += " node [shape=rect,fixedsize=true,height=.3,width=.12,style=filled,fillcolor=black,color=white];\n" dot += " edge [arrowhead=normal];\n"
for transition_name, _ in od.get_all_instances("PNTransition"): dot += " node [fontname=Arial,fontsize=10,shape=rect,fixedsize=true,height=.3,width=.12,style=filled,fillcolor=black,color=white];\n"
dot += f" {transition_name} [label=\"{transition_name}\\n\\n\\n­\"];\n" for transition_name, transition in od.get_all_instances("PNTransition"):
dot += f" {make_graphviz_id(transition)} [label=\"{transition_name}\\n\\n\\n­\"];\n"
dot += "}\n" dot += "}\n"
for _, arc in od.get_all_instances("arc"): for _, arc in od.get_all_instances("arc"):
src_name = od.get_name(od.get_source(arc)) src = od.get_source(arc)
tgt_name = od.get_name(od.get_target(arc)) tgt = od.get_target(arc)
dot += f"{src_name} -> {tgt_name};" # src_name = od.get_name(od.get_source(arc))
# tgt_name = od.get_name(od.get_target(arc))
dot += f"{make_graphviz_id(src)} -> {make_graphviz_id(tgt)};"
for _, inhib_arc in od.get_all_instances("inh_arc"): for _, inhib_arc in od.get_all_instances("inh_arc"):
src_name = od.get_name(od.get_source(inhib_arc)) src = od.get_source(inhib_arc)
tgt_name = od.get_name(od.get_target(inhib_arc)) tgt = od.get_target(inhib_arc)
dot += f"{src_name} -> {tgt_name} [arrowhead=odot];\n" dot += f"{make_graphviz_id(src)} -> {make_graphviz_id(tgt)} [arrowhead=odot];\n"
show_graphviz(dot, engine="neato") return dot
return ""
# deprecated
def render_petri_net(od: ODAPI, engine="neato"):
show_graphviz(render_petri_net_to_dot(od), engine=engine)
# use this instead:
def show_petri_net(od: ODAPI, engine="neato"):
show_graphviz(render_petri_net_to_dot(od), engine=engine)

View file

@ -1,12 +1,13 @@
from state.devstate import DevState from state.devstate import DevState
from api.od import ODAPI from api.od import ODAPI
from concrete_syntax.textual_od.renderer import render_od from concrete_syntax.textual_od.renderer import render_od
# from concrete_syntax.textual_od.renderer_jinja2 import render_od_jinja2
from bootstrap.scd import bootstrap_scd from bootstrap.scd import bootstrap_scd
from util import loader from util import loader
from transformation.rule import RuleMatcherRewriter, ActionGenerator from transformation.rule import RuleMatcherRewriter, ActionGenerator
from transformation.ramify import ramify from transformation.ramify import ramify
from examples.semantics.operational import simulator from examples.semantics.operational import simulator
from examples.petrinet.renderer import render_petri_net from examples.petrinet.renderer import show_petri_net
if __name__ == "__main__": if __name__ == "__main__":
@ -48,11 +49,15 @@ if __name__ == "__main__":
matcher_rewriter = RuleMatcherRewriter(state, mm_rt, mm_rt_ramified) matcher_rewriter = RuleMatcherRewriter(state, mm_rt, mm_rt_ramified)
action_generator = ActionGenerator(matcher_rewriter, rules) action_generator = ActionGenerator(matcher_rewriter, rules)
def render_callback(od):
show_petri_net(od)
return render_od(state, od.m, od.mm)
sim = simulator.Simulator( sim = simulator.Simulator(
action_generator=action_generator, action_generator=action_generator,
decision_maker=simulator.InteractiveDecisionMaker(auto_proceed=False), decision_maker=simulator.InteractiveDecisionMaker(auto_proceed=False),
# decision_maker=simulator.RandomDecisionMaker(seed=0), # decision_maker=simulator.RandomDecisionMaker(seed=0),
renderer=lambda od: render_petri_net(od) + render_od(state, od.m, od.mm), renderer=render_callback,
# renderer=lambda od: render_od(state, od.m, od.mm), # renderer=lambda od: render_od(state, od.m, od.mm),
) )

View file

@ -0,0 +1,35 @@
from state.devstate import DevState
from bootstrap.scd import bootstrap_scd
from util import loader
from examples.petrinet.translational_semantics.tapaal.exporter import export_tapaal
if __name__ == "__main__":
import os
THIS_DIR = os.path.dirname(__file__)
# get file contents as string
def read_file(filename):
with open(THIS_DIR+'/'+filename) as file:
return file.read()
state = DevState()
scd_mmm = bootstrap_scd(state)
# Read models from their files
mm_cs = read_file('metamodels/mm_design.od')
mm_rt_cs = mm_cs + read_file('metamodels/mm_runtime.od')
# m_cs = read_file('models/m_example_simple.od')
# m_rt_initial_cs = m_cs + read_file('models/m_example_simple_rt_initial.od')
m_cs = read_file('models/m_example_mutex.od')
m_rt_initial_cs = m_cs + read_file('models/m_example_mutex_rt_initial.od')
# m_cs = read_file('models/m_example_inharc.od')
# m_rt_initial_cs = m_cs + read_file('models/m_example_inharc_rt_initial.od')
# Parse them
mm = loader.parse_and_check(state, mm_cs, scd_mmm, "Petri-Net Design meta-model")
mm_rt = loader.parse_and_check(state, mm_rt_cs, scd_mmm, "Petri-Net Runtime meta-model")
m = loader.parse_and_check(state, m_cs, mm, "Example model")
m_rt_initial = loader.parse_and_check(state, m_rt_initial_cs, mm_rt, "Example model initial state")
with open('exported.tapn', 'w') as f:
f.write(export_tapaal(state, m=m_rt_initial, mm=mm_rt))

View file

@ -0,0 +1,17 @@
import jinja2
import os
THIS_DIR = os.path.dirname(__file__)
from api.od import ODAPI
from examples.petrinet import helpers
from util.module_to_dict import module_to_dict
def export_tapaal(state, m, mm):
loader = jinja2.FileSystemLoader(searchpath=THIS_DIR)
environment = jinja2.Environment(loader=loader)
template = environment.get_template("tapaal.jinja2")
return template.render({
'odapi': ODAPI(state, m, mm),
**globals()['__builtins__'],
**module_to_dict(helpers),
})

View file

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<pnml xmlns="http://www.informatik.hu-berlin.de/top/pnml/ptNetb">
<net active="true" id="PN" type="P/T net">
{% for i, (place_name, place) in enumerate(odapi.get_all_instances("PNPlace")) %}
<place id="{{ place_name }}"
name="{{ place_name }}"
initialMarking="{{ get_num_tokens(odapi, place) }}"
invariant="&lt; inf"
displayName="true"
nameOffsetX="0"
nameOffsetY="0"
positionX="{{ i * 100 + 100 }}"
positionY="100"
/>
{% endfor %}
{% for i, (transition_name, transition) in enumerate(odapi.get_all_instances("PNTransition")) %}
<transition angle="0" displayName="true" id="{{ transition_name }}" infiniteServer="false" name="{{ transition_name }}" nameOffsetX="0" nameOffsetY="0" player="0" positionX="{{ i * 100 + 100 }}" positionY="300" priority="0" urgent="false"/>
{% endfor %}
{% for arc_name, arc in odapi.get_all_instances("arc") %}
<arc id="{{ arc_name }}"
inscription="{{ '[0,inf)' if odapi.get_type_name(odapi.get_source(arc)) == 'PNPlace' else '1' }}"
nameOffsetX="0"
nameOffsetY="0"
weight="1"
type="{{ 'timed' if odapi.get_type_name(odapi.get_source(arc)) == 'PNPlace' else 'normal' }}"
source="{{ odapi.get_name(odapi.get_source(arc)) }}"
target="{{ odapi.get_name(odapi.get_target(arc)) }}">
<arcpath arcPointType="false" id="0" xCoord="0" yCoord="0"/>
<arcpath arcPointType="false" id="1" xCoord="0" yCoord="0"/>
</arc>
{% endfor %}
{% for inh_arc_name, inh_arc in odapi.get_all_instances("inh_arc") %}
<arc id="{{ inh_arc_name }}"
inscription="[0,inf)"
nameOffsetX="0"
nameOffsetY="0"
type="tapnInhibitor"
weight="1"
source="{{ odapi.get_name(odapi.get_source(inh_arc)) }}"
target="{{ odapi.get_name(odapi.get_target(inh_arc)) }}">
<arcpath arcPointType="false" id="0" xCoord="0" yCoord="0"/>
<arcpath arcPointType="false" id="1" xCoord="0" yCoord="0"/>
</arc>
{% endfor %}
</net>
<feature isGame="false" isTimed="false"/>
</pnml>

View file

@ -2,12 +2,14 @@ from concrete_syntax.common import indent
from concrete_syntax.graphviz.make_url import make_url from concrete_syntax.graphviz.make_url import make_url
from examples.semantics.operational.port.helpers import design_to_state, state_to_design, get_time, get_num_ships from examples.semantics.operational.port.helpers import design_to_state, state_to_design, get_time, get_num_ships
def render_port_graphviz(od): def render_port_to_dot(od,
make_id=lambda name,obj: name # by default, we just use the object name for the graphviz node name
):
txt = "" txt = ""
def render_place(place): def render_place(place):
name = od.get_name(place) name = od.get_name(place)
return f'"{name}" [ label = "{name}\\n ships = {get_num_ships(od, place)}", style = filled, fillcolor = lightblue ]\n' return f'"{make_id(name,place)}" [ label = "{name}\\n ships = {get_num_ships(od, place)}", style = filled, fillcolor = lightblue ]\n'
for _, cap in od.get_all_instances("CapacityConstraint", include_subtypes=False): for _, cap in od.get_all_instances("CapacityConstraint", include_subtypes=False):
name = od.get_name(cap) name = od.get_name(cap)
@ -26,10 +28,10 @@ def render_port_graphviz(od):
for _, berth_state in od.get_all_instances("BerthState", include_subtypes=False): for _, berth_state in od.get_all_instances("BerthState", include_subtypes=False):
berth = state_to_design(od, berth_state) berth = state_to_design(od, berth_state)
name = od.get_name(berth) name = od.get_name(berth)
txt += f'"{name}" [ label = "{name}\\n numShips = {get_num_ships(od, berth)}\\n status = {od.get_slot_value(berth_state, "status")}", fillcolor = yellow, style = filled]\n' txt += f'"{make_id(name,berth)}" [ label = "{name}\\n numShips = {get_num_ships(od, berth)}\\n status = {od.get_slot_value(berth_state, "status")}", fillcolor = yellow, style = filled]\n'
for _, gen in od.get_all_instances("Generator", include_subtypes=False): for _, gen in od.get_all_instances("Generator", include_subtypes=False):
txt += f'"{od.get_name(gen)}" [ label = "+", shape = diamond, fillcolor = green, fontsize = 30, style = filled ]\n' txt += f'"{make_id(od.get_name(gen),gen)}" [ label = "+", shape = diamond, fillcolor = green, fontsize = 30, style = filled ]\n'
for _, conn in od.get_all_instances("connection"): for _, conn in od.get_all_instances("connection"):
src = od.get_source(conn) src = od.get_source(conn)
@ -37,23 +39,26 @@ def render_port_graphviz(od):
moved = od.get_slot_value(design_to_state(od, conn), "moved") moved = od.get_slot_value(design_to_state(od, conn), "moved")
src_name = od.get_name(src) src_name = od.get_name(src)
tgt_name = od.get_name(tgt) tgt_name = od.get_name(tgt)
txt += f"{src_name} -> {tgt_name} [color=deepskyblue3, penwidth={1 if moved else 2}];\n" txt += f"{make_id(src_name,src)} -> {make_id(tgt_name,tgt)} [color=deepskyblue3, penwidth={1 if moved else 2}];\n"
for _, workers in od.get_all_instances("WorkerSet"): for _, workers in od.get_all_instances("WorkerSet"):
already_have = [] already_have = []
name = od.get_name(workers) name = od.get_name(workers)
num_workers = od.get_slot_value(workers, "numWorkers") num_workers = od.get_slot_value(workers, "numWorkers")
txt += f'{name} [label="{num_workers} worker(s)", shape=parallelogram, fillcolor=chocolate, style=filled];\n' txt += f'{make_id(name,workers)} [label="{num_workers} worker(s)", shape=parallelogram, fillcolor=chocolate, style=filled];\n'
for lnk in od.get_outgoing(design_to_state(od, workers), "isOperating"): for lnk in od.get_outgoing(design_to_state(od, workers), "isOperating"):
berth = od.get_target(lnk) berth = od.get_target(lnk)
already_have.append(berth) already_have.append(berth)
txt += f"{name} -> {od.get_name(berth)} [arrowhead=none, color=chocolate];\n" txt += f"{make_id(name,workers)} -> {make_id(od.get_name(berth),berth)} [arrowhead=none, color=chocolate];\n"
for lnk in od.get_outgoing(workers, "canOperate"): for lnk in od.get_outgoing(workers, "canOperate"):
berth = od.get_target(lnk) berth = od.get_target(lnk)
if berth not in already_have: if berth not in already_have:
txt += f"{name} -> {od.get_name(berth)} [style=dotted, arrowhead=none, color=chocolate];\n" txt += f"{make_id(name,workers)} -> {make_id(od.get_name(berth),berth)} [style=dotted, arrowhead=none, color=chocolate];\n"
return make_url(txt) return txt
def render_port_graphviz(od):
return make_url(render_port_to_dot(od))
def render_port_textual(od): def render_port_textual(od):
txt = "" txt = ""

View file

@ -0,0 +1,90 @@
from api.od import ODAPI
from concrete_syntax.graphviz.renderer import render_object_diagram, make_graphviz_id
from concrete_syntax.graphviz.make_url import show_graphviz
from examples.petrinet.renderer import render_petri_net_to_dot
from examples.semantics.operational.port.renderer import render_port_to_dot
from examples.semantics.operational.port import helpers
# COLORS
PLACE_BG = "#DAE8FC" # fill color
PLACE_FG = "#6C8EBF" # font, line, arrow
BERTH_BG = "#FFF2CC"
BERTH_FG = "#D6B656"
CAPACITY_BG = "#F5F5F5"
CAPACITY_FG = "#666666"
WORKER_BG = "#D5E8D4"
WORKER_FG = "#82B366"
GENERATOR_BG = "#FFE6CC"
GENERATOR_FG = "#D79B00"
CLOCK_BG = "black"
CLOCK_FG = "white"
def graphviz_style_fg_bg(fg, bg):
return f"style=filled,fillcolor=\"{bg}\",color=\"{fg}\",fontcolor=\"{fg}\""
def render_port(state, m, mm):
dot = render_object_diagram(state, m, mm,
reify=True,
only_render=[
# Only render these types
"Place", "Berth", "CapacityConstraint", "WorkerSet", "Generator", "Clock",
"connection", "capacityOf", "canOperate", "generic_link",
# Petri Net types not included (they are already rendered by other function)
# Port-State-types not included to avoid cluttering the diagram, but if you need them, feel free to add them.
],
# We can style nodes/edges according to their type:
type_to_style={
"Place": graphviz_style_fg_bg(PLACE_FG, PLACE_BG),
"Berth": graphviz_style_fg_bg(BERTH_FG, BERTH_BG),
"CapacityConstraint": graphviz_style_fg_bg(CAPACITY_FG, CAPACITY_BG),
"WorkerSet": "shape=oval,"+graphviz_style_fg_bg(WORKER_FG, WORKER_BG),
"Generator": "shape=parallelogram,"+graphviz_style_fg_bg(GENERATOR_FG, GENERATOR_BG),
"Clock": graphviz_style_fg_bg(CLOCK_FG, CLOCK_BG),
# same blue as Place, thick line:
"connection": f"color=\"{PLACE_FG}\",fontcolor=\"{PLACE_FG}\",penwidth=2.0",
# same grey as CapacityConstraint
"capacityOf": f"color=\"{CAPACITY_FG}\",fontcolor=\"{CAPACITY_FG}\"",
# same green as WorkerSet
"canOperate": f"color=\"{WORKER_FG}\",fontcolor=\"{WORKER_FG}\"",
# purple line
"generic_link": "color=purple,fontcolor=purple,arrowhead=onormal",
},
# We have control over the node/edge labels that are rendered:
type_to_label={
"CapacityConstraint": lambda capconstr_name, capconstr, odapi: f"{capconstr_name}\\nshipCapacity={odapi.get_slot_value(capconstr, "shipCapacity")}",
"Place": lambda place_name, place, odapi: f"{place_name}\\nnumShips={helpers.get_num_ships(odapi, place)}",
"Berth": lambda berth_name, berth, odapi: f"{berth_name}\\nnumShips={helpers.get_num_ships(odapi, berth)}\\nstatus={odapi.get_slot_value(helpers.design_to_state(odapi, berth), "status")}",
"Clock": lambda _, clock, odapi: f"Clock\\ntime={odapi.get_slot_value(clock, "time")}",
"connection": lambda conn_name, conn, odapi: f"{conn_name}\\nmoved={odapi.get_slot_value(helpers.design_to_state(odapi, conn), "moved")}",
# hide generic link labels
"generic_link": lambda lnk_name, lnk, odapi: "",
"WorkerSet": lambda ws_name, ws, odapi: f"{ws_name}\\nnumWorkers={odapi.get_slot_value(ws, "numWorkers")}",
# hide the type (it's already clear enough)
"Generator": lambda gen_name, gen, odapi: gen_name,
},
)
return dot
def render_port_and_petri_net(state, m, mm):
od = ODAPI(state, m, mm)
dot = ""
dot += "// petri net:\n"
dot += render_petri_net_to_dot(od)
dot += "\n// the rest:\n"
dot += render_port(state, m, mm)
return dot
def show_port_and_petri_net(state, m, mm, engine="dot"):
show_graphviz(render_port_and_petri_net(state, m, mm), engine=engine)

View file

@ -17,7 +17,7 @@ from examples.semantics.operational.simulator import Simulator, RandomDecisionMa
from examples.semantics.operational.port import models from examples.semantics.operational.port import models
from examples.semantics.operational.port.helpers import design_to_state, state_to_design, get_time from examples.semantics.operational.port.helpers import design_to_state, state_to_design, get_time
from examples.semantics.operational.port.renderer import render_port_textual, render_port_graphviz from examples.semantics.operational.port.renderer import render_port_textual, render_port_graphviz
from examples.petrinet.renderer import render_petri_net from examples.petrinet.renderer import show_petri_net
from examples.semantics.operational import simulator from examples.semantics.operational import simulator
import os import os
@ -68,12 +68,15 @@ if __name__ == "__main__":
matcher_rewriter = RuleMatcherRewriter(state, merged_mm, ramified_merged_mm) matcher_rewriter = RuleMatcherRewriter(state, merged_mm, ramified_merged_mm)
action_generator = ActionGenerator(matcher_rewriter, rules) action_generator = ActionGenerator(matcher_rewriter, rules)
def render(od):
show_petri_net(od) # graphviz in web browser
return renderer.render_od(state, od.m, od.mm) # text in terminal
sim = simulator.Simulator( sim = simulator.Simulator(
action_generator=action_generator, action_generator=action_generator,
decision_maker=simulator.InteractiveDecisionMaker(auto_proceed=False), decision_maker=simulator.InteractiveDecisionMaker(auto_proceed=False),
# decision_maker=simulator.RandomDecisionMaker(seed=0), # decision_maker=simulator.RandomDecisionMaker(seed=0),
renderer=lambda od: render_petri_net(od) + renderer.render_od(state, od.m, od.mm), renderer=render,
# renderer=lambda od: render_od(state, od.m, od.mm),
) )
sim.run(ODAPI(state, model, merged_mm)) sim.run(ODAPI(state, model, merged_mm))

View file

@ -5,18 +5,15 @@ from concrete_syntax.plantuml.renderer import render_object_diagram, render_clas
from concrete_syntax.plantuml.make_url import make_url from concrete_syntax.plantuml.make_url import make_url
from api.od import ODAPI from api.od import ODAPI
from transformation.ramify import ramify
from transformation.topify.topify import Topifier
from transformation.merger import merge_models
from transformation.ramify import ramify from transformation.ramify import ramify
from transformation.rule import RuleMatcherRewriter from transformation.rule import RuleMatcherRewriter
from util import loader from util import loader
from util.module_to_dict import module_to_dict
from examples.semantics.operational.simulator import Simulator, RandomDecisionMaker, InteractiveDecisionMaker from examples.semantics.operational.port import models, helpers
from examples.semantics.operational.port import models
from examples.semantics.operational.port.helpers import design_to_state, state_to_design, get_time
from examples.semantics.operational.port.renderer import render_port_textual, render_port_graphviz from examples.semantics.operational.port.renderer import render_port_textual, render_port_graphviz
from examples.semantics.translational.renderer import show_port_and_petri_net
from examples.petrinet.renderer import render_petri_net from examples.petrinet.renderer import render_petri_net
import os import os
@ -76,7 +73,14 @@ if __name__ == "__main__":
print('ready!') print('ready!')
port_m_rt = port_m_rt_initial port_m_rt = port_m_rt_initial
matcher_rewriter = RuleMatcherRewriter(state, merged_mm, ramified_merged_mm) eval_context = {
# make all the functions defined in 'helpers' module available to 'condition'-code in LHS/NAC/RHS:
**module_to_dict(helpers),
# another example: in all 'condition'-code, there will be a global variable 'meaning_of_life', equal to 42:
'meaning_of_life': 42, # just to demonstrate - feel free to remove this
}
print('The following additional globals are available:', ', '.join(list(eval_context.keys())))
matcher_rewriter = RuleMatcherRewriter(state, merged_mm, ramified_merged_mm, eval_context=eval_context)
################################### ###################################
# Because the matching of many different rules can be slow, # Because the matching of many different rules can be slow,
@ -104,7 +108,7 @@ if __name__ == "__main__":
try: try:
with open(filename, "r") as file: with open(filename, "r") as file:
port_m_rt = parser.parse_od(state, file.read(), merged_mm) port_m_rt = parser.parse_od(state, file.read(), merged_mm)
print('loaded', filename) print(f'skip rule (found {filename})')
except FileNotFoundError: except FileNotFoundError:
# Fire every rule until it cannot match any longer: # Fire every rule until it cannot match any longer:
while True: while True:
@ -123,6 +127,12 @@ if __name__ == "__main__":
print('wrote', filename) print('wrote', filename)
render_petri_net(ODAPI(state, port_m_rt, merged_mm)) render_petri_net(ODAPI(state, port_m_rt, merged_mm))
# Uncomment to show also the port model:
# show_port_and_petri_net(state, port_m_rt, merged_mm)
# Uncomment to pause after each rendering:
# input()
################################### ###################################
# Once you have generated a Petri Net, you can execute the petri net: # Once you have generated a Petri Net, you can execute the petri net:
# #

View file

@ -1 +1,2 @@
lark==1.1.9 lark==1.1.9
jinja2==3.1.4

View file

@ -168,7 +168,14 @@ def _cannot_call_matched(_):
# This function returns a Generator of matches. # This function returns a Generator of matches.
# The idea is that the user can iterate over the match set, lazily generating it: if only interested in the first match, the entire match set doesn't have to be generated. # The idea is that the user can iterate over the match set, lazily generating it: if only interested in the first match, the entire match set doesn't have to be generated.
def match_od(state, host_m, host_mm, pattern_m, pattern_mm, pivot={}): def match_od(state,
host_m, # the host graph, in which to search for matches
host_mm, # meta-model of the host graph
pattern_m, # the pattern to look for
pattern_mm, # the meta-model of the pattern (typically the RAMified version of host_mm)
pivot={}, # optional: a partial match (restricts possible matches, and speeds up the match process)
eval_context={}, # optional: additional variables, functions, ... to be available while evaluating condition-code in the pattern. Will be available as global variables in the condition-code.
):
bottom = Bottom(state) bottom = Bottom(state)
# compute subtype relations and such: # compute subtype relations and such:
@ -177,6 +184,21 @@ def match_od(state, host_m, host_mm, pattern_m, pattern_mm, pivot={}):
pattern_odapi = ODAPI(state, pattern_m, pattern_mm) pattern_odapi = ODAPI(state, pattern_m, pattern_mm)
pattern_mm_odapi = ODAPI(state, pattern_mm, cdapi.mm) pattern_mm_odapi = ODAPI(state, pattern_mm, cdapi.mm)
# 'globals'-dict used when eval'ing conditions
bound_api = bind_api_readonly(odapi)
builtin = {
**bound_api,
'matched': _cannot_call_matched,
'odapi': odapi,
}
for key in eval_context:
if key in builtin:
print(f"WARNING: custom global '{key}' overrides pre-defined API function. Consider renaming it.")
eval_globals = {
**builtin,
**eval_context,
}
# Function object for pattern matching. Decides whether to match host and guest vertices, where guest is a RAMified instance (e.g., the attributes are all strings with Python expressions), and the host is an instance (=object diagram) of the original model (=class diagram) # Function object for pattern matching. Decides whether to match host and guest vertices, where guest is a RAMified instance (e.g., the attributes are all strings with Python expressions), and the host is an instance (=object diagram) of the original model (=class diagram)
class RAMCompare: class RAMCompare:
def __init__(self, bottom, host_od): def __init__(self, bottom, host_od):
@ -234,10 +256,7 @@ def match_od(state, host_m, host_mm, pattern_m, pattern_mm, pivot={}):
# - incompatible slots may be matched (it is only when their AttributeLinks are matched, that we know the types will be compatible) # - incompatible slots may be matched (it is only when their AttributeLinks are matched, that we know the types will be compatible)
with Timer(f'EVAL condition {g_vtx.name}'): with Timer(f'EVAL condition {g_vtx.name}'):
ok = exec_then_eval(python_code, ok = exec_then_eval(python_code,
_globals={ _globals=eval_globals,
**bind_api_readonly(odapi),
'matched': _cannot_call_matched,
},
_locals={'this': h_vtx.node_id}) _locals={'this': h_vtx.node_id})
self.conditions_to_check.pop(g_vtx.name, None) self.conditions_to_check.pop(g_vtx.name, None)
return ok return ok
@ -324,13 +343,14 @@ def match_od(state, host_m, host_mm, pattern_m, pattern_mm, pivot={}):
def check_conditions(name_mapping): def check_conditions(name_mapping):
eval_globals = {
**bound_api,
# this time, the real 'matched'-function can be used:
'matched': lambda name: bottom.read_outgoing_elements(host_m, name_mapping[name])[0],
**eval_context,
}
def check(python_code: str, loc): def check(python_code: str, loc):
return exec_then_eval(python_code, return exec_then_eval(python_code, _globals=eval_globals, _locals=loc)
_globals={
**bind_api_readonly(odapi),
'matched': lambda name: bottom.read_outgoing_elements(host_m, name_mapping[name])[0],
},
_locals=loc)
# Attribute conditions # Attribute conditions
for pattern_name, host_name in name_mapping.items(): for pattern_name, host_name in name_mapping.items():

View file

@ -3,6 +3,7 @@
# - Change attribute values # - Change attribute values
# - ? that's it? # - ? that's it?
import re
from uuid import UUID from uuid import UUID
from api.od import ODAPI, bind_api from api.od import ODAPI, bind_api
from services.bottom.V0 import Bottom from services.bottom.V0 import Bottom
@ -13,12 +14,22 @@ from services.primitives.actioncode_type import ActionCode
from services.primitives.integer_type import Integer from services.primitives.integer_type import Integer
from util.eval import exec_then_eval, simply_exec from util.eval import exec_then_eval, simply_exec
identifier_regex_pattern = '[_A-Za-z][._A-Za-z0-9]*'
identifier_regex = re.compile(identifier_regex_pattern)
class TryAgainNextRound(Exception): class TryAgainNextRound(Exception):
pass pass
# Rewrite is performed in-place (modifying `host_m`) # Rewrite is performed in-place (modifying `host_m`)
def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict, host_m: UUID, host_mm: UUID): def rewrite(state,
lhs_m: UUID, # LHS-pattern
rhs_m: UUID, # RHS-pattern
pattern_mm: UUID, # meta-model of both patterns (typically the RAMified host_mm)
lhs_match: dict, # a match, morphism, from lhs_m to host_m (mapping pattern name -> host name), typically found by the 'match_od'-function.
host_m: UUID, # host model
host_mm: UUID, # host meta-model
eval_context={}, # optional: additional variables/functions to be available while executing condition-code. These will be seen as global variables.
):
bottom = Bottom(state) bottom = Bottom(state)
# Need to come up with a new, unique name when creating new element in host-model: # Need to come up with a new, unique name when creating new element in host-model:
@ -74,6 +85,29 @@ def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict,
# to be grown # to be grown
rhs_match = { name : lhs_match[name] for name in common } rhs_match = { name : lhs_match[name] for name in common }
bound_api = bind_api(host_odapi)
original_delete = bound_api["delete"]
def wrapped_delete(obj):
not_allowed_to_delete = { host_odapi.get(host_name): pattern_name for pattern_name, host_name in rhs_match.items() }
if obj in not_allowed_to_delete:
pattern_name = not_allowed_to_delete[obj]
raise Exception(f"\n\nYou're trying to delete the element that was matched with the RHS-element '{pattern_name}'. This is not allowed! You're allowed to delete anything BUT NOT elements matched with your RHS-pattern. Instead, simply remove the element '{pattern_name}' from your RHS, if you want to delete it.")
return original_delete(obj)
bound_api["delete"] = wrapped_delete
builtin = {
**bound_api,
'matched': matched_callback,
'odapi': host_odapi,
}
for key in eval_context:
if key in builtin:
print(f"WARNING: custom global '{key}' overrides pre-defined API function. Consider renaming it.")
eval_globals = {
**builtin,
**eval_context,
}
# 1. Perform creations - in the right order! # 1. Perform creations - in the right order!
remaining_to_create = list(to_create) remaining_to_create = list(to_create)
while len(remaining_to_create) > 0: while len(remaining_to_create) > 0:
@ -86,11 +120,9 @@ def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict,
name_expr = rhs_odapi.get_slot_value(rhs_obj, "name") name_expr = rhs_odapi.get_slot_value(rhs_obj, "name")
except: except:
name_expr = f'"{rhs_name}"' # <- if the 'name' slot doesnt exist, use the pattern element name name_expr = f'"{rhs_name}"' # <- if the 'name' slot doesnt exist, use the pattern element name
suggested_name = exec_then_eval(name_expr, suggested_name = exec_then_eval(name_expr, _globals=eval_globals)
_globals={ if not identifier_regex.match(suggested_name):
**bind_api(host_odapi), raise Exception(f"In the RHS pattern element '{rhs_name}', the following name-expression:\n {name_expr}\nproduced the name:\n '{suggested_name}'\nwhich contains illegal characters.\nNames should match the following regex: {identifier_regex_pattern}")
'matched': matched_callback,
})
rhs_type = rhs_odapi.get_type(rhs_obj) rhs_type = rhs_odapi.get_type(rhs_obj)
host_type = ramify.get_original_type(bottom, rhs_type) host_type = ramify.get_original_type(bottom, rhs_type)
# for debugging: # for debugging:
@ -157,10 +189,7 @@ def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict,
host_attr_name = host_mm_odapi.get_slot_value(host_attr_link, "name") host_attr_name = host_mm_odapi.get_slot_value(host_attr_link, "name")
val_name = f"{host_src_name}.{host_attr_name}" val_name = f"{host_src_name}.{host_attr_name}"
python_expr = ActionCode(UUID(bottom.read_value(rhs_obj)), bottom.state).read() python_expr = ActionCode(UUID(bottom.read_value(rhs_obj)), bottom.state).read()
result = exec_then_eval(python_expr, _globals={ result = exec_then_eval(python_expr, _globals=eval_globals)
**bind_api(host_odapi),
'matched': matched_callback,
})
host_odapi.create_primitive_value(val_name, result, is_code=False) host_odapi.create_primitive_value(val_name, result, is_code=False)
rhs_match[rhs_name] = val_name rhs_match[rhs_name] = val_name
else: else:
@ -192,10 +221,7 @@ def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict,
rhs_obj = rhs_odapi.get(common_name) rhs_obj = rhs_odapi.get(common_name)
python_expr = ActionCode(UUID(bottom.read_value(rhs_obj)), bottom.state).read() python_expr = ActionCode(UUID(bottom.read_value(rhs_obj)), bottom.state).read()
result = exec_then_eval(python_expr, result = exec_then_eval(python_expr,
_globals={ _globals=eval_globals,
**bind_api(host_odapi),
'matched': matched_callback,
},
_locals={'this': host_obj}) # 'this' can be used to read the previous value of the slot _locals={'this': host_obj}) # 'this' can be used to read the previous value of the slot
host_odapi.overwrite_primitive_value(host_obj_name, result, is_code=False) host_odapi.overwrite_primitive_value(host_obj_name, result, is_code=False)
else: else:
@ -235,18 +261,12 @@ def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict,
# rhs_obj is an object or link (because association is subtype of class) # rhs_obj is an object or link (because association is subtype of class)
python_code = rhs_odapi.get_slot_value_default(rhs_obj, "condition", default="") python_code = rhs_odapi.get_slot_value_default(rhs_obj, "condition", default="")
simply_exec(python_code, simply_exec(python_code,
_globals={ _globals=eval_globals,
**bind_api(host_odapi),
'matched': matched_callback,
},
_locals={'this': host_obj}) _locals={'this': host_obj})
# 5. Execute global actions # 5. Execute global actions
for cond_name, cond in rhs_odapi.get_all_instances("GlobalCondition"): for cond_name, cond in rhs_odapi.get_all_instances("GlobalCondition"):
python_code = rhs_odapi.get_slot_value(cond, "condition") python_code = rhs_odapi.get_slot_value(cond, "condition")
simply_exec(python_code, _globals={ simply_exec(python_code, _globals=eval_globals)
**bind_api(host_odapi),
'matched': matched_callback,
})
return rhs_match return rhs_match

View file

@ -26,10 +26,11 @@ class _NAC_MATCHED(Exception):
# Helper for executing NAC/LHS/RHS-type rules # Helper for executing NAC/LHS/RHS-type rules
class RuleMatcherRewriter: class RuleMatcherRewriter:
def __init__(self, state, mm: UUID, mm_ramified: UUID): def __init__(self, state, mm: UUID, mm_ramified: UUID, eval_context={}):
self.state = state self.state = state
self.mm = mm self.mm = mm
self.mm_ramified = mm_ramified self.mm_ramified = mm_ramified
self.eval_context = eval_context
# Generates matches. # Generates matches.
# Every match is a dictionary with entries LHS_element_name -> model_element_name # Every match is a dictionary with entries LHS_element_name -> model_element_name
@ -38,7 +39,9 @@ class RuleMatcherRewriter:
host_m=m, host_m=m,
host_mm=self.mm, host_mm=self.mm,
pattern_m=lhs, pattern_m=lhs,
pattern_mm=self.mm_ramified) pattern_mm=self.mm_ramified,
eval_context=self.eval_context,
)
try: try:
# First we iterate over LHS-matches: # First we iterate over LHS-matches:
@ -64,7 +67,9 @@ class RuleMatcherRewriter:
host_mm=self.mm, host_mm=self.mm,
pattern_m=nac, pattern_m=nac,
pattern_mm=self.mm_ramified, pattern_mm=self.mm_ramified,
pivot=lhs_match) # try to "grow" LHS-match with NAC-match pivot=lhs_match, # try to "grow" LHS-match with NAC-match
eval_context=self.eval_context,
)
try: try:
# for nac_match in nac_matcher: # for nac_match in nac_matcher:
@ -117,7 +122,9 @@ class RuleMatcherRewriter:
pattern_mm=self.mm_ramified, pattern_mm=self.mm_ramified,
lhs_match=lhs_match, lhs_match=lhs_match,
host_m=cloned_m, host_m=cloned_m,
host_mm=self.mm) host_mm=self.mm,
eval_context=self.eval_context,
)
except Exception as e: except Exception as e:
# Make exceptions raised in eval'ed code easier to trace: # Make exceptions raised in eval'ed code easier to trace:
e.add_note(f"while executing RHS of '{rule_name}'") e.add_note(f"while executing RHS of '{rule_name}'")

View file

@ -5,13 +5,14 @@ from concrete_syntax.common import indent
from transformation.rule import Rule from transformation.rule import Rule
# parse model and check conformance # parse model and check conformance
def parse_and_check(state, m_cs, mm, descr: str, check_conformance=True, type_transform=lambda type_name: type_name): def parse_and_check(state, m_cs, mm, descr: str, check_conformance=True, type_transform=lambda type_name: type_name, name_generator=parser.DefaultNameGenerator()):
try: try:
m = parser.parse_od( m = parser.parse_od(
state, state,
m_text=m_cs, m_text=m_cs,
mm=mm, mm=mm,
type_transform=type_transform, type_transform=type_transform,
name_generator=name_generator,
) )
except Exception as e: except Exception as e:
e.add_note("While parsing model " + descr) e.add_note("While parsing model " + descr)
@ -35,6 +36,11 @@ def read_file(filename):
KINDS = ["nac", "lhs", "rhs"] KINDS = ["nac", "lhs", "rhs"]
# Phony name generator that raises an error if you try to use it :)
class LHSNameGenerator:
def __call__(self, type_name):
raise Exception(f"Error: Object or link of type '{type_name}' does not have a name.\nAnonymous objects/links are not allowed in the LHS of a rule, because they can have unintended consequences. Please give all of the elements in the LHS explicit names.")
# load model transformation rules # load model transformation rules
def load_rules(state, get_filename, rt_mm_ramified, rule_names, check_conformance=True): def load_rules(state, get_filename, rt_mm_ramified, rule_names, check_conformance=True):
rules = {} rules = {}
@ -62,9 +68,12 @@ def load_rules(state, get_filename, rt_mm_ramified, rule_names, check_conformanc
if suffix == "": if suffix == "":
print(f"Warning: rule {rule_name} has no NAC ({filename} not found)") print(f"Warning: rule {rule_name} has no NAC ({filename} not found)")
return nacs return nacs
elif kind == "lhs" or kind == "rhs": else:
try: try:
m = parse_and_check(state, read_file(filename), rt_mm_ramified, descr, check_conformance) if kind == "lhs":
m = parse_and_check(state, read_file(filename), rt_mm_ramified, descr, check_conformance, name_generator=LHSNameGenerator())
elif kind == "rhs":
m = parse_and_check(state, read_file(filename), rt_mm_ramified, descr, check_conformance)
files_read.append(filename) files_read.append(filename)
return m return m
except FileNotFoundError as e: except FileNotFoundError as e:

8
util/module_to_dict.py Normal file
View file

@ -0,0 +1,8 @@
# Based on: https://stackoverflow.com/a/46263657
def module_to_dict(module):
context = {}
for name in dir(module):
# this will filter out 'private' functions, as well as __builtins__, __name__, __package__, etc.:
if not name.startswith('_'):
context[name] = getattr(module, name)
return context