Compare commits
10 commits
e4c12b7349
...
200f2a3ede
| Author | SHA1 | Date | |
|---|---|---|---|
| 200f2a3ede | |||
| 26d8655147 | |||
| 6aa5302e36 | |||
| 7914f01006 | |||
| 6314506ac0 | |||
| c7288635f8 | |||
| 9e74075066 | |||
| fd8bc3bc40 | |||
| bd1588d809 | |||
| 3ddfc96532 |
21 changed files with 611 additions and 134 deletions
|
|
@ -104,6 +104,7 @@ class ODAPI:
|
|||
result.append(i)
|
||||
return result
|
||||
|
||||
# Returns list of tuples (name, obj)
|
||||
def get_all_instances(self, type_name: str, include_subtypes=True):
|
||||
if include_subtypes:
|
||||
all_types = self.cdapi.transitive_sub_types[type_name]
|
||||
|
|
|
|||
|
|
@ -1,26 +1,55 @@
|
|||
import functools
|
||||
from uuid import UUID
|
||||
from api.od import ODAPI
|
||||
from services import scd, od
|
||||
from services.bottom.V0 import Bottom
|
||||
from concrete_syntax.common import display_value, display_name, indent
|
||||
|
||||
# Turn ModelVerse/muMLE ID into GraphViz ID
|
||||
def make_graphviz_id(uuid, prefix="") -> str:
|
||||
result = 'n'+(prefix+str(uuid).replace('-',''))[24:] # we assume that the first 24 characters are always zero...
|
||||
return result
|
||||
|
||||
def render_object_diagram(state, m, mm, render_attributes=True, prefix_ids=""):
|
||||
def render_object_diagram(state, m, mm,
|
||||
render_attributes=True, # doesn't do anything (yet)
|
||||
prefix_ids="",
|
||||
reify=False, # If true, will create a node in the middle of every link. This allows links to be the src/tgt of other links (which muMLE supports), but will result in a larger diagram.
|
||||
only_render=None, # List of type names or None. If specified, only render instances of these types. E.g., ["Place", "connection"]
|
||||
type_to_style={}, # Dictionary. Mapping from type-name to graphviz style. E.g., { "generic_link": ",color=purple" }
|
||||
type_to_label={}, # Dictionary. Mapping from type-name to callback for custom label creation.
|
||||
):
|
||||
bottom = Bottom(state)
|
||||
mm_scd = scd.SCD(mm, state)
|
||||
m_od = od.OD(mm, m, state)
|
||||
odapi = ODAPI(state, m, mm)
|
||||
|
||||
def make_id(uuid) -> str:
|
||||
return 'n'+(prefix_ids+str(uuid).replace('-',''))[24:]
|
||||
make_id = functools.partial(make_graphviz_id, prefix=prefix_ids)
|
||||
|
||||
output = ""
|
||||
|
||||
# Render objects
|
||||
for class_name, class_node in mm_scd.get_classes().items():
|
||||
if only_render != None and class_name not in only_render:
|
||||
continue
|
||||
|
||||
make_label = type_to_label.get(class_name,
|
||||
# default, if not found:
|
||||
lambda obj_name, obj, odapi: f"{display_name(obj_name)} : {class_name}")
|
||||
|
||||
output += f"\nsubgraph {class_name} {{"
|
||||
|
||||
if render_attributes:
|
||||
attributes = od.get_attributes(bottom, class_node)
|
||||
|
||||
|
||||
custom_style = type_to_style.get(class_name, "")
|
||||
if custom_style == "":
|
||||
output += f"\nnode [shape=rect]"
|
||||
else:
|
||||
output += f"\nnode [shape=rect,{custom_style}]"
|
||||
|
||||
for obj_name, obj_node in m_od.get_objects(class_node).items():
|
||||
output += f"\n{make_id(obj_node)} [label=\"{display_name(obj_name)} : {class_name}\", shape=rect] ;"
|
||||
output += f"\n{make_id(obj_node)} [label=\"{make_label(obj_name, obj_node, odapi)}\"] ;"
|
||||
#" {{"
|
||||
|
||||
# if render_attributes:
|
||||
|
|
@ -31,17 +60,46 @@ def render_object_diagram(state, m, mm, render_attributes=True, prefix_ids=""):
|
|||
# output += f"\n{attr_name} => {display_value(val, type_name)}"
|
||||
# output += '\n}'
|
||||
|
||||
output += '\n}'
|
||||
|
||||
output += '\n'
|
||||
|
||||
# Render links
|
||||
for assoc_name, assoc_edge in mm_scd.get_associations().items():
|
||||
if only_render != None and assoc_name not in only_render:
|
||||
continue
|
||||
|
||||
make_label = type_to_label.get(assoc_name,
|
||||
# default, if not found:
|
||||
lambda lnk_name, lnk, odapi: f"{display_name(lnk_name)} : {assoc_name}")
|
||||
|
||||
output += f"\nsubgraph {assoc_name} {{"
|
||||
|
||||
custom_style = type_to_style.get(assoc_name, "")
|
||||
if custom_style != "":
|
||||
output += f"\nedge [{custom_style}]"
|
||||
if reify:
|
||||
if custom_style != "":
|
||||
# created nodes will be points of matching style:
|
||||
output += f"\nnode [{custom_style},shape=point]"
|
||||
else:
|
||||
output += "\nnode [shape=point]"
|
||||
|
||||
for link_name, link_edge in m_od.get_objects(assoc_edge).items():
|
||||
src_obj = bottom.read_edge_source(link_edge)
|
||||
tgt_obj = bottom.read_edge_target(link_edge)
|
||||
src_name = m_od.get_object_name(src_obj)
|
||||
tgt_name = m_od.get_object_name(tgt_obj)
|
||||
|
||||
output += f"\n{make_id(src_obj)} -> {make_id(tgt_obj)} [label=\"{display_name(link_name)}:{assoc_name}\"] ;"
|
||||
if reify:
|
||||
# intermediary node:
|
||||
output += f"\n{make_id(src_obj)} -> {make_id(link_edge)} [arrowhead=none]"
|
||||
output += f"\n{make_id(link_edge)} -> {make_id(tgt_obj)}"
|
||||
output += f"\n{make_id(link_edge)} [xlabel=\"{make_label(link_name, link_edge, odapi)}\"]"
|
||||
else:
|
||||
output += f"\n{make_id(src_obj)} -> {make_id(tgt_obj)} [label=\"{make_label(link_name, link_edge, odapi)}\", {custom_style}] ;"
|
||||
|
||||
output += '\n}'
|
||||
|
||||
return output
|
||||
|
||||
|
|
@ -50,10 +108,8 @@ def render_trace_match(state, name_mapping: dict, pattern_m: UUID, host_m: UUID,
|
|||
class_type = od.get_scd_mm_class_node(bottom)
|
||||
attr_link_type = od.get_scd_mm_attributelink_node(bottom)
|
||||
|
||||
def make_pattern_id(uuid) -> str:
|
||||
return 'n'+(prefix_pattern_ids+str(uuid).replace('-',''))[24:]
|
||||
def make_host_id(uuid) -> str:
|
||||
return 'n'+(prefix_host_ids+str(uuid).replace('-',''))[24:]
|
||||
make_pattern_id = functools.partial(make_graphviz_id, prefix=prefix_pattern_ids)
|
||||
make_host_id = functools.partial(make_graphviz_id, prefix=prefix_host_ids)
|
||||
|
||||
output = ""
|
||||
|
||||
|
|
|
|||
18
concrete_syntax/textual_od/objectdiagrams.jinja2
Normal file
18
concrete_syntax/textual_od/objectdiagrams.jinja2
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
{% macro render_name(name) %}{{ name if not hide_names or name.startswith("__") else "" }}{% endmacro %}
|
||||
|
||||
{% macro render_attributes(obj) %} {
|
||||
{% for attr_name in odapi.get_slots(obj) %}
|
||||
{{ attr_name}} = {{ display_value(
|
||||
val=odapi.get_slot_value(obj, attr_name),
|
||||
type_name=odapi.get_type_name(odapi.get_slot(obj, attr_name)),
|
||||
indentation=4) }};
|
||||
{% endfor %}
|
||||
}{% endmacro %}
|
||||
|
||||
{% for obj_name, obj in objects %}
|
||||
{{ render_name(obj_name) }}:{{ odapi.get_type_name(obj) }}{{ render_attributes(obj) }}
|
||||
{% endfor %}
|
||||
|
||||
{% for lnk_name, lnk in links %}
|
||||
{{ render_name(obj_name) }}:{{ odapi.get_type_name(lnk) }} ({{odapi.get_name(odapi.get_source(lnk))}} -> {{odapi.get_name(odapi.get_target(lnk))}}){{ render_attributes(lnk) }}
|
||||
{% endfor %}
|
||||
|
|
@ -1,10 +1,10 @@
|
|||
# Parser for Object Diagrams textual concrete syntax
|
||||
|
||||
from lark import Lark, logger
|
||||
from lark import Lark, logger, Transformer
|
||||
from lark.indenter import Indenter
|
||||
from api.od import ODAPI
|
||||
from services.scd import SCD
|
||||
from concrete_syntax.common import _Code, TBase
|
||||
from concrete_syntax.common import _Code
|
||||
from uuid import UUID
|
||||
|
||||
grammar = r"""
|
||||
|
|
@ -41,11 +41,25 @@ rev_link_spec: "(" IDENTIFIER "<-" IDENTIFIER ")"
|
|||
slot: IDENTIFIER "=" literal ";"
|
||||
"""
|
||||
|
||||
parser = Lark(grammar, parser='lalr')
|
||||
parser = Lark(grammar, parser='lalr', propagate_positions=True)
|
||||
|
||||
class DefaultNameGenerator:
|
||||
def __init__(self):
|
||||
self.counter = 0
|
||||
|
||||
def __call__(self, type_name):
|
||||
name = f"__{type_name}_{self.counter}"
|
||||
self.counter += 1
|
||||
return name
|
||||
|
||||
# given a concrete syntax text string, and a meta-model, parses the CS
|
||||
# Parameter 'type_transform' is useful for adding prefixes to the type names, when parsing a model and pretending it is an instance of a prefixed meta-model.
|
||||
def parse_od(state, m_text, mm, type_transform=lambda type_name: type_name):
|
||||
def parse_od(state,
|
||||
m_text, # text to parse
|
||||
mm, # meta-model of model that will be parsed. The meta-model must already have been parsed.
|
||||
type_transform=lambda type_name: type_name,
|
||||
name_generator=DefaultNameGenerator(), # exception to raise if anonymous (nameless) object/link occurs in the model. Main reason for this is to forbid them in LHS of transformation rules.
|
||||
):
|
||||
tree = parser.parse(m_text)
|
||||
|
||||
m = state.create_node()
|
||||
|
|
@ -56,62 +70,95 @@ def parse_od(state, m_text, mm, type_transform=lambda type_name: type_name):
|
|||
for type_name in ["Integer", "String", "Boolean", "ActionCode"]
|
||||
}
|
||||
|
||||
class T(TBase):
|
||||
class T(Transformer):
|
||||
def __init__(self, visit_tokens):
|
||||
super().__init__(visit_tokens)
|
||||
self.obj_counter = 0 # used for generating unique names for anonymous objects
|
||||
|
||||
def IDENTIFIER(self, token):
|
||||
return (str(token), token.line)
|
||||
|
||||
def INT(self, token):
|
||||
return (int(token), token.line)
|
||||
|
||||
def BOOL(self, token):
|
||||
return (token == "True", token.line)
|
||||
|
||||
def STR(self, token):
|
||||
return (str(token[1:-1]), token.line) # strip the "" or ''
|
||||
|
||||
def CODE(self, token):
|
||||
return (_Code(str(token[1:-1])), token.line) # strip the ``
|
||||
|
||||
def INDENTED_CODE(self, token):
|
||||
skip = 4 # strip the ``` and the following newline character
|
||||
space_count = 0
|
||||
while token[skip+space_count] == " ":
|
||||
space_count += 1
|
||||
lines = token.split('\n')[1:-1]
|
||||
for line in lines:
|
||||
if len(line) >= space_count and line[0:space_count] != ' '*space_count:
|
||||
raise Exception("wrong indentation of INDENTED_CODE")
|
||||
unindented_lines = [l[space_count:] for l in lines]
|
||||
return (_Code('\n'.join(unindented_lines)), token.line)
|
||||
|
||||
def literal(self, el):
|
||||
return el[0]
|
||||
|
||||
def link_spec(self, el):
|
||||
[src, tgt] = el
|
||||
return (src, tgt)
|
||||
[(src, src_line), (tgt, _)] = el
|
||||
return (src, tgt, src_line)
|
||||
|
||||
def rev_link_spec(self, el):
|
||||
[tgt, src] = el # <-- reversed :)
|
||||
return (src, tgt)
|
||||
[(tgt, tgt_line), (src, _)] = el # <-- reversed :)
|
||||
return (src, tgt, tgt_line)
|
||||
|
||||
def type_name(self, el):
|
||||
type_name = el[0]
|
||||
type_name, line = el[0]
|
||||
if type_name in primitive_types:
|
||||
return type_name
|
||||
return (type_name, line)
|
||||
else:
|
||||
return type_transform(el[0])
|
||||
return (type_transform(type_name), line)
|
||||
|
||||
def slot(self, el):
|
||||
[attr_name, value] = el
|
||||
return (attr_name, value)
|
||||
[(attr_name, line), (value, _)] = el
|
||||
return (attr_name, value, line)
|
||||
|
||||
def object(self, el):
|
||||
[obj_name, type_name, link] = el[0:3]
|
||||
[obj, (type_name, line), link] = el[0:3]
|
||||
slots = el[3:]
|
||||
if state.read_dict(m, obj_name) != None:
|
||||
msg = f"Element '{obj_name}:{type_name}': name '{obj_name}' already in use."
|
||||
# raise Exception(msg + " Names must be unique")
|
||||
print(msg + " Ignoring.")
|
||||
return
|
||||
if obj_name == None:
|
||||
# object/link names are optional
|
||||
# generate a unique name if no name given
|
||||
obj_name = f"__{type_name}_{self.obj_counter}"
|
||||
self.obj_counter += 1
|
||||
if link == None:
|
||||
obj_node = od.create_object(obj_name, type_name)
|
||||
else:
|
||||
src, tgt = link
|
||||
if tgt in primitive_types:
|
||||
if state.read_dict(m, tgt) == None:
|
||||
scd = SCD(m, state)
|
||||
scd.create_model_ref(tgt, primitive_types[tgt])
|
||||
src_obj = od.get(src)
|
||||
tgt_obj = od.get(tgt)
|
||||
obj_node = od.create_link(obj_name, type_name, src_obj, tgt_obj)
|
||||
# Create slots
|
||||
for attr_name, value in slots:
|
||||
if isinstance(value, _Code):
|
||||
od.set_slot_value(obj_node, attr_name, value.code, is_code=True)
|
||||
try:
|
||||
if obj != None:
|
||||
(obj_name, _) = obj
|
||||
else:
|
||||
od.set_slot_value(obj_node, attr_name, value)
|
||||
# anonymous object - auto-generate a name
|
||||
obj_name = name_generator(type_name)
|
||||
if state.read_dict(m, obj_name) != None:
|
||||
msg = f"Element '{obj_name}:{type_name}': name '{obj_name}' already in use."
|
||||
raise Exception(msg + " Names must be unique")
|
||||
# print(msg + " Ignoring.")
|
||||
return
|
||||
if link == None:
|
||||
obj_node = od.create_object(obj_name, type_name)
|
||||
else:
|
||||
(src, tgt, _) = link
|
||||
if tgt in primitive_types:
|
||||
if state.read_dict(m, tgt) == None:
|
||||
scd = SCD(m, state)
|
||||
scd.create_model_ref(tgt, primitive_types[tgt])
|
||||
src_obj = od.get(src)
|
||||
tgt_obj = od.get(tgt)
|
||||
obj_node = od.create_link(obj_name, type_name, src_obj, tgt_obj)
|
||||
# Create slots
|
||||
for attr_name, value, line in slots:
|
||||
if isinstance(value, _Code):
|
||||
od.set_slot_value(obj_node, attr_name, value.code, is_code=True)
|
||||
else:
|
||||
od.set_slot_value(obj_node, attr_name, value)
|
||||
|
||||
return obj_name
|
||||
return obj_name
|
||||
except Exception as e:
|
||||
# raising a *new* exception (instead of adding a note to the existing exception) because Lark will also raise a new exception, and ignore our note:
|
||||
raise Exception(f"at line {line}:\n " + m_text.split('\n')[line-1] + "\n"+ str(e)) from e
|
||||
|
||||
t = T(visit_tokens=True).transform(tree)
|
||||
|
||||
|
|
|
|||
58
concrete_syntax/textual_od/renderer_jinja2.py
Normal file
58
concrete_syntax/textual_od/renderer_jinja2.py
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
import jinja2
|
||||
import os
|
||||
from uuid import UUID
|
||||
|
||||
THIS_DIR = os.path.dirname(__file__)
|
||||
|
||||
from api.od import ODAPI
|
||||
from concrete_syntax import common
|
||||
from services.bottom.V0 import Bottom
|
||||
from util.module_to_dict import module_to_dict
|
||||
|
||||
def render_od_jinja2(state, m, mm):
|
||||
bottom = Bottom(state)
|
||||
type_model_id = state.read_dict(state.read_root(), "SCD")
|
||||
scd_model = UUID(state.read_value(type_model_id))
|
||||
type_odapi = ODAPI(state, mm, scd_model)
|
||||
|
||||
objects = []
|
||||
links = []
|
||||
|
||||
to_add = bottom.read_keys(m)
|
||||
already_added = set()
|
||||
|
||||
while len(to_add) > 0:
|
||||
next_round = []
|
||||
for obj_name in to_add:
|
||||
obj = state.read_dict(m, obj_name)
|
||||
src, tgt = state.read_edge(obj)
|
||||
if src == None:
|
||||
# not a link
|
||||
objects.append((obj_name, obj))
|
||||
already_added.add(obj)
|
||||
else:
|
||||
# A link can only be written out after its source and target have been written out
|
||||
if src in already_added and tgt in already_added:
|
||||
links.append((obj_name, obj))
|
||||
else:
|
||||
# try again later
|
||||
next_round.append(obj_name)
|
||||
if len(next_round) == len(to_add):
|
||||
raise Exception("We got stuck!", next_round)
|
||||
to_add = next_round
|
||||
|
||||
loader = jinja2.FileSystemLoader(searchpath=THIS_DIR)
|
||||
environment = jinja2.Environment(
|
||||
loader=loader,
|
||||
# whitespace control:
|
||||
trim_blocks=True,
|
||||
lstrip_blocks=True,
|
||||
)
|
||||
template = environment.get_template("objectdiagrams.jinja2")
|
||||
return template.render({
|
||||
'objects': objects,
|
||||
'links': links,
|
||||
'odapi': ODAPI(state, m, mm),
|
||||
**globals()['__builtins__'],
|
||||
**module_to_dict(common),
|
||||
})
|
||||
6
examples/petrinet/helpers.py
Normal file
6
examples/petrinet/helpers.py
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
from uuid import UUID
|
||||
|
||||
def get_num_tokens(odapi, place: UUID):
|
||||
pn_of = odapi.get_incoming(place, "pn_of")[0]
|
||||
place_state = odapi.get_source(pn_of)
|
||||
return odapi.get_slot_value(place_state, "numTokens")
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
from api.od import ODAPI
|
||||
from concrete_syntax.graphviz.make_url import show_graphviz
|
||||
from concrete_syntax.graphviz.renderer import make_graphviz_id
|
||||
|
||||
try:
|
||||
import graphviz
|
||||
|
|
@ -14,37 +15,45 @@ def render_tokens(num_tokens: int):
|
|||
return '●●\\n●●'
|
||||
return str(num_tokens)
|
||||
|
||||
def render_petri_net(od: ODAPI):
|
||||
def render_petri_net_to_dot(od: ODAPI) -> str:
|
||||
dot = ""
|
||||
dot += "rankdir=LR;"
|
||||
dot += "center=true;"
|
||||
dot += "margin=1;"
|
||||
dot += "nodesep=1;"
|
||||
dot += "edge [arrowhead=vee];"
|
||||
dot += "node[fontname=Arial,fontsize=10];\n"
|
||||
dot += "subgraph places {"
|
||||
dot += " node [shape=circle,fixedsize=true,label=\"\", height=.35,width=.35];"
|
||||
for _, place in od.get_all_instances("PNPlace"):
|
||||
place_name = od.get_name(place)
|
||||
dot += " node [fontname=Arial,fontsize=10,shape=circle,fixedsize=true,label=\"\", height=.35,width=.35];"
|
||||
for place_name, place in od.get_all_instances("PNPlace"):
|
||||
# place_name = od.get_name(place)
|
||||
try:
|
||||
place_state = od.get_source(od.get_incoming(place, "pn_of")[0])
|
||||
num_tokens = od.get_slot_value(place_state, "numTokens")
|
||||
except IndexError:
|
||||
num_tokens = 0
|
||||
dot += f" {place_name} [label=\"{place_name}\\n\\n{render_tokens(num_tokens)}\\n\\n\"];\n"
|
||||
dot += f" {make_graphviz_id(place)} [label=\"{place_name}\\n\\n{render_tokens(num_tokens)}\\n\\n\"];\n"
|
||||
dot += "}\n"
|
||||
dot += "subgraph transitions {"
|
||||
dot += " node [shape=rect,fixedsize=true,height=.3,width=.12,style=filled,fillcolor=black,color=white];\n"
|
||||
for transition_name, _ in od.get_all_instances("PNTransition"):
|
||||
dot += f" {transition_name} [label=\"{transition_name}\\n\\n\\n\"];\n"
|
||||
dot += "subgraph transitions {\n"
|
||||
dot += " edge [arrowhead=normal];\n"
|
||||
dot += " node [fontname=Arial,fontsize=10,shape=rect,fixedsize=true,height=.3,width=.12,style=filled,fillcolor=black,color=white];\n"
|
||||
for transition_name, transition in od.get_all_instances("PNTransition"):
|
||||
dot += f" {make_graphviz_id(transition)} [label=\"{transition_name}\\n\\n\\n\"];\n"
|
||||
dot += "}\n"
|
||||
for _, arc in od.get_all_instances("arc"):
|
||||
src_name = od.get_name(od.get_source(arc))
|
||||
tgt_name = od.get_name(od.get_target(arc))
|
||||
dot += f"{src_name} -> {tgt_name};"
|
||||
src = od.get_source(arc)
|
||||
tgt = od.get_target(arc)
|
||||
# src_name = od.get_name(od.get_source(arc))
|
||||
# tgt_name = od.get_name(od.get_target(arc))
|
||||
dot += f"{make_graphviz_id(src)} -> {make_graphviz_id(tgt)};"
|
||||
for _, inhib_arc in od.get_all_instances("inh_arc"):
|
||||
src_name = od.get_name(od.get_source(inhib_arc))
|
||||
tgt_name = od.get_name(od.get_target(inhib_arc))
|
||||
dot += f"{src_name} -> {tgt_name} [arrowhead=odot];\n"
|
||||
show_graphviz(dot, engine="neato")
|
||||
return ""
|
||||
src = od.get_source(inhib_arc)
|
||||
tgt = od.get_target(inhib_arc)
|
||||
dot += f"{make_graphviz_id(src)} -> {make_graphviz_id(tgt)} [arrowhead=odot];\n"
|
||||
return dot
|
||||
|
||||
# deprecated
|
||||
def render_petri_net(od: ODAPI, engine="neato"):
|
||||
show_graphviz(render_petri_net_to_dot(od), engine=engine)
|
||||
|
||||
# use this instead:
|
||||
def show_petri_net(od: ODAPI, engine="neato"):
|
||||
show_graphviz(render_petri_net_to_dot(od), engine=engine)
|
||||
|
|
@ -1,12 +1,13 @@
|
|||
from state.devstate import DevState
|
||||
from api.od import ODAPI
|
||||
from concrete_syntax.textual_od.renderer import render_od
|
||||
# from concrete_syntax.textual_od.renderer_jinja2 import render_od_jinja2
|
||||
from bootstrap.scd import bootstrap_scd
|
||||
from util import loader
|
||||
from transformation.rule import RuleMatcherRewriter, ActionGenerator
|
||||
from transformation.ramify import ramify
|
||||
from examples.semantics.operational import simulator
|
||||
from examples.petrinet.renderer import render_petri_net
|
||||
from examples.petrinet.renderer import show_petri_net
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
@ -48,11 +49,15 @@ if __name__ == "__main__":
|
|||
matcher_rewriter = RuleMatcherRewriter(state, mm_rt, mm_rt_ramified)
|
||||
action_generator = ActionGenerator(matcher_rewriter, rules)
|
||||
|
||||
def render_callback(od):
|
||||
show_petri_net(od)
|
||||
return render_od(state, od.m, od.mm)
|
||||
|
||||
sim = simulator.Simulator(
|
||||
action_generator=action_generator,
|
||||
decision_maker=simulator.InteractiveDecisionMaker(auto_proceed=False),
|
||||
# decision_maker=simulator.RandomDecisionMaker(seed=0),
|
||||
renderer=lambda od: render_petri_net(od) + render_od(state, od.m, od.mm),
|
||||
renderer=render_callback,
|
||||
# renderer=lambda od: render_od(state, od.m, od.mm),
|
||||
)
|
||||
|
||||
|
|
|
|||
35
examples/petrinet/runner_export_tapaal.py
Normal file
35
examples/petrinet/runner_export_tapaal.py
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
from state.devstate import DevState
|
||||
from bootstrap.scd import bootstrap_scd
|
||||
from util import loader
|
||||
|
||||
from examples.petrinet.translational_semantics.tapaal.exporter import export_tapaal
|
||||
|
||||
if __name__ == "__main__":
|
||||
import os
|
||||
THIS_DIR = os.path.dirname(__file__)
|
||||
|
||||
# get file contents as string
|
||||
def read_file(filename):
|
||||
with open(THIS_DIR+'/'+filename) as file:
|
||||
return file.read()
|
||||
|
||||
state = DevState()
|
||||
scd_mmm = bootstrap_scd(state)
|
||||
# Read models from their files
|
||||
mm_cs = read_file('metamodels/mm_design.od')
|
||||
mm_rt_cs = mm_cs + read_file('metamodels/mm_runtime.od')
|
||||
# m_cs = read_file('models/m_example_simple.od')
|
||||
# m_rt_initial_cs = m_cs + read_file('models/m_example_simple_rt_initial.od')
|
||||
m_cs = read_file('models/m_example_mutex.od')
|
||||
m_rt_initial_cs = m_cs + read_file('models/m_example_mutex_rt_initial.od')
|
||||
# m_cs = read_file('models/m_example_inharc.od')
|
||||
# m_rt_initial_cs = m_cs + read_file('models/m_example_inharc_rt_initial.od')
|
||||
|
||||
# Parse them
|
||||
mm = loader.parse_and_check(state, mm_cs, scd_mmm, "Petri-Net Design meta-model")
|
||||
mm_rt = loader.parse_and_check(state, mm_rt_cs, scd_mmm, "Petri-Net Runtime meta-model")
|
||||
m = loader.parse_and_check(state, m_cs, mm, "Example model")
|
||||
m_rt_initial = loader.parse_and_check(state, m_rt_initial_cs, mm_rt, "Example model initial state")
|
||||
|
||||
with open('exported.tapn', 'w') as f:
|
||||
f.write(export_tapaal(state, m=m_rt_initial, mm=mm_rt))
|
||||
17
examples/petrinet/translational_semantics/tapaal/exporter.py
Normal file
17
examples/petrinet/translational_semantics/tapaal/exporter.py
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
import jinja2
|
||||
import os
|
||||
THIS_DIR = os.path.dirname(__file__)
|
||||
|
||||
from api.od import ODAPI
|
||||
from examples.petrinet import helpers
|
||||
from util.module_to_dict import module_to_dict
|
||||
|
||||
def export_tapaal(state, m, mm):
|
||||
loader = jinja2.FileSystemLoader(searchpath=THIS_DIR)
|
||||
environment = jinja2.Environment(loader=loader)
|
||||
template = environment.get_template("tapaal.jinja2")
|
||||
return template.render({
|
||||
'odapi': ODAPI(state, m, mm),
|
||||
**globals()['__builtins__'],
|
||||
**module_to_dict(helpers),
|
||||
})
|
||||
|
|
@ -0,0 +1,52 @@
|
|||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<pnml xmlns="http://www.informatik.hu-berlin.de/top/pnml/ptNetb">
|
||||
<net active="true" id="PN" type="P/T net">
|
||||
|
||||
{% for i, (place_name, place) in enumerate(odapi.get_all_instances("PNPlace")) %}
|
||||
<place id="{{ place_name }}"
|
||||
name="{{ place_name }}"
|
||||
initialMarking="{{ get_num_tokens(odapi, place) }}"
|
||||
invariant="< inf"
|
||||
displayName="true"
|
||||
nameOffsetX="0"
|
||||
nameOffsetY="0"
|
||||
positionX="{{ i * 100 + 100 }}"
|
||||
positionY="100"
|
||||
/>
|
||||
{% endfor %}
|
||||
|
||||
{% for i, (transition_name, transition) in enumerate(odapi.get_all_instances("PNTransition")) %}
|
||||
<transition angle="0" displayName="true" id="{{ transition_name }}" infiniteServer="false" name="{{ transition_name }}" nameOffsetX="0" nameOffsetY="0" player="0" positionX="{{ i * 100 + 100 }}" positionY="300" priority="0" urgent="false"/>
|
||||
{% endfor %}
|
||||
|
||||
{% for arc_name, arc in odapi.get_all_instances("arc") %}
|
||||
<arc id="{{ arc_name }}"
|
||||
inscription="{{ '[0,inf)' if odapi.get_type_name(odapi.get_source(arc)) == 'PNPlace' else '1' }}"
|
||||
nameOffsetX="0"
|
||||
nameOffsetY="0"
|
||||
weight="1"
|
||||
type="{{ 'timed' if odapi.get_type_name(odapi.get_source(arc)) == 'PNPlace' else 'normal' }}"
|
||||
source="{{ odapi.get_name(odapi.get_source(arc)) }}"
|
||||
target="{{ odapi.get_name(odapi.get_target(arc)) }}">
|
||||
<arcpath arcPointType="false" id="0" xCoord="0" yCoord="0"/>
|
||||
<arcpath arcPointType="false" id="1" xCoord="0" yCoord="0"/>
|
||||
</arc>
|
||||
{% endfor %}
|
||||
|
||||
{% for inh_arc_name, inh_arc in odapi.get_all_instances("inh_arc") %}
|
||||
<arc id="{{ inh_arc_name }}"
|
||||
inscription="[0,inf)"
|
||||
nameOffsetX="0"
|
||||
nameOffsetY="0"
|
||||
type="tapnInhibitor"
|
||||
weight="1"
|
||||
source="{{ odapi.get_name(odapi.get_source(inh_arc)) }}"
|
||||
target="{{ odapi.get_name(odapi.get_target(inh_arc)) }}">
|
||||
<arcpath arcPointType="false" id="0" xCoord="0" yCoord="0"/>
|
||||
<arcpath arcPointType="false" id="1" xCoord="0" yCoord="0"/>
|
||||
</arc>
|
||||
{% endfor %}
|
||||
|
||||
</net>
|
||||
<feature isGame="false" isTimed="false"/>
|
||||
</pnml>
|
||||
|
|
@ -2,12 +2,14 @@ from concrete_syntax.common import indent
|
|||
from concrete_syntax.graphviz.make_url import make_url
|
||||
from examples.semantics.operational.port.helpers import design_to_state, state_to_design, get_time, get_num_ships
|
||||
|
||||
def render_port_graphviz(od):
|
||||
def render_port_to_dot(od,
|
||||
make_id=lambda name,obj: name # by default, we just use the object name for the graphviz node name
|
||||
):
|
||||
txt = ""
|
||||
|
||||
def render_place(place):
|
||||
name = od.get_name(place)
|
||||
return f'"{name}" [ label = "{name}\\n ships = {get_num_ships(od, place)}", style = filled, fillcolor = lightblue ]\n'
|
||||
return f'"{make_id(name,place)}" [ label = "{name}\\n ships = {get_num_ships(od, place)}", style = filled, fillcolor = lightblue ]\n'
|
||||
|
||||
for _, cap in od.get_all_instances("CapacityConstraint", include_subtypes=False):
|
||||
name = od.get_name(cap)
|
||||
|
|
@ -26,10 +28,10 @@ def render_port_graphviz(od):
|
|||
for _, berth_state in od.get_all_instances("BerthState", include_subtypes=False):
|
||||
berth = state_to_design(od, berth_state)
|
||||
name = od.get_name(berth)
|
||||
txt += f'"{name}" [ label = "{name}\\n numShips = {get_num_ships(od, berth)}\\n status = {od.get_slot_value(berth_state, "status")}", fillcolor = yellow, style = filled]\n'
|
||||
txt += f'"{make_id(name,berth)}" [ label = "{name}\\n numShips = {get_num_ships(od, berth)}\\n status = {od.get_slot_value(berth_state, "status")}", fillcolor = yellow, style = filled]\n'
|
||||
|
||||
for _, gen in od.get_all_instances("Generator", include_subtypes=False):
|
||||
txt += f'"{od.get_name(gen)}" [ label = "+", shape = diamond, fillcolor = green, fontsize = 30, style = filled ]\n'
|
||||
txt += f'"{make_id(od.get_name(gen),gen)}" [ label = "+", shape = diamond, fillcolor = green, fontsize = 30, style = filled ]\n'
|
||||
|
||||
for _, conn in od.get_all_instances("connection"):
|
||||
src = od.get_source(conn)
|
||||
|
|
@ -37,23 +39,26 @@ def render_port_graphviz(od):
|
|||
moved = od.get_slot_value(design_to_state(od, conn), "moved")
|
||||
src_name = od.get_name(src)
|
||||
tgt_name = od.get_name(tgt)
|
||||
txt += f"{src_name} -> {tgt_name} [color=deepskyblue3, penwidth={1 if moved else 2}];\n"
|
||||
txt += f"{make_id(src_name,src)} -> {make_id(tgt_name,tgt)} [color=deepskyblue3, penwidth={1 if moved else 2}];\n"
|
||||
|
||||
for _, workers in od.get_all_instances("WorkerSet"):
|
||||
already_have = []
|
||||
name = od.get_name(workers)
|
||||
num_workers = od.get_slot_value(workers, "numWorkers")
|
||||
txt += f'{name} [label="{num_workers} worker(s)", shape=parallelogram, fillcolor=chocolate, style=filled];\n'
|
||||
txt += f'{make_id(name,workers)} [label="{num_workers} worker(s)", shape=parallelogram, fillcolor=chocolate, style=filled];\n'
|
||||
for lnk in od.get_outgoing(design_to_state(od, workers), "isOperating"):
|
||||
berth = od.get_target(lnk)
|
||||
already_have.append(berth)
|
||||
txt += f"{name} -> {od.get_name(berth)} [arrowhead=none, color=chocolate];\n"
|
||||
txt += f"{make_id(name,workers)} -> {make_id(od.get_name(berth),berth)} [arrowhead=none, color=chocolate];\n"
|
||||
for lnk in od.get_outgoing(workers, "canOperate"):
|
||||
berth = od.get_target(lnk)
|
||||
if berth not in already_have:
|
||||
txt += f"{name} -> {od.get_name(berth)} [style=dotted, arrowhead=none, color=chocolate];\n"
|
||||
txt += f"{make_id(name,workers)} -> {make_id(od.get_name(berth),berth)} [style=dotted, arrowhead=none, color=chocolate];\n"
|
||||
|
||||
return make_url(txt)
|
||||
return txt
|
||||
|
||||
def render_port_graphviz(od):
|
||||
return make_url(render_port_to_dot(od))
|
||||
|
||||
def render_port_textual(od):
|
||||
txt = ""
|
||||
|
|
|
|||
90
examples/semantics/translational/renderer.py
Normal file
90
examples/semantics/translational/renderer.py
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
from api.od import ODAPI
|
||||
from concrete_syntax.graphviz.renderer import render_object_diagram, make_graphviz_id
|
||||
from concrete_syntax.graphviz.make_url import show_graphviz
|
||||
from examples.petrinet.renderer import render_petri_net_to_dot
|
||||
from examples.semantics.operational.port.renderer import render_port_to_dot
|
||||
from examples.semantics.operational.port import helpers
|
||||
|
||||
# COLORS
|
||||
PLACE_BG = "#DAE8FC" # fill color
|
||||
PLACE_FG = "#6C8EBF" # font, line, arrow
|
||||
BERTH_BG = "#FFF2CC"
|
||||
BERTH_FG = "#D6B656"
|
||||
CAPACITY_BG = "#F5F5F5"
|
||||
CAPACITY_FG = "#666666"
|
||||
WORKER_BG = "#D5E8D4"
|
||||
WORKER_FG = "#82B366"
|
||||
GENERATOR_BG = "#FFE6CC"
|
||||
GENERATOR_FG = "#D79B00"
|
||||
CLOCK_BG = "black"
|
||||
CLOCK_FG = "white"
|
||||
|
||||
def graphviz_style_fg_bg(fg, bg):
|
||||
return f"style=filled,fillcolor=\"{bg}\",color=\"{fg}\",fontcolor=\"{fg}\""
|
||||
|
||||
def render_port(state, m, mm):
|
||||
dot = render_object_diagram(state, m, mm,
|
||||
reify=True,
|
||||
only_render=[
|
||||
# Only render these types
|
||||
"Place", "Berth", "CapacityConstraint", "WorkerSet", "Generator", "Clock",
|
||||
"connection", "capacityOf", "canOperate", "generic_link",
|
||||
# Petri Net types not included (they are already rendered by other function)
|
||||
# Port-State-types not included to avoid cluttering the diagram, but if you need them, feel free to add them.
|
||||
],
|
||||
# We can style nodes/edges according to their type:
|
||||
type_to_style={
|
||||
"Place": graphviz_style_fg_bg(PLACE_FG, PLACE_BG),
|
||||
"Berth": graphviz_style_fg_bg(BERTH_FG, BERTH_BG),
|
||||
"CapacityConstraint": graphviz_style_fg_bg(CAPACITY_FG, CAPACITY_BG),
|
||||
"WorkerSet": "shape=oval,"+graphviz_style_fg_bg(WORKER_FG, WORKER_BG),
|
||||
"Generator": "shape=parallelogram,"+graphviz_style_fg_bg(GENERATOR_FG, GENERATOR_BG),
|
||||
"Clock": graphviz_style_fg_bg(CLOCK_FG, CLOCK_BG),
|
||||
|
||||
# same blue as Place, thick line:
|
||||
"connection": f"color=\"{PLACE_FG}\",fontcolor=\"{PLACE_FG}\",penwidth=2.0",
|
||||
|
||||
# same grey as CapacityConstraint
|
||||
"capacityOf": f"color=\"{CAPACITY_FG}\",fontcolor=\"{CAPACITY_FG}\"",
|
||||
|
||||
# same green as WorkerSet
|
||||
"canOperate": f"color=\"{WORKER_FG}\",fontcolor=\"{WORKER_FG}\"",
|
||||
|
||||
# purple line
|
||||
"generic_link": "color=purple,fontcolor=purple,arrowhead=onormal",
|
||||
},
|
||||
# We have control over the node/edge labels that are rendered:
|
||||
type_to_label={
|
||||
"CapacityConstraint": lambda capconstr_name, capconstr, odapi: f"{capconstr_name}\\nshipCapacity={odapi.get_slot_value(capconstr, "shipCapacity")}",
|
||||
|
||||
"Place": lambda place_name, place, odapi: f"{place_name}\\nnumShips={helpers.get_num_ships(odapi, place)}",
|
||||
|
||||
"Berth": lambda berth_name, berth, odapi: f"{berth_name}\\nnumShips={helpers.get_num_ships(odapi, berth)}\\nstatus={odapi.get_slot_value(helpers.design_to_state(odapi, berth), "status")}",
|
||||
|
||||
"Clock": lambda _, clock, odapi: f"Clock\\ntime={odapi.get_slot_value(clock, "time")}",
|
||||
|
||||
"connection": lambda conn_name, conn, odapi: f"{conn_name}\\nmoved={odapi.get_slot_value(helpers.design_to_state(odapi, conn), "moved")}",
|
||||
|
||||
# hide generic link labels
|
||||
"generic_link": lambda lnk_name, lnk, odapi: "",
|
||||
|
||||
"WorkerSet": lambda ws_name, ws, odapi: f"{ws_name}\\nnumWorkers={odapi.get_slot_value(ws, "numWorkers")}",
|
||||
|
||||
# hide the type (it's already clear enough)
|
||||
"Generator": lambda gen_name, gen, odapi: gen_name,
|
||||
},
|
||||
)
|
||||
return dot
|
||||
|
||||
def render_port_and_petri_net(state, m, mm):
|
||||
od = ODAPI(state, m, mm)
|
||||
dot = ""
|
||||
dot += "// petri net:\n"
|
||||
dot += render_petri_net_to_dot(od)
|
||||
dot += "\n// the rest:\n"
|
||||
dot += render_port(state, m, mm)
|
||||
return dot
|
||||
|
||||
|
||||
def show_port_and_petri_net(state, m, mm, engine="dot"):
|
||||
show_graphviz(render_port_and_petri_net(state, m, mm), engine=engine)
|
||||
|
|
@ -17,7 +17,7 @@ from examples.semantics.operational.simulator import Simulator, RandomDecisionMa
|
|||
from examples.semantics.operational.port import models
|
||||
from examples.semantics.operational.port.helpers import design_to_state, state_to_design, get_time
|
||||
from examples.semantics.operational.port.renderer import render_port_textual, render_port_graphviz
|
||||
from examples.petrinet.renderer import render_petri_net
|
||||
from examples.petrinet.renderer import show_petri_net
|
||||
from examples.semantics.operational import simulator
|
||||
|
||||
import os
|
||||
|
|
@ -68,12 +68,15 @@ if __name__ == "__main__":
|
|||
matcher_rewriter = RuleMatcherRewriter(state, merged_mm, ramified_merged_mm)
|
||||
action_generator = ActionGenerator(matcher_rewriter, rules)
|
||||
|
||||
def render(od):
|
||||
show_petri_net(od) # graphviz in web browser
|
||||
return renderer.render_od(state, od.m, od.mm) # text in terminal
|
||||
|
||||
sim = simulator.Simulator(
|
||||
action_generator=action_generator,
|
||||
decision_maker=simulator.InteractiveDecisionMaker(auto_proceed=False),
|
||||
# decision_maker=simulator.RandomDecisionMaker(seed=0),
|
||||
renderer=lambda od: render_petri_net(od) + renderer.render_od(state, od.m, od.mm),
|
||||
# renderer=lambda od: render_od(state, od.m, od.mm),
|
||||
renderer=render,
|
||||
)
|
||||
|
||||
sim.run(ODAPI(state, model, merged_mm))
|
||||
|
|
|
|||
|
|
@ -5,18 +5,15 @@ from concrete_syntax.plantuml.renderer import render_object_diagram, render_clas
|
|||
from concrete_syntax.plantuml.make_url import make_url
|
||||
from api.od import ODAPI
|
||||
|
||||
from transformation.ramify import ramify
|
||||
from transformation.topify.topify import Topifier
|
||||
from transformation.merger import merge_models
|
||||
from transformation.ramify import ramify
|
||||
from transformation.rule import RuleMatcherRewriter
|
||||
|
||||
from util import loader
|
||||
from util.module_to_dict import module_to_dict
|
||||
|
||||
from examples.semantics.operational.simulator import Simulator, RandomDecisionMaker, InteractiveDecisionMaker
|
||||
from examples.semantics.operational.port import models
|
||||
from examples.semantics.operational.port.helpers import design_to_state, state_to_design, get_time
|
||||
from examples.semantics.operational.port import models, helpers
|
||||
from examples.semantics.operational.port.renderer import render_port_textual, render_port_graphviz
|
||||
from examples.semantics.translational.renderer import show_port_and_petri_net
|
||||
from examples.petrinet.renderer import render_petri_net
|
||||
|
||||
import os
|
||||
|
|
@ -76,7 +73,14 @@ if __name__ == "__main__":
|
|||
print('ready!')
|
||||
|
||||
port_m_rt = port_m_rt_initial
|
||||
matcher_rewriter = RuleMatcherRewriter(state, merged_mm, ramified_merged_mm)
|
||||
eval_context = {
|
||||
# make all the functions defined in 'helpers' module available to 'condition'-code in LHS/NAC/RHS:
|
||||
**module_to_dict(helpers),
|
||||
# another example: in all 'condition'-code, there will be a global variable 'meaning_of_life', equal to 42:
|
||||
'meaning_of_life': 42, # just to demonstrate - feel free to remove this
|
||||
}
|
||||
print('The following additional globals are available:', ', '.join(list(eval_context.keys())))
|
||||
matcher_rewriter = RuleMatcherRewriter(state, merged_mm, ramified_merged_mm, eval_context=eval_context)
|
||||
|
||||
###################################
|
||||
# Because the matching of many different rules can be slow,
|
||||
|
|
@ -104,7 +108,7 @@ if __name__ == "__main__":
|
|||
try:
|
||||
with open(filename, "r") as file:
|
||||
port_m_rt = parser.parse_od(state, file.read(), merged_mm)
|
||||
print('loaded', filename)
|
||||
print(f'skip rule (found {filename})')
|
||||
except FileNotFoundError:
|
||||
# Fire every rule until it cannot match any longer:
|
||||
while True:
|
||||
|
|
@ -123,6 +127,12 @@ if __name__ == "__main__":
|
|||
print('wrote', filename)
|
||||
render_petri_net(ODAPI(state, port_m_rt, merged_mm))
|
||||
|
||||
# Uncomment to show also the port model:
|
||||
# show_port_and_petri_net(state, port_m_rt, merged_mm)
|
||||
|
||||
# Uncomment to pause after each rendering:
|
||||
# input()
|
||||
|
||||
###################################
|
||||
# Once you have generated a Petri Net, you can execute the petri net:
|
||||
#
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
lark==1.1.9
|
||||
jinja2==3.1.4
|
||||
|
|
@ -168,7 +168,14 @@ def _cannot_call_matched(_):
|
|||
|
||||
# This function returns a Generator of matches.
|
||||
# The idea is that the user can iterate over the match set, lazily generating it: if only interested in the first match, the entire match set doesn't have to be generated.
|
||||
def match_od(state, host_m, host_mm, pattern_m, pattern_mm, pivot={}):
|
||||
def match_od(state,
|
||||
host_m, # the host graph, in which to search for matches
|
||||
host_mm, # meta-model of the host graph
|
||||
pattern_m, # the pattern to look for
|
||||
pattern_mm, # the meta-model of the pattern (typically the RAMified version of host_mm)
|
||||
pivot={}, # optional: a partial match (restricts possible matches, and speeds up the match process)
|
||||
eval_context={}, # optional: additional variables, functions, ... to be available while evaluating condition-code in the pattern. Will be available as global variables in the condition-code.
|
||||
):
|
||||
bottom = Bottom(state)
|
||||
|
||||
# compute subtype relations and such:
|
||||
|
|
@ -177,6 +184,21 @@ def match_od(state, host_m, host_mm, pattern_m, pattern_mm, pivot={}):
|
|||
pattern_odapi = ODAPI(state, pattern_m, pattern_mm)
|
||||
pattern_mm_odapi = ODAPI(state, pattern_mm, cdapi.mm)
|
||||
|
||||
# 'globals'-dict used when eval'ing conditions
|
||||
bound_api = bind_api_readonly(odapi)
|
||||
builtin = {
|
||||
**bound_api,
|
||||
'matched': _cannot_call_matched,
|
||||
'odapi': odapi,
|
||||
}
|
||||
for key in eval_context:
|
||||
if key in builtin:
|
||||
print(f"WARNING: custom global '{key}' overrides pre-defined API function. Consider renaming it.")
|
||||
eval_globals = {
|
||||
**builtin,
|
||||
**eval_context,
|
||||
}
|
||||
|
||||
# Function object for pattern matching. Decides whether to match host and guest vertices, where guest is a RAMified instance (e.g., the attributes are all strings with Python expressions), and the host is an instance (=object diagram) of the original model (=class diagram)
|
||||
class RAMCompare:
|
||||
def __init__(self, bottom, host_od):
|
||||
|
|
@ -234,10 +256,7 @@ def match_od(state, host_m, host_mm, pattern_m, pattern_mm, pivot={}):
|
|||
# - incompatible slots may be matched (it is only when their AttributeLinks are matched, that we know the types will be compatible)
|
||||
with Timer(f'EVAL condition {g_vtx.name}'):
|
||||
ok = exec_then_eval(python_code,
|
||||
_globals={
|
||||
**bind_api_readonly(odapi),
|
||||
'matched': _cannot_call_matched,
|
||||
},
|
||||
_globals=eval_globals,
|
||||
_locals={'this': h_vtx.node_id})
|
||||
self.conditions_to_check.pop(g_vtx.name, None)
|
||||
return ok
|
||||
|
|
@ -324,13 +343,14 @@ def match_od(state, host_m, host_mm, pattern_m, pattern_mm, pivot={}):
|
|||
|
||||
|
||||
def check_conditions(name_mapping):
|
||||
eval_globals = {
|
||||
**bound_api,
|
||||
# this time, the real 'matched'-function can be used:
|
||||
'matched': lambda name: bottom.read_outgoing_elements(host_m, name_mapping[name])[0],
|
||||
**eval_context,
|
||||
}
|
||||
def check(python_code: str, loc):
|
||||
return exec_then_eval(python_code,
|
||||
_globals={
|
||||
**bind_api_readonly(odapi),
|
||||
'matched': lambda name: bottom.read_outgoing_elements(host_m, name_mapping[name])[0],
|
||||
},
|
||||
_locals=loc)
|
||||
return exec_then_eval(python_code, _globals=eval_globals, _locals=loc)
|
||||
|
||||
# Attribute conditions
|
||||
for pattern_name, host_name in name_mapping.items():
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
# - Change attribute values
|
||||
# - ? that's it?
|
||||
|
||||
import re
|
||||
from uuid import UUID
|
||||
from api.od import ODAPI, bind_api
|
||||
from services.bottom.V0 import Bottom
|
||||
|
|
@ -13,12 +14,22 @@ from services.primitives.actioncode_type import ActionCode
|
|||
from services.primitives.integer_type import Integer
|
||||
from util.eval import exec_then_eval, simply_exec
|
||||
|
||||
identifier_regex_pattern = '[_A-Za-z][._A-Za-z0-9]*'
|
||||
identifier_regex = re.compile(identifier_regex_pattern)
|
||||
|
||||
class TryAgainNextRound(Exception):
|
||||
pass
|
||||
|
||||
# Rewrite is performed in-place (modifying `host_m`)
|
||||
def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict, host_m: UUID, host_mm: UUID):
|
||||
def rewrite(state,
|
||||
lhs_m: UUID, # LHS-pattern
|
||||
rhs_m: UUID, # RHS-pattern
|
||||
pattern_mm: UUID, # meta-model of both patterns (typically the RAMified host_mm)
|
||||
lhs_match: dict, # a match, morphism, from lhs_m to host_m (mapping pattern name -> host name), typically found by the 'match_od'-function.
|
||||
host_m: UUID, # host model
|
||||
host_mm: UUID, # host meta-model
|
||||
eval_context={}, # optional: additional variables/functions to be available while executing condition-code. These will be seen as global variables.
|
||||
):
|
||||
bottom = Bottom(state)
|
||||
|
||||
# Need to come up with a new, unique name when creating new element in host-model:
|
||||
|
|
@ -74,6 +85,29 @@ def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict,
|
|||
# to be grown
|
||||
rhs_match = { name : lhs_match[name] for name in common }
|
||||
|
||||
|
||||
bound_api = bind_api(host_odapi)
|
||||
original_delete = bound_api["delete"]
|
||||
def wrapped_delete(obj):
|
||||
not_allowed_to_delete = { host_odapi.get(host_name): pattern_name for pattern_name, host_name in rhs_match.items() }
|
||||
if obj in not_allowed_to_delete:
|
||||
pattern_name = not_allowed_to_delete[obj]
|
||||
raise Exception(f"\n\nYou're trying to delete the element that was matched with the RHS-element '{pattern_name}'. This is not allowed! You're allowed to delete anything BUT NOT elements matched with your RHS-pattern. Instead, simply remove the element '{pattern_name}' from your RHS, if you want to delete it.")
|
||||
return original_delete(obj)
|
||||
bound_api["delete"] = wrapped_delete
|
||||
builtin = {
|
||||
**bound_api,
|
||||
'matched': matched_callback,
|
||||
'odapi': host_odapi,
|
||||
}
|
||||
for key in eval_context:
|
||||
if key in builtin:
|
||||
print(f"WARNING: custom global '{key}' overrides pre-defined API function. Consider renaming it.")
|
||||
eval_globals = {
|
||||
**builtin,
|
||||
**eval_context,
|
||||
}
|
||||
|
||||
# 1. Perform creations - in the right order!
|
||||
remaining_to_create = list(to_create)
|
||||
while len(remaining_to_create) > 0:
|
||||
|
|
@ -86,11 +120,9 @@ def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict,
|
|||
name_expr = rhs_odapi.get_slot_value(rhs_obj, "name")
|
||||
except:
|
||||
name_expr = f'"{rhs_name}"' # <- if the 'name' slot doesnt exist, use the pattern element name
|
||||
suggested_name = exec_then_eval(name_expr,
|
||||
_globals={
|
||||
**bind_api(host_odapi),
|
||||
'matched': matched_callback,
|
||||
})
|
||||
suggested_name = exec_then_eval(name_expr, _globals=eval_globals)
|
||||
if not identifier_regex.match(suggested_name):
|
||||
raise Exception(f"In the RHS pattern element '{rhs_name}', the following name-expression:\n {name_expr}\nproduced the name:\n '{suggested_name}'\nwhich contains illegal characters.\nNames should match the following regex: {identifier_regex_pattern}")
|
||||
rhs_type = rhs_odapi.get_type(rhs_obj)
|
||||
host_type = ramify.get_original_type(bottom, rhs_type)
|
||||
# for debugging:
|
||||
|
|
@ -157,10 +189,7 @@ def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict,
|
|||
host_attr_name = host_mm_odapi.get_slot_value(host_attr_link, "name")
|
||||
val_name = f"{host_src_name}.{host_attr_name}"
|
||||
python_expr = ActionCode(UUID(bottom.read_value(rhs_obj)), bottom.state).read()
|
||||
result = exec_then_eval(python_expr, _globals={
|
||||
**bind_api(host_odapi),
|
||||
'matched': matched_callback,
|
||||
})
|
||||
result = exec_then_eval(python_expr, _globals=eval_globals)
|
||||
host_odapi.create_primitive_value(val_name, result, is_code=False)
|
||||
rhs_match[rhs_name] = val_name
|
||||
else:
|
||||
|
|
@ -192,10 +221,7 @@ def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict,
|
|||
rhs_obj = rhs_odapi.get(common_name)
|
||||
python_expr = ActionCode(UUID(bottom.read_value(rhs_obj)), bottom.state).read()
|
||||
result = exec_then_eval(python_expr,
|
||||
_globals={
|
||||
**bind_api(host_odapi),
|
||||
'matched': matched_callback,
|
||||
},
|
||||
_globals=eval_globals,
|
||||
_locals={'this': host_obj}) # 'this' can be used to read the previous value of the slot
|
||||
host_odapi.overwrite_primitive_value(host_obj_name, result, is_code=False)
|
||||
else:
|
||||
|
|
@ -235,18 +261,12 @@ def rewrite(state, lhs_m: UUID, rhs_m: UUID, pattern_mm: UUID, lhs_match: dict,
|
|||
# rhs_obj is an object or link (because association is subtype of class)
|
||||
python_code = rhs_odapi.get_slot_value_default(rhs_obj, "condition", default="")
|
||||
simply_exec(python_code,
|
||||
_globals={
|
||||
**bind_api(host_odapi),
|
||||
'matched': matched_callback,
|
||||
},
|
||||
_globals=eval_globals,
|
||||
_locals={'this': host_obj})
|
||||
|
||||
# 5. Execute global actions
|
||||
for cond_name, cond in rhs_odapi.get_all_instances("GlobalCondition"):
|
||||
python_code = rhs_odapi.get_slot_value(cond, "condition")
|
||||
simply_exec(python_code, _globals={
|
||||
**bind_api(host_odapi),
|
||||
'matched': matched_callback,
|
||||
})
|
||||
simply_exec(python_code, _globals=eval_globals)
|
||||
|
||||
return rhs_match
|
||||
|
|
@ -26,10 +26,11 @@ class _NAC_MATCHED(Exception):
|
|||
|
||||
# Helper for executing NAC/LHS/RHS-type rules
|
||||
class RuleMatcherRewriter:
|
||||
def __init__(self, state, mm: UUID, mm_ramified: UUID):
|
||||
def __init__(self, state, mm: UUID, mm_ramified: UUID, eval_context={}):
|
||||
self.state = state
|
||||
self.mm = mm
|
||||
self.mm_ramified = mm_ramified
|
||||
self.eval_context = eval_context
|
||||
|
||||
# Generates matches.
|
||||
# Every match is a dictionary with entries LHS_element_name -> model_element_name
|
||||
|
|
@ -38,7 +39,9 @@ class RuleMatcherRewriter:
|
|||
host_m=m,
|
||||
host_mm=self.mm,
|
||||
pattern_m=lhs,
|
||||
pattern_mm=self.mm_ramified)
|
||||
pattern_mm=self.mm_ramified,
|
||||
eval_context=self.eval_context,
|
||||
)
|
||||
|
||||
try:
|
||||
# First we iterate over LHS-matches:
|
||||
|
|
@ -64,7 +67,9 @@ class RuleMatcherRewriter:
|
|||
host_mm=self.mm,
|
||||
pattern_m=nac,
|
||||
pattern_mm=self.mm_ramified,
|
||||
pivot=lhs_match) # try to "grow" LHS-match with NAC-match
|
||||
pivot=lhs_match, # try to "grow" LHS-match with NAC-match
|
||||
eval_context=self.eval_context,
|
||||
)
|
||||
|
||||
try:
|
||||
# for nac_match in nac_matcher:
|
||||
|
|
@ -117,7 +122,9 @@ class RuleMatcherRewriter:
|
|||
pattern_mm=self.mm_ramified,
|
||||
lhs_match=lhs_match,
|
||||
host_m=cloned_m,
|
||||
host_mm=self.mm)
|
||||
host_mm=self.mm,
|
||||
eval_context=self.eval_context,
|
||||
)
|
||||
except Exception as e:
|
||||
# Make exceptions raised in eval'ed code easier to trace:
|
||||
e.add_note(f"while executing RHS of '{rule_name}'")
|
||||
|
|
|
|||
|
|
@ -5,13 +5,14 @@ from concrete_syntax.common import indent
|
|||
from transformation.rule import Rule
|
||||
|
||||
# parse model and check conformance
|
||||
def parse_and_check(state, m_cs, mm, descr: str, check_conformance=True, type_transform=lambda type_name: type_name):
|
||||
def parse_and_check(state, m_cs, mm, descr: str, check_conformance=True, type_transform=lambda type_name: type_name, name_generator=parser.DefaultNameGenerator()):
|
||||
try:
|
||||
m = parser.parse_od(
|
||||
state,
|
||||
m_text=m_cs,
|
||||
mm=mm,
|
||||
type_transform=type_transform,
|
||||
name_generator=name_generator,
|
||||
)
|
||||
except Exception as e:
|
||||
e.add_note("While parsing model " + descr)
|
||||
|
|
@ -35,6 +36,11 @@ def read_file(filename):
|
|||
|
||||
KINDS = ["nac", "lhs", "rhs"]
|
||||
|
||||
# Phony name generator that raises an error if you try to use it :)
|
||||
class LHSNameGenerator:
|
||||
def __call__(self, type_name):
|
||||
raise Exception(f"Error: Object or link of type '{type_name}' does not have a name.\nAnonymous objects/links are not allowed in the LHS of a rule, because they can have unintended consequences. Please give all of the elements in the LHS explicit names.")
|
||||
|
||||
# load model transformation rules
|
||||
def load_rules(state, get_filename, rt_mm_ramified, rule_names, check_conformance=True):
|
||||
rules = {}
|
||||
|
|
@ -62,9 +68,12 @@ def load_rules(state, get_filename, rt_mm_ramified, rule_names, check_conformanc
|
|||
if suffix == "":
|
||||
print(f"Warning: rule {rule_name} has no NAC ({filename} not found)")
|
||||
return nacs
|
||||
elif kind == "lhs" or kind == "rhs":
|
||||
else:
|
||||
try:
|
||||
m = parse_and_check(state, read_file(filename), rt_mm_ramified, descr, check_conformance)
|
||||
if kind == "lhs":
|
||||
m = parse_and_check(state, read_file(filename), rt_mm_ramified, descr, check_conformance, name_generator=LHSNameGenerator())
|
||||
elif kind == "rhs":
|
||||
m = parse_and_check(state, read_file(filename), rt_mm_ramified, descr, check_conformance)
|
||||
files_read.append(filename)
|
||||
return m
|
||||
except FileNotFoundError as e:
|
||||
|
|
|
|||
8
util/module_to_dict.py
Normal file
8
util/module_to_dict.py
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
# Based on: https://stackoverflow.com/a/46263657
|
||||
def module_to_dict(module):
|
||||
context = {}
|
||||
for name in dir(module):
|
||||
# this will filter out 'private' functions, as well as __builtins__, __name__, __package__, etc.:
|
||||
if not name.startswith('_'):
|
||||
context[name] = getattr(module, name)
|
||||
return context
|
||||
Loading…
Add table
Add a link
Reference in a new issue