Merge pull request #2 from joeriexelmans/development

Adding bytes as a type + add FTG+PM++ formalism
This commit is contained in:
joeriexelmans 2025-06-24 13:59:45 +02:00 committed by GitHub
commit 35f74ab79d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 1347 additions and 56 deletions

46
TODO.txt Normal file
View file

@ -0,0 +1,46 @@
Things that need to be cleaned up:
- At several places in the code, it is assumed that from the root node, there is an edge labeled 'SCD' containing the self-conforming meta-meta-model. It would be better for parts of the code that need the meta-meta-model to receive this model as a (function) parameter.
- The whole 'ModelRef'-construct does not work as originally foreseen. It is currently only used for attributes of primitive types, where it unnecessarily complicates things. Better to get rid of it.
Known bugs:
- Cannot parse negative numbers
- When merging models, the model element names must not overlap. Maybe allow some kind of prefixing of the overlapping names? Difficulty porting existing models to the merged models if the type names have changed...
Merging (meta-)models is a nightmare:
- Prefixing the type names (to avoid naming collisions) is not an option:
(*) constraints (and transformation rules) already contain API calls that mention type names -> all of these would break
(*) don't want to prefix primitive types like "Integer", "String", ... because the existing code already assumes these exact names
- Not prefixing the type names leads to naming collisions, even if names are carefully chosen:
(*) anonymous names, e.g., Inheritance-links still result in naming collisions (requiring auto-renaming?)
Feature requests:
- Support custom functions in 'conditions'
- When matching edge, match 'any' src/tgt
- Support 'return'-statement in conditions? (just makes syntax nicer)
- RAMification / matching: add `match_subtypes` attribute to each RAMified class.
- Separate script for running LHS (+NAC) on any model, and visualizing the match.
- Syntax highlighting:
most students use:
- VS Code
- PyCharm
i use:
- Sublime Text
nobody uses:
- Eclipse

View file

@ -5,11 +5,13 @@ from services.primitives.boolean_type import Boolean
from services.primitives.integer_type import Integer from services.primitives.integer_type import Integer
from services.primitives.string_type import String from services.primitives.string_type import String
from services.primitives.actioncode_type import ActionCode from services.primitives.actioncode_type import ActionCode
from services.primitives.bytes_type import Bytes
from uuid import UUID from uuid import UUID
from typing import Optional from typing import Optional
from util.timer import Timer from util.timer import Timer
NEXT_ID = 0 NEXT_LINK_ID = 0
NEXT_OBJ_ID = 0
# Models map names to elements # Models map names to elements
# This builds the inverse mapping, so we can quickly lookup the name of an element # This builds the inverse mapping, so we can quickly lookup the name of an element
@ -41,6 +43,7 @@ class ODAPI:
self.create_integer_value = self.od.create_integer_value self.create_integer_value = self.od.create_integer_value
self.create_string_value = self.od.create_string_value self.create_string_value = self.od.create_string_value
self.create_actioncode_value = self.od.create_actioncode_value self.create_actioncode_value = self.od.create_actioncode_value
self.create_bytes_value = self.od.create_bytes_value
self.__recompute_mappings() self.__recompute_mappings()
@ -207,6 +210,8 @@ class ODAPI:
tgt = self.create_actioncode_value(name, value) tgt = self.create_actioncode_value(name, value)
else: else:
tgt = self.create_string_value(name, value) tgt = self.create_string_value(name, value)
elif isinstance(value, bytes):
tgt = self.create_bytes_value(name, value)
else: else:
raise Exception("Unimplemented type "+value) raise Exception("Unimplemented type "+value)
self.__recompute_mappings() self.__recompute_mappings()
@ -214,22 +219,35 @@ class ODAPI:
def overwrite_primitive_value(self, name: str, value: any, is_code=False): def overwrite_primitive_value(self, name: str, value: any, is_code=False):
referred_model = UUID(self.bottom.read_value(self.get(name))) referred_model = UUID(self.bottom.read_value(self.get(name)))
to_overwrite_type = self.get_type_name(self.get(name))
# watch out: in Python, 'bool' is subtype of 'int' # watch out: in Python, 'bool' is subtype of 'int'
# so we must check for 'bool' first # so we must check for 'bool' first
if isinstance(value, bool): if isinstance(value, bool):
if to_overwrite_type != "Boolean":
raise Exception(f"Cannot assign boolean value '{value}' to value of type {to_overwrite_type}.")
Boolean(referred_model, self.state).create(value) Boolean(referred_model, self.state).create(value)
elif isinstance(value, int): elif isinstance(value, int):
if to_overwrite_type != "Integer":
raise Exception(f"Cannot assign integer value '{value}' to value of type {to_overwrite_type}.")
Integer(referred_model, self.state).create(value) Integer(referred_model, self.state).create(value)
elif isinstance(value, str): elif isinstance(value, str):
if is_code: if is_code:
if to_overwrite_type != "ActionCode":
raise Exception(f"Cannot assign code to value of type {to_overwrite_type}.")
ActionCode(referred_model, self.state).create(value) ActionCode(referred_model, self.state).create(value)
else: else:
if to_overwrite_type != "String":
raise Exception(f"Cannot assign string value '{value}' to value of type {to_overwrite_type}.")
String(referred_model, self.state).create(value) String(referred_model, self.state).create(value)
elif isinstance(value, bytes):
if to_overwrite_type != "Bytes":
raise Exception(f"Cannot assign bytes value '{value}' to value of type {to_overwrite_type}.")
Bytes(referred_model, self.state).create(value)
else: else:
raise Exception("Unimplemented type "+value) raise Exception("Unimplemented type "+value)
def create_link(self, link_name: Optional[str], assoc_name: str, src: UUID, tgt: UUID): def create_link(self, link_name: Optional[str], assoc_name: str, src: UUID, tgt: UUID):
global NEXT_ID global NEXT_LINK_ID
types = self.bottom.read_outgoing_elements(self.mm, assoc_name) types = self.bottom.read_outgoing_elements(self.mm, assoc_name)
if len(types) == 0: if len(types) == 0:
raise Exception(f"No such association: '{assoc_name}'") raise Exception(f"No such association: '{assoc_name}'")
@ -237,13 +255,18 @@ class ODAPI:
raise Exception(f"More than one association exists with name '{assoc_name}' - this means the MM is invalid.") raise Exception(f"More than one association exists with name '{assoc_name}' - this means the MM is invalid.")
typ = types[0] typ = types[0]
if link_name == None: if link_name == None:
link_name = f"__{assoc_name}{NEXT_ID}" link_name = f"__{assoc_name}{NEXT_LINK_ID}"
NEXT_ID += 1 NEXT_LINK_ID += 1
link_id = self.od._create_link(link_name, typ, src, tgt) link_id = self.od._create_link(link_name, typ, src, tgt)
self.__recompute_mappings() self.__recompute_mappings()
return link_id return link_id
def create_object(self, object_name: Optional[str], class_name: str): def create_object(self, object_name: Optional[str], class_name: str):
global NEXT_OBJ_ID
if object_name == None:
object_name = f"__{class_name}{NEXT_OBJ_ID}"
NEXT_OBJ_ID += 1
obj = self.od.create_object(object_name, class_name) obj = self.od.create_object(object_name, class_name)
self.__recompute_mappings() self.__recompute_mappings()
return obj return obj

View file

@ -47,7 +47,7 @@ def bootstrap_constraint(class_node, type_name: str, python_type: str, scd_root:
bottom.create_edge(constraint_node, scd_node, "Morphism") bottom.create_edge(constraint_node, scd_node, "Morphism")
bottom.create_edge(constraint_link, scd_link, "Morphism") bottom.create_edge(constraint_link, scd_link, "Morphism")
def bootstrap_primitive_types(scd_root, state, integer_type, boolean_type, float_type, string_type, type_type, actioncode_type): def bootstrap_primitive_types(scd_root, state, integer_type, boolean_type, float_type, string_type, type_type, actioncode_type, bytes_type):
# Order is important: Integer must come first # Order is important: Integer must come first
class_integer = bootstrap_type("Integer", scd_root, integer_type, state) class_integer = bootstrap_type("Integer", scd_root, integer_type, state)
class_type = bootstrap_type("Type", scd_root, type_type, state) class_type = bootstrap_type("Type", scd_root, type_type, state)
@ -55,6 +55,7 @@ def bootstrap_primitive_types(scd_root, state, integer_type, boolean_type, float
class_float = bootstrap_type("Float", scd_root, float_type, state) class_float = bootstrap_type("Float", scd_root, float_type, state)
class_string = bootstrap_type("String", scd_root, string_type, state) class_string = bootstrap_type("String", scd_root, string_type, state)
class_actioncode = bootstrap_type("ActionCode", scd_root, actioncode_type, state) class_actioncode = bootstrap_type("ActionCode", scd_root, actioncode_type, state)
class_bytes = bootstrap_type("Bytes", scd_root, bytes_type, state)
# Can only create constraints after ActionCode type has been created: # Can only create constraints after ActionCode type has been created:
bootstrap_constraint(class_integer, "Integer", "int", scd_root, integer_type, actioncode_type, state) bootstrap_constraint(class_integer, "Integer", "int", scd_root, integer_type, actioncode_type, state)
@ -63,3 +64,4 @@ def bootstrap_primitive_types(scd_root, state, integer_type, boolean_type, float
bootstrap_constraint(class_float, "Float", "float", scd_root, float_type, actioncode_type, state) bootstrap_constraint(class_float, "Float", "float", scd_root, float_type, actioncode_type, state)
bootstrap_constraint(class_string, "String", "str", scd_root, string_type, actioncode_type, state) bootstrap_constraint(class_string, "String", "str", scd_root, string_type, actioncode_type, state)
bootstrap_constraint(class_actioncode, "ActionCode", "str", scd_root, actioncode_type, actioncode_type, state) bootstrap_constraint(class_actioncode, "ActionCode", "str", scd_root, actioncode_type, actioncode_type, state)
bootstrap_constraint(class_bytes, "Bytes", "bytes", scd_root, bytes_type, actioncode_type, state)

View file

@ -2,15 +2,7 @@ from state.base import State, UUID
from services.bottom.V0 import Bottom from services.bottom.V0 import Bottom
from services.primitives.boolean_type import Boolean from services.primitives.boolean_type import Boolean
from services.primitives.string_type import String from services.primitives.string_type import String
from bootstrap.primitive import ( from bootstrap.primitive import bootstrap_primitive_types
bootstrap_primitive_types
# bootstrap_boolean_type,
# bootstrap_float_type,
# bootstrap_integer_type,
# bootstrap_string_type,
# bootstrap_type_type,
# bootstrap_actioncode_type
)
def create_model_root(bottom: Bottom, model_name: str) -> UUID: def create_model_root(bottom: Bottom, model_name: str) -> UUID:
@ -32,6 +24,7 @@ def bootstrap_scd(state: State) -> UUID:
float_type_root = create_model_root(bottom, "Float") float_type_root = create_model_root(bottom, "Float")
type_type_root = create_model_root(bottom, "Type") type_type_root = create_model_root(bottom, "Type")
actioncode_type_root = create_model_root(bottom, "ActionCode") actioncode_type_root = create_model_root(bottom, "ActionCode")
bytes_type_root = create_model_root(bottom, "Bytes")
# create MCL, without morphism links # create MCL, without morphism links
@ -132,7 +125,8 @@ def bootstrap_scd(state: State) -> UUID:
float_type_root, float_type_root,
string_type_root, string_type_root,
type_type_root, type_type_root,
actioncode_type_root) actioncode_type_root,
bytes_type_root)
# bootstrap_integer_type(mcl_root, integer_type_root, integer_type_root, actioncode_type_root, state) # bootstrap_integer_type(mcl_root, integer_type_root, integer_type_root, actioncode_type_root, state)
# bootstrap_boolean_type(mcl_root, boolean_type_root, integer_type_root, actioncode_type_root, state) # bootstrap_boolean_type(mcl_root, boolean_type_root, integer_type_root, actioncode_type_root, state)
# bootstrap_float_type(mcl_root, float_type_root, integer_type_root, actioncode_type_root, state) # bootstrap_float_type(mcl_root, float_type_root, integer_type_root, actioncode_type_root, state)

View file

@ -16,6 +16,8 @@ def display_value(val: any, type_name: str, indentation=0, newline_character='\n
return '"'+val+'"'.replace('\n', newline_character) return '"'+val+'"'.replace('\n', newline_character)
elif type_name == "Integer" or type_name == "Boolean": elif type_name == "Integer" or type_name == "Boolean":
return str(val) return str(val)
elif type_name == "Bytes":
return val
else: else:
raise Exception("don't know how to display value" + type_name) raise Exception("don't know how to display value" + type_name)
@ -48,6 +50,9 @@ class TBase(Transformer):
def CODE(self, token): def CODE(self, token):
return _Code(str(token[1:-1])) # strip the `` return _Code(str(token[1:-1])) # strip the ``
def BYTES(self, token):
return (bytes(token[2:-1], "utf-8"), token.line) # Strip b"" or b''
def INDENTED_CODE(self, token): def INDENTED_CODE(self, token):
skip = 4 # strip the ``` and the following newline character skip = 4 # strip the ``` and the following newline character
space_count = 0 space_count = 0

View file

@ -1,18 +1,23 @@
{% macro render_name(name) %}{{ name if not hide_names or name.startswith("__") else "" }}{% endmacro %} {% macro render_name(name) %}{{ name if not hide_names or name.startswith("__") else "" }}{% endmacro %}
{% macro render_attributes(obj) %} { {% macro render_attributes(obj) %}
{% if len(odapi.get_slots(obj)) > 0 %} {
{% for attr_name in odapi.get_slots(obj) %} {% for attr_name in odapi.get_slots(obj) %}
{{ attr_name}} = {{ display_value( {{ attr_name}} = {{ display_value(
val=odapi.get_slot_value(obj, attr_name), val=odapi.get_slot_value(obj, attr_name),
type_name=odapi.get_type_name(odapi.get_slot(obj, attr_name)), type_name=odapi.get_type_name(odapi.get_slot(obj, attr_name)),
indentation=4) }}; indentation=4) }};
{% endfor %} {% endfor -%}
}{% endmacro %} }
{% endif -%}
{%- endmacro %}
{% for obj_name, obj in objects %} {%- for obj_name, obj in objects %}
{{ render_name(obj_name) }}:{{ odapi.get_type_name(obj) }}{{ render_attributes(obj) }} {{ render_name(obj_name) }}:{{ odapi.get_type_name(obj) }}
{% endfor %} {{- render_attributes(obj) }}
{% endfor -%}
{% for lnk_name, lnk in links %} {%- for lnk_name, lnk in links %}
{{ render_name(obj_name) }}:{{ odapi.get_type_name(lnk) }} ({{odapi.get_name(odapi.get_source(lnk))}} -> {{odapi.get_name(odapi.get_target(lnk))}}){{ render_attributes(lnk) }} {{ render_name(obj_name) }}:{{ odapi.get_type_name(lnk) }} ({{odapi.get_name(odapi.get_source(lnk))}} -> {{odapi.get_name(odapi.get_target(lnk))}})
{% endfor %} {{- render_attributes(lnk) }}
{% endfor -%}

View file

@ -21,6 +21,7 @@ literal: INT
| STR | STR
| BOOL | BOOL
| CODE | CODE
| BYTES
| INDENTED_CODE | INDENTED_CODE
INT: /[0-9]+/ INT: /[0-9]+/
@ -28,6 +29,8 @@ STR: /"[^"]*"/
| /'[^']*'/ | /'[^']*'/
BOOL: "True" | "False" BOOL: "True" | "False"
CODE: /`[^`]*`/ CODE: /`[^`]*`/
BYTES: /b"[^"]*"/
| /b'[^']*'/
INDENTED_CODE: /```[^`]*```/ INDENTED_CODE: /```[^`]*```/
type_name: IDENTIFIER type_name: IDENTIFIER
@ -67,7 +70,7 @@ def parse_od(state,
primitive_types = { primitive_types = {
type_name : UUID(state.read_value(state.read_dict(state.read_root(), type_name))) type_name : UUID(state.read_value(state.read_dict(state.read_root(), type_name)))
for type_name in ["Integer", "String", "Boolean", "ActionCode"] for type_name in ["Integer", "String", "Boolean", "ActionCode", "Bytes"]
} }
class T(Transformer): class T(Transformer):
@ -89,6 +92,10 @@ def parse_od(state,
def CODE(self, token): def CODE(self, token):
return (_Code(str(token[1:-1])), token.line) # strip the `` return (_Code(str(token[1:-1])), token.line) # strip the ``
def BYTES(self, token):
# Strip b"" or b'', and make \\ back to \ (happens when reading the file as a string)
return (token[2:-1].encode().decode('unicode_escape').encode('raw_unicode_escape'), token.line) # Strip b"" or b''
def INDENTED_CODE(self, token): def INDENTED_CODE(self, token):
skip = 4 # strip the ``` and the following newline character skip = 4 # strip the ``` and the following newline character
space_count = 0 space_count = 0

View file

@ -9,7 +9,7 @@ def render_od(state, m_id, mm_id, hide_names=True):
m_od = od.OD(mm_id, m_id, state) m_od = od.OD(mm_id, m_id, state)
serialized = set(["Integer", "String", "Boolean", "ActionCode"]) # assume these types always already exist serialized = set(["Integer", "String", "Boolean", "ActionCode", "Bytes"]) # assume these types always already exist
def display_name(name: str): def display_name(name: str):
# object names that start with "__" are hidden # object names that start with "__" are hidden

View file

@ -4,6 +4,7 @@ from framework.conformance import Conformance, render_conformance_check_result
from concrete_syntax.textual_od import parser, renderer from concrete_syntax.textual_od import parser, renderer
from concrete_syntax.common import indent from concrete_syntax.common import indent
from concrete_syntax.plantuml import renderer as plantuml from concrete_syntax.plantuml import renderer as plantuml
from concrete_syntax.plantuml.make_url import make_url
from util.prompt import yes_no, pause from util.prompt import yes_no, pause
state = DevState() state = DevState()
@ -153,6 +154,7 @@ woods_m_cs = """
bear2:Bear bear2:Bear
:afraidOf (george -> bear1) :afraidOf (george -> bear1)
:afraidOf (george -> bear2) :afraidOf (george -> bear2)
:afraidOf (billy -> george)
""" """
print() print()
@ -194,7 +196,7 @@ if yes_no("Print PlantUML?"):
uml += plantuml.render_trace_conformance(state, woods_m, woods_mm) uml += plantuml.render_trace_conformance(state, woods_m, woods_mm)
print("==================================") print("==================================")
print(uml) print(make_url(uml))
print("==================================") print("==================================")
print("Go to either:") print("Go to either:")
print(" ▸ https://www.plantuml.com/plantuml/uml") print(" ▸ https://www.plantuml.com/plantuml/uml")

View file

@ -0,0 +1,47 @@
import os
# Todo: remove src.backend.muMLE from the imports
from state.devstate import DevState
from bootstrap.scd import bootstrap_scd
from concrete_syntax.textual_od.parser import parse_od
from api.od import ODAPI
from concrete_syntax.textual_od.renderer import render_od as od_renderer
from concrete_syntax.plantuml import make_url as plant_url, renderer as plant_renderer
from concrete_syntax.graphviz import make_url as graphviz_url, renderer as graphviz_renderer
class FtgPmPt:
def __init__(self, name: str):
self.state = DevState()
self.scd_mmm = bootstrap_scd(self.state)
self.meta_model = self.load_metamodel()
self.model = None
self.odapi = None
self.name = name
@staticmethod
def read_file(file_name):
with open(os.path.join(os.path.dirname(__file__), file_name)) as file:
return file.read()
def load_metamodel(self):
mm_cs = self.read_file("pm/metamodels/mm_design.od")
mm_rt_cs = mm_cs + self.read_file("pm/metamodels/mm_runtime.od")
mm_total = mm_rt_cs + self.read_file("pt/metamodels/mm_design.od")
return parse_od(self.state, m_text=mm_total, mm=self.scd_mmm)
def load_model(self, m_text: str | None = None):
m_text = "" if not m_text else m_text
self.model = parse_od(self.state, m_text=m_text, mm=self.meta_model)
self.odapi = ODAPI(self.state, self.model, self.meta_model)
def render_od(self):
return od_renderer(self.state, self.model, self.meta_model, hide_names=False)
def render_plantuml_object_diagram(self):
print(plant_url.make_url(plant_renderer.render_package(
self.name, plant_renderer.render_object_diagram(self.state, self.model, self.meta_model)))
)
def render_graphviz_object_diagram(self):
print(graphviz_url.make_url(graphviz_renderer.render_object_diagram(self.state, self.model, self.meta_model)))

View file

@ -0,0 +1,68 @@
import copy
import pickle
from api.od import ODAPI
from examples.ftg_pm_pt.helpers.composite_activity import execute_composite_workflow
def serialize(obj):
return pickle.dumps(obj)
def deserialize(obj):
return pickle.loads(obj)
def create_activity_links(od: ODAPI, activity, prev_element, ctrl_port, end_trace=None,
relation_type="pt_IsFollowedBy"):
od.create_link(None, "pt_RelatesTo", activity, ctrl_port)
od.create_link(None, relation_type, prev_element, activity)
if end_trace:
od.create_link(None, "pt_IsFollowedBy", activity, end_trace)
def extract_input_data(od: ODAPI, activity):
input_data = {}
for has_data_in in od.get_outgoing(activity, "pm_HasDataIn"):
data_port = od.get_target(has_data_in)
artefact_state = od.get_source(od.get_incoming(od.get_source(od.get_incoming(data_port, "pm_DataFlowOut")[0]), "pm_Of")[0])
input_data[od.get_name(data_port)] = deserialize(od.get_slot_value(artefact_state, "data"))
return input_data
def execute_activity(od: ODAPI, globs, activity, input_data):
inp = copy.deepcopy(input_data) # Necessary, otherwise the function changes the values inside the dictionary -> need the original values for process trace
func = globs[od.get_slot_value(activity, "func")]
return func(inp) if func.__code__.co_argcount > 0 else func()
def handle_artefact(od: ODAPI, activity, artefact_type, relation_type, data_port=None, data=None,
direction="DataFlowIn"):
artefact = od.create_object(None, "pt_Artefact")
if 'pt_Consumes' == relation_type:
od.create_link(None, relation_type, artefact, activity)
else:
od.create_link(None, relation_type, activity, artefact)
if data_port:
flow_direction = od.get_incoming if relation_type == 'pt_Consumes' else od.get_outgoing
ass_side = od.get_source if relation_type == 'pt_Consumes' else od.get_target
pm_artefact = ass_side(flow_direction(data_port, f"pm_{direction}")[0])
prev_artefact = find_previous_artefact(od, od.get_incoming(pm_artefact, "pt_BelongsTo"))
if prev_artefact:
od.create_link(None, "pt_PrevVersion", artefact, prev_artefact)
od.create_link(None, "pt_BelongsTo", artefact, pm_artefact)
if data is not None:
artefact_state = od.get_source(od.get_incoming(pm_artefact, "pm_Of")[0])
od.set_slot_value(artefact_state, "data", serialize(data))
od.set_slot_value(artefact, "data", serialize(data))
def find_previous_artefact(od: ODAPI, linked_artefacts):
return next((od.get_source(link) for link in linked_artefacts if
not od.get_incoming(od.get_source(link), "pt_PrevVersion")), None)
def update_control_states(od: ODAPI, activity, ctrl_out):
for has_ctrl_in in od.get_outgoing(activity, "pm_HasCtrlIn"):
od.set_slot_value(od.get_source(od.get_incoming(od.get_target(has_ctrl_in), "pm_Of")[0]), "active", False)
od.set_slot_value(od.get_source(od.get_incoming(ctrl_out, "pm_Of")[0]), "active", True)

View file

@ -0,0 +1,272 @@
from uuid import UUID
from api.od import ODAPI
from examples.ftg_pm_pt.ftg_pm_pt import FtgPmPt
from examples.ftg_pm_pt.runner import FtgPmPtRunner
def find_previous_artefact(od: ODAPI, linked_artefacts):
return next((od.get_source(link) for link in linked_artefacts if
not od.get_incoming(od.get_source(link), "pt_PrevVersion")), None)
def create_activity_links(od: ODAPI, activity, prev_element, ctrl_port, end_trace=None,
relation_type="pt_IsFollowedBy"):
od.create_link(None, "pt_RelatesTo", activity, ctrl_port)
od.create_link(None, relation_type, prev_element, activity)
if end_trace:
od.create_link(None, "pt_IsFollowedBy", activity, end_trace)
def get_workflow_path(od: ODAPI, activity: UUID):
return od.get_slot_value(activity, "subworkflow_path")
def get_workflow(workflow_path: str):
with open(workflow_path, "r") as f:
return f.read()
############################
def get_runtime_state(od: ODAPI, design_obj: UUID):
states = od.get_incoming(design_obj, "pm_Of")
if len(states) == 0:
print(f"Design object '{od.get_name(design_obj)}' has no runtime state.")
return None
return od.get_source(states[0])
def get_source_incoming(od: ODAPI, obj: UUID, link_name: str):
links = od.get_incoming(obj, link_name)
if len(links) == 0:
print(f"Object '{od.get_name(obj)} has no incoming links of type '{link_name}'.")
return None
return od.get_source(links[0])
def get_target_outgoing(od: ODAPI, obj: UUID, link_name: str):
links = od.get_outgoing(obj, link_name)
if len(links) == 0:
print(f"Object '{od.get_name(obj)} has no outgoing links of type '{link_name}'.")
return None
return od.get_target(links[0])
def set_control_port_value(od: ODAPI, port: UUID, value: bool):
state = get_runtime_state(od, port)
od.set_slot_value(state, "active", value)
def set_artefact_data(od: ODAPI, artefact: UUID, value: bytes):
state = artefact
# Only the proces model of the artefact contains a runtime state
if od.get_type_name(state) == "pm_Artefact":
state = get_runtime_state(od, artefact)
od.set_slot_value(state, "data", value)
def get_artefact_data(od: ODAPI, artefact):
state = artefact
# Only the proces model of the artefact contains a runtime state
if od.get_type_name(state) == "pm_Artefact":
state = get_runtime_state(od, artefact)
return od.get_slot_value(state, "data")
############################
def set_workflow_control_source(workflow_model: FtgPmPt, ctrl_port_name: str, composite_linkage: dict):
od = workflow_model.odapi
source_port_name = composite_linkage[ctrl_port_name]
source_port = od.get(source_port_name)
set_control_port_value(od, source_port, True)
def set_workflow_artefacts(act_od: ODAPI, activity: UUID, workflow_model: FtgPmPt, composite_linkage: dict):
for data_port in [act_od.get_target(data_in) for data_in in act_od.get_outgoing(activity, "pm_HasDataIn")]:
# Get the data source port of the inner workflow
data_port_name = act_od.get_name(data_port)
source_port_name = composite_linkage[data_port_name]
source_port = workflow_model.odapi.get(source_port_name)
# Get the artefact that is linked to the data port of the activity
act_artefact = get_source_incoming(act_od, data_port, "pm_DataFlowOut")
# Get the data of the artefact
artefact_data = get_artefact_data(act_od, act_artefact)
# Get the artefact that is linked to the data port of the inner workflow
workflow_artefact = get_target_outgoing(workflow_model.odapi, source_port, "pm_DataFlowIn")
set_artefact_data(workflow_model.odapi, workflow_artefact, artefact_data)
def get_activity_port_from_inner_port(composite_linkage: dict, port_name: str):
for act_port_name, work_port_name in composite_linkage.items():
if work_port_name == port_name:
return act_port_name
def execute_composite_workflow(od: ODAPI, activity: UUID, ctrl_port: UUID, composite_linkage: dict,
packages: dict | None, matched=None):
activity_name = od.get_slot_value(activity, "name")
# First get the path of the object diagram file that contains the inner workflow of the activity
workflow_path = get_workflow_path(od, activity)
# Read the object diagram file
workflow = get_workflow(workflow_path)
# Create an FtgPmPt object
workflow_model = FtgPmPt(activity_name)
# Load the workflow to the object
workflow_model.load_model(workflow)
# Set the correct control source port of the workflow to active
set_workflow_control_source(workflow_model, od.get_name(ctrl_port), composite_linkage[activity_name])
# If a data port is linked, set the data of the artefact
set_workflow_artefacts(od, activity, workflow_model, composite_linkage[activity_name])
# Create an FtgPmPtRunner object with the FtgPmPt object
workflow_runner = FtgPmPtRunner(workflow_model)
# Set the packages if present
workflow_runner.set_packages(packages, is_path=False)
# Run the FtgPmPtRunner (is a subprocess necessary? This makes it more complicated because now we have direct access to the object)
workflow_runner.run()
# Contains all the ports of the inner workflow -> map back to the activity ports, and so we can set the correct
# Control ports to active and also set the data artefacts correctly
ports = extract_inner_workflow(workflow_model.odapi)
start_act = None
end_act = None
for port in [port for port in ports if port]:
port_name = workflow_model.odapi.get_name(port)
activity_port_name = get_activity_port_from_inner_port(composite_linkage[activity_name], port_name)
activity_port = od.get(activity_port_name)
match workflow_model.odapi.get_type_name(port):
case "pm_CtrlSource":
start_act = handle_control_source(od, activity_port, matched("prev_trace_element"))
case "pm_CtrlSink":
end_act = handle_control_sink(od, activity_port, start_act, matched("end_trace"))
case "pm_DataSource":
handle_data_source(od, activity_port, start_act)
case "pm_DataSink":
handle_data_sink(od, workflow_model.odapi, activity_port, port, end_act)
def handle_control_source(od: ODAPI, port, prev_trace_elem):
set_control_port_value(od, port, False)
start_activity = od.create_object(None, "pt_StartActivity")
create_activity_links(od, start_activity, prev_trace_elem, port)
return start_activity
def handle_control_sink(od: ODAPI, port, start_act, end_trace):
set_control_port_value(od, port, True)
end_activity = od.create_object(None, "pt_EndActivity")
create_activity_links(od, end_activity, start_act, port, end_trace)
return end_activity
def handle_data_source(od: ODAPI, port, start_activity):
pt_artefact = od.create_object(None, "pt_Artefact")
od.create_link(None, "pt_Consumes", pt_artefact, start_activity)
pm_artefact = get_source_incoming(od, port, "pm_DataFlowOut")
pm_artefact_data = get_artefact_data(od, pm_artefact)
set_artefact_data(od, pt_artefact, pm_artefact_data)
prev_pt_artefact = find_previous_artefact(od, od.get_incoming(pm_artefact, "pt_BelongsTo"))
if prev_pt_artefact:
od.create_link(None, "pt_PrevVersion", pt_artefact, prev_pt_artefact)
od.create_link(None, "pt_BelongsTo", pt_artefact, pm_artefact)
def handle_data_sink(act_od: ODAPI, work_od: ODAPI, act_port, work_port, end_activity):
pt_artefact = act_od.create_object(None, "pt_Artefact")
act_od.create_link(None, "pt_Produces", end_activity, pt_artefact)
work_artefact = get_source_incoming(work_od, work_port, "pm_DataFlowOut")
work_artefact_data = get_artefact_data(work_od, work_artefact)
act_artefact = get_target_outgoing(act_od, act_port, "pm_DataFlowIn")
set_artefact_data(act_od, act_artefact, work_artefact_data)
set_artefact_data(act_od, pt_artefact, work_artefact_data)
prev_pt_artefact = find_previous_artefact(act_od, act_od.get_incoming(act_artefact, "pt_BelongsTo"))
if prev_pt_artefact:
act_od.create_link(None, "pt_PrevVersion", pt_artefact, prev_pt_artefact)
act_od.create_link(None, "pt_BelongsTo", pt_artefact, act_artefact)
def extract_inner_workflow(workflow: ODAPI):
# Get the model, this should be only one
name, model = workflow.get_all_instances("pm_Model")[0]
# Get the start of the process trace
start_trace = get_source_incoming(workflow, model, "pt_Starts")
# Get the end of the process trace
end_trace = get_source_incoming(workflow, model, "pt_Ends")
# Get the first started activity
first_activity = get_target_outgoing(workflow, start_trace, "pt_IsFollowedBy")
# Get the last ended activity
end_activity = get_source_incoming(workflow, end_trace, "pt_IsFollowedBy")
# Get the control port that started the activity
act_ctrl_in = get_target_outgoing(workflow, first_activity, "pt_RelatesTo")
# Get the control port that is activated when the activity is executed
act_ctrl_out = get_target_outgoing(workflow, end_activity, "pt_RelatesTo")
# Get the control source of the workflow
ports = []
for port in workflow.get_incoming(act_ctrl_in, "pm_CtrlFlow"):
source = workflow.get_source(port)
if workflow.get_type_name(source) == "pm_CtrlSource":
# Only one port can activate an activity
ports.append(source)
break
# Get the control sink of the workflow
for port in workflow.get_outgoing(act_ctrl_out, "pm_CtrlFlow"):
sink = workflow.get_target(port)
if workflow.get_type_name(sink) == "pm_CtrlSink":
# Only one port can be set to active one an activity is ended
ports.append(sink)
break
# Get the data port that the activity consumes (if used)
consumed_links = workflow.get_incoming(first_activity, "pt_Consumes")
if len(consumed_links) > 0:
pt_artefact = None
for link in consumed_links:
pt_artefact = workflow.get_source(link)
# Check if it is the first artefact -> contains no previous version
if len(workflow.get_outgoing(pt_artefact, "pt_PrevVersion")) == 0:
break
pm_artefact = get_target_outgoing(workflow, pt_artefact, "pt_BelongsTo")
# Find the data source port
for link in workflow.get_incoming(pm_artefact, "pm_DataFlowIn"):
source = workflow.get_source(link)
if workflow.get_type_name(source) == "pm_DataSource":
# An activity can only use one artefact as input
ports.append(source)
break
# Get all data ports that are connected to an artefact that is produced by an activity in the workflow,
# where the artefact is also part of main workflow
for port_name, data_sink in workflow.get_all_instances("pm_DataSink"):
pm_art = get_source_incoming(workflow, data_sink, "pm_DataFlowOut")
# If the pm_artefact is linked to a proces trace artefact that is produced, we can add to port
links = workflow.get_incoming(pm_art, "pt_BelongsTo")
if not len(links):
continue
# A data sink port linkage will only be added to the proces trace when an activity is ended and so an artefact
# is produced, meaning that if a belongsTo link exists, a proces trace artefact is linked to this data port
ports.append(data_sink)
return ports

View file

@ -0,0 +1,2 @@
# Match the model
model:RAM_pm_Model

View file

@ -0,0 +1,7 @@
model:RAM_pm_Model
# Check if the model isn't already connected to a process trace
start_trace:RAM_pt_StartTrace
:RAM_pt_Starts (start_trace -> model)
end_trace:RAM_pt_EndTrace
:RAM_pt_Ends (end_trace -> model)

View file

@ -0,0 +1,12 @@
# Keep the left hand side
model:RAM_pm_Model
# Connect a process trace to it
start_trace:RAM_pt_StartTrace
starts:RAM_pt_Starts (start_trace -> model)
end_trace:RAM_pt_EndTrace
ends:RAM_pt_Ends (end_trace -> model)
# Connect the start with the end
:RAM_pt_IsFollowedBy (start_trace -> end_trace)

View file

@ -0,0 +1,49 @@
# When a control port is active and is connected to an activity, we want to execute the activity
# But, if the activity has input_and (input_or = False). It only can be activated if all its inputs are active
# Match the model
model:RAM_pm_Model
# Match the a python automated activity
py_activity:RAM_pm_PythonAutomatedActivity {
# Check if all connected ports are active in case of input_and
condition = ```
all_active = True
# Check for or / and
if not get_slot_value(this, "input_or"):
# Get all the ctrl in ports
for has_ctrl_in in get_outgoing(this, "pm_HasCtrlIn"):
c_in_state = get_source(get_incoming(get_target(has_ctrl_in), "pm_Of")[0])
# Check if the port is active or not
if not get_slot_value(c_in_state, "active"):
all_active = False
break
all_active
```;
} model_to_activity:RAM_pm_Owns (model -> py_activity)
# Match a control activity in port that is active
ctrl_in:RAM_pm_CtrlActivityIn
ctrl_in_state:RAM_pm_CtrlPortState {
RAM_active = `get_value(this)`;
}
state_to_port:RAM_pm_Of (ctrl_in_state -> ctrl_in)
# Match the activity link to the port
activity_to_port:RAM_pm_HasCtrlIn (py_activity -> ctrl_in)
# Match the end of the trace
end_trace:RAM_pt_EndTrace
ends:RAM_pt_Ends (end_trace -> model)
# Match the previous trace element before the end trace
prev_trace_element:RAM_pt_Event
followed_by:RAM_pt_IsFollowedBy (prev_trace_element -> end_trace)

View file

@ -0,0 +1,42 @@
model:RAM_pm_Model
py_activity:RAM_pm_PythonAutomatedActivity {
condition = ```
start_activity = create_object(None, "pt_StartActivity")
create_activity_links(odapi, start_activity, matched("prev_trace_element"), matched("ctrl_in"))
input_data = extract_input_data(odapi, this)
result = execute_activity(odapi, globals()["packages"], this, input_data)
if len(result) == 3:
status_code, output_data, input_used = result
else:
status_code, output_data, input_used = *result, None
if input_used:
handle_artefact(odapi, start_activity, "pt_Artefact", "pt_Consumes", get(input_used), input_data[input_used], direction="DataFlowOut")
end_activity = create_object(None, "pt_EndActivity")
ctrl_out = get(status_code)
create_activity_links(odapi, end_activity, start_activity, ctrl_out, end_trace=matched("end_trace"))
if output_data:
port, data = output_data
handle_artefact(odapi, end_activity, "pt_Artefact", "pt_Produces", get(port), data, direction="DataFlowIn")
update_control_states(odapi, this, ctrl_out)
```;
}
model_to_activity:RAM_pm_Owns
ctrl_in:RAM_pm_CtrlActivityIn
ctrl_in_state:RAM_pm_CtrlPortState {
RAM_active = `False`;
}
state_to_port:RAM_pm_Of (ctrl_in_state -> ctrl_in)
activity_to_port:RAM_pm_HasCtrlIn (py_activity -> ctrl_in)
end_trace:RAM_pt_EndTrace
ends:RAM_pt_Ends (end_trace -> model)
prev_trace_element:RAM_pt_Event

View file

@ -0,0 +1,36 @@
# When a control port is active and is connected to an activity, we want to execute the activity. If it is a composite one, we execute the inner workflow of it
# But, if the activity has input_and (input_or = False). It only can be activated if all its inputs are active
# Match the model
model:RAM_pm_Model
# Match the a python automated activity
activity:RAM_pm_Activity {
RAM_composite = `True`;
} model_to_activity:RAM_pm_Owns (model -> activity)
# Match a control activity in port that is active
ctrl_in:RAM_pm_CtrlActivityIn
ctrl_in_state:RAM_pm_CtrlPortState {
RAM_active = `get_value(this)`;
}
state_to_port:RAM_pm_Of (ctrl_in_state -> ctrl_in)
# Match the activity link to the port
activity_to_port:RAM_pm_HasCtrlIn (activity -> ctrl_in)
# Match the end of the trace
end_trace:RAM_pt_EndTrace
ends:RAM_pt_Ends (end_trace -> model)
# Match the previous trace element before the end trace
prev_trace_element:RAM_pt_Event
followed_by:RAM_pt_IsFollowedBy (prev_trace_element -> end_trace)

View file

@ -0,0 +1,29 @@
model:RAM_pm_Model
activity:RAM_pm_Activity {
RAM_composite = `True`;
condition = ```
# Execute inner workflow
execute_composite_workflow(odapi, this, matched("ctrl_in"), globals()["composite_linkage"], globals()["packages"], matched)
```;
}
model_to_activity:RAM_pm_Owns
ctrl_in:RAM_pm_CtrlActivityIn
ctrl_in_state:RAM_pm_CtrlPortState {
RAM_active = `False`;
}
state_to_port:RAM_pm_Of (ctrl_in_state -> ctrl_in)
activity_to_port:RAM_pm_HasCtrlIn (activity -> ctrl_in)
end_trace:RAM_pt_EndTrace
ends:RAM_pt_Ends (end_trace -> model)
prev_trace_element:RAM_pt_Event

View file

@ -0,0 +1,20 @@
# Match an active control output port
out_state:RAM_pm_CtrlPortState {
RAM_active = `get_value(this)`;
}
out:RAM_pm_CtrlOut
state_to_out:RAM_pm_Of (out_state -> out)
# Match an inactive control input port
in_state:RAM_pm_CtrlPortState {
RAM_active = `not get_value(this)`;
}
in:RAM_pm_CtrlIn
state_to_in:RAM_pm_Of (in_state -> in)
# Match the connection between those two ports
flow:RAM_pm_CtrlFlow (out -> in)

View file

@ -0,0 +1,42 @@
# Copy the left hand side
out_state:RAM_pm_CtrlPortState {
# Only set the output port to inactive if all connected input ports are set to active
RAM_active = ```
set_to_active = False
output_port = matched("out")
outgoing_flows = get_outgoing(output_port, "pm_CtrlFlow")
# for each flow: pm_CtrlFlow -> pm_CtrlIn <- pm_Of <- pm_CtrlPortState == state
all_input_port_states = [get_source(get_incoming(get_target(flow), "pm_Of")[0]) for flow in outgoing_flows]
input_port_state = matched("in_state")
for state in all_input_port_states:
is_active = get_slot_value(state, "active")
# If the state is not active and it is not the input port state we have matched and planned to set active
# Then we can't yet set this output port state to active
if not is_active and state != input_port_state:
set_to_active = True
break
# Set the attribute to the assigned value
set_to_active
```;
}
out:RAM_pm_CtrlOut
state_to_out:RAM_pm_Of (out_state -> out)
in_state:RAM_pm_CtrlPortState {
# Set the input port active
RAM_active = `True`;
}
in:RAM_pm_CtrlIn
state_to_in:RAM_pm_Of (in_state -> in)
flow:RAM_pm_CtrlFlow (out -> in)

View file

@ -0,0 +1,200 @@
##################################################
pm_Model:Class
##################################################
pm_Stateful:Class
##################################################
pm_ModelElement:Class {
abstract = True;
}
##################################################
pm_Activity:Class
:Inheritance (pm_Activity -> pm_ModelElement)
pm_Activity_name:AttributeLink (pm_Activity -> String) {
name = "name";
optional = False;
}
pm_Activity_composite:AttributeLink (pm_Activity -> Boolean) {
name = "composite";
optional = False;
}
pm_Activity_subworkflow_path:AttributeLink (pm_Activity -> String) {
name = "subworkflow_path";
optional = True;
}
pm_AutomatedActivity:Class {
abstract = True;
} :Inheritance (pm_AutomatedActivity -> pm_Activity)
pm_AutomatedActivity_input_or:AttributeLink (pm_AutomatedActivity -> Boolean) {
name = "input_or";
optional = False;
}
pm_PythonAutomatedActivity:Class
:Inheritance (pm_PythonAutomatedActivity -> pm_AutomatedActivity)
pm_PythonAutomatedActivity_func:AttributeLink (pm_PythonAutomatedActivity -> ActionCode) {
name = "func";
optional = False;
}
##################################################
pm_Artefact:Class
:Inheritance (pm_Artefact -> pm_ModelElement)
:Inheritance (pm_Artefact -> pm_Stateful)
##################################################
pm_CtrlPort:Class {
abstract = True;
} :Inheritance (pm_CtrlPort -> pm_Stateful)
pm_CtrlIn:Class {
abstract = True;
} :Inheritance (pm_CtrlIn -> pm_CtrlPort)
pm_CtrlSink:Class {
# 1) A control sink port must have at least one incoming control flow
# 2) A control sink port can't have any control flow output
constraint = ```
has_incoming = len(get_incoming(this, "pm_CtrlFlow")) > 0
no_outgoing = len(get_outgoing(this, "pm_CtrlFlow")) == 0
# Return constraint
has_incoming and no_outgoing
```;
} :Inheritance (pm_CtrlSink -> pm_CtrlIn)
pm_CtrlActivityIn:Class {
# 1) Must have at least one incoming control flow
constraint = ```
has_incoming = len(get_incoming(this, "pm_CtrlFlow")) > 0
# Return constraint
has_incoming
```;
} :Inheritance (pm_CtrlActivityIn -> pm_CtrlIn)
pm_CtrlOut:Class {
abstract = True;
} :Inheritance (pm_CtrlOut -> pm_CtrlPort)
pm_CtrlSource:Class {
# 1) A control source port can't have any control flow inputs
# 2) A control source port must have at least one outgoing control flow
constraint = ```
no_incoming = len(get_incoming(this, "pm_CtrlFlow")) == 0
has_outgoing = len(get_outgoing(this, "pm_CtrlFlow")) > 0
# Return constraint
no_incoming and has_outgoing
```;
} :Inheritance (pm_CtrlSource -> pm_CtrlOut)
pm_CtrlActivityOut:Class {
# 1) Must have at least one outgoing control flow
constraint = ```
has_outgoing = len(get_outgoing(this, "pm_CtrlFlow")) > 0
# Return constraint
has_outgoing
```;
} :Inheritance (pm_CtrlActivityOut -> pm_CtrlOut)
##################################################
pm_DataPort:Class {
abstract = True;
}
pm_DataIn:Class {
abstract = True;
} :Inheritance (pm_DataIn -> pm_DataPort)
pm_DataSink:Class
:Inheritance (pm_DataSink -> pm_DataIn)
pm_DataActivityIn:Class
:Inheritance (pm_DataActivityIn -> pm_DataIn)
pm_DataOut:Class {
abstract = True;
} :Inheritance (pm_DataOut -> pm_DataPort)
pm_DataSource:Class
:Inheritance (pm_DataSource -> pm_DataOut)
pm_DataActivityOut:Class
:Inheritance (pm_DataActivityOut -> pm_DataOut)
##################################################
##################################################
pm_Owns:Association (pm_Model -> pm_ModelElement) {
source_lower_cardinality = 1;
source_upper_cardinality = 1;
}
##################################################
pm_CtrlFlow:Association (pm_CtrlPort -> pm_CtrlPort)
##################################################
pm_HasCtrlIn:Association (pm_Activity -> pm_CtrlIn) {
source_upper_cardinality = 1;
target_lower_cardinality = 1;
}
pm_HasCtrlOut:Association (pm_Activity -> pm_CtrlOut) {
source_upper_cardinality = 1;
target_lower_cardinality = 1;
}
pm_HasDataIn:Association (pm_Activity -> pm_DataIn) {
source_upper_cardinality = 1;
}
pm_HasDataOut:Association (pm_Activity -> pm_DataOut) {
source_upper_cardinality = 1;
}
##################################################
pm_DataFlowIn:Association (pm_DataOut -> pm_Artefact) {
source_lower_cardinality = 1;
target_lower_cardinality = 1;
}
pm_DataFlowOut:Association (pm_Artefact -> pm_DataIn) {
source_lower_cardinality = 1;
target_lower_cardinality = 1;
}
##################################################
##################################################
has_source_and_sink:GlobalConstraint {
# There should be at least one source and sink control port
constraint = ```
contains_source = len(get_all_instances("pm_CtrlSource")) > 0
contains_sink = len(get_all_instances("pm_CtrlSink")) > 0
# return constraint
contains_source and contains_sink
```;
}
##################################################

View file

@ -0,0 +1,38 @@
##################################################
pm_State:Class {
abstract = True;
}
##################################################
pm_ArtefactState:Class
:Inheritance (pm_ArtefactState -> pm_State)
pm_ArtefactState_data:AttributeLink (pm_ArtefactState -> Bytes) {
name = "data";
optional = False;
}
##################################################
pm_CtrlPortState:Class
:Inheritance (pm_CtrlPortState -> pm_State)
pm_CtrlPortState_active:AttributeLink (pm_CtrlPortState -> Boolean) {
name = "active";
optional = False;
}
##################################################
##################################################
pm_Of:Association (pm_State -> pm_Stateful) {
# one-to-one
source_lower_cardinality = 1;
source_upper_cardinality = 1;
target_lower_cardinality = 1;
target_upper_cardinality = 1;
}
##################################################

View file

@ -0,0 +1,109 @@
##################################################
pt_Event:Class {
abstract = True;
}
##################################################
pt_Activity:Class {
abstract = True;
} :Inheritance (pt_Activity -> pt_Event)
pt_StartActivity:Class {
# A start activity can only be related to a control in port
constraint = ```
correct_related = True
port = get_target(get_outgoing(this, "pt_RelatesTo")[0])
correct_related = port in [uid for _, uid in get_all_instances("pm_CtrlIn")]
correct_related
```;
} :Inheritance (pt_StartActivity -> pt_Activity)
pt_EndActivity:Class {
# A end activity can only be related to a control out port
constraint = ```
correct_related = True
port = get_target(get_outgoing(this, "pt_RelatesTo")[0])
correct_related = port in [uid for _, uid in get_all_instances("pm_CtrlOut")]
correct_related
```;
} :Inheritance (pt_EndActivity -> pt_Activity)
##################################################
pt_StartTrace:Class
:Inheritance (pt_StartTrace -> pt_Event)
pt_EndTrace:Class
:Inheritance (pt_EndTrace -> pt_Event)
##################################################
pt_Artefact:Class
:Inheritance (pt_Artefact -> pt_Event)
pt_Artefact_data:AttributeLink (pt_Artefact -> Bytes) {
name = "data";
optional = False;
}
##################################################
##################################################
pt_IsFollowedBy:Association (pt_Event -> pt_Event) {
source_upper_cardinality = 1;
target_upper_cardinality = 1;
}
##################################################
pt_RelatesTo:Association (pt_Activity -> pm_CtrlPort) {
source_upper_cardinality = 1;
target_lower_cardinality = 1;
target_upper_cardinality = 1;
}
pt_Consumes:Association (pt_Artefact -> pt_StartActivity) {
source_upper_cardinality = 1;
target_lower_cardinality = 1;
target_upper_cardinality = 1;
}
pt_Produces:Association (pt_EndActivity -> pt_Artefact) {
source_lower_cardinality = 1;
source_upper_cardinality = 1;
target_upper_cardinality = 1;
}
##################################################
pt_Starts:Association (pt_StartTrace -> pm_Model) {
source_upper_cardinality = 1;
target_lower_cardinality = 1;
target_upper_cardinality = 1;
}
pt_Ends:Association (pt_EndTrace -> pm_Model) {
source_upper_cardinality = 1;
target_lower_cardinality = 1;
target_upper_cardinality = 1;
}
##################################################
pt_PrevVersion:Association (pt_Artefact -> pt_Artefact) {
source_upper_cardinality = 1;
target_upper_cardinality = 1;
}
pt_BelongsTo:Association (pt_Artefact -> pm_Artefact) {
target_lower_cardinality = 1;
target_upper_cardinality = 1;
}
##################################################

View file

@ -0,0 +1,162 @@
import re
from state.devstate import DevState
from bootstrap.scd import bootstrap_scd
from util import loader
from transformation.rule import RuleMatcherRewriter
from transformation.ramify import ramify
from concrete_syntax.graphviz import renderer as graphviz
from concrete_syntax.graphviz.make_url import make_url
from concrete_syntax.plantuml import renderer as plantuml
from concrete_syntax.plantuml.make_url import make_url as plant_make_url
from api.od import ODAPI
import os
from os import listdir
from os.path import isfile, join
import importlib.util
from util.module_to_dict import module_to_dict
from examples.ftg_pm_pt import help_functions
from examples.ftg_pm_pt.ftg_pm_pt import FtgPmPt
class FtgPmPtRunner:
def __init__(self, model: FtgPmPt, composite_linkage: dict | None = None):
self.model = model
self.ram_mm = ramify(self.model.state, self.model.meta_model)
self.rules = self.load_rules()
self.packages = None
self.composite_linkage = composite_linkage
def load_rules(self):
return loader.load_rules(
self.model.state,
lambda rule_name, kind: os.path.join(
os.path.dirname(__file__),
f"operational_semantics/r_{rule_name}_{kind}.od"
),
self.ram_mm,
["connect_process_trace", "trigger_ctrl_flow", "exec_activity", "exec_composite_activity"]
)
def set_packages(self, packages: str | dict, is_path: bool):
if not is_path:
self.packages = packages
return
self.packages = self.parse_packages(packages)
def parse_packages(self, packages_path: str) -> dict:
return self.collect_functions_from_packages(packages_path, packages_path)
def collect_functions_from_packages(self, base_path, current_path):
functions_dict = {}
for entry in listdir(current_path):
entry_path = join(current_path, entry)
if isfile(entry_path) and entry.endswith(".py"):
module_name = self.convert_path_to_module_name(base_path, entry_path)
module = self.load_module_from_file(entry_path)
for func_name, func in module_to_dict(module).items():
functions_dict[f"{module_name}.{func_name}"] = func
elif not isfile(entry_path):
nested_functions = self.collect_functions_from_packages(base_path, entry_path)
functions_dict.update(nested_functions)
return functions_dict
@staticmethod
def convert_path_to_module_name(base_path, file_path):
return file_path.replace(base_path, "").replace(".py", "").replace("/", "")
@staticmethod
def load_module_from_file(file_path):
spec = importlib.util.spec_from_file_location("", file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def create_matcher(self):
packages = module_to_dict(help_functions)
if self.packages:
packages.update({ "packages": self.packages })
if self.composite_linkage:
packages.update({ "composite_linkage": self.composite_linkage })
matcher_rewriter = RuleMatcherRewriter(
self.model.state, self.model.meta_model, self.ram_mm, eval_context=packages
)
return matcher_rewriter
def visualize_model(self):
print(make_url(graphviz.render_object_diagram(self.model.state, self.model.model, self.model.meta_model)))
print(plant_make_url(plantuml.render_object_diagram(self.model.state, self.model.model, self.model.meta_model)))
@staticmethod
def __extract_artefact_info(od, pt_art):
"""Extract artefact metadata and data."""
data = od.get_slot_value(pt_art, "data")
pm_art = od.get_name(od.get_target(od.get_outgoing(pt_art, "pt_BelongsTo")[0]))
has_prev_version = bool(od.get_outgoing(pt_art, "pt_PrevVersion"))
is_last_version = not od.get_incoming(pt_art, "pt_PrevVersion")
return {
"Artefact Name": pm_art,
"Data": data,
"Has previous version": has_prev_version,
"Is last version": is_last_version
}
def __extract_inputs(self, od, event_node):
"""Extract all consumed artefacts for an event."""
return [
self.__extract_artefact_info(od, od.get_source(consumes))
for consumes in od.get_incoming(event_node, "pt_Consumes")
]
def __extract_outputs(self, od, event_node):
"""Extract all produced artefacts for an event."""
return [
self.__extract_artefact_info(od, od.get_target(produces))
for produces in od.get_outgoing(event_node, "pt_Produces")
]
@staticmethod
def to_snake_case(experiment_type):
# Finds uppercase letters that are not at the start of the string.
# Example: AtomicExperiment -> atomic_experiment
return re.sub(r'(?<!^)(?=[A-Z])', '_', experiment_type).lower()
def run(self, debug_flag: bool = False):
matcher = self.create_matcher()
rule_performed = True
while rule_performed:
# Loop over all the rules first in order priority
for i, (rule_name, rule) in enumerate(self.rules.items()):
rule_performed = False
result = matcher.exec_on_first_match(
self.model.model, rule, rule_name, in_place=True
)
# If the rule cannot be executed go to the next rule
if not result:
continue
rule_performed = True
self.model.model, lhs_match, _ = result
if debug_flag:
print(f"Match: {lhs_match}")
self.visualize_model()
# If a rule is performed, break and start loping over the rules from the beginning
break

View file

@ -113,9 +113,7 @@ def main():
# object to match # object to match
man:{prefix}Man {{ man:{prefix}Man {{
# match only men heavy enough # match only men heavy enough
{prefix}weight = ``` {prefix}weight = `get_value(this) > 60`;
get_value(this) > 60
```;
}} }}
# object to delete # object to delete
@ -142,6 +140,7 @@ def main():
# object to create # object to create
bill:{prefix}Man {{ bill:{prefix}Man {{
# name = `"billie"+str(get_slot_value(matched("man"), "weight"))`;
{prefix}weight = `100`; {prefix}weight = `100`;
}} }}
@ -208,6 +207,7 @@ def main():
generator = match_od(state, dsl_m_id, dsl_mm_id, lhs_id, ramified_mm_id) generator = match_od(state, dsl_m_id, dsl_mm_id, lhs_id, ramified_mm_id)
for i, (match, color) in enumerate(zip(generator, ["red", "orange"])): for i, (match, color) in enumerate(zip(generator, ["red", "orange"])):
print("\nMATCH:\n", match)
uml += plantuml.render_trace_match(state, match, lhs_id, dsl_m_id, color) uml += plantuml.render_trace_match(state, match, lhs_id, dsl_m_id, color)
# rewrite happens in-place (which sucks), so we will only modify a clone: # rewrite happens in-place (which sucks), so we will only modify a clone:

View file

@ -2,7 +2,7 @@ from examples.schedule.RuleExecuter import RuleExecuter
from state.devstate import DevState from state.devstate import DevState
from api.od import ODAPI from api.od import ODAPI
from concrete_syntax.textual_od.renderer import render_od from concrete_syntax.textual_od.renderer import render_od
# from concrete_syntax.textual_od.renderer_jinja2 import render_od_jinja2 from concrete_syntax.textual_od.renderer_jinja2 import render_od_jinja2
from bootstrap.scd import bootstrap_scd from bootstrap.scd import bootstrap_scd
from util import loader from util import loader
from transformation.rule import RuleMatcherRewriter, ActionGenerator from transformation.rule import RuleMatcherRewriter, ActionGenerator
@ -33,10 +33,10 @@ if __name__ == "__main__":
mm_rt_cs = mm_cs + read_file('metamodels/mm_runtime.od') mm_rt_cs = mm_cs + read_file('metamodels/mm_runtime.od')
# m_cs = read_file('models/m_example_simple.od') # m_cs = read_file('models/m_example_simple.od')
# m_rt_initial_cs = m_cs + read_file('models/m_example_simple_rt_initial.od') # m_rt_initial_cs = m_cs + read_file('models/m_example_simple_rt_initial.od')
# m_cs = read_file('models/m_example_mutex.od') m_cs = read_file('models/m_example_mutex.od')
# m_rt_initial_cs = m_cs + read_file('models/m_example_mutex_rt_initial.od') m_rt_initial_cs = m_cs + read_file('models/m_example_mutex_rt_initial.od')
m_cs = read_file('models/m_example_inharc.od') # m_cs = read_file('models/m_example_inharc.od')
m_rt_initial_cs = m_cs + read_file('models/m_example_inharc_rt_initial.od') # m_rt_initial_cs = m_cs + read_file('models/m_example_inharc_rt_initial.od')
# Parse them # Parse them
mm = loader.parse_and_check(state, mm_cs, scd_mmm, "Petri-Net Design meta-model") mm = loader.parse_and_check(state, mm_cs, scd_mmm, "Petri-Net Design meta-model")
@ -59,7 +59,8 @@ if __name__ == "__main__":
def render_callback(od): def render_callback(od):
show_petri_net(od) show_petri_net(od)
return render_od(state, od.m, od.mm) # return render_od(state, od.m, od.mm)
return render_od_jinja2(state, od.m, od.mm)
action_generator.generate_dot() action_generator.generate_dot()

View file

@ -12,11 +12,22 @@
nameOffsetY="0" nameOffsetY="0"
positionX="{{ i * 100 + 100 }}" positionX="{{ i * 100 + 100 }}"
positionY="100" positionY="100"
/> />
{% endfor %} {% endfor %}
{% for i, (transition_name, transition) in enumerate(odapi.get_all_instances("PNTransition")) %} {% for i, (transition_name, transition) in enumerate(odapi.get_all_instances("PNTransition")) %}
<transition angle="0" displayName="true" id="{{ transition_name }}" infiniteServer="false" name="{{ transition_name }}" nameOffsetX="0" nameOffsetY="0" player="0" positionX="{{ i * 100 + 100 }}" positionY="300" priority="0" urgent="false"/> <transition angle="0"
displayName="true"
id="{{ transition_name }}"
infiniteServer="false"
name="{{ transition_name }}"
nameOffsetX="0"
nameOffsetY="0"
player="0"
positionX="{{ i * 100 + 100 }}"
positionY="300"
priority="0"
urgent="false"/>
{% endfor %} {% endfor %}
{% for arc_name, arc in odapi.get_all_instances("arc") %} {% for arc_name, arc in odapi.get_all_instances("arc") %}

View file

@ -270,7 +270,7 @@ port_rt_m_cs = port_m_cs + """
time = 0; time = 0;
} }
waitingState:PlaceState { numShips = 0; } :of (waitingState -> waiting) waitingState:PlaceState { numShips = 2; } :of (waitingState -> waiting)
inboundPassageState:PlaceState { numShips = 0; } :of (inboundPassageState -> inboundPassage) inboundPassageState:PlaceState { numShips = 0; } :of (inboundPassageState -> inboundPassage)
outboundPassageState:PlaceState { numShips = 0; } :of (outboundPassageState -> outboundPassage) outboundPassageState:PlaceState { numShips = 0; } :of (outboundPassageState -> outboundPassage)
@ -282,7 +282,7 @@ port_rt_m_cs = port_m_cs + """
berth1State:BerthState { status = "empty"; numShips = 0; } :of (berth1State -> berth1) berth1State:BerthState { status = "empty"; numShips = 0; } :of (berth1State -> berth1)
berth2State:BerthState { status = "empty"; numShips = 0; } :of (berth2State -> berth2) berth2State:BerthState { status = "empty"; numShips = 0; } :of (berth2State -> berth2)
servedState:PlaceState { numShips = 0; } :of (servedState -> served) servedState:PlaceState { numShips = 1; } :of (servedState -> served)
workersState:WorkerSetState :of (workersState -> workers) workersState:WorkerSetState :of (workersState -> workers)
@ -396,12 +396,12 @@ smaller_model2_rt_cs = smaller_model2_cs + """
} }
waitingState:PlaceState { numShips = 1; } :of (waitingState -> waiting) waitingState:PlaceState { numShips = 1; } :of (waitingState -> waiting)
berthState:BerthState { numShips = 0; status = "empty"; } :of (berthState -> berth) berthState:BerthState { numShips = 1; status = "served"; } :of (berthState -> berth)
servedState:PlaceState { numShips = 0; } :of (servedState -> served) servedState:PlaceState { numShips = 1; } :of (servedState -> served)
gen2waitState:ConnectionState { moved = False; } :of (gen2waitState -> gen2wait) gen2waitState:ConnectionState { moved = False; } :of (gen2waitState -> gen2wait)
wait2berthState:ConnectionState { moved = False; } :of (wait2berthState -> wait2berth) wait2berthState:ConnectionState { moved = False; } :of (wait2berthState -> wait2berth)
berth2servedState:ConnectionState { moved = False; } :of (berth2servedState -> berth2served) berth2servedState:ConnectionState { moved = True; } :of (berth2servedState -> berth2served)
workersState:WorkerSetState :of (workersState -> workers) workersState:WorkerSetState :of (workersState -> workers)
""" """

