Rewriter: deleting elements works

This commit is contained in:
Joeri Exelmans 2024-09-11 11:04:44 +02:00
parent 99bcf9a542
commit f12fd8bd6c
5 changed files with 157 additions and 120 deletions

View file

@ -7,6 +7,7 @@ from services.scd import SCD
from framework.conformance import Conformance from framework.conformance import Conformance
from services.od import OD from services.od import OD
from transformation.ramify import ramify from transformation.ramify import ramify
from transformation import rewriter
from services.bottom.V0 import Bottom from services.bottom.V0 import Bottom
from services.primitives.integer_type import Integer from services.primitives.integer_type import Integer
from pattern_matching import mvs_adapter from pattern_matching import mvs_adapter
@ -23,119 +24,108 @@ def create_integer_node(state, i: int):
def main(): def main():
state = DevState() state = DevState()
root = state.read_root() # id: 0 root = state.read_root() # id: 0
scd_node = bootstrap_scd(state)
scd_node2 = state.read_dict(root, "SCD")
# print(root, scd_node, scd_node2)
def print_tree(root, max_depth, depth=0): scd_mm_id = bootstrap_scd(state)
print(" "*depth, "root=", root, "value=", state.read_value(root)) int_mm_id = UUID(state.read_value(state.read_dict(state.read_root(), "Integer")))
src,tgt = state.read_edge(root) string_mm_id = UUID(state.read_value(state.read_dict(state.read_root(), "String")))
if src != None:
print(" "*depth, "src...")
print_tree(src, max_depth, depth+1)
if tgt != None:
print(" "*depth, "tgt...")
print_tree(tgt, max_depth, depth+1)
for edge in state.read_outgoing(root):
for edge_label in state.read_outgoing(edge):
[_,tgt] = state.read_edge(edge_label)
label = state.read_value(tgt)
print(" "*depth, " key:", label)
[_, tgt] = state.read_edge(edge)
value = state.read_value(tgt)
if value != None:
print(" "*depth, " ->", tgt, " (value:", value, ")")
else:
print(" "*depth, " ->", tgt)
if depth < max_depth:
if isinstance(value, str) and len(value) == 36:
i = None
try:
i = UUID(value)
except ValueError as e:
# print("invalid UUID:", value)
pass
if i != None:
print_tree(i, max_depth, depth+1)
print_tree(tgt, max_depth, depth+1)
print("explore...") # def print_tree(root, max_depth, depth=0):
# print_tree(root, 2) # print(" "*depth, "root=", root, "value=", state.read_value(root))
# src,tgt = state.read_edge(root)
# if src != None:
# print(" "*depth, "src...")
# print_tree(src, max_depth, depth+1)
# if tgt != None:
# print(" "*depth, "tgt...")
# print_tree(tgt, max_depth, depth+1)
# for edge in state.read_outgoing(root):
# for edge_label in state.read_outgoing(edge):
# [_,tgt] = state.read_edge(edge_label)
# label = state.read_value(tgt)
# print(" "*depth, " key:", label)
# [_, tgt] = state.read_edge(edge)
# value = state.read_value(tgt)
# if value != None:
# print(" "*depth, " ->", tgt, " (value:", value, ")")
# else:
# print(" "*depth, " ->", tgt)
# if depth < max_depth:
# if isinstance(value, str) and len(value) == 36:
# i = None
# try:
# i = UUID(value)
# except ValueError as e:
# # print("invalid UUID:", value)
# pass
# if i != None:
# print_tree(i, max_depth, depth+1)
# print_tree(tgt, max_depth, depth+1)
int_type_id = state.read_dict(state.read_root(), "Integer") # Meta-model for our DSL
int_type = UUID(state.read_value(int_type_id)) dsl_mm_id = state.create_node()
dsl_mm_scd = SCD(dsl_mm_id, state)
string_type_id = state.read_dict(state.read_root(), "String") dsl_mm_scd.create_class("Animal", abstract=True)
string_type = UUID(state.read_value(string_type_id)) dsl_mm_scd.create_class("Man", min_c=1, max_c=2)
dsl_mm_scd.create_inheritance("Man", "Animal")
# scd2 = SCD(scd_node, state) dsl_mm_scd.create_model_ref("Integer", int_mm_id)
# for el in scd2.list_elements(): dsl_mm_scd.create_attribute_link("Man", "Integer", "weight", optional=False)
# print(el) dsl_mm_scd.create_class("Bear")
dsl_mm_scd.create_inheritance("Bear", "Animal")
dsl_mm_scd.create_association("afraidOf", "Man", "Animal",
model_id = state.create_node() # Every Man afraid of at least one Animal:
scd = SCD(model_id, state) src_min_c=0,
scd.create_class("Abstract", abstract=True) src_max_c=None,
scd.create_class("A", min_c=1, max_c=2)
scd.create_inheritance("A", "Abstract")
scd.create_model_ref("Integer", int_type)
scd.create_attribute_link("A", "Integer", "size", False)
scd.create_class("B")
scd.create_association("A2B", "A", "B",
src_min_c=1,
src_max_c=1,
tgt_min_c=1, tgt_min_c=1,
tgt_max_c=2, tgt_max_c=None,
) )
# print_tree(model_id, 3) conf = Conformance(state, dsl_mm_id, scd_mm_id)
print("conforms?", conf.check_nominal(log=True))
# Model in our DSL
dsl_m_id = state.create_node()
dsl_m_od = OD(dsl_mm_id, dsl_m_id, state)
conf = Conformance(state, model_id, scd_node) dsl_m_od.create_object("george", "Man")
print("Check nominal conformance...") dsl_m_od.create_object("bear1", "Bear")
print(conf.check_nominal(log=True)) dsl_m_od.create_object("bear2", "Bear")
# print("Check structural conformance...") dsl_m_od.create_link("georgeAfraidOfBear1", "afraidOf", "george", "bear1")
# print(conf.check_structural(log=True)) dsl_m_od.create_link("georgeAfraidOfBear2", "afraidOf", "george", "bear2")
# print("Check nominal conformance (again)...")
# print(conf.check_nominal(log=True))
inst_id = state.create_node() dsl_m_od.create_slot("weight", "george",
od = OD(model_id, inst_id, state) dsl_m_od.create_integer_value("george.weight", 80))
od.create_object("a", "A") conf2 = Conformance(state, dsl_m_id, dsl_mm_id)
od.create_object("a2", "A") print("Model conforms?", conf2.check_nominal(log=True))
od.create_object("b", "B")
od.create_link("A2B", "a", "b")
od.create_link("A2B", "a2", "b")
od.create_slot("size", "a", od.create_integer_value("a.size", 42)) # RAMify MM
od.create_slot("size", "a2", od.create_integer_value("a2.size", 50)) ramified_mm_id = ramify(state, dsl_mm_id)
print("checking conformance....") # LHS of our rule
conf2 = Conformance(state, inst_id, model_id) lhs_id = state.create_node()
print("conforms?", conf2.check_nominal(log=True)) lhs_od = OD(ramified_mm_id, lhs_id, state)
ramified_MM_id = ramify(state, model_id) lhs_od.create_object("man", "RAM_Man")
lhs_od.create_slot("RAM_weight", "man", lhs_od.create_string_value("man.RAM_weight", 'v < 99'))
lhs_od.create_object("scaryAnimal", "RAM_Animal")
lhs_od.create_link("manAfraidOfAnimal", "RAM_afraidOf", "man", "scaryAnimal")
pattern_id = state.create_node() conf3 = Conformance(state, lhs_id, ramified_mm_id)
pattern = OD(ramified_MM_id, pattern_id, state) print("LHS conforms?", conf3.check_nominal(log=True))
pattern.create_object("pattern_a", "LHS_A") # RHS of our rule
# pattern.create_slot("constraint", "a1", pattern.create_string_value("a1.constraint", 'read_int(get_slot("LHS_size")) > 50')) rhs_id = state.create_node()
pattern.create_slot("LHS_size", "pattern_a", pattern.create_string_value("pattern_a.LHS_size", 'v < 99')) rhs_od = OD(ramified_mm_id, rhs_id, state)
# pattern.create_object("a2", "A")
# pattern.create_slot("size", "a2", pattern.create_string_value("a2.size", '99'))
pattern.create_object("pattern_b", "LHS_B") rhs_od.create_object("man", "RAM_Man")
rhs_od.create_slot("RAM_weight", "man", rhs_od.create_string_value("man.RAM_weight", 'v + 5'))
pattern.create_link("LHS_A2B", "pattern_a", "pattern_b") conf4 = Conformance(state, rhs_id, ramified_mm_id)
print("RHS conforms?", conf4.check_nominal(log=True))
# Convert to format understood by matching algorithm
conf3 = Conformance(state, pattern_id, ramified_MM_id) host = mvs_adapter.model_to_graph(state, dsl_m_id, dsl_mm_id)
print("conforms?", conf3.check_nominal(log=True)) guest = mvs_adapter.model_to_graph(state, lhs_id, ramified_mm_id)
host = mvs_adapter.model_to_graph(state, inst_id, model_id)
guest = mvs_adapter.model_to_graph(state, pattern_id, ramified_MM_id)
print("HOST:") print("HOST:")
print(host.vtxs) print(host.vtxs)
@ -146,7 +136,7 @@ def main():
print(guest.edges) print(guest.edges)
print("matching...") print("matching...")
matcher = MatcherVF2(host, guest, mvs_adapter.RAMCompare(Bottom(state), od)) matcher = MatcherVF2(host, guest, mvs_adapter.RAMCompare(Bottom(state), dsl_m_od))
prev = None prev = None
for m in matcher.match(): for m in matcher.match():
print("\nMATCH:\n", m) print("\nMATCH:\n", m)
@ -155,8 +145,12 @@ def main():
if isinstance(guest_vtx, mvs_adapter.NamedNode) and isinstance(host_vtx, mvs_adapter.NamedNode): if isinstance(guest_vtx, mvs_adapter.NamedNode) and isinstance(host_vtx, mvs_adapter.NamedNode):
name_to_matched[guest_vtx.name] = host_vtx.name name_to_matched[guest_vtx.name] = host_vtx.name
print(name_to_matched) print(name_to_matched)
input() rewriter.rewrite(state, lhs_id, rhs_id, name_to_matched, dsl_m_id)
break
print("DONE") print("DONE")
conf5 = Conformance(state, dsl_m_id, dsl_mm_id)
print("Updated model conforms?", conf5.check_nominal(log=True))
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View file

