Abstract away Object Diagram pattern matching in a function that does all the magic
This commit is contained in:
parent
13f19cdec6
commit
f45872d3f7
3 changed files with 89 additions and 101 deletions
|
|
@ -137,18 +137,6 @@ def main():
|
||||||
conf4 = Conformance(state, rhs_id, ramified_mm_id)
|
conf4 = Conformance(state, rhs_id, ramified_mm_id)
|
||||||
print("RHS conforms?", conf4.check_nominal(log=True))
|
print("RHS conforms?", conf4.check_nominal(log=True))
|
||||||
|
|
||||||
# Convert to format understood by matching algorithm
|
|
||||||
host = mvs_adapter.model_to_graph(state, dsl_m_id, dsl_mm_id)
|
|
||||||
guest = mvs_adapter.model_to_graph(state, lhs_id, ramified_mm_id)
|
|
||||||
|
|
||||||
print("HOST:")
|
|
||||||
print(host.vtxs)
|
|
||||||
print(host.edges)
|
|
||||||
|
|
||||||
print("GUEST:")
|
|
||||||
print(guest.vtxs)
|
|
||||||
print(guest.edges)
|
|
||||||
|
|
||||||
def render_ramification():
|
def render_ramification():
|
||||||
uml = (""
|
uml = (""
|
||||||
# Render original and RAMified meta-models
|
# Render original and RAMified meta-models
|
||||||
|
|
@ -177,16 +165,9 @@ def main():
|
||||||
uml += plantuml.render_trace_conformance(state, dsl_m_id, dsl_mm_id)
|
uml += plantuml.render_trace_conformance(state, dsl_m_id, dsl_mm_id)
|
||||||
|
|
||||||
print("matching...")
|
print("matching...")
|
||||||
matcher = MatcherVF2(host, guest, mvs_adapter.RAMCompare(Bottom(state), dsl_m_od))
|
generator = mvs_adapter.match_od(state, dsl_m_id, dsl_mm_id, lhs_id, ramified_mm_id)
|
||||||
for m, color in zip(matcher.match(), ["red", "orange"]):
|
for name_mapping, color in zip(generator, ["red", "orange"]):
|
||||||
print("\nMATCH:\n", m)
|
print("\nMATCH:\n", name_mapping)
|
||||||
name_mapping = {}
|
|
||||||
# id_mapping = {}
|
|
||||||
for guest_vtx, host_vtx in m.mapping_vtxs.items():
|
|
||||||
if isinstance(guest_vtx, mvs_adapter.NamedNode) and isinstance(host_vtx, mvs_adapter.NamedNode):
|
|
||||||
# id_mapping[guest_vtx.node_id] = host_vtx.node_id
|
|
||||||
name_mapping[guest_vtx.name] = host_vtx.name
|
|
||||||
print(name_mapping)
|
|
||||||
|
|
||||||
# Render every match
|
# Render every match
|
||||||
uml += plantuml.render_trace_match(state, name_mapping, lhs_id, dsl_m_id, color)
|
uml += plantuml.render_trace_match(state, name_mapping, lhs_id, dsl_m_id, color)
|
||||||
|
|
@ -197,17 +178,8 @@ def main():
|
||||||
def render_rewrite():
|
def render_rewrite():
|
||||||
uml = render_ramification()
|
uml = render_ramification()
|
||||||
|
|
||||||
matcher = MatcherVF2(host, guest, mvs_adapter.RAMCompare(Bottom(state), dsl_m_od))
|
generator = mvs_adapter.match_od(state, dsl_m_id, dsl_mm_id, lhs_id, ramified_mm_id)
|
||||||
for m in matcher.match():
|
for name_mapping in generator:
|
||||||
name_mapping = {}
|
|
||||||
# id_mapping = {}
|
|
||||||
for guest_vtx, host_vtx in m.mapping_vtxs.items():
|
|
||||||
if isinstance(guest_vtx, mvs_adapter.NamedNode) and isinstance(host_vtx, mvs_adapter.NamedNode):
|
|
||||||
# id_mapping[guest_vtx.node_id] = host_vtx.node_id
|
|
||||||
name_mapping[guest_vtx.name] = host_vtx.name
|
|
||||||
print(name_mapping)
|
|
||||||
|
|
||||||
|
|
||||||
rewriter.rewrite(state, lhs_id, rhs_id, ramified_mm_id, name_mapping, dsl_m_id, dsl_mm_id)
|
rewriter.rewrite(state, lhs_id, rhs_id, ramified_mm_id, name_mapping, dsl_m_id, dsl_mm_id)
|
||||||
|
|
||||||
# Render match
|
# Render match
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ from uuid import UUID
|
||||||
from services.bottom.V0 import Bottom
|
from services.bottom.V0 import Bottom
|
||||||
from services.scd import SCD
|
from services.scd import SCD
|
||||||
from services.od import OD
|
from services.od import OD
|
||||||
from pattern_matching.matcher import Graph, Edge, Vertex
|
from pattern_matching.matcher import Graph, Edge, Vertex, MatcherVF2
|
||||||
from transformation import ramify
|
from transformation import ramify
|
||||||
import itertools
|
import itertools
|
||||||
import re
|
import re
|
||||||
|
|
@ -188,87 +188,103 @@ def model_to_graph(state: State, model: UUID, metamodel: UUID, prefix=""):
|
||||||
|
|
||||||
return graph
|
return graph
|
||||||
|
|
||||||
# Function object for pattern matching. Decides whether to match host and guest vertices, where guest is a RAMified instance (e.g., the attributes are all strings with Python expressions), and the host is an instance (=object diagram) of the original model (=class diagram)
|
|
||||||
class RAMCompare:
|
|
||||||
def __init__(self, bottom, host_od):
|
|
||||||
self.bottom = bottom
|
|
||||||
self.host_od = host_od
|
|
||||||
|
|
||||||
type_model_id = bottom.state.read_dict(bottom.state.read_root(), "SCD")
|
def match_od(state, host_m, host_mm, pattern_m, pattern_mm):
|
||||||
self.scd_model = UUID(bottom.state.read_value(type_model_id))
|
# Function object for pattern matching. Decides whether to match host and guest vertices, where guest is a RAMified instance (e.g., the attributes are all strings with Python expressions), and the host is an instance (=object diagram) of the original model (=class diagram)
|
||||||
|
class RAMCompare:
|
||||||
|
def __init__(self, bottom, host_od):
|
||||||
|
self.bottom = bottom
|
||||||
|
self.host_od = host_od
|
||||||
|
|
||||||
def is_subtype_of(self, supposed_subtype: UUID, supposed_supertype: UUID):
|
type_model_id = bottom.state.read_dict(bottom.state.read_root(), "SCD")
|
||||||
if supposed_subtype == supposed_supertype:
|
self.scd_model = UUID(bottom.state.read_value(type_model_id))
|
||||||
# reflexive:
|
|
||||||
return True
|
|
||||||
|
|
||||||
inheritance_node, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
|
def is_subtype_of(self, supposed_subtype: UUID, supposed_supertype: UUID):
|
||||||
|
if supposed_subtype == supposed_supertype:
|
||||||
|
# reflexive:
|
||||||
|
return True
|
||||||
|
|
||||||
for outgoing in self.bottom.read_outgoing_edges(supposed_subtype):
|
inheritance_node, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
|
||||||
if inheritance_node in self.bottom.read_outgoing_elements(outgoing, "Morphism"):
|
|
||||||
# 'outgoing' is an inheritance link
|
|
||||||
supertype = self.bottom.read_edge_target(outgoing)
|
|
||||||
if supertype != supposed_subtype:
|
|
||||||
if self.is_subtype_of(supertype, supposed_supertype):
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
for outgoing in self.bottom.read_outgoing_edges(supposed_subtype):
|
||||||
|
if inheritance_node in self.bottom.read_outgoing_elements(outgoing, "Morphism"):
|
||||||
|
# 'outgoing' is an inheritance link
|
||||||
|
supertype = self.bottom.read_edge_target(outgoing)
|
||||||
|
if supertype != supposed_subtype:
|
||||||
|
if self.is_subtype_of(supertype, supposed_supertype):
|
||||||
|
return True
|
||||||
|
|
||||||
def match_types(self, g_vtx_type, h_vtx_type):
|
|
||||||
# types only match with their supertypes
|
|
||||||
# we assume that 'RAMifies'-traceability links have been created between guest and host types
|
|
||||||
try:
|
|
||||||
g_vtx_original_type = ramify.get_original_type(self.bottom, g_vtx_type)
|
|
||||||
except:
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
return self.is_subtype_of(h_vtx_type, g_vtx_original_type)
|
def match_types(self, g_vtx_type, h_vtx_type):
|
||||||
|
# types only match with their supertypes
|
||||||
|
# we assume that 'RAMifies'-traceability links have been created between guest and host types
|
||||||
# Memoizing the result of comparison gives a huge performance boost!
|
try:
|
||||||
# Especially `is_subtype_of` is very slow, and will be performed many times over on the same pair of nodes during the matching process.
|
g_vtx_original_type = ramify.get_original_type(self.bottom, g_vtx_type)
|
||||||
# Assuming the model is not altered *during* matching, this is safe.
|
except:
|
||||||
@functools.cache
|
|
||||||
def __call__(self, g_vtx, h_vtx):
|
|
||||||
# First check if the types match (if we have type-information)
|
|
||||||
if hasattr(g_vtx, 'typ'):
|
|
||||||
if not hasattr(h_vtx, 'typ'):
|
|
||||||
# if guest has a type, host must have a type
|
|
||||||
return False
|
return False
|
||||||
return self.match_types(g_vtx.typ, h_vtx.typ)
|
|
||||||
|
|
||||||
# Then, match by value
|
return self.is_subtype_of(h_vtx_type, g_vtx_original_type)
|
||||||
|
|
||||||
if g_vtx.value == None:
|
|
||||||
return h_vtx.value == None
|
|
||||||
|
|
||||||
# mvs-edges (which are converted to vertices) only match with mvs-edges
|
# Memoizing the result of comparison gives a huge performance boost!
|
||||||
if g_vtx.value == IS_EDGE:
|
# Especially `is_subtype_of` is very slow, and will be performed many times over on the same pair of nodes during the matching process.
|
||||||
return h_vtx.value == IS_EDGE
|
# Assuming the model is not altered *during* matching, this is safe.
|
||||||
|
@functools.cache
|
||||||
|
def __call__(self, g_vtx, h_vtx):
|
||||||
|
# First check if the types match (if we have type-information)
|
||||||
|
if hasattr(g_vtx, 'typ'):
|
||||||
|
if not hasattr(h_vtx, 'typ'):
|
||||||
|
# if guest has a type, host must have a type
|
||||||
|
return False
|
||||||
|
return self.match_types(g_vtx.typ, h_vtx.typ)
|
||||||
|
|
||||||
if h_vtx.value == IS_EDGE:
|
# Then, match by value
|
||||||
return False
|
|
||||||
|
|
||||||
if g_vtx.value == IS_MODELREF:
|
if g_vtx.value == None:
|
||||||
return h_vtx.value == IS_MODELREF
|
return h_vtx.value == None
|
||||||
|
|
||||||
if h_vtx.value == IS_MODELREF:
|
# mvs-edges (which are converted to vertices) only match with mvs-edges
|
||||||
return False
|
if g_vtx.value == IS_EDGE:
|
||||||
|
return h_vtx.value == IS_EDGE
|
||||||
|
|
||||||
# print(g_vtx.value, h_vtx.value)
|
if h_vtx.value == IS_EDGE:
|
||||||
def get_slot(h_vtx, slot_name: str):
|
return False
|
||||||
slot_node = self.host_od.get_slot(h_vtx.node_id, slot_name)
|
|
||||||
return slot_node
|
|
||||||
|
|
||||||
def read_int(slot: UUID):
|
if g_vtx.value == IS_MODELREF:
|
||||||
i = Integer(slot, self.bottom.state)
|
return h_vtx.value == IS_MODELREF
|
||||||
return i.read()
|
|
||||||
|
|
||||||
try:
|
if h_vtx.value == IS_MODELREF:
|
||||||
return eval(g_vtx.value, {}, {
|
return False
|
||||||
'v': h_vtx.value,
|
|
||||||
'get_slot': functools.partial(get_slot, h_vtx),
|
# print(g_vtx.value, h_vtx.value)
|
||||||
'read_int': read_int,
|
def get_slot(h_vtx, slot_name: str):
|
||||||
})
|
slot_node = self.host_od.get_slot(h_vtx.node_id, slot_name)
|
||||||
except Exception as e:
|
return slot_node
|
||||||
return False
|
|
||||||
|
def read_int(slot: UUID):
|
||||||
|
i = Integer(slot, self.bottom.state)
|
||||||
|
return i.read()
|
||||||
|
|
||||||
|
try:
|
||||||
|
return eval(g_vtx.value, {}, {
|
||||||
|
'v': h_vtx.value,
|
||||||
|
'get_slot': functools.partial(get_slot, h_vtx),
|
||||||
|
'read_int': read_int,
|
||||||
|
})
|
||||||
|
except Exception as e:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Convert to format understood by matching algorithm
|
||||||
|
host = model_to_graph(state, host_m, host_mm)
|
||||||
|
guest = model_to_graph(state, pattern_m, pattern_mm)
|
||||||
|
|
||||||
|
matcher = MatcherVF2(host, guest, RAMCompare(Bottom(state), OD(host_mm, host_m, state)))
|
||||||
|
for m in matcher.match():
|
||||||
|
# print("\nMATCH:\n", m)
|
||||||
|
# Convert mapping
|
||||||
|
name_mapping = {}
|
||||||
|
for guest_vtx, host_vtx in m.mapping_vtxs.items():
|
||||||
|
if isinstance(guest_vtx, NamedNode) and isinstance(host_vtx, NamedNode):
|
||||||
|
name_mapping[guest_vtx.name] = host_vtx.name
|
||||||
|
yield name_mapping
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ def process_rule(state, lhs: UUID, rhs: UUID):
|
||||||
|
|
||||||
# Rewrite is performed in-place
|
# Rewrite is performed in-place
|
||||||
# Also updates the mapping in-place, to become RHS -> host
|
# Also updates the mapping in-place, to become RHS -> host
|
||||||
def rewrite(state, lhs: UUID, rhs: UUID, rhs_mm: UUID, match_mapping: dict, m_to_transform: UUID, mm: UUID) -> dict:
|
def rewrite(state, lhs: UUID, rhs: UUID, rhs_mm: UUID, match_mapping: dict, m_to_transform: UUID, mm: UUID):
|
||||||
bottom = Bottom(state)
|
bottom = Bottom(state)
|
||||||
|
|
||||||
scd_metamodel_id = state.read_dict(state.read_root(), "SCD")
|
scd_metamodel_id = state.read_dict(state.read_root(), "SCD")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue