from services.bottom.V0 import Bottom from services.primitives.actioncode_type import ActionCode from uuid import UUID from state.base import State from typing import Dict, Tuple, Set, Any, List from pprint import pprint import traceback from concrete_syntax.common import indent from util.eval import exec_then_eval from api.cd import CDAPI from api.od import ODAPI, bind_api_readonly import functools def eval_context_decorator(func): """ Used to mark functions that can be called inside the evaluation context. Base functions are mapped into the function, as well as the evaluation context. This happens at runtime so typechecking will not be happy. Important: Using the same name in the evaluation context as the function name will lead to naming conflicts with the function as priority, resulting in missing argument errors. from typing import TYPE_CHECKING if TYPE_CHECKING: from api.od_stub import * ... Use this to make partially fix the typechecking. Optionally, define a stub for your own evaluation context and include it. """ def wrapper(*args, api_context, eval_context, **kwargs): for key, value in api_context.items(): func.__globals__[key] = value for key, value in eval_context.items(): func.__globals__[key] = value return func(*args, **kwargs) return wrapper def render_conformance_check_result(error_list): if len(error_list) == 0: return "CONFORM" else: joined = ''.join(('\n ▸ ' + err for err in error_list)) return f"NOT CONFORM, {len(error_list)} errors:{joined}" class Conformance: # Parameter 'constraint_check_subtypes': whether to check local type-level constraints also on subtypes. def __init__(self, state: State, model: UUID, type_model: UUID, constraint_check_subtypes=True, *, eval_context = None): self.state = state self.bottom = Bottom(state) self.model = model self.type_model = type_model self.constraint_check_subtypes = constraint_check_subtypes # MCL type_model_id = state.read_dict(state.read_root(), "SCD") self.scd_model = UUID(state.read_value(type_model_id)) # Helpers self.cdapi = CDAPI(state, type_model) self.odapi = ODAPI(state, model, type_model) self.type_odapi = ODAPI(state, type_model, self.scd_model) # Pre-computed: self.abstract_types: List[str] = [] self.multiplicities: Dict[str, Tuple] = {} self.source_multiplicities: Dict[str, Tuple] = {} self.target_multiplicities: Dict[str, Tuple] = {} # ? self.structures = {} self.candidates = {} # add user defined functions to constraints self.eval_context = eval_context if eval_context else {} def check_nominal(self, *, log=False): """ Perform a nominal conformance check Args: log: boolean indicating whether to log errors Returns: Boolean indicating whether the check has passed """ errors = [] errors += self.check_typing() errors += self.check_link_typing() errors += self.check_multiplicities() errors += self.check_constraints() return errors # def check_structural(self, *, build_morphisms=True, log=False): # """ # Perform a structural conformance check # Args: # build_morphisms: boolean indicating whether to create morpishm links # log: boolean indicating whether to log errors # Returns: # Boolean indicating whether the check has passed # """ # try: # self.precompute_structures() # self.match_structures() # if build_morphisms: # self.build_morphisms() # self.check_nominal(log=log) # return True # except RuntimeError as e: # if log: # print(e) # return False def precompute_multiplicities(self): """ Creates an internal representation of type multiplicities that is more easily queryable that the state graph """ for clss_name, clss in self.type_odapi.get_all_instances("Class"): abstract = self.type_odapi.get_slot_value_default(clss, "abstract", default=False) if abstract: self.abstract_types.append(clss_name) lc = self.type_odapi.get_slot_value_default(clss, "lower_cardinality", default=0) uc = self.type_odapi.get_slot_value_default(clss, "upper_cardinality", default=float('inf')) if lc or uc: self.multiplicities[clss_name] = (lc, uc) for assoc_name, assoc in self.type_odapi.get_all_instances("Association"): # multiplicities for associations slc = self.type_odapi.get_slot_value_default(assoc, "source_lower_cardinality", default=0) suc = self.type_odapi.get_slot_value_default(assoc, "source_upper_cardinality", default=float('inf')) if slc or suc: self.source_multiplicities[assoc_name] = (slc, suc) tlc = self.type_odapi.get_slot_value_default(assoc, "target_lower_cardinality", default=0) tuc = self.type_odapi.get_slot_value_default(assoc, "target_upper_cardinality", default=float('inf')) if tlc or tuc: self.target_multiplicities[assoc_name] = (tlc, tuc) for attr_name, attr in self.type_odapi.get_all_instances("AttributeLink"): # optional for attribute links opt = self.type_odapi.get_slot_value(attr, "optional") if opt != None: self.source_multiplicities[attr_name] = (0, float('inf')) self.target_multiplicities[attr_name] = (0 if opt else 1, 1) def check_typing(self): """ for each element of model check whether a morphism link exists to some element of type_model """ errors = [] # Recursively do a conformance check for each ModelRef for ref_name, ref in self.type_odapi.get_all_instances("ModelRef"): sub_mm = UUID(self.bottom.read_value(ref)) for ref_inst_name, ref_inst in self.odapi.get_all_instances(ref_name): sub_m = UUID(self.bottom.read_value(ref_inst)) nested_errors = Conformance(self.state, sub_m, sub_mm).check_nominal() errors += [f"In ModelRef ({ref_name}):" + err for err in nested_errors] return errors def check_link_typing(self): """ for each link, check whether its source and target are of a valid type """ errors = [] for tm_name, tm_element in self.type_odapi.get_all_instances("Association") + self.type_odapi.get_all_instances("AttributeLink"): for m_name, m_element in self.odapi.get_all_instances(tm_name): m_source = self.bottom.read_edge_source(m_element) m_target = self.bottom.read_edge_target(m_element) if m_source == None or m_target == None: # element is not a link continue # tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) tm_source = self.bottom.read_edge_source(tm_element) tm_target = self.bottom.read_edge_target(tm_element) # check if source is typed correctly # source_name = self.odapi.m_obj_to_name[m_source] source_type_actual = self.odapi.get_type_name(m_source) source_type_expected = self.odapi.mm_obj_to_name[tm_source] if not self.cdapi.is_subtype(super_type_name=source_type_expected, sub_type_name=source_type_actual): errors.append(f"Invalid source type '{source_type_actual}' for link '{m_name}:{tm_name}'") # check if target is typed correctly # target_name = self.odapi.m_obj_to_name[m_target] target_type_actual = self.odapi.get_type_name(m_target) target_type_expected = self.odapi.mm_obj_to_name[tm_target] if not self.cdapi.is_subtype(super_type_name=source_type_expected, sub_type_name=source_type_actual): errors.append(f"Invalid target type '{target_type_actual}' for link '{m_name}:{tm_name}'") return errors def check_multiplicities(self): """ Check whether multiplicities for all types are respected """ self.precompute_multiplicities() errors = [] for class_name, clss in self.type_odapi.get_all_instances("Class"): # for type_name in self.odapi.mm_obj_to_name.values(): # abstract classes if class_name in self.abstract_types: count = len(self.odapi.get_all_instances(class_name, include_subtypes=False)) if count > 0: errors.append(f"Invalid instantiation of abstract class: '{class_name}'") # class multiplicities if class_name in self.multiplicities: lc, uc = self.multiplicities[class_name] count = len(self.odapi.get_all_instances(class_name, include_subtypes=True)) if count < lc or count > uc: errors.append(f"Cardinality of type exceeds valid multiplicity range: '{class_name}' ({count})") for assoc_name, assoc in self.type_odapi.get_all_instances("Association") + self.type_odapi.get_all_instances("AttributeLink"): # association/attribute source multiplicities if assoc_name in self.source_multiplicities: # type is an association assoc, = self.bottom.read_outgoing_elements(self.type_model, assoc_name) tgt_type_obj = self.bottom.read_edge_target(assoc) tgt_type_name = self.odapi.mm_obj_to_name[tgt_type_obj] lc, uc = self.source_multiplicities[assoc_name] for obj_name, obj in self.odapi.get_all_instances(tgt_type_name, include_subtypes=True): # obj's type has this incoming association -> now we will count the number of links typed by it count = len(self.odapi.get_incoming(obj, assoc_name, include_subtypes=True)) if count < lc or count > uc: errors.append(f"Source cardinality of type '{assoc_name}' ({count}) out of bounds ({lc}..{uc}) in '{obj_name}'.") # association/attribute target multiplicities if assoc_name in self.target_multiplicities: # type is an association type_obj, = self.bottom.read_outgoing_elements(self.type_model, assoc_name) src_type_obj = self.bottom.read_edge_source(type_obj) src_type_name = self.odapi.mm_obj_to_name[src_type_obj] lc, uc = self.target_multiplicities[assoc_name] for obj_name, obj in self.odapi.get_all_instances(src_type_name, include_subtypes=True): # obj's type has this outgoing association -> now we will count the number of links typed by it count = len(self.odapi.get_outgoing(obj, assoc_name, include_subtypes=True)) if count < lc or count > uc: errors.append(f"Target cardinality of type '{assoc_name}' ({count}) out of bounds ({lc}..{uc}) in '{obj_name}'.") return errors def check_constraints(self): """ Check whether all constraints defined for a model are respected """ errors = [] def get_code(tm_name): constraints = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}.constraint") if len(constraints) == 1: constraint = constraints[0] code = ActionCode(UUID(self.bottom.read_value(constraint)), self.bottom.state).read() return code def check_result(result, description): if result == None: return # OK if isinstance(result, str): errors.append(f"{description} not satisfied. Reason: {result}") elif isinstance(result, bool): if not result: errors.append(f"{description} not satisfied.") elif isinstance(result, list): if len(result) > 0: reasons = indent('\n'.join(result), 4) errors.append(f"{description} not satisfied. Reasons:\n{reasons}") else: raise Exception(f"{description} evaluation result should be boolean or string! Instead got {result}") # local constraints _api_context = bind_api_readonly(self.odapi) _global_binds = {**_api_context} _eval_context = {**self.eval_context} for key, code in _eval_context.items(): _f = functools.partial(code, **{"api_context" :_api_context, "eval_context":_eval_context}) _global_binds[key] = _f _eval_context[key] = _f for type_name in self.bottom.read_keys(self.type_model): code = get_code(type_name) if code != None: instances = self.odapi.get_all_instances(type_name, include_subtypes=self.constraint_check_subtypes) for obj_name, obj_id in instances: description = f"Local constraint of \"{type_name}\" in \"{obj_name}\"" # print(description) try: result = exec_then_eval(code, _globals=_global_binds, _locals={'this': obj_id}) # may raise check_result(result, description) except: errors.append(f"Runtime error during evaluation of {description}:\n{indent(traceback.format_exc(), 6)}") # global constraints glob_constraints = [] # find global constraints... glob_constraint_type, = self.bottom.read_outgoing_elements(self.scd_model, "GlobalConstraint") for tm_name in self.bottom.read_keys(self.type_model): tm_node, = self.bottom.read_outgoing_elements(self.type_model, tm_name) # print(key, node) for type_of_node in self.bottom.read_outgoing_elements(tm_node, "Morphism"): if type_of_node == glob_constraint_type: # node is GlobalConstraint glob_constraints.append(tm_name) # evaluate them (each constraint once) for tm_name in glob_constraints: code = get_code(tm_name) if code != None: description = f"Global constraint \"{tm_name}\"" try: result = exec_then_eval(code, _globals=_global_binds) # may raise check_result(result, description) except: errors.append(f"Runtime error during evaluation of {description}:\n{indent(traceback.format_exc(), 6)}") return errors def precompute_structures(self): """ Make an internal representation of type structures such that comparing type structures is easier """ scd_elements = self.bottom.read_outgoing_elements(self.scd_model) # collect types class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class") association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association") for tm_element, tm_name in self.odapi.mm_obj_to_name.items(): # retrieve elements that tm_element is a morphism of morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") morphism, = [m for m in morphisms if m in scd_elements] # check if tm_element is a morphism of AttributeLink if class_element == morphism or association_element == morphism: self.structures[tm_name] = set() # collect type structures # retrieve AttributeLink to check whether element is a morphism of AttributeLink attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink") for tm_element, tm_name in self.odapi.mm_obj_to_name.items(): # retrieve elements that tm_element is a morphism of morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") morphism, = [m for m in morphisms if m in scd_elements] # check if tm_element is a morphism of AttributeLink if attr_link_element == morphism: # retrieve attributes of attribute link, i.e. 'name' and 'optional' attrs = self.bottom.read_outgoing_elements(tm_element) name_model_node, = filter(lambda x: self.odapi.m_obj_to_name.get(x, "").endswith(".name"), attrs) opt_model_node, = filter(lambda x: self.odapi.m_obj_to_name.get(x, "").endswith(".optional"), attrs) # get attr name value name_model = UUID(self.bottom.read_value(name_model_node)) name_node, = self.bottom.read_outgoing_elements(name_model) name = self.bottom.read_value(name_node) # get attr opt value opt_model = UUID(self.bottom.read_value(opt_model_node)) opt_node, = self.bottom.read_outgoing_elements(opt_model) opt = self.bottom.read_value(opt_node) # get attr type name source_type_node = self.bottom.read_edge_source(tm_element) source_type_name = self.odapi.mm_obj_to_name[source_type_node] target_type_node = self.bottom.read_edge_target(tm_element) target_type_name = self.odapi.mm_obj_to_name[target_type_node] # add attribute to the structure of its source type # attribute is stored as a (name, optional, type) triple self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name)) # extend structures of sub types with attrs of super types for super_type, sub_types in self.odapi.transitive_sub_types.items(): # JE: I made an untested change here! Can't test because structural conformance checking is broken. # for super_type, sub_types in self.sub_types.items(): for sub_type in sub_types: if sub_type != super_type: self.structures.setdefault(sub_type, set()).update(self.structures[super_type]) # filter out abstract types, as they cannot be instantiated # retrieve Class_abstract to check whether element is a morphism of Class_abstract class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract") for tm_element, tm_name in self.odapi.mm_obj_to_name.items(): # retrieve elements that tm_element is a morphism of morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") morphism, = [m for m in morphisms if m in scd_elements] # check if tm_element is a morphism of Class_abstract if class_abs_element == morphism: # retrieve 'abstract' attribute value target_node = self.bottom.read_edge_target(tm_element) abst_model = UUID(self.bottom.read_value(target_node)) abst_node, = self.bottom.read_outgoing_elements(abst_model) is_abstract = self.bottom.read_value(abst_node) # retrieve type name source_node = self.bottom.read_edge_source(tm_element) type_name = self.odapi.mm_obj_to_name[source_node] if is_abstract: self.structures.pop(type_name) def match_structures(self): """ Try to match the structure of each element in the instance model to some element in the type model """ ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef") # matching for m_element, m_name in self.odapi.m_obj_to_name.items(): is_edge = self.bottom.read_edge_source(m_element) != None print('element:', m_element, 'name:', m_name, 'is_edge', is_edge) for type_name, structure in self.structures.items(): tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name) type_is_edge = self.bottom.read_edge_source(tm_element) != None if is_edge == type_is_edge: print(' type_name:', type_name, 'type_is_edge:', type_is_edge, "structure:", structure) mismatch = False matched = 0 for name, optional, attr_type in structure: print(' name:', name, "optional:", optional, "attr_type:", attr_type) try: attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}") attr_tm, = self.bottom.read_outgoing_elements(self.type_model, attr_type) # if attribute is a modelref, we need to check whether it # linguistically conforms to the specified type # if its an internally defined attribute, this will be checked by constraints morphisms = self.bottom.read_outgoing_elements(attr_tm, "Morphism") attr_conforms = True if ref_element in morphisms: # check conformance of reference model type_model_uuid = UUID(self.bottom.read_value(attr_tm)) model_uuid = UUID(self.bottom.read_value(attr)) attr_conforms = Conformance(self.state, model_uuid, type_model_uuid)\ .check_nominal() else: # eval constraints code = self.read_attribute(attr_tm, "constraint") if code != None: attr_conforms = self.evaluate_constraint(code, this=attr) if attr_conforms: matched += 1 print(" attr_conforms -> matched:", matched) except ValueError as e: # attr not found or failed parsing UUID if optional: print(" skipping:", e) continue else: # did not match mandatory attribute print(" breaking:", e) mismatch = True break print(' matched:', matched, 'len(structure):', len(structure)) # if matched == len(structure): if not mismatch: print(' add to candidates:', m_name, type_name) self.candidates.setdefault(m_name, set()).add(type_name) # filter out candidates for links based on source and target types for m_element, m_name in self.odapi.m_obj_to_name.items(): is_edge = self.bottom.read_edge_source(m_element) != None if is_edge and m_name in self.candidates: m_source = self.bottom.read_edge_source(m_element) m_target = self.bottom.read_edge_target(m_element) print(self.candidates) source_candidates = self.candidates[self.odapi.m_obj_to_name[m_source]] target_candidates = self.candidates[self.odapi.m_obj_to_name[m_target]] remove = set() for candidate_name in self.candidates[m_name]: candidate_element, = self.bottom.read_outgoing_elements(self.type_model, candidate_name) candidate_source = self.odapi.mm_obj_to_name[self.bottom.read_edge_source(candidate_element)] if candidate_source not in source_candidates: if len(source_candidates.intersection(set(self.odapi.transitive_sub_types[candidate_source]))) == 0: # if len(source_candidates.intersection(set(self.sub_types[candidate_source]))) == 0: remove.add(candidate_name) candidate_target = self.odapi.mm_obj_to_name[self.bottom.read_edge_target(candidate_element)] if candidate_target not in target_candidates: if len(target_candidates.intersection(set(self.odapi.transitive_sub_types[candidate_target]))) == 0: # if len(target_candidates.intersection(set(self.sub_types[candidate_target]))) == 0: remove.add(candidate_name) self.candidates[m_name] = self.candidates[m_name].difference(remove) def build_morphisms(self): """ Build the morphisms between an instance and a type model that structurally match """ if not all([len(c) == 1 for c in self.candidates.values()]): raise RuntimeError("Cannot build incomplete or ambiguous morphism.") mapping = {k: v.pop() for k, v in self.candidates.items()} for m_name, tm_name in mapping.items(): # morphism to class/assoc m_element, = self.bottom.read_outgoing_elements(self.model, m_name) tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) self.bottom.create_edge(m_element, tm_element, "Morphism") # morphism for attributes and attribute links structure = self.structures[tm_name] for attr_name, _, attr_type in structure: try: # attribute node attr_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}") attr_type_element, = self.bottom.read_outgoing_elements(self.type_model, attr_type) self.bottom.create_edge(attr_element, attr_type_element, "Morphism") # attribute link attr_link_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}_{attr_name}") attr_link_type_element, = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}_{attr_name}") self.bottom.create_edge(attr_link_element, attr_link_type_element, "Morphism") except ValueError: pass if __name__ == '__main__': from state.devstate import DevState as State s = State() from bootstrap.scd import bootstrap_scd scd = bootstrap_scd(s) from bootstrap.pn import bootstrap_pn ltm_pn = bootstrap_pn(s, "PN") ltm_pn_lola = bootstrap_pn(s, "PNlola") from services.pn import PN my_pn = s.create_node() PNserv = PN(my_pn, s) PNserv.create_place("p1", 5) PNserv.create_place("p2", 0) PNserv.create_transition("t1") PNserv.create_p2t("p1", "t1", 1) PNserv.create_t2p("t1", "p2", 1) cf = Conformance(s, my_pn, ltm_pn_lola) # cf = Conformance(s, scd, ltm_pn, scd) cf.precompute_structures() cf.match_structures() cf.build_morphisms() print(cf.check_nominal())