View file

@ -53,7 +53,7 @@ sim = Simulator(
termination_condition=termination_condition, termination_condition=termination_condition,
check_conformance=True, check_conformance=True,
verbose=True, verbose=True,
renderer=render_port_textual, # renderer=render_port_textual,
# renderer=render_port_graphviz, # renderer=render_port_graphviz,
) )

View file

@ -9,7 +9,7 @@
pn_place:RAM_PNPlace { pn_place:RAM_PNPlace {
# new feature: you can control the name of the object to be created: # new feature: you can control the name of the object to be created:
name = `f"pn_{get_name(matched("port_place"))}"`; name = `f"ships_{get_name(matched("port_place"))}"`;
} }
place2place:RAM_generic_link (pn_place -> port_place) place2place:RAM_generic_link (pn_place -> port_place)
@ -19,4 +19,4 @@
pn_place_state:RAM_PNPlaceState { pn_place_state:RAM_PNPlaceState {
RAM_numTokens = `get_slot_value(matched('port_place_state'), "numShips")`; RAM_numTokens = `get_slot_value(matched('port_place_state'), "numShips")`;
} }
:RAM_pn_of(pn_place_state -> pn_place) :RAM_pn_of(pn_place_state -> pn_place)

View file

@ -1,5 +1,7 @@
# Just look for a connection: # Just look for a connection and its state:
port_src:RAM_Source port_src:RAM_Source
port_snk:RAM_Sink port_snk:RAM_Sink
port_conn:RAM_connection (port_src -> port_snk) port_conn:RAM_connection (port_src -> port_snk)
port_conn_state:RAM_ConnectionState
port_of:RAM_of (port_conn_state -> port_conn)

View file

@ -3,12 +3,26 @@
port_src:RAM_Source port_src:RAM_Source
port_snk:RAM_Sink port_snk:RAM_Sink
port_conn:RAM_connection (port_src -> port_snk) port_conn:RAM_connection (port_src -> port_snk)
port_conn_state:RAM_ConnectionState
port_of:RAM_of (port_conn_state -> port_conn)
# Create a Petri Net transition, and link it to our port-connection: # Create a Petri Net transition, and link it to our port-connection:
pn_transition:RAM_PNTransition { move_transition:RAM_PNTransition {
name = `f"pn_{get_name(matched("port_conn"))}"`; name = `f"move_{get_name(matched("port_conn"))}"`;
} }
trans2conn:RAM_generic_link (pn_transition -> port_conn)
moved_place:RAM_PNPlace {
name = `f" moved_{get_name(matched("port_conn"))}"`;
}
moved_place_state:RAM_PNPlaceState {
RAM_numTokens = `1 if get_slot_value(matched('port_conn_state'), "moved") else 0`;
}
:RAM_pn_of (moved_place_state -> moved_place)
# when firing a 'move', put a token in the 'moved'-place
:RAM_arc (move_transition -> moved_place)
trans2conn:RAM_generic_link (move_transition -> port_conn)
moved2conn:RAM_generic_link (moved_place -> port_conn)
# Note that we are not yet creating any incoming/outgoing petri net arcs! This will be done in another rule. # Note that we are not yet creating any incoming/outgoing petri net arcs! This will be done in another rule.

View file

@ -62,9 +62,9 @@ if __name__ == "__main__":
print('loading model...') print('loading model...')
port_m_rt_initial = loader.parse_and_check(state, port_m_rt_initial = loader.parse_and_check(state,
m_cs=models.port_rt_m_cs, # <-- your final solution should work with the full model # m_cs=models.port_rt_m_cs, # <-- your final solution should work with the full model
# m_cs=models.smaller_model_rt_cs, # <-- simpler model to try first # m_cs=models.smaller_model_rt_cs, # <-- simpler model to try first
# m_cs=models.smaller_model2_rt_cs, # <-- simpler model to try first m_cs=models.smaller_model2_rt_cs, # <-- simpler model to try first
mm=merged_mm, mm=merged_mm,
descr="initial model", descr="initial model",
check_conformance=False, # no need to check conformance every time check_conformance=False, # no need to check conformance every time

View file

@ -5,6 +5,7 @@ from services.primitives.integer_type import Integer
from services.primitives.string_type import String from services.primitives.string_type import String
from services.primitives.boolean_type import Boolean from services.primitives.boolean_type import Boolean
from services.primitives.actioncode_type import ActionCode from services.primitives.actioncode_type import ActionCode
from services.primitives.bytes_type import Bytes
from api.cd import CDAPI from api.cd import CDAPI
from typing import Optional from typing import Optional
@ -147,6 +148,13 @@ class OD:
actioncode_t.create(value) actioncode_t.create(value)
return self.create_model_ref(name, "ActionCode", actioncode_node) return self.create_model_ref(name, "ActionCode", actioncode_node)
def create_bytes_value(self, name: str, value: bytes):
from services.primitives.bytes_type import Bytes
bytes_node = self.bottom.create_node()
bytes_t = Bytes(bytes_node, self.bottom.state)
bytes_t.create(value)
return self.create_model_ref(name, "Bytes", bytes_node)
# Identical to the same SCD method: # Identical to the same SCD method:
def create_model_ref(self, name: str, type_name: str, model: UUID): def create_model_ref(self, name: str, type_name: str, model: UUID):
# create element + morphism links # create element + morphism links
@ -389,6 +397,8 @@ def read_primitive_value(bottom, modelref: UUID, mm: UUID):
return Boolean(referred_model, bottom.state).read(), typ_name return Boolean(referred_model, bottom.state).read(), typ_name
elif typ_name == "ActionCode": elif typ_name == "ActionCode":
return ActionCode(referred_model, bottom.state).read(), typ_name return ActionCode(referred_model, bottom.state).read(), typ_name
elif typ_name == "Bytes":
return Bytes(referred_model, bottom.state).read(), typ_name
else: else:
raise Exception("Unimplemented type:", typ_name) raise Exception("Unimplemented type:", typ_name)

View file

@ -0,0 +1,24 @@
from uuid import UUID
from state.base import State
from services.bottom.V0 import Bottom
class Bytes:
def __init__(self, model: UUID, state: State):
self.model = model
self.bottom = Bottom(state)
type_model_id_node, = self.bottom.read_outgoing_elements(state.read_root(), "Bytes")
self.type_model = UUID(self.bottom.read_value(type_model_id_node))
def create(self, value: bool):
if "bytes" in self.bottom.read_keys(self.model):
instance, = self.bottom.read_outgoing_elements(self.model, "bytes")
self.bottom.delete_element(instance)
_instance = self.bottom.create_node(value)
self.bottom.create_edge(self.model, _instance, "bytes")
_type, = self.bottom.read_outgoing_elements(self.type_model, "Bytes")
self.bottom.create_edge(_instance, _type, "Morphism")
def read(self):
instance, = self.bottom.read_outgoing_elements(self.model, "bytes")
return self.bottom.read_value(instance)

View file

@ -2,13 +2,14 @@ from abc import ABC, abstractmethod
from typing import Any, List, Tuple, Optional, Union from typing import Any, List, Tuple, Optional, Union
from uuid import UUID, uuid4 from uuid import UUID, uuid4
primitive_types = (int, float, str, bool) primitive_types = (int, float, str, bool, bytes)
INTEGER = ("Integer",) INTEGER = ("Integer",)
FLOAT = ("Float",) FLOAT = ("Float",)
STRING = ("String",) STRING = ("String",)
BOOLEAN = ("Boolean",) BOOLEAN = ("Boolean",)
TYPE = ("Type",) TYPE = ("Type",)
type_values = (INTEGER, FLOAT, STRING, BOOLEAN, TYPE) BYTES = ("Bytes",)
type_values = (INTEGER, FLOAT, STRING, BOOLEAN, TYPE, BYTES)
Node = UUID Node = UUID

View file

@ -171,3 +171,12 @@ def test_create_nodevalue_string_type(state):
def test_create_nodevalue_invalid_type(state): def test_create_nodevalue_invalid_type(state):
id1 = state.create_nodevalue(("Class",)) id1 = state.create_nodevalue(("Class",))
assert id1 == None assert id1 == None
@pytest.mark.usefixtures("state")
def test_create_nodevalue_bytes_type(state):
id1 = state.create_nodevalue(("Bytes",))
assert id1 != None
v = state.read_value(id1)
assert v == ("Bytes",)

View file

@ -4,7 +4,7 @@ from concrete_syntax.textual_od import parser, renderer
from services.scd import SCD from services.scd import SCD
from util.timer import Timer from util.timer import Timer
PRIMITIVE_TYPES = set(["Integer", "String", "Boolean", "ActionCode"]) PRIMITIVE_TYPES = set(["Integer", "String", "Boolean", "ActionCode", "Bytes"])
# Merges N models. The models must have the same meta-model. # Merges N models. The models must have the same meta-model.
# Care should be taken to avoid naming collisions before calling this function. # Care should be taken to avoid naming collisions before calling this function.
@ -12,7 +12,7 @@ def merge_models(state, mm, models: list[UUID]):
with Timer("merge_models"): with Timer("merge_models"):
primitive_types = { primitive_types = {
type_name : UUID(state.read_value(state.read_dict(state.read_root(), type_name))) type_name : UUID(state.read_value(state.read_dict(state.read_root(), type_name)))
for type_name in ["Integer", "String", "Boolean", "ActionCode"] for type_name in ["Integer", "String", "Boolean", "ActionCode", "Bytes"]
} }
merged = state.create_node() merged = state.create_node()