Park structural conformance for now, finish constraints for nominal

This commit is contained in:
Andrei Bondarenko 2021-08-24 17:32:21 +02:00
parent 3c1d1fa002
commit 6ce8a4ef5c
4 changed files with 285 additions and 35 deletions

40
bootstrap/pn.py Normal file
View file

@ -0,0 +1,40 @@
from services.scd import SCD
from uuid import UUID
from state.base import State
def bootstrap_pn(state: State, model_name: str) -> UUID:
# Retrieve scd model
scd_id = state.read_dict(state.read_root(), "SCD")
scd = UUID(state.read_value(scd_id))
# Retrieve refs to primitive type models
# # integer
int_type_id = state.read_dict(state.read_root(), "Integer")
int_type = UUID(state.read_value(int_type_id))
# # string
str_type_id = state.read_dict(state.read_root(), "String")
str_type = UUID(state.read_value(str_type_id))
# Create LTM_PN
model_uuid = state.create_node()
mcl_root_id = state.create_nodevalue(str(model_uuid))
state.create_dict(state.read_root(), model_name, mcl_root_id)
service = SCD(scd, model_uuid, state)
# Create classes
service.create_class("P")
service.create_class("T")
# Create associations
service.create_association("P2T", "P", "T")
service.create_association("T2P", "T", "P")
# Create model refs
service.create_model_ref("Integer", int_type)
service.create_model_ref("String", str_type)
# Create class attributes
service.create_attribute_link("P", "Integer", "t", False)
service.create_attribute_link("P", "String", "n", False)
service.create_attribute_link("T", "String", "n", False)
# Create association attributes
service.create_attribute_link("P2T", "Integer", "w", False)
service.create_attribute_link("T2P", "Integer", "w", False)
# Create test constraint
service.add_constraint("P", "print(element)\nreturn True")
return model_uuid

View file

@ -74,6 +74,8 @@ def bootstrap_scd(state: State) -> UUID:
# # INHERITANCES, i.e. elements typed by Inheritance
# # Class inherits from Element
add_edge_element("class_inh_element", class_node, element_node)
# # GlobalConstraint inherits from Element
add_edge_element("gc_inh_element", glob_constr_node, element_node)
# # Attribute inherits from Element
add_edge_element("attr_inh_element", attr_node, element_node)
# # Association inherits from Element
@ -186,6 +188,7 @@ def bootstrap_scd(state: State) -> UUID:
add_mcl_morphism("AttributeLink", "Association")
# Inheritance
add_mcl_morphism("class_inh_element", "Inheritance")
add_mcl_morphism("gc_inh_element", "Inheritance")
add_mcl_morphism("attr_inh_element", "Inheritance")
add_mcl_morphism("assoc_inh_element", "Inheritance")
add_mcl_morphism("attr_link_inh_element", "Inheritance")

View file

@ -31,6 +31,9 @@ class Conformance:
self.multiplicities: Dict[str, Tuple] = {}
self.source_multiplicities: Dict[str, Tuple] = {}
self.target_multiplicities: Dict[str, Tuple] = {}
self.structures = {}
self.matches = {}
self.candidates = {}
def check_nominal(self):
steps = [
@ -311,38 +314,103 @@ class Conformance:
print(code)
return True
def precompute_structures(self):
self.precompute_sub_types()
scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
# collect types
class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
for tm_element, tm_name in self.type_model_names.items():
# retrieve elements that tm_element is a morphism of
morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
morphism, = [m for m in morphisms if m in scd_elements]
# check if tm_element is a morphism of AttributeLink
if class_element == morphism or association_element == morphism:
self.structures[tm_name] = set()
# collect type structures
# retrieve AttributeLink to check whether element is a morphism of AttributeLink
attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
for tm_element, tm_name in self.type_model_names.items():
# retrieve elements that tm_element is a morphism of
morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
morphism, = [m for m in morphisms if m in scd_elements]
# check if tm_element is a morphism of AttributeLink
if attr_link_element == morphism:
# retrieve attributes of attribute link, i.e. 'name' and 'optional'
attrs = self.bottom.read_outgoing_elements(tm_element)
name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs)
opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs)
# get attr name value
name_model = UUID(self.bottom.read_value(name_model_node))
name_node, = self.bottom.read_outgoing_elements(name_model)
name = self.bottom.read_value(name_node)
# get attr opt value
opt_model = UUID(self.bottom.read_value(opt_model_node))
opt_node, = self.bottom.read_outgoing_elements(opt_model)
opt = self.bottom.read_value(opt_node)
# get attr type name
source_type_node = self.bottom.read_edge_source(tm_element)
source_type_name = self.type_model_names[source_type_node]
target_type_node = self.bottom.read_edge_target(tm_element)
target_type_name = self.type_model_names[target_type_node]
# add attribute to the structure of its source type
# attribute is stored as a (name, optional, type) triple
self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name))
# extend structures of sub types with attrs of super types
for super_type, sub_types in self.sub_types.items():
for sub_type in sub_types:
self.structures.setdefault(sub_type, set()).update(self.structures[super_type])
# filter out abstract types, as they cannot be instantiated
# retrieve Class_abstract to check whether element is a morphism of Class_abstract
class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
for tm_element, tm_name in self.type_model_names.items():
# retrieve elements that tm_element is a morphism of
morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
morphism, = [m for m in morphisms if m in scd_elements]
# check if tm_element is a morphism of Class_abstract
if class_abs_element == morphism:
# retrieve 'abstract' attribute value
target_node = self.bottom.read_edge_target(tm_element)
abst_model = UUID(self.bottom.read_value(target_node))
abst_node, = self.bottom.read_outgoing_elements(abst_model)
is_abstract = self.bottom.read_value(abst_node)
# retrieve type name
source_node = self.bottom.read_edge_source(tm_element)
type_name = self.type_model_names[source_node]
if is_abstract:
self.structures.pop(type_name)
def __create_pn(state: State):
from services.scd import SCD
# Retrieve refs to primitive type models
# # integer
int_type_id = state.read_dict(state.read_root(), "Integer")
int_type = UUID(state.read_value(int_type_id))
# # string
str_type_id = state.read_dict(state.read_root(), "String")
str_type = UUID(state.read_value(str_type_id))
# Create LTM_PN
model_uuid = state.create_node()
service = SCD(scd, model_uuid, state)
# Create classes
service.create_class("P")
service.create_class("T")
# Create associations
service.create_association("P2T", "P", "T")
service.create_association("T2P", "T", "P")
# Create model refs
service.create_model_ref("Integer", int_type)
service.create_model_ref("String", int_type)
# Create class attributes
service.create_attribute_link("P", "Integer", "t", False)
service.create_attribute_link("P", "String", "n", False)
service.create_attribute_link("T", "String", "n", False)
# Create association attributes
service.create_attribute_link("P2T", "Integer", "w", False)
service.create_attribute_link("T2P", "Integer", "w", False)
# Create test constraint
service.add_constraint("P", "print(element)\nreturn True")
return model_uuid
def match_structures(self):
ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
# match nodes
for m_element, m_name in self.model_names.items():
self.candidates[m_name] = set()
is_edge = self.bottom.read_edge_source(m_element) is not None
for type_name, structure in self.structures.items():
tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name)
type_is_edge = self.bottom.read_edge_source(tm_element) is not None
if is_edge == type_is_edge:
matched = []
for name, optional, attr_type in structure:
try:
attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}")
# if attribute is a modelref, we need to check whether it
# linguistically conforms to the specified type
# if its an internlly defined attribute, this will be checked by constraints later
morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
if ref_element in morphisms:
pass
except ValueError:
if optional:
continue
else:
break
if len(matched) == len(structure):
self.candidates[m_name].add(type_name)
def build_morphisms(self):
pass
if __name__ == '__main__':
@ -350,9 +418,21 @@ if __name__ == '__main__':
s = State()
from bootstrap.scd import bootstrap_scd
scd = bootstrap_scd(s)
pn = __create_pn(s)
# cf = Conformance(s, scd, scd, scd)
# cf.check_nominal()
cf = Conformance(s, scd, pn, scd)
cf.check_nominal()
from bootstrap.pn import bootstrap_pn
ltm_pn = bootstrap_pn(s, "PN")
from services.pn import PN
my_pn = s.create_node()
PNserv = PN(ltm_pn, my_pn, s)
PNserv.create_place("p1", 5)
PNserv.create_place("p2", 0)
PNserv.create_transition("t1")
PNserv.create_p2t("p1", "t1", 1)
PNserv.create_p2t("t1", "p2", 1)
cf = Conformance(s, scd, my_pn, ltm_pn)
# cf = Conformance(s, scd, ltm_pn, scd)
# cf.check_nominal()
cf.precompute_structures()
cf.match_structures()

127
services/pn.py Normal file
View file

@ -0,0 +1,127 @@
from uuid import UUID
from state.base import State
from services.bottom.V0 import Bottom
from services.primitives.integer_type import Integer
from services.primitives.string_type import String
import re
class PN:
def __init__(self, ltm_pn: UUID, model: UUID, state: State):
self.ltm_pn = ltm_pn
self.model = model
self.bottom = Bottom(state)
def create_place(self, name: str, tokens: int):
# instantiate Place class
place_node = self.bottom.create_node() # create place node
self.bottom.create_edge(self.model, place_node, name) # attach to model
morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "P") # retrieve type
self.bottom.create_edge(place_node, morph_node, "Morphism") # create morphism link
# instantiate name attribute
name_model = self.bottom.create_node()
String(name_model, self.bottom.state).create(name)
name_node = self.bottom.create_node(str(name_model))
self.bottom.create_edge(self.model, name_node, f"{name}.n")
name_link = self.bottom.create_edge(place_node, name_node)
self.bottom.create_edge(self.model, name_link, f"{name}.n_link")
ltm_pn_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "String")
ltm_pn_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "P_n")
self.bottom.create_edge(name_node, ltm_pn_node, "Morphism")
self.bottom.create_edge(name_link, ltm_pn_link, "Morphism")
# instantiate tokens attribute
tokens_model = self.bottom.create_node()
Integer(tokens_model, self.bottom.state).create(tokens)
tokens_node = self.bottom.create_node(str(tokens_model))
self.bottom.create_edge(self.model, tokens_node, f"{name}.t")
tokens_link = self.bottom.create_edge(place_node, tokens_node)
self.bottom.create_edge(self.model, tokens_link, f"{name}.t_link")
ltm_pn_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "Integer")
ltm_pn_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "P_t")
self.bottom.create_edge(tokens_node, ltm_pn_node, "Morphism")
self.bottom.create_edge(tokens_link, ltm_pn_link, "Morphism")
def create_transition(self, name: str):
# instantiate Transition class
transition_node = self.bottom.create_node() # create transition node
self.bottom.create_edge(self.model, transition_node, name) # attach to model
morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "T") # retrieve type
self.bottom.create_edge(transition_node, morph_node, "Morphism") # create morphism link
# instantiate name attribute
name_model = self.bottom.create_node()
String(name_model, self.bottom.state).create(name)
name_node = self.bottom.create_node(str(name_model))
self.bottom.create_edge(self.model, name_node, f"{name}.n")
name_link = self.bottom.create_edge(transition_node, name_node)
self.bottom.create_edge(self.model, name_link, f"{name}.n_link")
ltm_pn_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "String")
ltm_pn_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "T_n")
self.bottom.create_edge(name_node, ltm_pn_node, "Morphism")
self.bottom.create_edge(name_link, ltm_pn_link, "Morphism")
def create_p2t(self, place: str, transition: str, weight: int):
# create p2t link + morphism links
edge = self.bottom.create_edge(
*self.bottom.read_outgoing_elements(self.model, place),
*self.bottom.read_outgoing_elements(self.model, transition),
)
self.bottom.create_edge(self.model, edge, f"{place}_to_{transition}") # attach to model
morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "P2T") # retrieve type
self.bottom.create_edge(edge, morph_node, "Morphism") # create morphism link
# weight attribute
weight_model = self.bottom.create_node()
Integer(weight_model, self.bottom.state).create(weight)
weight_node = self.bottom.create_node(str(weight_model))
self.bottom.create_edge(self.model, weight_node, f"{place}_to_{transition}.w")
weight_link = self.bottom.create_edge(edge, weight_node)
self.bottom.create_edge(self.model, weight_link, f"{place}_to_{transition}.w_link")
scd_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "Integer")
scd_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "P2T_w")
self.bottom.create_edge(weight_node, scd_node, "Morphism")
self.bottom.create_edge(weight_link, scd_link, "Morphism")
def create_t2p(self, transition: str, place: str, weight: int):
# create t2p link + morphism links
edge = self.bottom.create_edge(
*self.bottom.read_outgoing_elements(self.model, transition),
*self.bottom.read_outgoing_elements(self.model, place),
)
self.bottom.create_edge(self.model, edge, f"{transition}_to_{place}") # attach to model
morph_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "T2P") # retrieve type
self.bottom.create_edge(edge, morph_node, "Morphism") # create morphism link
# weight attribute
weight_model = self.bottom.create_node()
Integer(weight_model, self.bottom.state).create(weight)
weight_node = self.bottom.create_node(str(weight_model))
self.bottom.create_edge(self.model, weight_node, f"{transition}_to_{place}.w")
weight_link = self.bottom.create_edge(edge, weight_node)
self.bottom.create_edge(self.model, weight_link, f"{transition}_to_{place}.w_link")
scd_node, = self.bottom.read_outgoing_elements(self.ltm_pn, "Integer")
scd_link, = self.bottom.read_outgoing_elements(self.ltm_pn, "T2P_w")
self.bottom.create_edge(weight_node, scd_node, "Morphism")
self.bottom.create_edge(weight_link, scd_link, "Morphism")
def list_elements(self):
pn_names = {}
for key in self.bottom.read_keys(self.ltm_pn):
element, = self.bottom.read_outgoing_elements(self.ltm_pn, key)
pn_names[element] = key
unsorted = []
for key in self.bottom.read_keys(self.model):
element, = self.bottom.read_outgoing_elements(self.model, key)
element_types = self.bottom.read_outgoing_elements(element, "Morphism")
type_model_elements = self.bottom.read_outgoing_elements(self.ltm_pn)
element_type_node, = [e for e in element_types if e in type_model_elements]
unsorted.append((key, pn_names[element_type_node]))
for elem in sorted(unsorted, key=lambda e: e[0]):
print("{} : {}".format(*elem))
def delete_element(self, name: str):
keys = self.bottom.read_keys(self.model)
r = re.compile(r"{}\..*".format(name))
to_delete = list(filter(r.match, keys))
for key in to_delete:
# TODO: find way to solve memory leak, primitive models are not deleted this way
node, = self.bottom.read_outgoing_elements(self.model, label=key)
self.bottom.delete_element(node)