diff --git a/service/__init__.py b/bootstrap/__init__.py similarity index 100% rename from service/__init__.py rename to bootstrap/__init__.py diff --git a/bootstrap/pn.py b/bootstrap/pn.py new file mode 100644 index 0000000..d80f618 --- /dev/null +++ b/bootstrap/pn.py @@ -0,0 +1,40 @@ +from services.scd import SCD +from uuid import UUID +from state.base import State + + +def bootstrap_pn(state: State, model_name: str) -> UUID: + # Retrieve scd model + scd_id = state.read_dict(state.read_root(), "SCD") + scd = UUID(state.read_value(scd_id)) + # Retrieve refs to primitive type models + # # integer + int_type_id = state.read_dict(state.read_root(), "Integer") + int_type = UUID(state.read_value(int_type_id)) + # # string + str_type_id = state.read_dict(state.read_root(), "String") + str_type = UUID(state.read_value(str_type_id)) + # Create LTM_PN + model_uuid = state.create_node() + mcl_root_id = state.create_nodevalue(str(model_uuid)) + state.create_dict(state.read_root(), model_name, mcl_root_id) + service = SCD(model_uuid, state) + # Create classes + service.create_class("P") + service.create_class("T") + # Create associations + service.create_association("P2T", "P", "T") + service.create_association("T2P", "T", "P") + # Create model refs + service.create_model_ref("Integer", int_type) + service.create_model_ref("String", str_type) + # Create class attributes + service.create_attribute_link("P", "Integer", "t", False) + service.create_attribute_link("P", "String", "n", False) + service.create_attribute_link("T", "String", "name", False) + # Create association attributes + service.create_attribute_link("P2T", "Integer", "weight", False) + service.create_attribute_link("T2P", "Integer", "weight", False) + # Create test constraint + service.add_constraint("P", "True") + return model_uuid diff --git a/bootstrap/primitive.py b/bootstrap/primitive.py new file mode 100644 index 0000000..177c085 --- /dev/null +++ b/bootstrap/primitive.py @@ -0,0 +1,63 @@ +from state.base import State, UUID +from services.bottom.V0 import Bottom +from services.primitives.integer_type import Integer + + +def bootstrap_type(type_name: str, python_type: str, scd_root: UUID, model_root: UUID, state: State): + bottom = Bottom(state) + # create class + class_node = bottom.create_node() # create class node + bottom.create_edge(model_root, class_node, type_name) # attach to model + scd_node, = bottom.read_outgoing_elements(scd_root, "Class") # retrieve type + bottom.create_edge(class_node, scd_node, "Morphism") # create morphism link + # set min_cardinality + min_c_model = bottom.create_node() + Integer(min_c_model, state).create(1) + min_c_node = bottom.create_node(str(min_c_model)) + bottom.create_edge(model_root, min_c_node, f"{type_name}.lower_cardinality") + min_c_link = bottom.create_edge(class_node, min_c_node) + bottom.create_edge(model_root, min_c_link, f"{type_name}.lower_cardinality_link") + scd_node, = bottom.read_outgoing_elements(scd_root, "Integer") + scd_link, = bottom.read_outgoing_elements(scd_root, "Class_lower_cardinality") + bottom.create_edge(min_c_node, scd_node, "Morphism") + bottom.create_edge(min_c_link, scd_link, "Morphism") + # set max_cardinality + max_c_model = bottom.create_node() + Integer(max_c_model, state).create(1) + max_c_node = bottom.create_node(str(max_c_model)) + bottom.create_edge(model_root, max_c_node, f"{type_name}.upper_cardinality") + max_c_link = bottom.create_edge(class_node, max_c_node) + bottom.create_edge(model_root, max_c_link, f"{type_name}.upper_cardinality_link") + scd_node, = bottom.read_outgoing_elements(scd_root, "Integer") + scd_link, = bottom.read_outgoing_elements(scd_root, "Class_upper_cardinality") + bottom.create_edge(max_c_node, scd_node, "Morphism") + bottom.create_edge(max_c_link, scd_link, "Morphism") + # set constraint + constraint_node = bottom.create_node(f"isinstance(read_value(element),{python_type})") + bottom.create_edge(model_root, constraint_node, f"{type_name}.constraint") + constraint_link = bottom.create_edge(class_node, constraint_node) + bottom.create_edge(model_root, constraint_link, f"{type_name}.constraint_link") + scd_node, = bottom.read_outgoing_elements(scd_root, "ActionCode") + scd_link, = bottom.read_outgoing_elements(scd_root, "Element_constraint") + bottom.create_edge(constraint_node, scd_node, "Morphism") + bottom.create_edge(constraint_link, scd_link, "Morphism") + + +def bootstrap_type_type(scd_root: UUID, model_root: UUID, state: State): + bootstrap_type("Type", "tuple", scd_root, model_root, state) + + +def bootstrap_boolean_type(scd_root: UUID, model_root: UUID, state: State): + bootstrap_type("Boolean", "bool", scd_root, model_root, state) + + +def bootstrap_integer_type(scd_root: UUID, model_root: UUID, state: State): + bootstrap_type("Integer", "int", scd_root, model_root, state) + + +def bootstrap_float_type(scd_root: UUID, model_root: UUID, state: State): + bootstrap_type("Float", "float", scd_root, model_root, state) + + +def bootstrap_string_type(scd_root: UUID, model_root: UUID, state: State): + bootstrap_type("String", "str", scd_root, model_root, state) diff --git a/bootstrap/scd.py b/bootstrap/scd.py new file mode 100644 index 0000000..30a42f3 --- /dev/null +++ b/bootstrap/scd.py @@ -0,0 +1,270 @@ +from state.base import State, UUID +from services.bottom.V0 import Bottom +from services.primitives.boolean_type import Boolean +from services.primitives.string_type import String +from bootstrap.primitive import ( + bootstrap_boolean_type, + bootstrap_float_type, + bootstrap_integer_type, + bootstrap_string_type, + bootstrap_type_type +) + + +def create_model_root(bottom: Bottom, model_name: str) -> UUID: + model_root = bottom.create_node() + mcl_root_id = bottom.create_node(value=str(model_root)) + bottom.create_edge(bottom.state.read_root(), mcl_root_id, label=model_name) + return model_root + + +def bootstrap_scd(state: State) -> UUID: + # init model roots and store their UUIDs attached to state root + bottom = Bottom(state) + mcl_root = create_model_root(bottom, "SCD") + + # Create model roots for primitive types + integer_type_root = create_model_root(bottom, "Integer") + boolean_type_root = create_model_root(bottom, "Boolean") + string_type_root = create_model_root(bottom, "String") + float_type_root = create_model_root(bottom, "Float") + type_type_root = create_model_root(bottom, "Type") + + # create MCL, without morphism links + + def add_node_element(element_name, node_value=None): + """ Helper function, adds node to model with given name and value """ + _node = bottom.create_node(value=node_value) + bottom.create_edge(mcl_root, _node, element_name) + return _node + + def add_edge_element(element_name, source, target): + """ Helper function, adds edge to model with given name """ + _edge = bottom.create_edge(source, target) + bottom.create_edge(mcl_root, _edge, element_name) + return _edge + + def add_attribute_attributes(attribute_element_name, attribute_element): + _name_model = bottom.create_node() + _name_node = add_node_element(f"{attribute_element_name}.name", str(_name_model)) + _name_edge = add_edge_element(f"{attribute_element_name}.name_link", attribute_element, _name_node) + _optional_model = bottom.create_node() + _optional_node = add_node_element(f"{attribute_element_name}.optional", str(_optional_model)) + _optional_edge = add_edge_element(f"{attribute_element_name}.optional_link", attribute_element, _optional_node) + return _name_model, _optional_model + + # # CLASSES, i.e. elements typed by Class + # # Element + element_node = add_node_element("Element") + # # Class + class_node = add_node_element("Class") + # # Attribute + attr_node = add_node_element("Attribute") + # # ModelRef + model_ref_node = add_node_element("ModelRef") + # # Global Constraint + glob_constr_node = add_node_element("GlobalConstraint") + # # ASSOCIATIONS, i.e. elements typed by Association + # # Association + assoc_edge = add_edge_element("Association", class_node, class_node) + # # Inheritance + inh_edge = add_edge_element("Inheritance", element_node, element_node) + # # Attribute Link + attr_link_edge = add_edge_element("AttributeLink", element_node, attr_node) + # # INHERITANCES, i.e. elements typed by Inheritance + # # Class inherits from Element + add_edge_element("class_inh_element", class_node, element_node) + # # GlobalConstraint inherits from Element + add_edge_element("gc_inh_element", glob_constr_node, element_node) + # # Attribute inherits from Element + add_edge_element("attr_inh_element", attr_node, element_node) + # # Association inherits from Element + add_edge_element("assoc_inh_element", assoc_edge, element_node) + # # AttributeLink inherits from Element + add_edge_element("attr_link_inh_element", attr_link_edge, element_node) + # # ModelRef inherits from Attribute + add_edge_element("model_ref_inh_attr", model_ref_node, attr_node) + # # ATTRIBUTES, i.e. elements typed by Attribute + # # Action Code # TODO: Update to ModelRef when action code is explicitly modelled + action_code_node = add_node_element("ActionCode") + # # MODELREFS, i.e. elements typed by ModelRef + # # Integer + integer_node = add_node_element("Integer", str(integer_type_root)) + # # String + string_node = add_node_element("String", str(string_type_root)) + # # Boolean + boolean_node = add_node_element("Boolean", str(boolean_type_root)) + # # ATTRIBUTE LINKS, i.e. elements typed by AttributeLink + # # name attribute of AttributeLink + attr_name_edge = add_edge_element("AttributeLink_name", attr_link_edge, string_node) + # # optional attribute of AttributeLink + attr_opt_edge = add_edge_element("AttributeLink_optional", attr_link_edge, boolean_node) + # # constraint attribute of Element + elem_constr_edge = add_edge_element("Element_constraint", element_node, action_code_node) + # # abstract attribute of Class + class_abs_edge = add_edge_element("Class_abstract", class_node, boolean_node) + # # multiplicity attributes of Class + class_l_c_edge = add_edge_element("Class_lower_cardinality", class_node, integer_node) + class_u_c_edge = add_edge_element("Class_upper_cardinality", class_node, integer_node) + # # multiplicity attributes of Association + assoc_s_l_c_edge = add_edge_element("Association_source_lower_cardinality", assoc_edge, integer_node) + assoc_s_u_c_edge = add_edge_element("Association_source_upper_cardinality", assoc_edge, integer_node) + assoc_t_l_c_edge = add_edge_element("Association_target_lower_cardinality", assoc_edge, integer_node) + assoc_t_u_c_edge = add_edge_element("Association_target_upper_cardinality", assoc_edge, integer_node) + # # bootstrap primitive types + # # order is important, integer must be first + bootstrap_integer_type(mcl_root, integer_type_root, state) + bootstrap_boolean_type(mcl_root, boolean_type_root, state) + bootstrap_float_type(mcl_root, float_type_root, state) + bootstrap_string_type(mcl_root, string_type_root, state) + bootstrap_type_type(mcl_root, type_type_root, state) + # # ATTRIBUTE ATTRIBUTES, assign 'name' and 'optional' attributes to all AttributeLinks + # # AttributeLink_name + m_name, m_opt = add_attribute_attributes("AttributeLink_name", attr_name_edge) + String(m_name, state).create("name") + Boolean(m_opt, state).create(False) + # # AttributeLink_opt + m_name, m_opt = add_attribute_attributes("AttributeLink_optional", attr_opt_edge) + String(m_name, state).create("optional") + Boolean(m_opt, state).create(False) + # # Element_constraint + m_name, m_opt = add_attribute_attributes("Element_constraint", elem_constr_edge) + String(m_name, state).create("constraint") + Boolean(m_opt, state).create(True) + # # Class_abstract + m_name, m_opt = add_attribute_attributes("Class_abstract", class_abs_edge) + String(m_name, state).create("abstract") + Boolean(m_opt, state).create(True) + # # Class_lower_cardinality + m_name, m_opt = add_attribute_attributes("Class_lower_cardinality", class_l_c_edge) + String(m_name, state).create("lower_cardinality") + Boolean(m_opt, state).create(True) + # # Class_upper_cardinality + m_name, m_opt = add_attribute_attributes("Class_upper_cardinality", class_u_c_edge) + String(m_name, state).create("upper_cardinality") + Boolean(m_opt, state).create(True) + # # Association_source_lower_cardinality + m_name, m_opt = add_attribute_attributes("Association_source_lower_cardinality", assoc_s_l_c_edge) + String(m_name, state).create("source_lower_cardinality") + Boolean(m_opt, state).create(True) + # # Association_source_upper_cardinality + m_name, m_opt = add_attribute_attributes("Association_source_upper_cardinality", assoc_s_u_c_edge) + String(m_name, state).create("source_upper_cardinality") + Boolean(m_opt, state).create(True) + # # Association_target_lower_cardinality + m_name, m_opt = add_attribute_attributes("Association_target_lower_cardinality", assoc_t_l_c_edge) + String(m_name, state).create("target_lower_cardinality") + Boolean(m_opt, state).create(True) + # # Association_target_upper_cardinality + m_name, m_opt = add_attribute_attributes("Association_target_upper_cardinality", assoc_t_u_c_edge) + String(m_name, state).create("target_upper_cardinality") + Boolean(m_opt, state).create(True) + # # Make Element abstract + abs_model = bottom.create_node() + abs_node = add_node_element(f"Element.abstract", str(abs_model)) + abs_edge = add_edge_element(f"Element.abstract_link", element_node, abs_node) + Boolean(abs_model, state).create(True) + + # create phi(SCD,SCD) to type MCL with itself + + def add_mcl_morphism(element_name, type_name): + # get elements from mcl by name + _element_edge, = bottom.read_outgoing_edges(mcl_root, element_name) + _element_node = bottom.read_edge_target(_element_edge) + _type_edge, = bottom.read_outgoing_edges(mcl_root, type_name) + _type_node = bottom.read_edge_target(_type_edge) + # create morphism link + bottom.create_edge(_element_node, _type_node, "Morphism") + + # Class + add_mcl_morphism("Element", "Class") + add_mcl_morphism("Class", "Class") + add_mcl_morphism("Attribute", "Class") + add_mcl_morphism("ModelRef", "Class") + add_mcl_morphism("GlobalConstraint", "Class") + # Association + add_mcl_morphism("Association", "Association") + add_mcl_morphism("Inheritance", "Association") + add_mcl_morphism("AttributeLink", "Association") + # Inheritance + add_mcl_morphism("class_inh_element", "Inheritance") + add_mcl_morphism("gc_inh_element", "Inheritance") + add_mcl_morphism("attr_inh_element", "Inheritance") + add_mcl_morphism("assoc_inh_element", "Inheritance") + add_mcl_morphism("attr_link_inh_element", "Inheritance") + add_mcl_morphism("model_ref_inh_attr", "Inheritance") + # Attribute + add_mcl_morphism("ActionCode", "Attribute") + # ModelRef + add_mcl_morphism("Integer", "ModelRef") + add_mcl_morphism("String", "ModelRef") + add_mcl_morphism("Boolean", "ModelRef") + # AttributeLink + add_mcl_morphism("AttributeLink_name", "AttributeLink") + add_mcl_morphism("AttributeLink_optional", "AttributeLink") + add_mcl_morphism("Element_constraint", "AttributeLink") + add_mcl_morphism("Class_abstract", "AttributeLink") + add_mcl_morphism("Class_lower_cardinality", "AttributeLink") + add_mcl_morphism("Class_upper_cardinality", "AttributeLink") + add_mcl_morphism("Association_source_lower_cardinality", "AttributeLink") + add_mcl_morphism("Association_source_upper_cardinality", "AttributeLink") + add_mcl_morphism("Association_target_lower_cardinality", "AttributeLink") + add_mcl_morphism("Association_target_upper_cardinality", "AttributeLink") + # AttributeLink_name + add_mcl_morphism("AttributeLink_name.name_link", "AttributeLink_name") + add_mcl_morphism("AttributeLink_optional.name_link", "AttributeLink_name") + add_mcl_morphism("Element_constraint.name_link", "AttributeLink_name") + add_mcl_morphism("Class_abstract.name_link", "AttributeLink_name") + add_mcl_morphism("Class_lower_cardinality.name_link", "AttributeLink_name") + add_mcl_morphism("Class_upper_cardinality.name_link", "AttributeLink_name") + add_mcl_morphism("Association_source_lower_cardinality.name_link", "AttributeLink_name") + add_mcl_morphism("Association_source_upper_cardinality.name_link", "AttributeLink_name") + add_mcl_morphism("Association_target_lower_cardinality.name_link", "AttributeLink_name") + add_mcl_morphism("Association_target_upper_cardinality.name_link", "AttributeLink_name") + # AttributeLink_optional + add_mcl_morphism("AttributeLink_name.optional_link", "AttributeLink_optional") + add_mcl_morphism("AttributeLink_optional.optional_link", "AttributeLink_optional") + add_mcl_morphism("Element_constraint.optional_link", "AttributeLink_optional") + add_mcl_morphism("Class_abstract.optional_link", "AttributeLink_optional") + add_mcl_morphism("Class_lower_cardinality.optional_link", "AttributeLink_optional") + add_mcl_morphism("Class_upper_cardinality.optional_link", "AttributeLink_optional") + add_mcl_morphism("Association_source_lower_cardinality.optional_link", "AttributeLink_optional") + add_mcl_morphism("Association_source_upper_cardinality.optional_link", "AttributeLink_optional") + add_mcl_morphism("Association_target_lower_cardinality.optional_link", "AttributeLink_optional") + add_mcl_morphism("Association_target_upper_cardinality.optional_link", "AttributeLink_optional") + # String + add_mcl_morphism("AttributeLink_name.name", "String") + add_mcl_morphism("AttributeLink_optional.name", "String") + add_mcl_morphism("Element_constraint.name", "String") + add_mcl_morphism("Class_abstract.name", "String") + add_mcl_morphism("Class_lower_cardinality.name", "String") + add_mcl_morphism("Class_upper_cardinality.name", "String") + add_mcl_morphism("Association_source_lower_cardinality.name", "String") + add_mcl_morphism("Association_source_upper_cardinality.name", "String") + add_mcl_morphism("Association_target_lower_cardinality.name", "String") + add_mcl_morphism("Association_target_upper_cardinality.name", "String") + # Boolean + add_mcl_morphism("AttributeLink_name.optional", "Boolean") + add_mcl_morphism("AttributeLink_optional.optional", "Boolean") + add_mcl_morphism("Element_constraint.optional", "Boolean") + add_mcl_morphism("Class_abstract.optional", "Boolean") + add_mcl_morphism("Class_lower_cardinality.optional", "Boolean") + add_mcl_morphism("Class_upper_cardinality.optional", "Boolean") + add_mcl_morphism("Association_source_lower_cardinality.optional", "Boolean") + add_mcl_morphism("Association_source_upper_cardinality.optional", "Boolean") + add_mcl_morphism("Association_target_lower_cardinality.optional", "Boolean") + add_mcl_morphism("Association_target_upper_cardinality.optional", "Boolean") + add_mcl_morphism("Element.abstract", "Boolean") + # Class_abstract + add_mcl_morphism("Element.abstract_link", "Class_abstract") + + return mcl_root + + +if __name__ == '__main__': + from state.devstate import DevState as State + s = State() + bootstrap_scd(s) + r = s.read_root() + for n in s.read_dict_keys(r): + print(s.read_value(n)) diff --git a/service/bottom/__init__.py b/framework/__init__.py similarity index 100% rename from service/bottom/__init__.py rename to framework/__init__.py diff --git a/framework/conformance.py b/framework/conformance.py new file mode 100644 index 0000000..9a97ca4 --- /dev/null +++ b/framework/conformance.py @@ -0,0 +1,503 @@ +from services.bottom.V0 import Bottom +from uuid import UUID +from state.base import State +from typing import Dict, Tuple, Set, Any, List +from pprint import pprint + + +class Conformance: + def __init__(self, state: State, model: UUID, type_model: UUID): + self.state = state + self.bottom = Bottom(state) + type_model_id = state.read_dict(state.read_root(), "SCD") + self.scd_model = UUID(state.read_value(type_model_id)) + self.model = model + self.type_model = type_model + self.type_mapping: Dict[str, str] = {} + self.model_names = { + # map model elements to their names to prevent iterating too much + self.bottom.read_outgoing_elements(self.model, e)[0]: e + for e in self.bottom.read_keys(self.model) + } + self.type_model_names = { + # map type model elements to their names to prevent iterating too much + self.bottom.read_outgoing_elements(self.type_model, e)[0]: e + for e in self.bottom.read_keys(self.type_model) + } + self.sub_types: Dict[str, Set[str]] = { + k: set() for k in self.bottom.read_keys(self.type_model) + } + self.primitive_values: Dict[UUID, Any] = {} + self.abstract_types: List[str] = [] + self.multiplicities: Dict[str, Tuple] = {} + self.source_multiplicities: Dict[str, Tuple] = {} + self.target_multiplicities: Dict[str, Tuple] = {} + self.structures = {} + self.matches = {} + self.candidates = {} + + def check_nominal(self, *, log=False): + try: + self.check_typing() + self.check_link_typing() + self.check_multiplicities() + self.check_constraints() + return True + except RuntimeError as e: + if log: + print(e) + return False + + def check_structural(self, *, build_morphisms=True, log=False): + try: + self.precompute_structures() + self.match_structures() + if build_morphisms: + self.build_morphisms() + self.check_nominal(log=log) + return True + except RuntimeError as e: + if log: + print(e) + return False + + def read_attribute(self, element: UUID, attr_name: str): + + if element in self.type_model_names: + # type model element + element_name = self.type_model_names[element] + model = self.type_model + else: + # model element + element_name = self.model_names[element] + model = self.model + try: + attr_elem, = self.bottom.read_outgoing_elements(model, f"{element_name}.{attr_name}") + return self.primitive_values.get(attr_elem, self.bottom.read_value(attr_elem)) + except ValueError: + return None + + def precompute_sub_types(self): + inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance") + inh_links = [] + for tm_element, tm_name in self.type_model_names.items(): + morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") + if inh_element in morphisms: + inh_links.append(tm_element) + + for link in inh_links: + tm_source = self.bottom.read_edge_source(link) + tm_target = self.bottom.read_edge_target(link) + parent_name = self.type_model_names[tm_target] + child_name = self.type_model_names[tm_source] + self.sub_types[parent_name].add(child_name) + + stop = False + while not stop: + stop = True + for child_name, child_children in self.sub_types.items(): + for parent_name, parent_children in self.sub_types.items(): + if child_name in parent_children: + original_size = len(parent_children) + parent_children.update(child_children) + if len(parent_children) != original_size: + stop = False + + def deref_primitive_values(self): + ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef") + string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String") + boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean") + integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer") + t_deref = [] + t_refs = [] + for tm_element, tm_name in self.type_model_names.items(): + morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") + if ref_element in morphisms: + t_refs.append(self.type_model_names[tm_element]) + elif string_element in morphisms: + t_deref.append(tm_element) + elif boolean_element in morphisms: + t_deref.append(tm_element) + elif integer_element in morphisms: + t_deref.append(tm_element) + + for elem in t_deref: + primitive_model = UUID(self.bottom.read_value(elem)) + primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model) + primitive_value = self.bottom.read_value(primitive_value_node) + self.primitive_values[elem] = primitive_value + + for m_name, tm_name in self.type_mapping.items(): + if tm_name in t_refs: + # dereference + m_element, = self.bottom.read_outgoing_elements(self.model, m_name) + primitive_model = UUID(self.bottom.read_value(m_element)) + try: + primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model) + primitive_value = self.bottom.read_value(primitive_value_node) + self.primitive_values[m_element] = primitive_value + except ValueError: + pass # multiple elements in model indicate that we're not dealing with a primitive + + def precompute_multiplicities(self): + for tm_element, tm_name in self.type_model_names.items(): + # class abstract flags and multiplicities + abstract = self.read_attribute(tm_element, "abstract") + lc = self.read_attribute(tm_element, "lower_cardinality") + uc = self.read_attribute(tm_element, "upper_cardinality") + if abstract: + self.abstract_types.append(tm_name) + if lc or uc: + mult = ( + lc if lc is not None else float("-inf"), + uc if uc is not None else float("inf") + ) + self.multiplicities[tm_name] = mult + # multiplicities for associations + slc = self.read_attribute(tm_element, "source_lower_cardinality") + suc = self.read_attribute(tm_element, "source_upper_cardinality") + if slc or suc: + mult = ( + slc if slc is not None else float("-inf"), + suc if suc is not None else float("inf") + ) + self.source_multiplicities[tm_name] = mult + tlc = self.read_attribute(tm_element, "target_lower_cardinality") + tuc = self.read_attribute(tm_element, "target_upper_cardinality") + if tlc or tuc: + mult = ( + tlc if tlc is not None else float("-inf"), + tuc if tuc is not None else float("inf") + ) + self.target_multiplicities[tm_name] = mult + # optional for attribute links + opt = self.read_attribute(tm_element, "optional") + if opt is not None: + self.source_multiplicities[tm_name] = (0 if opt else 1, 1) + self.target_multiplicities[tm_name] = (0, 1) + + def get_type(self, element: UUID): + morphisms = self.bottom.read_outgoing_elements(element, "Morphism") + tm_element, = [m for m in morphisms if m in self.type_model_names.keys()] + return tm_element + + def check_typing(self): + """ + for each element of model check whether a morphism + link exists to some element of type_model + """ + ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef") + model_names = self.bottom.read_keys(self.model) + for m_name in model_names: + m_element, = self.bottom.read_outgoing_elements(self.model, m_name) + try: + tm_element = self.get_type(m_element) + tm_name = self.type_model_names[tm_element] + self.type_mapping[m_name] = tm_name + if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"): + sub_m = UUID(self.bottom.read_value(m_element)) + sub_tm = UUID(self.bottom.read_value(tm_element)) + if not Conformance(self.state, sub_m, sub_tm).check_nominal(): + raise RuntimeError(f"Incorrectly model reference: {m_name}") + except ValueError: + # no or too many morphism links found + raise RuntimeError(f"Incorrectly typed element: {m_name}") + return True + + def check_link_typing(self): + self.precompute_sub_types() + for m_name, tm_name in self.type_mapping.items(): + m_element, = self.bottom.read_outgoing_elements(self.model, m_name) + m_source = self.bottom.read_edge_source(m_element) + m_target = self.bottom.read_edge_target(m_element) + if m_source is None or m_target is None: + # element is not a link + continue + tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) + tm_source = self.bottom.read_edge_source(tm_element) + tm_target = self.bottom.read_edge_target(tm_element) + # check if source is typed correctly + source_name = self.model_names[m_source] + source_type_actual = self.type_mapping[source_name] + source_type_expected = self.type_model_names[tm_source] + if source_type_actual != source_type_expected: + if source_type_actual not in self.sub_types[source_type_expected]: + raise RuntimeError(f"Invalid source type {source_type_actual} for element {m_name}") + # check if target is typed correctly + target_name = self.model_names[m_target] + target_type_actual = self.type_mapping[target_name] + target_type_expected = self.type_model_names[tm_target] + if target_type_actual != target_type_expected: + if target_type_actual not in self.sub_types[target_type_expected]: + raise RuntimeError(f"Invalid target type {target_type_actual} for element {m_name}") + return True + + def check_multiplicities(self): + self.deref_primitive_values() + self.precompute_multiplicities() + for tm_name in self.type_model_names.values(): + # abstract classes + if tm_name in self.abstract_types: + type_count = list(self.type_mapping.values()).count(tm_name) + if type_count > 0: + raise RuntimeError(f"Invalid instantiation of abstract class: {tm_name}") + # class multiplicities + if tm_name in self.multiplicities: + lc, uc = self.multiplicities[tm_name] + type_count = list(self.type_mapping.values()).count(tm_name) + for sub_type in self.sub_types[tm_name]: + type_count += list(self.type_mapping.values()).count(sub_type) + if type_count < lc or type_count > uc: + raise RuntimeError(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})") + # association source multiplicities + if tm_name in self.source_multiplicities: + tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) + tm_source_element = self.bottom.read_edge_source(tm_element) + tm_source_name = self.type_model_names[tm_source_element] + lc, uc = self.source_multiplicities[tm_name] + for i, t in self.type_mapping.items(): + if t == tm_source_name or t in self.sub_types[tm_source_name]: + count = 0 + i_element, = self.bottom.read_outgoing_elements(self.model, i) + outgoing = self.bottom.read_outgoing_edges(i_element) + for o in outgoing: + try: + if self.type_mapping[self.model_names[o]] == tm_name: + count += 1 + except KeyError: + pass # for elements not part of model, e.g. morphism links + if count < lc or count > uc: + raise RuntimeError(f"Source cardinality of type {tm_name} exceeds valid multiplicity range in {i}.") + + # association target multiplicities + if tm_name in self.target_multiplicities: + tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) + tm_target_element = self.bottom.read_edge_target(tm_element) + tm_target_name = self.type_model_names[tm_target_element] + lc, uc = self.target_multiplicities[tm_name] + for i, t in self.type_mapping.items(): + if t == tm_target_name or t in self.sub_types[tm_target_name]: + count = 0 + i_element, = self.bottom.read_outgoing_elements(self.model, i) + outgoing = self.bottom.read_incoming_edges(i_element) + for o in outgoing: + try: + if self.type_mapping[self.model_names[o]] == tm_name: + count += 1 + except KeyError: + pass # for elements not part of model, e.g. morphism links + if count < lc or count > uc: + print(f"Target cardinality of type {tm_name} exceeds valid multiplicity range in {i}.") + return False + return True + + def evaluate_constraint(self, code, **kwargs): + funcs = { + 'read_value': self.state.read_value + } + return eval( + code, + {'__builtins__': {'isinstance': isinstance, 'print': print, + 'int': int, 'float': float, 'bool': bool, 'str': str, 'tuple': tuple} + }, # globals + {**kwargs, **funcs} # locals + ) + + def check_constraints(self): + # local constraints + for m_name, tm_name in self.type_mapping.items(): + if tm_name != "GlobalConstraint": + tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) + code = self.read_attribute(tm_element, "constraint") + if code is not None: + morphisms = self.bottom.read_incoming_elements(tm_element, "Morphism") + morphisms = [m for m in morphisms if m in self.model_names] + for m_element in morphisms: + if not self.evaluate_constraint(code, element=m_element): + raise RuntimeError(f"Local constraint of {tm_name} not satisfied in {m_name}.") + + # global constraints + for m_name, tm_name in self.type_mapping.items(): + if tm_name == "GlobalConstraint": + tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) + code = self.read_attribute(tm_element, "constraint") + if code is not None: + if not self.evaluate_constraint(code, model=self.model): + raise RuntimeError(f"Global constraint {tm_name} not satisfied.") + return True + + def precompute_structures(self): + self.precompute_sub_types() + scd_elements = self.bottom.read_outgoing_elements(self.scd_model) + # collect types + class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class") + association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association") + for tm_element, tm_name in self.type_model_names.items(): + # retrieve elements that tm_element is a morphism of + morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") + morphism, = [m for m in morphisms if m in scd_elements] + # check if tm_element is a morphism of AttributeLink + if class_element == morphism or association_element == morphism: + self.structures[tm_name] = set() + # collect type structures + # retrieve AttributeLink to check whether element is a morphism of AttributeLink + attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink") + for tm_element, tm_name in self.type_model_names.items(): + # retrieve elements that tm_element is a morphism of + morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") + morphism, = [m for m in morphisms if m in scd_elements] + # check if tm_element is a morphism of AttributeLink + if attr_link_element == morphism: + # retrieve attributes of attribute link, i.e. 'name' and 'optional' + attrs = self.bottom.read_outgoing_elements(tm_element) + name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs) + opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs) + # get attr name value + name_model = UUID(self.bottom.read_value(name_model_node)) + name_node, = self.bottom.read_outgoing_elements(name_model) + name = self.bottom.read_value(name_node) + # get attr opt value + opt_model = UUID(self.bottom.read_value(opt_model_node)) + opt_node, = self.bottom.read_outgoing_elements(opt_model) + opt = self.bottom.read_value(opt_node) + # get attr type name + source_type_node = self.bottom.read_edge_source(tm_element) + source_type_name = self.type_model_names[source_type_node] + target_type_node = self.bottom.read_edge_target(tm_element) + target_type_name = self.type_model_names[target_type_node] + # add attribute to the structure of its source type + # attribute is stored as a (name, optional, type) triple + self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name)) + # extend structures of sub types with attrs of super types + for super_type, sub_types in self.sub_types.items(): + for sub_type in sub_types: + self.structures.setdefault(sub_type, set()).update(self.structures[super_type]) + # filter out abstract types, as they cannot be instantiated + # retrieve Class_abstract to check whether element is a morphism of Class_abstract + class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract") + for tm_element, tm_name in self.type_model_names.items(): + # retrieve elements that tm_element is a morphism of + morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") + morphism, = [m for m in morphisms if m in scd_elements] + # check if tm_element is a morphism of Class_abstract + if class_abs_element == morphism: + # retrieve 'abstract' attribute value + target_node = self.bottom.read_edge_target(tm_element) + abst_model = UUID(self.bottom.read_value(target_node)) + abst_node, = self.bottom.read_outgoing_elements(abst_model) + is_abstract = self.bottom.read_value(abst_node) + # retrieve type name + source_node = self.bottom.read_edge_source(tm_element) + type_name = self.type_model_names[source_node] + if is_abstract: + self.structures.pop(type_name) + + def match_structures(self): + ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef") + # matching + for m_element, m_name in self.model_names.items(): + is_edge = self.bottom.read_edge_source(m_element) is not None + for type_name, structure in self.structures.items(): + tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name) + type_is_edge = self.bottom.read_edge_source(tm_element) is not None + if is_edge == type_is_edge: + matched = 0 + for name, optional, attr_type in structure: + try: + attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}") + attr_tm, = self.bottom.read_outgoing_elements(self.type_model, attr_type) + # if attribute is a modelref, we need to check whether it + # linguistically conforms to the specified type + # if its an internally defined attribute, this will be checked by constraints + morphisms = self.bottom.read_outgoing_elements(attr_tm, "Morphism") + attr_conforms = True + if ref_element in morphisms: + # check conformance of reference model + type_model_uuid = UUID(self.bottom.read_value(attr_tm)) + model_uuid = UUID(self.bottom.read_value(attr)) + attr_conforms = Conformance(self.state, model_uuid, type_model_uuid)\ + .check_nominal() + else: + # eval constraints + code = self.read_attribute(attr_tm, "constraint") + if code is not None: + attr_conforms = self.evaluate_constraint(code, element=attr) + if attr_conforms: + matched += 1 + except ValueError: + # attr not found or failed parsing UUID + if optional: + continue + else: + break + if matched == len(structure): + self.candidates.setdefault(m_name, set()).add(type_name) + # filter out candidates for links based on source and target types + for m_element, m_name in self.model_names.items(): + is_edge = self.bottom.read_edge_source(m_element) is not None + if is_edge and m_name in self.candidates: + m_source = self.bottom.read_edge_source(m_element) + m_target = self.bottom.read_edge_target(m_element) + source_candidates = self.candidates[self.model_names[m_source]] + target_candidates = self.candidates[self.model_names[m_target]] + remove = set() + for candidate_name in self.candidates[m_name]: + candidate_element, = self.bottom.read_outgoing_elements(self.type_model, candidate_name) + candidate_source = self.type_model_names[self.bottom.read_edge_source(candidate_element)] + if candidate_source not in source_candidates: + remove.add(candidate_name) + candidate_target = self.type_model_names[self.bottom.read_edge_target(candidate_element)] + if candidate_target not in target_candidates: + remove.add(candidate_name) + self.candidates[m_name] = self.candidates[m_name].difference(remove) + + def build_morphisms(self): + if not all([len(c) == 1 for c in self.candidates.values()]): + raise RuntimeError("Cannot build incomplete or ambiguous morphism.") + mapping = {k: v.pop() for k, v in self.candidates.items()} + for m_name, tm_name in mapping.items(): + # morphism to class/assoc + m_element, = self.bottom.read_outgoing_elements(self.model, m_name) + tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) + self.bottom.create_edge(m_element, tm_element, "Morphism") + # morphism for attributes and attribute links + structure = self.structures[tm_name] + for attr_name, _, attr_type in structure: + try: + # attribute node + attr_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}") + attr_type_element, = self.bottom.read_outgoing_elements(self.type_model, attr_type) + self.bottom.create_edge(attr_element, attr_type_element, "Morphism") + # attribute link + attr_link_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}_link") + attr_link_type_element, = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}_{attr_name}") + self.bottom.create_edge(attr_link_element, attr_link_type_element, "Morphism") + except ValueError: + pass + + +if __name__ == '__main__': + from state.devstate import DevState as State + s = State() + from bootstrap.scd import bootstrap_scd + scd = bootstrap_scd(s) + from bootstrap.pn import bootstrap_pn + ltm_pn = bootstrap_pn(s, "PN") + ltm_pn_lola = bootstrap_pn(s, "PNlola") + from services.pn import PN + my_pn = s.create_node() + PNserv = PN(my_pn, s) + PNserv.create_place("p1", 5) + PNserv.create_place("p2", 0) + PNserv.create_transition("t1") + PNserv.create_p2t("p1", "t1", 1) + PNserv.create_t2p("t1", "p2", 1) + + cf = Conformance(s, my_pn, ltm_pn_lola) + # cf = Conformance(s, scd, ltm_pn, scd) + cf.precompute_structures() + cf.match_structures() + cf.build_morphisms() + print(cf.check_nominal()) + + diff --git a/framework/interactive_prompt.py b/framework/interactive_prompt.py new file mode 100644 index 0000000..ff38c8c --- /dev/null +++ b/framework/interactive_prompt.py @@ -0,0 +1,99 @@ +from framework.manager import Manager +from state.devstate import DevState +from PyInquirer import prompt, Separator +from pprint import pprint +import prompt_questions as questions +from inspect import signature +from uuid import UUID + + +def generate_context_question(ctx_type, services): + choices = [ + s.__name__.replace('_', ' ') for s in services + ] + choices = sorted(choices) + choices.append(Separator()) + choices.append("close context") + ctx_question = [ + { + 'type': 'list', + 'name': 'op', + 'message': f'Currently in context {ctx_type.__name__}, which operation would you like to perform?', + 'choices': choices, + 'filter': lambda x: x.replace(' ', '_') + } + ] + return ctx_question + + +def main(): + state = DevState() + man = Manager(state) + + while True: + if man.current_model is not None and man.current_context is None: + answer = prompt(questions.MODEL_SELECTED) + ctx = man + elif man.current_model is not None and man.current_context is not None: + qs = generate_context_question(type(man.current_context), man.get_services()) + answer = prompt(qs) + if answer['op'] == 'close_context': + man.close_context() + continue + else: + ctx = man.current_context + else: + answer = prompt(questions.MODEL_MGMT) + ctx = man + + if answer['op'] == 'exit': + break + else: + method = getattr(ctx, answer['op']) + args_questions = [] + types = {} + for p in signature(method).parameters.values(): + types[p.name] = p.annotation # can't use filter in question dict, doesn't work for some reason... + if p.annotation == UUID: + args_questions.append({ + 'type': 'list', + 'name': p.name, + 'message': f'{p.name.replace("_", " ")}?', + 'choices': list(man.get_models()), + 'filter': lambda x: state.read_value(state.read_dict(state.read_root(), x)) + }) + else: + args_questions.append({ + 'type': 'input', + 'name': p.name, + 'message': f'{p.name.replace("_", " ")}?', + 'filter': lambda x: False if x.lower() == 'false' else x + }) + args = prompt(args_questions) + args = {k: types[k](v) for k, v in args.items()} + try: + output = method(**args) + if output is not None: + try: + if isinstance(output, str): + raise TypeError + output = list(output) + if len(output) > 0: + for o in sorted(output): + print(f"\u2022 {o}") + except TypeError: + print(f"\u2022 {output}") + except RuntimeError as e: + print(e) + + +if __name__ == '__main__': + print("""Welcome to... + __ ____ _____ + | \/ \ \ / /__ \ + | \ / |\ \ / / ) | + | |\/| | \ \/ / / / + | | | | \ / / /_ + |_| |_| \/ |____| + """) + main() diff --git a/framework/manager.py b/framework/manager.py new file mode 100644 index 0000000..289ab5e --- /dev/null +++ b/framework/manager.py @@ -0,0 +1,140 @@ +from state.base import State +from bootstrap.scd import bootstrap_scd +from services import implemented as services +from framework.conformance import Conformance +from uuid import UUID + + +class Manager: + def __init__(self, state: State): + self.current_model = None + self.current_context = None + self.state = state + bootstrap_scd(state) + scd_node = self.state.read_dict(self.state.read_root(), "SCD") + for key_node in self.state.read_dict_keys(self.state.read_root()): + model_node = self.state.read_dict_node(self.state.read_root(), key_node) + self.state.create_edge(model_node, scd_node) + + def get_models(self): + for key_node in self.state.read_dict_keys(self.state.read_root()): + yield self.state.read_value(key_node) + + def instantiate_model(self, type_model_name: str, name: str): + root = self.state.read_root() + type_model_node = self.state.read_dict(root, type_model_name) + if type_model_node is None: + raise RuntimeError(f"No type model with name {type_model_name} found.") + else: + # check if model is a linguistic type model + scd_node = self.state.read_dict(self.state.read_root(), "SCD") + incoming = self.state.read_incoming(scd_node) + incoming = [self.state.read_edge(e)[0] for e in incoming] + if type_model_node not in incoming: + raise RuntimeError(f"Model with name {type_model_name} is not a type model.") + if name in map(self.state.read_value, self.state.read_dict_keys(root)): + raise RuntimeError(f"Model with name {name} already exists.") + new_model_root = self.state.create_node() + new_model_node = self.state.create_nodevalue(str(new_model_root)) + self.state.create_dict(root, name, new_model_node) + self.state.create_edge(new_model_node, type_model_node) + self.current_model = (name, new_model_root) + if type_model_name not in services: + raise RuntimeError(f"Services for type {type_model_name} not implemented.") + self.current_context = services[type_model_name](self.current_model[1], self.state) + + def select_model(self, name: str): + root = self.state.read_root() + model_node = self.state.read_dict(root, name) + if model_node is None: + raise RuntimeError(f"No model with name {name} found.") + model_root = UUID(self.state.read_value(model_node)) + self.current_model = (name, model_root) + + def close_model(self): + self.current_model = None + self.current_context = None + + def get_types(self): + root = self.state.read_root() + if self.current_model is None: + raise RuntimeError(f"No model currently selected.") + name, model = self.current_model + model_id = self.state.read_dict(root, name) + outgoing = self.state.read_outgoing(model_id) + outgoing = [e for e in outgoing if len(self.state.read_outgoing(e)) == 0] + elements = [self.state.read_edge(e)[1] for e in outgoing] + for e in elements: + incoming = self.state.read_incoming(e) + label_edge, = [e for e in incoming if len(self.state.read_outgoing(e)) == 1] + label_edge, = self.state.read_outgoing(label_edge) + _, label_node = self.state.read_edge(label_edge) + yield self.state.read_value(label_node) + + def select_context(self, name: str): + if name not in self.get_types(): + raise RuntimeError(f"No type {name} that currently selected model conforms to.") + if name not in services: + raise RuntimeError(f"Services for type {name} not implemented.") + self.current_context = services[name](self.current_model[1], self.state) + self.current_context.from_bottom() + + def close_context(self): + self.current_context.to_bottom() + self.current_context = None + + def get_services(self): + if self.current_model is None: + raise RuntimeError(f"No model currently selected.") + if self.current_context is None: + raise RuntimeError(f"No context currently selected.") + yield from [ + getattr(self.current_context, func) + for func in dir(self.current_context) + if callable(getattr(self.current_context, func)) + and not func.startswith("__") + and not func == "from_bottom" + and not func == "to_bottom" + ] + + def check_conformance(self, type_model_name: str, model_name: str): + root = self.state.read_root() + type_model_node = self.state.read_dict(root, type_model_name) + if type_model_node is None: + raise RuntimeError(f"No type model with name {type_model_name} found.") + model_node = self.state.read_dict(root, model_name) + if model_node is None: + raise RuntimeError(f"No model with name {model_node} found.") + types = self.state.read_outgoing(model_node) + types = [self.state.read_edge(e)[1] for e in types] + if type_model_node not in types: + conf = Conformance(self.state, + UUID(self.state.read_value(model_node)), + UUID(self.state.read_value(type_model_node))).check_structural(log=True) + if conf: + self.state.create_edge(model_node, type_model_node) + return conf + else: + return Conformance(self.state, + UUID(self.state.read_value(model_node)), + UUID(self.state.read_value(type_model_node))).check_nominal(log=True) + + def dump_state(self): + import pickle + with open("state.p", "wb") as file: + pickle.dump(self.state, file) + + def load_state(self): + import pickle + with open("state.p", "rb") as file: + self.state = pickle.load(file) + + +if __name__ == '__main__': + from state.devstate import DevState + s = DevState() + m = Manager(s) + m.select_model("SCD") + m.select_context("SCD") + for f in m.get_services(): + print(f) diff --git a/framework/prompt_questions.py b/framework/prompt_questions.py new file mode 100644 index 0000000..d71e1cc --- /dev/null +++ b/framework/prompt_questions.py @@ -0,0 +1,36 @@ +from PyInquirer import Separator + +MODEL_SELECTED = [ + { + 'type': 'list', + 'name': 'op', + 'message': 'Model selected... Which operation would you like to perform?', + 'choices': [ + 'get types', + 'select context', + Separator(), + 'close model' + ], + 'filter': lambda x: x.replace(' ', '_') + } +] + +MODEL_MGMT = [ + { + 'type': 'list', + 'name': 'op', + 'message': 'Which model management operation would you like to perform?', + 'choices': [ + 'get models', + 'select model', + 'instantiate model', + 'check conformance', + Separator(), + 'load state', + 'dump state', + Separator(), + 'exit' + ], + 'filter': lambda x: x.replace(' ', '_') + } +] diff --git a/service/base.py b/service/base.py deleted file mode 100644 index 651607e..0000000 --- a/service/base.py +++ /dev/null @@ -1,7 +0,0 @@ -from abc import ABC, abstractmethod -from uuid import UUID - - -class Service(ABC): - def __init__(self, model: UUID): - self.model = model diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..e36ebb5 --- /dev/null +++ b/services/__init__.py @@ -0,0 +1,9 @@ +from services.scd import SCD +from services.point.cartesian import PointCartesian +from services.point.polar import PointPolar + +implemented = { + "SCD": SCD, + "PointCartesian": PointCartesian, + "PointPolar": PointPolar, +} diff --git a/service/bottom/V0.py b/services/bottom/V0.py similarity index 75% rename from service/bottom/V0.py rename to services/bottom/V0.py index 36a51d8..cfa32eb 100644 --- a/service/bottom/V0.py +++ b/services/bottom/V0.py @@ -1,11 +1,10 @@ -from service.base import Service, UUID +from uuid import UUID from state.base import State from typing import Any, List -class Bottom(Service): - def __init__(self, model: UUID, state: State): - super().__init__(model) +class Bottom: + def __init__(self, state: State): self.state = state def create_node(self, value=None) -> UUID: @@ -15,10 +14,10 @@ class Bottom(Service): return self.state.create_nodevalue(value) def create_edge(self, source: UUID, target: UUID, label=None): - pass - - def read_model_root(self) -> UUID: - return self.model + if label is None: + return self.state.create_edge(source, target) + else: + return self.state.create_dict(source, label, target) def read_value(self, node: UUID) -> Any: return self.state.read_value(node) @@ -65,6 +64,20 @@ class Bottom(Service): edges = [e for e in edges if read_label(e) == label] return edges + def read_incoming_elements(self, target: UUID, label=None) -> List[UUID]: + edges = self.read_incoming_edges(target, label) + if edges is None or len(edges) == 0: + return [] + else: + return [self.read_edge_source(e) for e in edges] + + def read_outgoing_elements(self, source: UUID, label=None) -> List[UUID]: + edges = self.read_outgoing_edges(source, label) + if edges is None or len(edges) == 0: + return [] + else: + return [self.read_edge_target(e) for e in edges] + def read_keys(self, element: UUID) -> List[str]: key_nodes = self.state.read_dict_keys(element) unique_keys = {self.state.read_value(node) for node in key_nodes} diff --git a/services/bottom/__init__.py b/services/bottom/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/pn.py b/services/pn.py new file mode 100644 index 0000000..7086ff9 --- /dev/null +++ b/services/pn.py @@ -0,0 +1,128 @@ +from uuid import UUID +from state.base import State +from services.bottom.V0 import Bottom +from services.primitives.integer_type import Integer +from services.primitives.string_type import String + +import re + + +class PN: + def __init__(self, model: UUID, state: State): + ltm_pn_id = state.read_dict(state.read_root(), "PN") + self.ltm_pn = UUID(state.read_value(ltm_pn_id)) + self.model = model + self.bottom = Bottom(state) + + def create_place(self, name: str, tokens: int): + # instantiate Place class + place_node = self.bottom.create_node() # create place node + self.bottom.create_edge(self.model, place_node, name) # attach to model + morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "P") # retrieve type + self.bottom.create_edge(place_node, morph_node, "Morphism") # create morphism link + # instantiate name attribute + name_model = self.bottom.create_node() + String(name_model, self.bottom.state).create(name) + name_node = self.bottom.create_node(str(name_model)) + self.bottom.create_edge(self.model, name_node, f"{name}.n") + name_link = self.bottom.create_edge(place_node, name_node) + self.bottom.create_edge(self.model, name_link, f"{name}.n_link") + ltm_pn_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "String") + ltm_pn_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "P_n") + self.bottom.create_edge(name_node, ltm_pn_node, "Morphism") + self.bottom.create_edge(name_link, ltm_pn_link, "Morphism") + # instantiate tokens attribute + tokens_model = self.bottom.create_node() + Integer(tokens_model, self.bottom.state).create(tokens) + tokens_node = self.bottom.create_node(str(tokens_model)) + self.bottom.create_edge(self.model, tokens_node, f"{name}.t") + tokens_link = self.bottom.create_edge(place_node, tokens_node) + self.bottom.create_edge(self.model, tokens_link, f"{name}.t_link") + ltm_pn_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "Integer") + ltm_pn_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "P_t") + self.bottom.create_edge(tokens_node, ltm_pn_node, "Morphism") + self.bottom.create_edge(tokens_link, ltm_pn_link, "Morphism") + + def create_transition(self, name: str): + # instantiate Transition class + transition_node = self.bottom.create_node() # create transition node + self.bottom.create_edge(self.model, transition_node, name) # attach to model + morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "T") # retrieve type + self.bottom.create_edge(transition_node, morph_node, "Morphism") # create morphism link + # instantiate name attribute + name_model = self.bottom.create_node() + String(name_model, self.bottom.state).create(name) + name_node = self.bottom.create_node(str(name_model)) + self.bottom.create_edge(self.model, name_node, f"{name}.name") + name_link = self.bottom.create_edge(transition_node, name_node) + self.bottom.create_edge(self.model, name_link, f"{name}.name_link") + ltm_pn_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "String") + ltm_pn_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "T_name") + self.bottom.create_edge(name_node, ltm_pn_node, "Morphism") + self.bottom.create_edge(name_link, ltm_pn_link, "Morphism") + + def create_p2t(self, place: str, transition: str, weight: int): + # create p2t link + morphism links + edge = self.bottom.create_edge( + *self.bottom.read_outgoing_elements(self.model, place), + *self.bottom.read_outgoing_elements(self.model, transition), + ) + self.bottom.create_edge(self.model, edge, f"{place}_to_{transition}") # attach to model + morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "P2T") # retrieve type + self.bottom.create_edge(edge, morph_node, "Morphism") # create morphism link + # weight attribute + weight_model = self.bottom.create_node() + Integer(weight_model, self.bottom.state).create(weight) + weight_node = self.bottom.create_node(str(weight_model)) + self.bottom.create_edge(self.model, weight_node, f"{place}_to_{transition}.weight") + weight_link = self.bottom.create_edge(edge, weight_node) + self.bottom.create_edge(self.model, weight_link, f"{place}_to_{transition}.weight_link") + scd_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "Integer") + scd_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "P2T_weight") + self.bottom.create_edge(weight_node, scd_node, "Morphism") + self.bottom.create_edge(weight_link, scd_link, "Morphism") + + def create_t2p(self, transition: str, place: str, weight: int): + # create t2p link + morphism links + edge = self.bottom.create_edge( + *self.bottom.read_outgoing_elements(self.model, transition), + *self.bottom.read_outgoing_elements(self.model, place), + ) + self.bottom.create_edge(self.model, edge, f"{transition}_to_{place}") # attach to model + morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "T2P") # retrieve type + self.bottom.create_edge(edge, morph_node, "Morphism") # create morphism link + # weight attribute + weight_model = self.bottom.create_node() + Integer(weight_model, self.bottom.state).create(weight) + weight_node = self.bottom.create_node(str(weight_model)) + self.bottom.create_edge(self.model, weight_node, f"{transition}_to_{place}.weight") + weight_link = self.bottom.create_edge(edge, weight_node) + self.bottom.create_edge(self.model, weight_link, f"{transition}_to_{place}.weight_link") + scd_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "Integer") + scd_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "T2P_weight") + self.bottom.create_edge(weight_node, scd_node, "Morphism") + self.bottom.create_edge(weight_link, scd_link, "Morphism") + + def list_elements(self): + pn_names = {} + for key in self.bottom.read_keys(self.ltm_pn): + element, = self.bottom.read_outgoing_elements(self.ltm_pn, key) + pn_names[element] = key + unsorted = [] + for key in self.bottom.read_keys(self.model): + element, = self.bottom.read_outgoing_elements(self.model, key) + element_types = self.bottom.read_outgoing_elements(element, "Morphism") + type_model_elements = self.bottom.read_outgoing_elements(self.ltm_pn) + element_type_node, = [e for e in element_types if e in type_model_elements] + unsorted.append((key, pn_names[element_type_node])) + for elem in sorted(unsorted, key=lambda e: e[0]): + print("{} : {}".format(*elem)) + + def delete_element(self, name: str): + keys = self.bottom.read_keys(self.model) + r = re.compile(r"{}\..*".format(name)) + to_delete = list(filter(r.match, keys)) + for key in to_delete: + # TODO: find way to solve memory leak, primitive models are not deleted this way + node, = self.bottom.read_outgoing_elements(self.model, label=key) + self.bottom.delete_element(node) diff --git a/services/point/__init__.py b/services/point/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/point/cartesian.py b/services/point/cartesian.py new file mode 100644 index 0000000..323de54 --- /dev/null +++ b/services/point/cartesian.py @@ -0,0 +1,83 @@ +from uuid import UUID +from state.base import State +from services.bottom.V0 import Bottom +from services.primitives.float_type import Float + + +class PointCartesian: + def __init__(self, model: UUID, state: State): + type_model_id = state.read_dict(state.read_root(), "PointCartesian") + self.type_model = UUID(state.read_value(type_model_id)) + self.model = model + self.state = state + + self.point = None + + def create_point(self, x: float, y: float): + if self.point is None: + self.point = (x, y) + else: + raise RuntimeError("A PointCartesian model can contain at most 1 point.") + + def read_point(self): + if self.point is None: + raise RuntimeError("No point found in model.") + else: + return f"(X = {self.point[0]}, Y = {self.point[1]})" + + def delete_point(self): + self.point = None + + def apply_movement(self, delta_x: float, delta_y: float): + if self.point is not None: + self.point = (self.point[0] + delta_x, self.point[1] + delta_y) + else: + raise RuntimeError("No point found in model.") + + def to_bottom(self): + bottom = Bottom(self.state) + # clear residual model + for element in bottom.read_outgoing_elements(self.model): + bottom.delete_element(element) + # create primitive models + c1_model = bottom.create_node() + c2_model = bottom.create_node() + Float(c1_model, self.state).create(self.point[0]) + Float(c2_model, self.state).create(self.point[1]) + # instantiate Point class + point_node = bottom.create_node() # create point node + bottom.create_edge(self.model, point_node, "point") # attach to model + morph_node, = bottom.read_outgoing_elements(self.type_model, "PointCartesian") # retrieve type + bottom.create_edge(point_node, morph_node, "Morphism") # create morphism link + # instantiate c1 attribute + c1_node = bottom.create_node(str(c1_model)) + bottom.create_edge(self.model, c1_node, "point.c1") + c1_link = bottom.create_edge(point_node, c1_node) + bottom.create_edge(self.model, c1_link, "point.c1_link") + ltm_point_node, = bottom.read_outgoing_elements(self.type_model, "Float") + ltm_point_link, = bottom.read_outgoing_elements(self.type_model, "PointCartesian_c1") + bottom.create_edge(c1_node, ltm_point_node, "Morphism") + bottom.create_edge(c1_link, ltm_point_link, "Morphism") + # instantiate c2 attribute + c2_node = bottom.create_node(str(c2_model)) + bottom.create_edge(self.model, c2_node, "point.c2") + c2_link = bottom.create_edge(point_node, c2_node) + bottom.create_edge(self.model, c2_link, "point.c2_link") + ltm_point_node, = bottom.read_outgoing_elements(self.type_model, "Float") + ltm_point_link, = bottom.read_outgoing_elements(self.type_model, "PointCartesian_c2") + bottom.create_edge(c2_node, ltm_point_node, "Morphism") + bottom.create_edge(c2_link, ltm_point_link, "Morphism") + + def from_bottom(self): + bottom = Bottom(self.state) + keys = bottom.read_keys(self.model) + x_key, = filter(lambda k: k.endswith(".c1"), keys) + y_key, = filter(lambda k: k.endswith(".c2"), keys) + x_ref_node, = bottom.read_outgoing_elements(self.model, x_key) + y_ref_node, = bottom.read_outgoing_elements(self.model, y_key) + x_model = UUID(bottom.read_value(x_ref_node)) + y_model = UUID(bottom.read_value(y_ref_node)) + x_val_node, = bottom.read_outgoing_elements(x_model) + y_val_node, = bottom.read_outgoing_elements(y_model) + self.point = (bottom.read_value(x_val_node), bottom.read_value(y_val_node)) + diff --git a/services/point/polar.py b/services/point/polar.py new file mode 100644 index 0000000..33bfc08 --- /dev/null +++ b/services/point/polar.py @@ -0,0 +1,90 @@ +from uuid import UUID +from state.base import State +from services.bottom.V0 import Bottom +from services.primitives.float_type import Float + +import math + + +class PointPolar: + def __init__(self, model: UUID, state: State): + type_model_id = state.read_dict(state.read_root(), "PointPolar") + self.type_model = UUID(state.read_value(type_model_id)) + self.model = model + self.state = state + + self.point = None + + def create_point(self, r: float, theta: float): + if self.point is None: + self.point = (r, theta) + else: + raise RuntimeError("A PointPolar model can contain at most 1 point.") + + def read_point(self): + if self.point is None: + raise RuntimeError("No point found in model.") + else: + return f"(r = {self.point[0]}, \u03B8 = {self.point[1]})" + + def delete_point(self): + self.point = None + + def apply_movement(self, delta_r: float, delta_theta: float): + if self.point is not None: + self.point = (self.point[0] + delta_r, self.point[1] + delta_theta) + else: + raise RuntimeError("No point found in model.") + + def to_bottom(self): + x = self.point[0]*math.cos(self.point[1]) # x = r * cos(theta) + y = self.point[0]*math.sin(self.point[1]) # y = r * sin(theta) + bottom = Bottom(self.state) + # clear residual model + for element in bottom.read_outgoing_elements(self.model): + bottom.delete_element(element) + # create primitive models + c1_model = bottom.create_node() + c2_model = bottom.create_node() + Float(c1_model, self.state).create(x) + Float(c2_model, self.state).create(y) + # instantiate Point class + point_node = bottom.create_node() # create point node + bottom.create_edge(self.model, point_node, "point") # attach to model + morph_node, = bottom.read_outgoing_elements(self.type_model, "PointPolar") # retrieve type + bottom.create_edge(point_node, morph_node, "Morphism") # create morphism link + # instantiate c1 attribute + c1_node = bottom.create_node(str(c1_model)) + bottom.create_edge(self.model, c1_node, "point.c1") + c1_link = bottom.create_edge(point_node, c1_node) + bottom.create_edge(self.model, c1_link, "point.c1_link") + ltm_point_node, = bottom.read_outgoing_elements(self.type_model, "Float") + ltm_point_link, = bottom.read_outgoing_elements(self.type_model, "PointPolar_c1") + bottom.create_edge(c1_node, ltm_point_node, "Morphism") + bottom.create_edge(c1_link, ltm_point_link, "Morphism") + # instantiate c2 attribute + c2_node = bottom.create_node(str(c2_model)) + bottom.create_edge(self.model, c2_node, "point.c2") + c2_link = bottom.create_edge(point_node, c2_node) + bottom.create_edge(self.model, c2_link, "point.c2_link") + ltm_point_node, = bottom.read_outgoing_elements(self.type_model, "Float") + ltm_point_link, = bottom.read_outgoing_elements(self.type_model, "PointPolar_c2") + bottom.create_edge(c2_node, ltm_point_node, "Morphism") + bottom.create_edge(c2_link, ltm_point_link, "Morphism") + + def from_bottom(self): + bottom = Bottom(self.state) + keys = bottom.read_keys(self.model) + x_key, = filter(lambda k: k.endswith(".c1"), keys) + y_key, = filter(lambda k: k.endswith(".c2"), keys) + x_ref_node, = bottom.read_outgoing_elements(self.model, x_key) + y_ref_node, = bottom.read_outgoing_elements(self.model, y_key) + x_model = UUID(bottom.read_value(x_ref_node)) + y_model = UUID(bottom.read_value(y_ref_node)) + x_val_node, = bottom.read_outgoing_elements(x_model) + y_val_node, = bottom.read_outgoing_elements(y_model) + x = bottom.read_value(x_val_node) + y = bottom.read_value(y_val_node) + r = math.sqrt(math.pow(x, 2) + math.pow(y, 2)) + theta = math.atan2(y, x) + self.point = (r, theta) diff --git a/services/primitives/__init__.py b/services/primitives/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/primitives/boolean_type.py b/services/primitives/boolean_type.py new file mode 100644 index 0000000..9f157f2 --- /dev/null +++ b/services/primitives/boolean_type.py @@ -0,0 +1,20 @@ +from uuid import UUID +from state.base import State +from services.bottom.V0 import Bottom + + +class Boolean: + def __init__(self, model: UUID, state: State): + self.model = model + self.bottom = Bottom(state) + type_model_id_node, = self.bottom.read_outgoing_elements(state.read_root(), "Boolean") + self.type_model = UUID(self.bottom.read_value(type_model_id_node)) + + def create(self, value: bool): + if "boolean" in self.bottom.read_keys(self.model): + instance, = self.bottom.read_outgoing_elements(self.model, "boolean") + self.bottom.delete_element(instance) + _instance = self.bottom.create_node(value) + self.bottom.create_edge(self.model, _instance, "boolean") + _type, = self.bottom.read_outgoing_elements(self.type_model, "Boolean") + self.bottom.create_edge(_instance, _type, "Morphism") diff --git a/services/primitives/float_type.py b/services/primitives/float_type.py new file mode 100644 index 0000000..ff18bbc --- /dev/null +++ b/services/primitives/float_type.py @@ -0,0 +1,20 @@ +from uuid import UUID +from state.base import State +from services.bottom.V0 import Bottom + + +class Float: + def __init__(self, model: UUID, state: State): + self.model = model + self.bottom = Bottom(state) + type_model_id_node, = self.bottom.read_outgoing_elements(state.read_root(), "Float") + self.type_model = UUID(self.bottom.read_value(type_model_id_node)) + + def create(self, value: float): + if "float" in self.bottom.read_keys(self.model): + instance, = self.bottom.read_outgoing_elements(self.model, "float") + self.bottom.delete_element(instance) + _instance = self.bottom.create_node(value) + self.bottom.create_edge(self.model, _instance, "float") + _type, = self.bottom.read_outgoing_elements(self.type_model, "Float") + self.bottom.create_edge(_instance, _type, "Morphism") diff --git a/services/primitives/integer_type.py b/services/primitives/integer_type.py new file mode 100644 index 0000000..44864bb --- /dev/null +++ b/services/primitives/integer_type.py @@ -0,0 +1,20 @@ +from uuid import UUID +from state.base import State +from services.bottom.V0 import Bottom + + +class Integer: + def __init__(self, model: UUID, state: State): + self.model = model + self.bottom = Bottom(state) + type_model_id_node, = self.bottom.read_outgoing_elements(state.read_root(), "Integer") + self.type_model = UUID(self.bottom.read_value(type_model_id_node)) + + def create(self, value: int): + if "string" in self.bottom.read_keys(self.model): + instance, = self.bottom.read_outgoing_elements(self.model, "integer") + self.bottom.delete_element(instance) + _instance = self.bottom.create_node(value) + self.bottom.create_edge(self.model, _instance, "integer") + _type, = self.bottom.read_outgoing_elements(self.type_model, "Integer") + self.bottom.create_edge(_instance, _type, "Morphism") diff --git a/services/primitives/string_type.py b/services/primitives/string_type.py new file mode 100644 index 0000000..926c1cd --- /dev/null +++ b/services/primitives/string_type.py @@ -0,0 +1,20 @@ +from uuid import UUID +from state.base import State +from services.bottom.V0 import Bottom + + +class String: + def __init__(self, model: UUID, state: State): + self.model = model + self.bottom = Bottom(state) + type_model_id_node, = self.bottom.read_outgoing_elements(state.read_root(), "String") + self.type_model = UUID(self.bottom.read_value(type_model_id_node)) + + def create(self, value: str): + if "string" in self.bottom.read_keys(self.model): + instance, = self.bottom.read_outgoing_elements(self.model, "string") + self.bottom.delete_element(instance) + _instance = self.bottom.create_node(value) + self.bottom.create_edge(self.model, _instance, "string") + _type, = self.bottom.read_outgoing_elements(self.type_model, "String") + self.bottom.create_edge(_instance, _type, "Morphism") diff --git a/services/primitives/type_type.py b/services/primitives/type_type.py new file mode 100644 index 0000000..98ec38b --- /dev/null +++ b/services/primitives/type_type.py @@ -0,0 +1,20 @@ +from uuid import UUID +from state.base import State +from services.bottom.V0 import Bottom + + +class Integer: + def __init__(self, model: UUID, state: State): + self.model = model + self.bottom = Bottom(state) + type_model_id_node, = self.bottom.read_outgoing_elements(state.read_root(), "Type") + self.type_model = UUID(self.bottom.read_value(type_model_id_node)) + + def create(self, value: tuple): + if "string" in self.bottom.read_keys(self.model): + instance, = self.bottom.read_outgoing_elements(self.model, "type") + self.bottom.delete_element(instance) + _instance = self.bottom.create_node(value) + self.bottom.create_edge(self.model, _instance, "type") + _type, = self.bottom.read_outgoing_elements(self.type_model, "Type") + self.bottom.create_edge(_instance, _type, "Morphism") diff --git a/services/scd.py b/services/scd.py new file mode 100644 index 0000000..d78cb88 --- /dev/null +++ b/services/scd.py @@ -0,0 +1,224 @@ +from uuid import UUID +from state.base import State +from services.bottom.V0 import Bottom +from services.primitives.boolean_type import Boolean +from services.primitives.integer_type import Integer +from services.primitives.string_type import String + +import re + + +class SCD: + def __init__(self, model: UUID, state: State): + type_model_id = state.read_dict(state.read_root(), "SCD") + self.scd_model = UUID(state.read_value(type_model_id)) + self.model = model + self.bottom = Bottom(state) + + def create_class(self, name: str, abstract: bool = None, min_c: int = None, max_c: int = None): + + def set_cardinality(bound: str, value: int): + _c_model = self.bottom.create_node() + Integer(_c_model, self.bottom.state).create(value) + _c_node = self.bottom.create_node(str(_c_model)) + self.bottom.create_edge(self.model, _c_node, f"{name}.{bound}_cardinality") + _c_link = self.bottom.create_edge(class_node, _c_node) + self.bottom.create_edge(self.model, _c_link, f"{name}.{bound}_cardinality_link") + _scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Integer") + _scd_link, = self.bottom.read_outgoing_elements(self.scd_model, f"Class_{bound}_cardinality") + self.bottom.create_edge(_c_node, _scd_node, "Morphism") + self.bottom.create_edge(_c_link, _scd_link, "Morphism") + + # create class + attributes + morphism links + class_node = self.bottom.create_node() # create class node + self.bottom.create_edge(self.model, class_node, name) # attach to model + scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Class") # retrieve type + self.bottom.create_edge(class_node, scd_node, "Morphism") # create morphism link + if abstract is not None: + abstract_model = self.bottom.create_node() + Boolean(abstract_model, self.bottom.state).create(abstract) + abstract_node = self.bottom.create_node(str(abstract_model)) + self.bottom.create_edge(self.model, abstract_node, f"{name}.abstract") + abstract_link = self.bottom.create_edge(class_node, abstract_node) + self.bottom.create_edge(self.model, abstract_link, f"{name}.abstract_link") + scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean") + scd_link, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract") + self.bottom.create_edge(abstract_node, scd_node, "Morphism") + self.bottom.create_edge(abstract_link, scd_link, "Morphism") + if min_c is not None: + set_cardinality("lower", min_c) + if max_c is not None: + set_cardinality("upper", min_c) + + def create_association(self, name: str, source: str, target: str, + src_min_c: int = None, src_max_c: int = None, + tgt_min_c: int = None, tgt_max_c: int = None): + + def set_cardinality(bound: str, value: int): + _c_model = self.bottom.create_node() + Integer(_c_model, self.bottom.state).create(value) + _c_node = self.bottom.create_node(str(_c_model)) + self.bottom.create_edge(self.model, _c_node, f"{name}.{bound}_cardinality") + _c_link = self.bottom.create_edge(assoc_edge, _c_node) + self.bottom.create_edge(self.model, _c_link, f"{name}.{bound}_cardinality_link") + _scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Integer") + _scd_link, = self.bottom.read_outgoing_elements(self.scd_model, f"Class_{bound}_cardinality") + self.bottom.create_edge(_c_node, _scd_node, "Morphism") + self.bottom.create_edge(_c_link, _scd_link, "Morphism") + + # create class + attributes + morphism links + assoc_edge = self.bottom.create_edge( + *self.bottom.read_outgoing_elements(self.model, source), + *self.bottom.read_outgoing_elements(self.model, target), + ) # create assoc edge + self.bottom.create_edge(self.model, assoc_edge, name) # attach to model + scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Association") # retrieve type + self.bottom.create_edge(assoc_edge, scd_node, "Morphism") # create morphism link + if src_min_c is not None: + set_cardinality("source_lower", src_min_c) + if src_max_c is not None: + set_cardinality("source_upper", src_max_c) + if tgt_min_c is not None: + set_cardinality("target_lower", tgt_min_c) + if tgt_max_c is not None: + set_cardinality("target_upper", tgt_max_c) + + def create_global_constraint(self, name: str): + # create element + morphism links + element_node = self.bottom.create_node() # create element node + self.bottom.create_edge(self.model, element_node, name) # attach to model + scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "GlobalConstraint") # retrieve type + self.bottom.create_edge(element_node, scd_node, "Morphism") # create morphism link + + def create_attribute(self, name: str): + # create element + morphism links + element_node = self.bottom.create_node() # create element node + self.bottom.create_edge(self.model, element_node, name) # attach to model + scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Attribute") # retrieve type + self.bottom.create_edge(element_node, scd_node, "Morphism") # create morphism link + + def create_attribute_link(self, source: str, target: str, name: str, optional: bool): + # create attribute link + morphism links + assoc_edge = self.bottom.create_edge( + *self.bottom.read_outgoing_elements(self.model, source), + *self.bottom.read_outgoing_elements(self.model, target), + ) # create inheritance edge + self.bottom.create_edge(self.model, assoc_edge, f"{source}_{name}") # attach to model + scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink") # retrieve type + self.bottom.create_edge(assoc_edge, scd_node, "Morphism") # create morphism link + # name attribute + name_model = self.bottom.create_node() + String(name_model, self.bottom.state).create(name) + name_node = self.bottom.create_node(str(name_model)) + self.bottom.create_edge(self.model, name_node, f"{source}_{name}.name") + name_link = self.bottom.create_edge(assoc_edge, name_node) + self.bottom.create_edge(self.model, name_link, f"{source}_{name}.name_link") + scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "String") + scd_link, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink_name") + self.bottom.create_edge(name_node, scd_node, "Morphism") + self.bottom.create_edge(name_link, scd_link, "Morphism") + # optional attribute + optional_model = self.bottom.create_node() + Boolean(optional_model, self.bottom.state).create(optional) + optional_node = self.bottom.create_node(str(optional_model)) + self.bottom.create_edge(self.model, optional_node, f"{source}_{name}.optional") + optional_link = self.bottom.create_edge(assoc_edge, optional_node) + self.bottom.create_edge(self.model, optional_link, f"{source}_{name}.optional_link") + scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean") + scd_link, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink_optional") + self.bottom.create_edge(optional_node, scd_node, "Morphism") + self.bottom.create_edge(optional_link, scd_link, "Morphism") + + def create_model_ref(self, name: str, model: UUID): + # create element + morphism links + element_node = self.bottom.create_node(str(model)) # create element node + self.bottom.create_edge(self.model, element_node, name) # attach to model + scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef") # retrieve type + self.bottom.create_edge(element_node, scd_node, "Morphism") # create morphism link + + def create_inheritance(self, child: str, parent: str): + # create inheritance + morphism links + assoc_edge = self.bottom.create_edge( + *self.bottom.read_outgoing_elements(self.model, child), + *self.bottom.read_outgoing_elements(self.model, parent), + ) # create inheritance edge + self.bottom.create_edge(self.model, assoc_edge, f"{child}_inh_{parent}") # attach to model + scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance") # retrieve type + self.bottom.create_edge(assoc_edge, scd_node, "Morphism") # create morphism link + + def add_constraint(self, element: str, code: str): + element_node, = self.bottom.read_outgoing_elements(self.model, element) # retrieve element + # code attribute + code_node = self.bottom.create_node(code) + self.bottom.create_edge(self.model, code_node, f"{element}.constraint") + code_link = self.bottom.create_edge(element_node, code_node) + self.bottom.create_edge(self.model, code_link, f"{element}.constraint_link") + scd_node, = self.bottom.read_outgoing_elements(self.scd_model, "ActionCode") + scd_link, = self.bottom.read_outgoing_elements(self.scd_model, "Element_constraint") + self.bottom.create_edge(code_node, scd_node, "Morphism") + self.bottom.create_edge(code_link, scd_link, "Morphism") + + def list_elements(self): + scd_names = {} + for key in self.bottom.read_keys(self.scd_model): + element, = self.bottom.read_outgoing_elements(self.scd_model, key) + scd_names[element] = key + unsorted = [] + for key in self.bottom.read_keys(self.model): + element, = self.bottom.read_outgoing_elements(self.model, key) + element_types = self.bottom.read_outgoing_elements(element, "Morphism") + type_model_elements = self.bottom.read_outgoing_elements(self.scd_model) + element_type_node, = [e for e in element_types if e in type_model_elements] + unsorted.append(f"{key} : {scd_names[element_type_node]}") + return sorted(unsorted) + + def delete_element(self, name: str): + keys = self.bottom.read_keys(self.model) + r = re.compile(r"{}\..*".format(name)) + to_delete = list(filter(r.match, keys)) + for key in to_delete: + # TODO: find way to solve memory leak, primitive models are not deleted this way + node, = self.bottom.read_outgoing_elements(self.model, label=key) + self.bottom.delete_element(node) + + def to_bottom(self): + pass + + def from_bottom(self): + pass + + +if __name__ == '__main__': + from state.devstate import DevState as State + s = State() + from bootstrap.scd import bootstrap_scd + scd = bootstrap_scd(s) + # Retrieve refs to primitive type models + # # integer + int_type_id = s.read_dict(s.read_root(), "Integer") + int_type = UUID(s.read_value(int_type_id)) + print(f"Integer Model UUID: {int_type}") # 6 + # # string + str_type_id = s.read_dict(s.read_root(), "String") + str_type = UUID(s.read_value(str_type_id)) + print(f"String Model UUID: {str_type}") # 16 + # Create LTM_PN + model_uuid = s.create_node() + print(f"LTM_PN Model UUID: {model_uuid}") # 845 + service = SCD(model_uuid, s) + # Create classes + service.create_class("P") + service.create_class("T") + # Create associations + service.create_association("P2T", "P", "T") + service.create_association("T2P", "T", "P") + # Create model refs + service.create_model_ref("Integer", int_type) + service.create_model_ref("String", int_type) + # Create class attributes + service.create_attribute_link("P", "Integer", "t", False) + service.create_attribute_link("P", "String", "n", False) + service.create_attribute_link("T", "String", "n", False) + # Create association attributes + service.create_attribute_link("P2T", "Integer", "w", False) + service.create_attribute_link("T2P", "Integer", "w", False) diff --git a/state.p b/state.p new file mode 100644 index 0000000..a7d4337 Binary files /dev/null and b/state.p differ diff --git a/state/devstate.py b/state/devstate.py index 6c7f853..909af84 100644 --- a/state/devstate.py +++ b/state/devstate.py @@ -8,15 +8,13 @@ class DevState(PyState): + node id's are generated sequentially to make writing tests easier """ - free_id = 0 - def __init__(self): + self.free_id = 0 super().__init__() - @staticmethod - def new_id() -> UUID: - DevState.free_id += 1 - return UUID(int=DevState.free_id - 1) + def new_id(self) -> UUID: + self.free_id += 1 + return UUID(int=self.free_id - 1) def dump(self, path: str, png_path: str = None): """Dumps the whole MV graph to a graphviz .dot-file