merging (meta-)models works (but it's dirty!)
This commit is contained in:
parent
c31c8bf3ea
commit
9883e09ac2
17 changed files with 474 additions and 46 deletions
|
|
@ -6,5 +6,4 @@ from concrete_syntax.common import indent
|
|||
def clone_od(state, m: UUID, mm: UUID):
|
||||
# cheap-ass implementation: render and parse
|
||||
cs = renderer.render_od(state, m, mm, hide_names=False)
|
||||
# print(indent(cs, 6))
|
||||
return parser.parse_od(state, cs, mm)
|
||||
78
transformation/merger.py
Normal file
78
transformation/merger.py
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
from api.od import ODAPI
|
||||
from uuid import UUID
|
||||
from concrete_syntax.textual_od import parser, renderer
|
||||
from services.scd import SCD
|
||||
from util.timer import Timer
|
||||
|
||||
PRIMITIVE_TYPES = set(["Integer", "String", "Boolean", "ActionCode"])
|
||||
|
||||
# Merges N models. The models must have the same meta-model.
|
||||
# Care should be taken to avoid naming collisions before calling this function.
|
||||
def merge_models(state, mm, models: list[UUID]):
|
||||
with Timer("merge_models"):
|
||||
primitive_types = {
|
||||
type_name : UUID(state.read_value(state.read_dict(state.read_root(), type_name)))
|
||||
for type_name in ["Integer", "String", "Boolean", "ActionCode"]
|
||||
}
|
||||
|
||||
merged = state.create_node()
|
||||
merged_odapi = ODAPI(state, m=merged, mm=mm)
|
||||
|
||||
scd_mmm = UUID(state.read_value(state.read_dict(state.read_root(), "SCD")))
|
||||
|
||||
mm_odapi = ODAPI(state, m=mm, mm=scd_mmm)
|
||||
types = mm_odapi.get_all_instances("Class", include_subtypes=True)
|
||||
all_objs = []
|
||||
for type_name, type_obj in types:
|
||||
for model in models:
|
||||
m_odapi = ODAPI(state, m=model, mm=mm)
|
||||
for obj_name, obj in m_odapi.get_all_instances(type_name, include_subtypes=False):
|
||||
all_objs.append((obj_name, obj, type_name, m_odapi))
|
||||
todo = all_objs
|
||||
|
||||
have = {}
|
||||
|
||||
mapping = {}
|
||||
while len(todo) > 0:
|
||||
next_round = []
|
||||
# if 'mm' is SCD, class_name will be 'Class', 'Association', ...
|
||||
for tup in todo:
|
||||
obj_name, obj, type_name, m_odapi = tup
|
||||
prefixed_obj_name = obj_name
|
||||
if obj_name in PRIMITIVE_TYPES:
|
||||
if prefixed_obj_name in have:
|
||||
# Don't rename primitive types. Instead, merge them.
|
||||
mapping[obj] = mapping[have[prefixed_obj_name]]
|
||||
continue
|
||||
while prefixed_obj_name in have:
|
||||
prefixed_obj_name = prefixed_obj_name + '_bis' # make name unique
|
||||
if prefixed_obj_name != obj_name:
|
||||
print(f"Warning: renaming {obj_name} to {prefixed_obj_name} to avoid naming collision.")
|
||||
if type_name == "ModelRef":
|
||||
model = state.read_value(obj)
|
||||
scd = SCD(merged, state)
|
||||
created_obj = scd.create_model_ref(prefixed_obj_name, model)
|
||||
merged_odapi._ODAPI__recompute_mappings() # dirty!!
|
||||
else:
|
||||
# create node or edge
|
||||
if state.is_edge(obj):
|
||||
source, target = state.read_edge(obj)
|
||||
if source not in mapping or target not in mapping:
|
||||
next_round.append(tup)
|
||||
continue # try again later...
|
||||
else:
|
||||
created_obj = merged_odapi.create_link(prefixed_obj_name, type_name, mapping[source], mapping[target])
|
||||
else:
|
||||
created_obj = merged_odapi.create_object(prefixed_obj_name, type_name)
|
||||
mapping[obj] = created_obj
|
||||
have[obj_name] = obj
|
||||
# copy slots
|
||||
for attr_name in m_odapi.get_slots(obj):
|
||||
value = m_odapi.get_slot_value(obj, attr_name)
|
||||
is_code = m_odapi.slot_has_code(obj, attr_name)
|
||||
merged_odapi.set_slot_value(created_obj, attr_name, value, is_code=is_code)
|
||||
if len(next_round) == len(todo):
|
||||
raise Exception("We got stuck!")
|
||||
todo = next_round
|
||||
|
||||
return merged
|
||||
|
|
@ -1,3 +1,5 @@
|
|||
from concrete_syntax.textual_od.renderer import render_od
|
||||
|
||||
import pprint
|
||||
from typing import Generator, Callable
|
||||
from uuid import UUID
|
||||
|
|
@ -93,8 +95,18 @@ class RuleMatcherRewriter:
|
|||
e.add_note(f"while matching LHS of '{rule_name}'")
|
||||
raise
|
||||
|
||||
def exec_rule(self, m: UUID, lhs: UUID, rhs: UUID, lhs_match: dict, rule_name: str):
|
||||
cloned_m = clone_od(self.state, m, self.mm)
|
||||
def exec_rule(self, m: UUID, lhs: UUID, rhs: UUID, lhs_match: dict, rule_name: str, in_place=False):
|
||||
if in_place:
|
||||
# dangerous
|
||||
cloned_m = m
|
||||
else:
|
||||
cloned_m = clone_od(self.state, m, self.mm)
|
||||
|
||||
# print('before clone:')
|
||||
# print(render_od(self.state, m, self.mm))
|
||||
# print('after clone:')
|
||||
# print(render_od(self.state, cloned_m, self.mm))
|
||||
|
||||
try:
|
||||
rhs_match = rewrite(self.state,
|
||||
lhs_m=lhs,
|
||||
|
|
|
|||
|
|
@ -4,5 +4,8 @@
|
|||
condition = ```
|
||||
top = create_object("Top", "Class")
|
||||
set_slot_value(top, "abstract", True)
|
||||
lnk = create_link("generic_link", "Association", top, top)
|
||||
# lnk also inherits top:
|
||||
create_link(None, "Inheritance", lnk, top)
|
||||
```;
|
||||
}
|
||||
|
|
@ -2,41 +2,51 @@ from uuid import UUID
|
|||
from transformation.rule import RuleMatcherRewriter
|
||||
from transformation.ramify import ramify
|
||||
from util.loader import load_rules
|
||||
from util.timer import Timer
|
||||
from concrete_syntax.textual_od.renderer import render_od
|
||||
|
||||
import os
|
||||
THIS_DIR = os.path.dirname(__file__)
|
||||
|
||||
# Given a class diagram, extend it (in-place) with a "Top"-type, i.e., an (abstract) supertype of all types. The set of instances of the "Top" is always the set of all objects in the diagram.
|
||||
def topify_cd(state, cd: UUID):
|
||||
# meta-meta-model
|
||||
scd_mmm = UUID(state.read_value(state.read_dict(state.read_root(), "SCD")))
|
||||
class Topifier:
|
||||
def __init__(self, state):
|
||||
self.state = state
|
||||
# meta-meta-model
|
||||
self.scd_mmm = UUID(state.read_value(state.read_dict(state.read_root(), "SCD")))
|
||||
self.scd_mmm_ramified = ramify(state, self.scd_mmm)
|
||||
self.matcher_rewriter = RuleMatcherRewriter(state, self.scd_mmm, self.scd_mmm_ramified)
|
||||
|
||||
scd_mmm_ramified = ramify(state, scd_mmm)
|
||||
# topification is implemented via model transformation
|
||||
self.rules = load_rules(state,
|
||||
lambda rule_name, kind: f"{THIS_DIR}/rules/r_{rule_name}_{kind}.od",
|
||||
self.scd_mmm_ramified, ["create_top", "create_inheritance"],
|
||||
check_conformance=False,
|
||||
)
|
||||
|
||||
matcher_rewriter = RuleMatcherRewriter(state, scd_mmm, scd_mmm_ramified)
|
||||
|
||||
# topification is implemented via model transformation
|
||||
rules = load_rules(state,
|
||||
lambda rule_name, kind: f"{THIS_DIR}/rules/r_{rule_name}_{kind}.od",
|
||||
scd_mmm_ramified, ["create_top", "create_inheritance"])
|
||||
# Given a class diagram, extend it with a "Top"-type, i.e., an (abstract) supertype of all types. The set of instances of the "Top" is always the set of all objects in the diagram.
|
||||
def topify_cd(self, cd: UUID):
|
||||
with Timer("topify_cd"):
|
||||
# 1. Execute rule 'create_top' once
|
||||
rule = self.rules["create_top"]
|
||||
match_set = list(self.matcher_rewriter.match_rule(cd, rule.lhs, rule.nacs, "create_top"))
|
||||
if len(match_set) != 1:
|
||||
raise Exception(f"Expected rule 'create_top' to match only once, instead got {len(match_set)} matches")
|
||||
lhs_match = match_set[0]
|
||||
cd, rhs_match = self.matcher_rewriter.exec_rule(cd, rule.lhs, rule.rhs, lhs_match, "create_top")
|
||||
|
||||
# 1. Execute rule 'create_top' once
|
||||
rule = rules["create_top"]
|
||||
match_set = list(matcher_rewriter.match_rule(cd, rule.lhs, rule.nacs, "create_top"))
|
||||
if len(match_set) != 1:
|
||||
raise Exception(f"Expected rule 'create_top' to match only once, instead got {len(match_set)} matches")
|
||||
lhs_match = match_set[0]
|
||||
cd, rhs_match = matcher_rewriter.exec_rule(cd, rule.lhs, rule.rhs, lhs_match, "create_top")
|
||||
# 2. Execute rule 'create_inheritance' as many times as possible
|
||||
rule = self.rules["create_inheritance"]
|
||||
|
||||
# 2. Execute rule 'create_inheritance' as many times as possible
|
||||
rule = rules["create_inheritance"]
|
||||
while True:
|
||||
iterator = matcher_rewriter.match_rule(cd, rule.lhs, rule.nacs, "create_inheritance")
|
||||
# find first match, and re-start matching
|
||||
try:
|
||||
lhs_match = iterator.__next__() # may throw StopIteration
|
||||
cd, rhs_match = matcher_rewriter.exec_rule(cd, rule.lhs, rule.rhs, lhs_match, "create_inheritance")
|
||||
except StopIteration:
|
||||
break # no more matches
|
||||
# for match in self.matcher_rewriter.match_rule(cd, rule.lhs, rule.nacs, "create_inheritance"):
|
||||
# self.matcher_rewriter.exec_rule(cd, rule.lhs, rule.rhs, match, "create_inheritance", in_place=True)
|
||||
# render_od(self.state, cd, self.scd_mmm)
|
||||
|
||||
return cd
|
||||
while True:
|
||||
iterator = self.matcher_rewriter.match_rule(cd, rule.lhs, rule.nacs, "create_inheritance")
|
||||
# find first match, and re-start matching
|
||||
try:
|
||||
lhs_match = iterator.__next__() # may throw StopIteration
|
||||
cd, rhs_match = self.matcher_rewriter.exec_rule(cd, rule.lhs, rule.rhs, lhs_match, "create_inheritance")
|
||||
except StopIteration:
|
||||
break # no more matches
|
||||
return cd
|
||||
Loading…
Add table
Add a link
Reference in a new issue