Conformance checker relies much more on ODAPI rather than duplicating its logic + Fix error in bootstrap of primitive types

This commit is contained in:
Joeri Exelmans 2024-11-05 16:54:00 +01:00
parent c92d5be284
commit a26ceef10f
5 changed files with 187 additions and 268 deletions

View file

@ -32,36 +32,33 @@ def render_conformance_check_result(error_list):
class Conformance:
# Parameter 'constraint_check_subtypes': whether to check local type-level constraints also on subtypes.
def __init__(self, state: State, model: UUID, type_model: UUID, constraint_check_subtypes=True):
self.state = state
self.bottom = Bottom(state)
type_model_id = state.read_dict(state.read_root(), "SCD")
self.scd_model = UUID(state.read_value(type_model_id))
self.model = model
self.type_model = type_model
self.constraint_check_subtypes = constraint_check_subtypes # for a class-level constraint, also check the constraint on the subtypes of that class? In other words, are constraints inherited.
self.type_mapping: Dict[str, str] = {}
self.model_names = {
# map model elements to their names to prevent iterating too much
self.bottom.read_outgoing_elements(self.model, e)[0]: e
for e in self.bottom.read_keys(self.model)
}
self.type_model_names = {
# map type model elements to their names to prevent iterating too much
self.bottom.read_outgoing_elements(self.type_model, e)[0]
: e for e in self.bottom.read_keys(self.type_model)
}
self.primitive_values: Dict[UUID, Any] = {}
self.constraint_check_subtypes = constraint_check_subtypes
# MCL
type_model_id = state.read_dict(state.read_root(), "SCD")
self.scd_model = UUID(state.read_value(type_model_id))
# Helpers
self.cdapi = CDAPI(state, type_model)
self.odapi = ODAPI(state, model, type_model)
self.type_odapi = ODAPI(state, type_model, self.scd_model)
# Pre-computed:
self.abstract_types: List[str] = []
self.multiplicities: Dict[str, Tuple] = {}
self.source_multiplicities: Dict[str, Tuple] = {}
self.target_multiplicities: Dict[str, Tuple] = {}
# ?
self.structures = {}
self.matches = {}
self.candidates = {}
self.cdapi = CDAPI(state, type_model)
self.odapi = ODAPI(state, model, type_model)
def check_nominal(self, *, log=False):
"""
@ -103,120 +100,38 @@ class Conformance:
# print(e)
# return False
def read_attribute(self, element: UUID, attr_name: str):
"""
Read an attribute value attached to an element
Args:
element: UUID of the element
attr_name: name of the attribute to read
Returns:
The value of hte attribute, if no attribute with given name is found, returns None
"""
if element in self.type_model_names:
# type model element
element_name = self.type_model_names[element]
model = self.type_model
else:
# model element
element_name = self.model_names[element]
model = self.model
try:
attr_elem, = self.bottom.read_outgoing_elements(model, f"{element_name}.{attr_name}")
return self.primitive_values.get(attr_elem, self.bottom.read_value(UUID(self.bottom.read_value(attr_elem))))
except ValueError:
return None
def deref_primitive_values(self):
"""
Prefetch the values stored in referenced primitive type models
"""
ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
t_deref = []
t_refs = []
for tm_element, tm_name in self.type_model_names.items():
morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
if ref_element in morphisms:
t_refs.append(self.type_model_names[tm_element])
elif string_element in morphisms:
t_deref.append(tm_element)
elif boolean_element in morphisms:
t_deref.append(tm_element)
elif integer_element in morphisms:
t_deref.append(tm_element)
for elem in t_deref:
primitive_model = UUID(self.bottom.read_value(elem))
primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
primitive_value = self.bottom.read_value(primitive_value_node)
self.primitive_values[elem] = primitive_value
for m_name, tm_name in self.type_mapping.items():
if tm_name in t_refs:
# dereference
m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
primitive_model = UUID(self.bottom.read_value(m_element))
try:
primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
primitive_value = self.bottom.read_value(primitive_value_node)
self.primitive_values[m_element] = primitive_value
except ValueError:
pass # multiple elements in model indicate that we're not dealing with a primitive
def precompute_multiplicities(self):
"""
Creates an internal representation of type multiplicities that is
more easily queryable that the state graph
"""
for tm_element, tm_name in self.type_model_names.items():
# class abstract flags and multiplicities
abstract = self.read_attribute(tm_element, "abstract")
lc = self.read_attribute(tm_element, "lower_cardinality")
uc = self.read_attribute(tm_element, "upper_cardinality")
for clss_name, clss in self.type_odapi.get_all_instances("Class"):
abstract = self.type_odapi.get_slot_value_default(clss, "abstract", default=False)
if abstract:
self.abstract_types.append(tm_name)
if lc or uc:
mult = (
lc if lc != None else float("-inf"),
uc if uc != None else float("inf")
)
self.multiplicities[tm_name] = mult
# multiplicities for associations
slc = self.read_attribute(tm_element, "source_lower_cardinality")
suc = self.read_attribute(tm_element, "source_upper_cardinality")
if slc or suc:
mult = (
# slc if slc != None else float("-inf"),
slc if slc != None else 0,
suc if suc != None else float("inf")
)
self.source_multiplicities[tm_name] = mult
tlc = self.read_attribute(tm_element, "target_lower_cardinality")
tuc = self.read_attribute(tm_element, "target_upper_cardinality")
if tlc or tuc:
mult = (
# tlc if tlc != None else float("-inf"),
tlc if tlc != None else 0,
tuc if tuc != None else float("inf")
)
self.target_multiplicities[tm_name] = mult
# optional for attribute links
opt = self.read_attribute(tm_element, "optional")
if opt != None:
self.source_multiplicities[tm_name] = (0, float('inf'))
self.target_multiplicities[tm_name] = (0 if opt else 1, 1)
self.abstract_types.append(clss_name)
def get_type(self, element: UUID):
"""
Retrieve the type of an element (wrt. current type model)
"""
morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
return tm_element
lc = self.type_odapi.get_slot_value_default(clss, "lower_cardinality", default=0)
uc = self.type_odapi.get_slot_value_default(clss, "upper_cardinality", default=float('inf'))
if lc or uc:
self.multiplicities[clss_name] = (lc, uc)
for assoc_name, assoc in self.type_odapi.get_all_instances("Association"):
# multiplicities for associations
slc = self.type_odapi.get_slot_value_default(assoc, "source_lower_cardinality", default=0)
suc = self.type_odapi.get_slot_value_default(assoc, "source_upper_cardinality", default=float('inf'))
if slc or suc:
self.source_multiplicities[assoc_name] = (slc, suc)
tlc = self.type_odapi.get_slot_value_default(assoc, "target_lower_cardinality", default=0)
tuc = self.type_odapi.get_slot_value_default(assoc, "target_upper_cardinality", default=float('inf'))
if tlc or tuc:
self.target_multiplicities[assoc_name] = (tlc, tuc)
for attr_name, attr in self.type_odapi.get_all_instances("AttributeLink"):
# optional for attribute links
opt = self.type_odapi.get_slot_value(attr, "optional")
if opt != None:
self.source_multiplicities[attr_name] = (0, float('inf'))
self.target_multiplicities[attr_name] = (0 if opt else 1, 1)
def check_typing(self):
"""
@ -224,24 +139,15 @@ class Conformance:
link exists to some element of type_model
"""
errors = []
ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
model_names = self.bottom.read_keys(self.model)
for m_name in model_names:
m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
try:
tm_element = self.get_type(m_element)
tm_name = self.type_model_names[tm_element]
self.type_mapping[m_name] = tm_name
if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
sub_m = UUID(self.bottom.read_value(m_element))
sub_tm = UUID(self.bottom.read_value(tm_element))
nested_errors = Conformance(self.state, sub_m, sub_tm).check_nominal()
errors += [f"In ModelRef ({m_name}):" + err for err in nested_errors]
except ValueError as e:
import traceback
traceback.format_exc(e)
# no or too many morphism links found
errors.append(f"Incorrectly typed element: '{m_name}'")
# Recursively do a conformance check for each ModelRef
for ref_name, ref in self.type_odapi.get_all_instances("ModelRef"):
sub_mm = UUID(self.bottom.read_value(ref))
for ref_inst_name, ref_inst in self.odapi.get_all_instances(ref_name):
sub_m = UUID(self.bottom.read_value(ref_inst))
nested_errors = Conformance(self.state, sub_m, sub_mm).check_nominal()
errors += [f"In ModelRef ({m_name}):" + err for err in nested_errors]
return errors
def check_link_typing(self):
@ -249,97 +155,76 @@ class Conformance:
for each link, check whether its source and target are of a valid type
"""
errors = []
for m_name, tm_name in self.type_mapping.items():
m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
m_source = self.bottom.read_edge_source(m_element)
m_target = self.bottom.read_edge_target(m_element)
if m_source == None or m_target == None:
# element is not a link
continue
tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
tm_source = self.bottom.read_edge_source(tm_element)
tm_target = self.bottom.read_edge_target(tm_element)
# check if source is typed correctly
source_name = self.model_names[m_source]
source_type_actual = self.type_mapping[source_name]
source_type_expected = self.type_model_names[tm_source]
if not self.cdapi.is_subtype(super_type_name=source_type_expected, sub_type_name=source_type_actual):
errors.append(f"Invalid source type '{source_type_actual}' for element '{m_name}'")
# check if target is typed correctly
target_name = self.model_names[m_target]
target_type_actual = self.type_mapping[target_name]
target_type_expected = self.type_model_names[tm_target]
if not self.cdapi.is_subtype(super_type_name=source_type_expected, sub_type_name=source_type_actual):
errors.append(f"Invalid target type '{target_type_actual}' for element '{m_name}'")
for tm_name, tm_element in self.type_odapi.get_all_instances("Association") + self.type_odapi.get_all_instances("AttributeLink"):
for m_name, m_element in self.odapi.get_all_instances(tm_name):
m_source = self.bottom.read_edge_source(m_element)
m_target = self.bottom.read_edge_target(m_element)
if m_source == None or m_target == None:
# element is not a link
continue
# tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
tm_source = self.bottom.read_edge_source(tm_element)
tm_target = self.bottom.read_edge_target(tm_element)
# check if source is typed correctly
# source_name = self.odapi.m_obj_to_name[m_source]
source_type_actual = self.odapi.get_type_name(m_source)
source_type_expected = self.odapi.mm_obj_to_name[tm_source]
if not self.cdapi.is_subtype(super_type_name=source_type_expected, sub_type_name=source_type_actual):
errors.append(f"Invalid source type '{source_type_actual}' for link '{m_name}:{tm_name}'")
# check if target is typed correctly
# target_name = self.odapi.m_obj_to_name[m_target]
target_type_actual = self.odapi.get_type_name(m_target)
target_type_expected = self.odapi.mm_obj_to_name[tm_target]
if not self.cdapi.is_subtype(super_type_name=source_type_expected, sub_type_name=source_type_actual):
errors.append(f"Invalid target type '{target_type_actual}' for link '{m_name}:{tm_name}'")
return errors
def check_multiplicities(self):
"""
Check whether multiplicities for all types are respected
"""
self.deref_primitive_values()
self.precompute_multiplicities()
errors = []
for type_name in self.type_model_names.values():
for class_name, clss in self.type_odapi.get_all_instances("Class"):
# for type_name in self.odapi.mm_obj_to_name.values():
# abstract classes
if type_name in self.abstract_types:
count = list(self.type_mapping.values()).count(type_name)
if class_name in self.abstract_types:
count = len(self.odapi.get_all_instances(class_name, include_subtypes=False))
if count > 0:
errors.append(f"Invalid instantiation of abstract class: '{type_name}'")
errors.append(f"Invalid instantiation of abstract class: '{class_name}'")
# class multiplicities
if type_name in self.multiplicities:
lc, uc = self.multiplicities[type_name]
count = 0
for sub_type in self.cdapi.transitive_sub_types[type_name]:
count += list(self.type_mapping.values()).count(sub_type)
if class_name in self.multiplicities:
lc, uc = self.multiplicities[class_name]
count = len(self.odapi.get_all_instances(class_name, include_subtypes=True))
if count < lc or count > uc:
errors.append(f"Cardinality of type exceeds valid multiplicity range: '{type_name}' ({count})")
errors.append(f"Cardinality of type exceeds valid multiplicity range: '{class_name}' ({count})")
for assoc_name, assoc in self.type_odapi.get_all_instances("Association") + self.type_odapi.get_all_instances("AttributeLink"):
# association/attribute source multiplicities
if type_name in self.source_multiplicities:
if assoc_name in self.source_multiplicities:
# type is an association
type_obj, = self.bottom.read_outgoing_elements(self.type_model, type_name)
tgt_type_obj = self.bottom.read_edge_target(type_obj)
tgt_type_name = self.type_model_names[tgt_type_obj]
lc, uc = self.source_multiplicities[type_name]
for obj_name, obj_type_name in self.type_mapping.items():
if self.cdapi.is_subtype(super_type_name=tgt_type_name, sub_type_name=obj_type_name):
# obj's type has this incoming association -> now we will count the number of links typed by it
count = 0
obj, = self.bottom.read_outgoing_elements(self.model, obj_name)
incoming = self.bottom.read_incoming_edges(obj)
for i in incoming:
try:
type_of_incoming_link = self.type_mapping[self.model_names[i]]
if self.cdapi.is_subtype(super_type_name=type_name, sub_type_name=type_of_incoming_link):
count += 1
except KeyError:
pass # for elements not part of model, e.g. morphism links
if count < lc or count > uc:
errors.append(f"Source cardinality of type '{type_name}' ({count}) out of bounds ({lc}..{uc}) in '{obj_name}'.")
assoc, = self.bottom.read_outgoing_elements(self.type_model, assoc_name)
tgt_type_obj = self.bottom.read_edge_target(assoc)
tgt_type_name = self.odapi.mm_obj_to_name[tgt_type_obj]
lc, uc = self.source_multiplicities[assoc_name]
for obj_name, obj in self.odapi.get_all_instances(tgt_type_name, include_subtypes=True):
# obj's type has this incoming association -> now we will count the number of links typed by it
count = len(self.odapi.get_incoming(obj, assoc_name, include_subtypes=True))
if count < lc or count > uc:
errors.append(f"Source cardinality of type '{assoc_name}' ({count}) out of bounds ({lc}..{uc}) in '{obj_name}'.")
# association/attribute target multiplicities
if type_name in self.target_multiplicities:
if assoc_name in self.target_multiplicities:
# type is an association
type_obj, = self.bottom.read_outgoing_elements(self.type_model, type_name)
type_obj, = self.bottom.read_outgoing_elements(self.type_model, assoc_name)
src_type_obj = self.bottom.read_edge_source(type_obj)
src_type_name = self.type_model_names[src_type_obj]
lc, uc = self.target_multiplicities[type_name]
for obj_name, obj_type_name in self.type_mapping.items():
if self.cdapi.is_subtype(super_type_name=src_type_name, sub_type_name=obj_type_name):
# obj's type has this outgoing association -> now we will count the number of links typed by it
count = 0
obj, = self.bottom.read_outgoing_elements(self.model, obj_name)
outgoing = self.bottom.read_outgoing_edges(obj)
for o in outgoing:
try:
type_of_outgoing_link = self.type_mapping[self.model_names[o]]
if self.cdapi.is_subtype(super_type_name=type_name, sub_type_name=type_of_outgoing_link):
count += 1
except KeyError:
pass # for elements not part of model, e.g. morphism links
if count < lc or count > uc:
errors.append(f"Target cardinality of type '{type_name}' ({count}) out of bounds ({lc}..{uc}) in '{obj_name}'.")
src_type_name = self.odapi.mm_obj_to_name[src_type_obj]
lc, uc = self.target_multiplicities[assoc_name]
for obj_name, obj in self.odapi.get_all_instances(src_type_name, include_subtypes=True):
# obj's type has this outgoing association -> now we will count the number of links typed by it
count = len(self.odapi.get_outgoing(obj, assoc_name, include_subtypes=True))
if count < lc or count > uc:
errors.append(f"Target cardinality of type '{assoc_name}' ({count}) out of bounds ({lc}..{uc}) in '{obj_name}'.")
return errors
def evaluate_constraint(self, code, **kwargs):
@ -447,7 +332,7 @@ class Conformance:
# collect types
class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
for tm_element, tm_name in self.type_model_names.items():
for tm_element, tm_name in self.odapi.mm_obj_to_name.items():
# retrieve elements that tm_element is a morphism of
morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
morphism, = [m for m in morphisms if m in scd_elements]
@ -457,7 +342,7 @@ class Conformance:
# collect type structures
# retrieve AttributeLink to check whether element is a morphism of AttributeLink
attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
for tm_element, tm_name in self.type_model_names.items():
for tm_element, tm_name in self.odapi.mm_obj_to_name.items():
# retrieve elements that tm_element is a morphism of
morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
morphism, = [m for m in morphisms if m in scd_elements]
@ -465,8 +350,8 @@ class Conformance:
if attr_link_element == morphism:
# retrieve attributes of attribute link, i.e. 'name' and 'optional'
attrs = self.bottom.read_outgoing_elements(tm_element)
name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs)
opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs)
name_model_node, = filter(lambda x: self.odapi.m_obj_to_name.get(x, "").endswith(".name"), attrs)
opt_model_node, = filter(lambda x: self.odapi.m_obj_to_name.get(x, "").endswith(".optional"), attrs)
# get attr name value
name_model = UUID(self.bottom.read_value(name_model_node))
name_node, = self.bottom.read_outgoing_elements(name_model)
@ -477,9 +362,9 @@ class Conformance:
opt = self.bottom.read_value(opt_node)
# get attr type name
source_type_node = self.bottom.read_edge_source(tm_element)
source_type_name = self.type_model_names[source_type_node]
source_type_name = self.odapi.mm_obj_to_name[source_type_node]
target_type_node = self.bottom.read_edge_target(tm_element)
target_type_name = self.type_model_names[target_type_node]
target_type_name = self.odapi.mm_obj_to_name[target_type_node]
# add attribute to the structure of its source type
# attribute is stored as a (name, optional, type) triple
self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name))
@ -493,7 +378,7 @@ class Conformance:
# filter out abstract types, as they cannot be instantiated
# retrieve Class_abstract to check whether element is a morphism of Class_abstract
class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
for tm_element, tm_name in self.type_model_names.items():
for tm_element, tm_name in self.odapi.mm_obj_to_name.items():
# retrieve elements that tm_element is a morphism of
morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
morphism, = [m for m in morphisms if m in scd_elements]
@ -506,7 +391,7 @@ class Conformance:
is_abstract = self.bottom.read_value(abst_node)
# retrieve type name
source_node = self.bottom.read_edge_source(tm_element)
type_name = self.type_model_names[source_node]
type_name = self.odapi.mm_obj_to_name[source_node]
if is_abstract:
self.structures.pop(type_name)
@ -516,7 +401,7 @@ class Conformance:
"""
ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
# matching
for m_element, m_name in self.model_names.items():
for m_element, m_name in self.odapi.m_obj_to_name.items():
is_edge = self.bottom.read_edge_source(m_element) != None
print('element:', m_element, 'name:', m_name, 'is_edge', is_edge)
for type_name, structure in self.structures.items():
@ -567,23 +452,23 @@ class Conformance:
print(' add to candidates:', m_name, type_name)
self.candidates.setdefault(m_name, set()).add(type_name)
# filter out candidates for links based on source and target types
for m_element, m_name in self.model_names.items():
for m_element, m_name in self.odapi.m_obj_to_name.items():
is_edge = self.bottom.read_edge_source(m_element) != None
if is_edge and m_name in self.candidates:
m_source = self.bottom.read_edge_source(m_element)
m_target = self.bottom.read_edge_target(m_element)
print(self.candidates)
source_candidates = self.candidates[self.model_names[m_source]]
target_candidates = self.candidates[self.model_names[m_target]]
source_candidates = self.candidates[self.odapi.m_obj_to_name[m_source]]
target_candidates = self.candidates[self.odapi.m_obj_to_name[m_target]]
remove = set()
for candidate_name in self.candidates[m_name]:
candidate_element, = self.bottom.read_outgoing_elements(self.type_model, candidate_name)
candidate_source = self.type_model_names[self.bottom.read_edge_source(candidate_element)]
candidate_source = self.odapi.mm_obj_to_name[self.bottom.read_edge_source(candidate_element)]
if candidate_source not in source_candidates:
if len(source_candidates.intersection(set(self.odapi.transitive_sub_types[candidate_source]))) == 0:
# if len(source_candidates.intersection(set(self.sub_types[candidate_source]))) == 0:
remove.add(candidate_name)
candidate_target = self.type_model_names[self.bottom.read_edge_target(candidate_element)]
candidate_target = self.odapi.mm_obj_to_name[self.bottom.read_edge_target(candidate_element)]
if candidate_target not in target_candidates:
if len(target_candidates.intersection(set(self.odapi.transitive_sub_types[candidate_target]))) == 0:
# if len(target_candidates.intersection(set(self.sub_types[candidate_target]))) == 0: