From 6ce8a4ef5c2e716ec12b5ffe64637f2de687ef34 Mon Sep 17 00:00:00 2001 From: Andrei Bondarenko Date: Tue, 24 Aug 2021 17:32:21 +0200 Subject: [PATCH] Park structural conformance for now, finish constraints for nominal --- bootstrap/pn.py | 40 +++++++++++ bootstrap/scd.py | 3 + framework/conformance.py | 150 ++++++++++++++++++++++++++++++--------- services/pn.py | 127 +++++++++++++++++++++++++++++++++ 4 files changed, 285 insertions(+), 35 deletions(-) create mode 100644 bootstrap/pn.py create mode 100644 services/pn.py diff --git a/bootstrap/pn.py b/bootstrap/pn.py new file mode 100644 index 0000000..511e816 --- /dev/null +++ b/bootstrap/pn.py @@ -0,0 +1,40 @@ +from services.scd import SCD +from uuid import UUID +from state.base import State + + +def bootstrap_pn(state: State, model_name: str) -> UUID: + # Retrieve scd model + scd_id = state.read_dict(state.read_root(), "SCD") + scd = UUID(state.read_value(scd_id)) + # Retrieve refs to primitive type models + # # integer + int_type_id = state.read_dict(state.read_root(), "Integer") + int_type = UUID(state.read_value(int_type_id)) + # # string + str_type_id = state.read_dict(state.read_root(), "String") + str_type = UUID(state.read_value(str_type_id)) + # Create LTM_PN + model_uuid = state.create_node() + mcl_root_id = state.create_nodevalue(str(model_uuid)) + state.create_dict(state.read_root(), model_name, mcl_root_id) + service = SCD(scd, model_uuid, state) + # Create classes + service.create_class("P") + service.create_class("T") + # Create associations + service.create_association("P2T", "P", "T") + service.create_association("T2P", "T", "P") + # Create model refs + service.create_model_ref("Integer", int_type) + service.create_model_ref("String", str_type) + # Create class attributes + service.create_attribute_link("P", "Integer", "t", False) + service.create_attribute_link("P", "String", "n", False) + service.create_attribute_link("T", "String", "n", False) + # Create association attributes + service.create_attribute_link("P2T", "Integer", "w", False) + service.create_attribute_link("T2P", "Integer", "w", False) + # Create test constraint + service.add_constraint("P", "print(element)\nreturn True") + return model_uuid diff --git a/bootstrap/scd.py b/bootstrap/scd.py index 34fe1d3..30a42f3 100644 --- a/bootstrap/scd.py +++ b/bootstrap/scd.py @@ -74,6 +74,8 @@ def bootstrap_scd(state: State) -> UUID: # # INHERITANCES, i.e. elements typed by Inheritance # # Class inherits from Element add_edge_element("class_inh_element", class_node, element_node) + # # GlobalConstraint inherits from Element + add_edge_element("gc_inh_element", glob_constr_node, element_node) # # Attribute inherits from Element add_edge_element("attr_inh_element", attr_node, element_node) # # Association inherits from Element @@ -186,6 +188,7 @@ def bootstrap_scd(state: State) -> UUID: add_mcl_morphism("AttributeLink", "Association") # Inheritance add_mcl_morphism("class_inh_element", "Inheritance") + add_mcl_morphism("gc_inh_element", "Inheritance") add_mcl_morphism("attr_inh_element", "Inheritance") add_mcl_morphism("assoc_inh_element", "Inheritance") add_mcl_morphism("attr_link_inh_element", "Inheritance") diff --git a/framework/conformance.py b/framework/conformance.py index 042aedb..3bcb999 100644 --- a/framework/conformance.py +++ b/framework/conformance.py @@ -31,6 +31,9 @@ class Conformance: self.multiplicities: Dict[str, Tuple] = {} self.source_multiplicities: Dict[str, Tuple] = {} self.target_multiplicities: Dict[str, Tuple] = {} + self.structures = {} + self.matches = {} + self.candidates = {} def check_nominal(self): steps = [ @@ -311,38 +314,103 @@ class Conformance: print(code) return True + def precompute_structures(self): + self.precompute_sub_types() + scd_elements = self.bottom.read_outgoing_elements(self.scd_model) + # collect types + class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class") + association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association") + for tm_element, tm_name in self.type_model_names.items(): + # retrieve elements that tm_element is a morphism of + morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") + morphism, = [m for m in morphisms if m in scd_elements] + # check if tm_element is a morphism of AttributeLink + if class_element == morphism or association_element == morphism: + self.structures[tm_name] = set() + # collect type structures + # retrieve AttributeLink to check whether element is a morphism of AttributeLink + attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink") + for tm_element, tm_name in self.type_model_names.items(): + # retrieve elements that tm_element is a morphism of + morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") + morphism, = [m for m in morphisms if m in scd_elements] + # check if tm_element is a morphism of AttributeLink + if attr_link_element == morphism: + # retrieve attributes of attribute link, i.e. 'name' and 'optional' + attrs = self.bottom.read_outgoing_elements(tm_element) + name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs) + opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs) + # get attr name value + name_model = UUID(self.bottom.read_value(name_model_node)) + name_node, = self.bottom.read_outgoing_elements(name_model) + name = self.bottom.read_value(name_node) + # get attr opt value + opt_model = UUID(self.bottom.read_value(opt_model_node)) + opt_node, = self.bottom.read_outgoing_elements(opt_model) + opt = self.bottom.read_value(opt_node) + # get attr type name + source_type_node = self.bottom.read_edge_source(tm_element) + source_type_name = self.type_model_names[source_type_node] + target_type_node = self.bottom.read_edge_target(tm_element) + target_type_name = self.type_model_names[target_type_node] + # add attribute to the structure of its source type + # attribute is stored as a (name, optional, type) triple + self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name)) + # extend structures of sub types with attrs of super types + for super_type, sub_types in self.sub_types.items(): + for sub_type in sub_types: + self.structures.setdefault(sub_type, set()).update(self.structures[super_type]) + # filter out abstract types, as they cannot be instantiated + # retrieve Class_abstract to check whether element is a morphism of Class_abstract + class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract") + for tm_element, tm_name in self.type_model_names.items(): + # retrieve elements that tm_element is a morphism of + morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") + morphism, = [m for m in morphisms if m in scd_elements] + # check if tm_element is a morphism of Class_abstract + if class_abs_element == morphism: + # retrieve 'abstract' attribute value + target_node = self.bottom.read_edge_target(tm_element) + abst_model = UUID(self.bottom.read_value(target_node)) + abst_node, = self.bottom.read_outgoing_elements(abst_model) + is_abstract = self.bottom.read_value(abst_node) + # retrieve type name + source_node = self.bottom.read_edge_source(tm_element) + type_name = self.type_model_names[source_node] + if is_abstract: + self.structures.pop(type_name) -def __create_pn(state: State): - from services.scd import SCD - # Retrieve refs to primitive type models - # # integer - int_type_id = state.read_dict(state.read_root(), "Integer") - int_type = UUID(state.read_value(int_type_id)) - # # string - str_type_id = state.read_dict(state.read_root(), "String") - str_type = UUID(state.read_value(str_type_id)) - # Create LTM_PN - model_uuid = state.create_node() - service = SCD(scd, model_uuid, state) - # Create classes - service.create_class("P") - service.create_class("T") - # Create associations - service.create_association("P2T", "P", "T") - service.create_association("T2P", "T", "P") - # Create model refs - service.create_model_ref("Integer", int_type) - service.create_model_ref("String", int_type) - # Create class attributes - service.create_attribute_link("P", "Integer", "t", False) - service.create_attribute_link("P", "String", "n", False) - service.create_attribute_link("T", "String", "n", False) - # Create association attributes - service.create_attribute_link("P2T", "Integer", "w", False) - service.create_attribute_link("T2P", "Integer", "w", False) - # Create test constraint - service.add_constraint("P", "print(element)\nreturn True") - return model_uuid + def match_structures(self): + ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef") + # match nodes + for m_element, m_name in self.model_names.items(): + self.candidates[m_name] = set() + is_edge = self.bottom.read_edge_source(m_element) is not None + for type_name, structure in self.structures.items(): + tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name) + type_is_edge = self.bottom.read_edge_source(tm_element) is not None + if is_edge == type_is_edge: + matched = [] + for name, optional, attr_type in structure: + try: + attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}") + # if attribute is a modelref, we need to check whether it + # linguistically conforms to the specified type + # if its an internlly defined attribute, this will be checked by constraints later + morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") + if ref_element in morphisms: + pass + + except ValueError: + if optional: + continue + else: + break + if len(matched) == len(structure): + self.candidates[m_name].add(type_name) + + def build_morphisms(self): + pass if __name__ == '__main__': @@ -350,9 +418,21 @@ if __name__ == '__main__': s = State() from bootstrap.scd import bootstrap_scd scd = bootstrap_scd(s) - pn = __create_pn(s) - # cf = Conformance(s, scd, scd, scd) + from bootstrap.pn import bootstrap_pn + ltm_pn = bootstrap_pn(s, "PN") + from services.pn import PN + my_pn = s.create_node() + PNserv = PN(ltm_pn, my_pn, s) + PNserv.create_place("p1", 5) + PNserv.create_place("p2", 0) + PNserv.create_transition("t1") + PNserv.create_p2t("p1", "t1", 1) + PNserv.create_p2t("t1", "p2", 1) + + cf = Conformance(s, scd, my_pn, ltm_pn) + # cf = Conformance(s, scd, ltm_pn, scd) # cf.check_nominal() - cf = Conformance(s, scd, pn, scd) - cf.check_nominal() + cf.precompute_structures() + cf.match_structures() + diff --git a/services/pn.py b/services/pn.py new file mode 100644 index 0000000..285368b --- /dev/null +++ b/services/pn.py @@ -0,0 +1,127 @@ +from uuid import UUID +from state.base import State +from services.bottom.V0 import Bottom +from services.primitives.integer_type import Integer +from services.primitives.string_type import String + +import re + + +class PN: + def __init__(self, ltm_pn: UUID, model: UUID, state: State): + self.ltm_pn = ltm_pn + self.model = model + self.bottom = Bottom(state) + + def create_place(self, name: str, tokens: int): + # instantiate Place class + place_node = self.bottom.create_node() # create place node + self.bottom.create_edge(self.model, place_node, name) # attach to model + morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "P") # retrieve type + self.bottom.create_edge(place_node, morph_node, "Morphism") # create morphism link + # instantiate name attribute + name_model = self.bottom.create_node() + String(name_model, self.bottom.state).create(name) + name_node = self.bottom.create_node(str(name_model)) + self.bottom.create_edge(self.model, name_node, f"{name}.n") + name_link = self.bottom.create_edge(place_node, name_node) + self.bottom.create_edge(self.model, name_link, f"{name}.n_link") + ltm_pn_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "String") + ltm_pn_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "P_n") + self.bottom.create_edge(name_node, ltm_pn_node, "Morphism") + self.bottom.create_edge(name_link, ltm_pn_link, "Morphism") + # instantiate tokens attribute + tokens_model = self.bottom.create_node() + Integer(tokens_model, self.bottom.state).create(tokens) + tokens_node = self.bottom.create_node(str(tokens_model)) + self.bottom.create_edge(self.model, tokens_node, f"{name}.t") + tokens_link = self.bottom.create_edge(place_node, tokens_node) + self.bottom.create_edge(self.model, tokens_link, f"{name}.t_link") + ltm_pn_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "Integer") + ltm_pn_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "P_t") + self.bottom.create_edge(tokens_node, ltm_pn_node, "Morphism") + self.bottom.create_edge(tokens_link, ltm_pn_link, "Morphism") + + def create_transition(self, name: str): + # instantiate Transition class + transition_node = self.bottom.create_node() # create transition node + self.bottom.create_edge(self.model, transition_node, name) # attach to model + morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "T") # retrieve type + self.bottom.create_edge(transition_node, morph_node, "Morphism") # create morphism link + # instantiate name attribute + name_model = self.bottom.create_node() + String(name_model, self.bottom.state).create(name) + name_node = self.bottom.create_node(str(name_model)) + self.bottom.create_edge(self.model, name_node, f"{name}.n") + name_link = self.bottom.create_edge(transition_node, name_node) + self.bottom.create_edge(self.model, name_link, f"{name}.n_link") + ltm_pn_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "String") + ltm_pn_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "T_n") + self.bottom.create_edge(name_node, ltm_pn_node, "Morphism") + self.bottom.create_edge(name_link, ltm_pn_link, "Morphism") + + def create_p2t(self, place: str, transition: str, weight: int): + # create p2t link + morphism links + edge = self.bottom.create_edge( + *self.bottom.read_outgoing_elements(self.model, place), + *self.bottom.read_outgoing_elements(self.model, transition), + ) + self.bottom.create_edge(self.model, edge, f"{place}_to_{transition}") # attach to model + morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "P2T") # retrieve type + self.bottom.create_edge(edge, morph_node, "Morphism") # create morphism link + # weight attribute + weight_model = self.bottom.create_node() + Integer(weight_model, self.bottom.state).create(weight) + weight_node = self.bottom.create_node(str(weight_model)) + self.bottom.create_edge(self.model, weight_node, f"{place}_to_{transition}.w") + weight_link = self.bottom.create_edge(edge, weight_node) + self.bottom.create_edge(self.model, weight_link, f"{place}_to_{transition}.w_link") + scd_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "Integer") + scd_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "P2T_w") + self.bottom.create_edge(weight_node, scd_node, "Morphism") + self.bottom.create_edge(weight_link, scd_link, "Morphism") + + def create_t2p(self, transition: str, place: str, weight: int): + # create t2p link + morphism links + edge = self.bottom.create_edge( + *self.bottom.read_outgoing_elements(self.model, transition), + *self.bottom.read_outgoing_elements(self.model, place), + ) + self.bottom.create_edge(self.model, edge, f"{transition}_to_{place}") # attach to model + morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "T2P") # retrieve type + self.bottom.create_edge(edge, morph_node, "Morphism") # create morphism link + # weight attribute + weight_model = self.bottom.create_node() + Integer(weight_model, self.bottom.state).create(weight) + weight_node = self.bottom.create_node(str(weight_model)) + self.bottom.create_edge(self.model, weight_node, f"{transition}_to_{place}.w") + weight_link = self.bottom.create_edge(edge, weight_node) + self.bottom.create_edge(self.model, weight_link, f"{transition}_to_{place}.w_link") + scd_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "Integer") + scd_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "T2P_w") + self.bottom.create_edge(weight_node, scd_node, "Morphism") + self.bottom.create_edge(weight_link, scd_link, "Morphism") + + def list_elements(self): + pn_names = {} + for key in self.bottom.read_keys(self.ltm_pn): + element, = self.bottom.read_outgoing_elements(self.ltm_pn, key) + pn_names[element] = key + unsorted = [] + for key in self.bottom.read_keys(self.model): + element, = self.bottom.read_outgoing_elements(self.model, key) + element_types = self.bottom.read_outgoing_elements(element, "Morphism") + type_model_elements = self.bottom.read_outgoing_elements(self.ltm_pn) + element_type_node, = [e for e in element_types if e in type_model_elements] + unsorted.append((key, pn_names[element_type_node])) + for elem in sorted(unsorted, key=lambda e: e[0]): + print("{} : {}".format(*elem)) + + def delete_element(self, name: str): + keys = self.bottom.read_keys(self.model) + r = re.compile(r"{}\..*".format(name)) + to_delete = list(filter(r.match, keys)) + for key in to_delete: + # TODO: find way to solve memory leak, primitive models are not deleted this way + node, = self.bottom.read_outgoing_elements(self.model, label=key) + self.bottom.delete_element(node)