@ -4,6 +4,7 @@ from services.bottom.V0 import Bottom
from services.scd import SCD from services.scd import SCD
from services.od import OD from services.od import OD
from pattern_matching.matcher import Graph, Edge, Vertex from pattern_matching.matcher import Graph, Edge, Vertex
from transformation import ramify
import itertools import itertools
import re import re
import functools import functools
@ -216,16 +217,13 @@ class RAMCompare:
def match_types(self, g_vtx_type, h_vtx_type): def match_types(self, g_vtx_type, h_vtx_type):
# types only match with their supertypes # types only match with their supertypes
# we assume that 'RAMifies'-traceability links have been created between guest and host types # we assume that 'RAMifies'-traceability links have been created between guest and host types
g_vtx_original_types = self.bottom.read_outgoing_elements(g_vtx_type, "RAMifies") try:
for typ in g_vtx_original_types: g_vtx_original_type = ramify.get_original_type(self.bottom, g_vtx_type)
# print(g_vtx, "is ramified") except:
result = self.is_subtype_of(h_vtx_type, g_vtx_original_types[0])
if result:
return True
else:
# print(g_vtx, "is not ramified")
return False return False
return self.is_subtype_of(h_vtx_type, g_vtx_original_type)
# Memoizing the result of comparison gives a huge performance boost! # Memoizing the result of comparison gives a huge performance boost!
# Especially `is_subtype_of` is very slow, and will be performed many times over on the same pair of nodes during the matching process. # Especially `is_subtype_of` is very slow, and will be performed many times over on the same pair of nodes during the matching process.

View file

@ -3,6 +3,7 @@ from state.base import State
from services.bottom.V0 import Bottom from services.bottom.V0 import Bottom
from services.primitives.integer_type import Integer from services.primitives.integer_type import Integer
from services.primitives.string_type import String from services.primitives.string_type import String
from typing import Optional
def get_attr_link_name(class_name: str, attr_name: str): def get_attr_link_name(class_name: str, attr_name: str):
return f"{class_name}_{attr_name}" return f"{class_name}_{attr_name}"
@ -37,8 +38,6 @@ class OD:
# 'abstract' is optional attribute, default is False # 'abstract' is optional attribute, default is False
is_abstract = False is_abstract = False
print("class", name, "is abstract?", is_abstract)
if is_abstract: if is_abstract:
raise Exception("Cannot instantiate abstract class!") raise Exception("Cannot instantiate abstract class!")
@ -63,7 +62,7 @@ class OD:
class_name = self.get_class_of_object(object_name) class_name = self.get_class_of_object(object_name)
attr_link_name = get_attr_link_name(class_name, attr_name) attr_link_name = get_attr_link_name(class_name, attr_name)
# An attribute-link is indistinguishable from an ordinary link: # An attribute-link is indistinguishable from an ordinary link:
return self.create_link(attr_link_name, object_name, target_name) return self.create_link(None, attr_link_name, object_name, target_name)
def get_slot(self, object_node: UUID, attr_name: str): def get_slot(self, object_node: UUID, attr_name: str):
# I really don't like how complex and inefficient it is to read an attribute of an object... # I really don't like how complex and inefficient it is to read an attribute of an object...
@ -106,13 +105,14 @@ class OD:
self.bottom.create_edge(element_node, type_node, "Morphism") # create morphism link self.bottom.create_edge(element_node, type_node, "Morphism") # create morphism link
def create_link(self, assoc_name: str, src_obj_name: str, tgt_obj_name: str): def create_link(self, link_name: Optional[str], assoc_name: str, src_obj_name: str, tgt_obj_name: str):
src_obj_node, = self.bottom.read_outgoing_elements(self.model, src_obj_name) src_obj_node, = self.bottom.read_outgoing_elements(self.model, src_obj_name)
tgt_obj_node, = self.bottom.read_outgoing_elements(self.model, tgt_obj_name) tgt_obj_node, = self.bottom.read_outgoing_elements(self.model, tgt_obj_name)
link_edge = self.bottom.create_edge(src_obj_node, tgt_obj_node) link_edge = self.bottom.create_edge(src_obj_node, tgt_obj_node)
# generate a unique name for the link # generate a unique name for the link
if link_name == None:
i = 0; i = 0;
while True: while True:
link_name = f"{assoc_name}{i}" link_name = f"{assoc_name}{i}"

View file

@ -4,8 +4,9 @@ from services.bottom.V0 import Bottom
from services.scd import SCD from services.scd import SCD
from framework.conformance import Conformance from framework.conformance import Conformance
RAMIFIES_LABEL = "RAMifies"
def ramify(state: State, model: UUID, prefix = "LHS_") -> UUID: def ramify(state: State, model: UUID, prefix = "RAM_") -> UUID:
# def print_tree(root, max_depth, depth=0): # def print_tree(root, max_depth, depth=0):
# print(" "*depth, "root=", root, "value=", state.read_value(root)) # print(" "*depth, "root=", root, "value=", state.read_value(root))
@ -119,7 +120,7 @@ def ramify(state: State, model: UUID, prefix = "LHS_") -> UUID:
print('creating class', class_name, "with card 0 ..", upper_card) print('creating class', class_name, "with card 0 ..", upper_card)
ramified_class = ramified_scd.create_class(prefix+class_name, abstract=None, max_c=upper_card) ramified_class = ramified_scd.create_class(prefix+class_name, abstract=None, max_c=upper_card)
# traceability link # traceability link
bottom.create_edge(ramified_class, class_node, "RAMifies") bottom.create_edge(ramified_class, class_node, RAMIFIES_LABEL)
# We don't add a 'label' attribute (as described in literature on RAMification) # We don't add a 'label' attribute (as described in literature on RAMification)
# Instead, the names of the objects (which only exist in the scope of the object diagram 'model', and are not visible to the matcher) are used as labels # Instead, the names of the objects (which only exist in the scope of the object diagram 'model', and are not visible to the matcher) are used as labels
@ -133,7 +134,7 @@ def ramify(state: State, model: UUID, prefix = "LHS_") -> UUID:
# The string will be a Python expression # The string will be a Python expression
ramified_attr_link = ramified_scd._create_attribute_link(prefix+class_name, string_modelref, prefix+attr_name, optional=True) ramified_attr_link = ramified_scd._create_attribute_link(prefix+class_name, string_modelref, prefix+attr_name, optional=True)
# traceability link # traceability link
bottom.create_edge(ramified_attr_link, attr_edge, "RAMifies") bottom.create_edge(ramified_attr_link, attr_edge, RAMIFIES_LABEL)
associations = scd.get_associations() associations = scd.get_associations()
for assoc_name, assoc_node in associations.items(): for assoc_name, assoc_node in associations.items():
@ -152,7 +153,7 @@ def ramify(state: State, model: UUID, prefix = "LHS_") -> UUID:
src_max_c=src_upper_card, src_max_c=src_upper_card,
tgt_max_c=tgt_upper_card) tgt_max_c=tgt_upper_card)
# traceability link # traceability link
bottom.create_edge(ramified_assoc, assoc_node, "RAMifies") bottom.create_edge(ramified_assoc, assoc_node, RAMIFIES_LABEL)
for inh_name, inh_node in scd.get_inheritances().items(): for inh_name, inh_node in scd.get_inheritances().items():
# Re-create inheritance links like in our original model: # Re-create inheritance links like in our original model:
@ -169,3 +170,11 @@ def ramify(state: State, model: UUID, prefix = "LHS_") -> UUID:
print("RAMification successful.") print("RAMification successful.")
return ramified return ramified
# Every RAMified type has a link to its original type
def get_original_type(bottom, typ: UUID):
original_types = bottom.read_outgoing_elements(typ, RAMIFIES_LABEL)
if len(original_types) != 1:
raise Exception("Expected 1 original type, got " + str(len(original_types)))
else:
return original_types[0]

View file

@ -0,0 +1,36 @@
# Things you can do:
# - Create/delete objects, associations, attributes
# - Change attribute values
# - ? that's it?
from uuid import UUID
from services.bottom.V0 import Bottom
def process_rule(state, lhs: UUID, rhs: UUID):
bottom = Bottom(state)
# : bottom.read_outgoing_elements(rhs, name)[0]
to_delete = { name for name in bottom.read_keys(lhs) if name not in bottom.read_keys(rhs) }
to_create = { name for name in bottom.read_keys(rhs) if name not in bottom.read_keys(lhs) }
print("to_delete:", to_delete)
print("to_create:", to_create)
return to_delete, to_create
def rewrite(state, lhs: UUID, rhs: UUID, match_mapping: dict, model_to_transform: UUID) -> UUID:
bottom = Bottom(state)
to_delete, to_create = process_rule(state, lhs, rhs)
for pattern_name_to_delete in to_delete:
# For every name in `to_delete`, look up the name of the matched element in the host graph
model_element_name_to_delete = match_mapping[pattern_name_to_delete]
print('deleting', model_element_name_to_delete)
# Look up the matched element in the host graph
element_to_delete, = bottom.read_outgoing_elements(model_to_transform, model_element_name_to_delete)
# Delete
bottom.delete_element(element_to_delete)
for pattern_name_to_create in to_create:
pass