More docs
This commit is contained in:
parent
7ffba9a295
commit
811e7b1eb1
3 changed files with 149 additions and 0 deletions
|
|
@ -37,6 +37,15 @@ class Conformance:
|
||||||
self.candidates = {}
|
self.candidates = {}
|
||||||
|
|
||||||
def check_nominal(self, *, log=False):
|
def check_nominal(self, *, log=False):
|
||||||
|
"""
|
||||||
|
Perform a nominal conformance check
|
||||||
|
|
||||||
|
Args:
|
||||||
|
log: boolean indicating whether to log errors
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Boolean indicating whether the check has passed
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
self.check_typing()
|
self.check_typing()
|
||||||
self.check_link_typing()
|
self.check_link_typing()
|
||||||
|
|
@ -49,6 +58,16 @@ class Conformance:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def check_structural(self, *, build_morphisms=True, log=False):
|
def check_structural(self, *, build_morphisms=True, log=False):
|
||||||
|
"""
|
||||||
|
Perform a structural conformance check
|
||||||
|
|
||||||
|
Args:
|
||||||
|
build_morphisms: boolean indicating whether to create morpishm links
|
||||||
|
log: boolean indicating whether to log errors
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Boolean indicating whether the check has passed
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
self.precompute_structures()
|
self.precompute_structures()
|
||||||
self.match_structures()
|
self.match_structures()
|
||||||
|
|
@ -62,7 +81,16 @@ class Conformance:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def read_attribute(self, element: UUID, attr_name: str):
|
def read_attribute(self, element: UUID, attr_name: str):
|
||||||
|
"""
|
||||||
|
Read an attribute value attached to an element
|
||||||
|
|
||||||
|
Args:
|
||||||
|
element: UUID of the element
|
||||||
|
attr_name: name of the attribute to read
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The value of hte attribute, if no attribute with given name is found, returns None
|
||||||
|
"""
|
||||||
if element in self.type_model_names:
|
if element in self.type_model_names:
|
||||||
# type model element
|
# type model element
|
||||||
element_name = self.type_model_names[element]
|
element_name = self.type_model_names[element]
|
||||||
|
|
@ -78,13 +106,20 @@ class Conformance:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def precompute_sub_types(self):
|
def precompute_sub_types(self):
|
||||||
|
"""
|
||||||
|
Creates an internal representation of sub-type hierarchies that is
|
||||||
|
more easily queryable that the state graph
|
||||||
|
"""
|
||||||
|
# collect inheritance link instances
|
||||||
inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
|
inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
|
||||||
inh_links = []
|
inh_links = []
|
||||||
for tm_element, tm_name in self.type_model_names.items():
|
for tm_element, tm_name in self.type_model_names.items():
|
||||||
morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
|
morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
|
||||||
if inh_element in morphisms:
|
if inh_element in morphisms:
|
||||||
|
# we have an instance of an inheritance link
|
||||||
inh_links.append(tm_element)
|
inh_links.append(tm_element)
|
||||||
|
|
||||||
|
# for each inheritance link we add the parent and child to the sub types map
|
||||||
for link in inh_links:
|
for link in inh_links:
|
||||||
tm_source = self.bottom.read_edge_source(link)
|
tm_source = self.bottom.read_edge_source(link)
|
||||||
tm_target = self.bottom.read_edge_target(link)
|
tm_target = self.bottom.read_edge_target(link)
|
||||||
|
|
@ -92,6 +127,7 @@ class Conformance:
|
||||||
child_name = self.type_model_names[tm_source]
|
child_name = self.type_model_names[tm_source]
|
||||||
self.sub_types[parent_name].add(child_name)
|
self.sub_types[parent_name].add(child_name)
|
||||||
|
|
||||||
|
# iteratively expand the sub type hierarchies in the sub types map
|
||||||
stop = False
|
stop = False
|
||||||
while not stop:
|
while not stop:
|
||||||
stop = True
|
stop = True
|
||||||
|
|
@ -104,6 +140,9 @@ class Conformance:
|
||||||
stop = False
|
stop = False
|
||||||
|
|
||||||
def deref_primitive_values(self):
|
def deref_primitive_values(self):
|
||||||
|
"""
|
||||||
|
Prefetch the values stored in referenced primitive type models
|
||||||
|
"""
|
||||||
ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
|
ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
|
||||||
string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
|
string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
|
||||||
boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
|
boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
|
||||||
|
|
@ -140,6 +179,10 @@ class Conformance:
|
||||||
pass # multiple elements in model indicate that we're not dealing with a primitive
|
pass # multiple elements in model indicate that we're not dealing with a primitive
|
||||||
|
|
||||||
def precompute_multiplicities(self):
|
def precompute_multiplicities(self):
|
||||||
|
"""
|
||||||
|
Creates an internal representation of type multiplicities that is
|
||||||
|
more easily queryable that the state graph
|
||||||
|
"""
|
||||||
for tm_element, tm_name in self.type_model_names.items():
|
for tm_element, tm_name in self.type_model_names.items():
|
||||||
# class abstract flags and multiplicities
|
# class abstract flags and multiplicities
|
||||||
abstract = self.read_attribute(tm_element, "abstract")
|
abstract = self.read_attribute(tm_element, "abstract")
|
||||||
|
|
@ -177,6 +220,9 @@ class Conformance:
|
||||||
self.target_multiplicities[tm_name] = (0, 1)
|
self.target_multiplicities[tm_name] = (0, 1)
|
||||||
|
|
||||||
def get_type(self, element: UUID):
|
def get_type(self, element: UUID):
|
||||||
|
"""
|
||||||
|
Retrieve the type of an element (wrt. current type model)
|
||||||
|
"""
|
||||||
morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
|
morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
|
||||||
tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
|
tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
|
||||||
return tm_element
|
return tm_element
|
||||||
|
|
@ -205,6 +251,9 @@ class Conformance:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def check_link_typing(self):
|
def check_link_typing(self):
|
||||||
|
"""
|
||||||
|
for each link, check whether its source and target are of a valid type
|
||||||
|
"""
|
||||||
self.precompute_sub_types()
|
self.precompute_sub_types()
|
||||||
for m_name, tm_name in self.type_mapping.items():
|
for m_name, tm_name in self.type_mapping.items():
|
||||||
m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
|
m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
|
||||||
|
|
@ -233,6 +282,9 @@ class Conformance:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def check_multiplicities(self):
|
def check_multiplicities(self):
|
||||||
|
"""
|
||||||
|
Check whether multiplicities for all types are respected
|
||||||
|
"""
|
||||||
self.deref_primitive_values()
|
self.deref_primitive_values()
|
||||||
self.precompute_multiplicities()
|
self.precompute_multiplicities()
|
||||||
for tm_name in self.type_model_names.values():
|
for tm_name in self.type_model_names.values():
|
||||||
|
|
@ -292,6 +344,9 @@ class Conformance:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def evaluate_constraint(self, code, **kwargs):
|
def evaluate_constraint(self, code, **kwargs):
|
||||||
|
"""
|
||||||
|
Evaluate constraint code (Python code)
|
||||||
|
"""
|
||||||
funcs = {
|
funcs = {
|
||||||
'read_value': self.state.read_value
|
'read_value': self.state.read_value
|
||||||
}
|
}
|
||||||
|
|
@ -304,6 +359,9 @@ class Conformance:
|
||||||
)
|
)
|
||||||
|
|
||||||
def check_constraints(self):
|
def check_constraints(self):
|
||||||
|
"""
|
||||||
|
Check whether all constraints defined for a model are respected
|
||||||
|
"""
|
||||||
# local constraints
|
# local constraints
|
||||||
for m_name, tm_name in self.type_mapping.items():
|
for m_name, tm_name in self.type_mapping.items():
|
||||||
if tm_name != "GlobalConstraint":
|
if tm_name != "GlobalConstraint":
|
||||||
|
|
@ -327,6 +385,9 @@ class Conformance:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def precompute_structures(self):
|
def precompute_structures(self):
|
||||||
|
"""
|
||||||
|
Make an internal representation of type structures such that comparing type structures is easier
|
||||||
|
"""
|
||||||
self.precompute_sub_types()
|
self.precompute_sub_types()
|
||||||
scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
|
scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
|
||||||
# collect types
|
# collect types
|
||||||
|
|
@ -393,6 +454,9 @@ class Conformance:
|
||||||
self.structures.pop(type_name)
|
self.structures.pop(type_name)
|
||||||
|
|
||||||
def match_structures(self):
|
def match_structures(self):
|
||||||
|
"""
|
||||||
|
Try to match the structure of each element in the instance model to some element in the type model
|
||||||
|
"""
|
||||||
ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
|
ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
|
||||||
# matching
|
# matching
|
||||||
for m_element, m_name in self.model_names.items():
|
for m_element, m_name in self.model_names.items():
|
||||||
|
|
@ -454,6 +518,9 @@ class Conformance:
|
||||||
self.candidates[m_name] = self.candidates[m_name].difference(remove)
|
self.candidates[m_name] = self.candidates[m_name].difference(remove)
|
||||||
|
|
||||||
def build_morphisms(self):
|
def build_morphisms(self):
|
||||||
|
"""
|
||||||
|
Build the morphisms between an instance and a type model that structurally match
|
||||||
|
"""
|
||||||
if not all([len(c) == 1 for c in self.candidates.values()]):
|
if not all([len(c) == 1 for c in self.candidates.values()]):
|
||||||
raise RuntimeError("Cannot build incomplete or ambiguous morphism.")
|
raise RuntimeError("Cannot build incomplete or ambiguous morphism.")
|
||||||
mapping = {k: v.pop() for k, v in self.candidates.items()}
|
mapping = {k: v.pop() for k, v in self.candidates.items()}
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,9 @@ from ast import literal_eval
|
||||||
|
|
||||||
|
|
||||||
def generate_context_question(ctx_type, services):
|
def generate_context_question(ctx_type, services):
|
||||||
|
"""
|
||||||
|
Converts service names to human readable form
|
||||||
|
"""
|
||||||
choices = [
|
choices = [
|
||||||
s.__name__.replace('_', ' ') for s in services
|
s.__name__.replace('_', ' ') for s in services
|
||||||
]
|
]
|
||||||
|
|
@ -33,9 +36,11 @@ def main():
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if man.current_model is not None and man.current_context is None:
|
if man.current_model is not None and man.current_context is None:
|
||||||
|
# we have selected a model, so we display typing questions
|
||||||
answer = prompt(questions.MODEL_SELECTED)
|
answer = prompt(questions.MODEL_SELECTED)
|
||||||
ctx = man
|
ctx = man
|
||||||
elif man.current_model is not None and man.current_context is not None:
|
elif man.current_model is not None and man.current_context is not None:
|
||||||
|
# we have selected both a model and a context, so we display available services
|
||||||
qs = generate_context_question(type(man.current_context), man.get_services())
|
qs = generate_context_question(type(man.current_context), man.get_services())
|
||||||
answer = prompt(qs)
|
answer = prompt(qs)
|
||||||
if answer['op'] == 'close_context':
|
if answer['op'] == 'close_context':
|
||||||
|
|
|
||||||
|
|
@ -17,10 +17,26 @@ class Manager:
|
||||||
self.state.create_edge(model_node, scd_node)
|
self.state.create_edge(model_node, scd_node)
|
||||||
|
|
||||||
def get_models(self):
|
def get_models(self):
|
||||||
|
"""
|
||||||
|
Retrieves all existing models
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Names of exising models
|
||||||
|
"""
|
||||||
for key_node in self.state.read_dict_keys(self.state.read_root()):
|
for key_node in self.state.read_dict_keys(self.state.read_root()):
|
||||||
yield self.state.read_value(key_node)
|
yield self.state.read_value(key_node)
|
||||||
|
|
||||||
def instantiate_model(self, type_model_name: str, name: str):
|
def instantiate_model(self, type_model_name: str, name: str):
|
||||||
|
"""
|
||||||
|
Retrieves all existing models
|
||||||
|
|
||||||
|
Args:
|
||||||
|
type_model_name: name of the type model we want to instantiate
|
||||||
|
name: name of the instance model to be created
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Nothing
|
||||||
|
"""
|
||||||
root = self.state.read_root()
|
root = self.state.read_root()
|
||||||
type_model_node = self.state.read_dict(root, type_model_name)
|
type_model_node = self.state.read_dict(root, type_model_name)
|
||||||
if type_model_node is None:
|
if type_model_node is None:
|
||||||
|
|
@ -44,6 +60,15 @@ class Manager:
|
||||||
self.current_context = services[type_model_name](self.current_model[1], self.state)
|
self.current_context = services[type_model_name](self.current_model[1], self.state)
|
||||||
|
|
||||||
def select_model(self, name: str):
|
def select_model(self, name: str):
|
||||||
|
"""
|
||||||
|
Select a model to interact with
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: name of the model we want to interact with
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Nothing
|
||||||
|
"""
|
||||||
root = self.state.read_root()
|
root = self.state.read_root()
|
||||||
model_node = self.state.read_dict(root, name)
|
model_node = self.state.read_dict(root, name)
|
||||||
if model_node is None:
|
if model_node is None:
|
||||||
|
|
@ -52,10 +77,22 @@ class Manager:
|
||||||
self.current_model = (name, model_root)
|
self.current_model = (name, model_root)
|
||||||
|
|
||||||
def close_model(self):
|
def close_model(self):
|
||||||
|
"""
|
||||||
|
Clear the currently selected model
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Nothing
|
||||||
|
"""
|
||||||
self.current_model = None
|
self.current_model = None
|
||||||
self.current_context = None
|
self.current_context = None
|
||||||
|
|
||||||
def get_types(self):
|
def get_types(self):
|
||||||
|
"""
|
||||||
|
Retrieve the types of the currently selected model
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Names of the model's types
|
||||||
|
"""
|
||||||
root = self.state.read_root()
|
root = self.state.read_root()
|
||||||
if self.current_model is None:
|
if self.current_model is None:
|
||||||
raise RuntimeError(f"No model currently selected.")
|
raise RuntimeError(f"No model currently selected.")
|
||||||
|
|
@ -72,6 +109,15 @@ class Manager:
|
||||||
yield self.state.read_value(label_node)
|
yield self.state.read_value(label_node)
|
||||||
|
|
||||||
def select_context(self, name: str):
|
def select_context(self, name: str):
|
||||||
|
"""
|
||||||
|
Select a type to set as the current context
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: name of the type/context
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Nothing
|
||||||
|
"""
|
||||||
if name not in self.get_types():
|
if name not in self.get_types():
|
||||||
raise RuntimeError(f"No type {name} that currently selected model conforms to.")
|
raise RuntimeError(f"No type {name} that currently selected model conforms to.")
|
||||||
if name not in services:
|
if name not in services:
|
||||||
|
|
@ -80,10 +126,22 @@ class Manager:
|
||||||
self.current_context.from_bottom()
|
self.current_context.from_bottom()
|
||||||
|
|
||||||
def close_context(self):
|
def close_context(self):
|
||||||
|
"""
|
||||||
|
Exit the current (type) context
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Nothing
|
||||||
|
"""
|
||||||
self.current_context.to_bottom()
|
self.current_context.to_bottom()
|
||||||
self.current_context = None
|
self.current_context = None
|
||||||
|
|
||||||
def get_services(self):
|
def get_services(self):
|
||||||
|
"""
|
||||||
|
Retrieve the services available in the current context
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Functions exposed by the current context's implementation
|
||||||
|
"""
|
||||||
if self.current_model is None:
|
if self.current_model is None:
|
||||||
raise RuntimeError(f"No model currently selected.")
|
raise RuntimeError(f"No model currently selected.")
|
||||||
if self.current_context is None:
|
if self.current_context is None:
|
||||||
|
|
@ -98,6 +156,19 @@ class Manager:
|
||||||
]
|
]
|
||||||
|
|
||||||
def check_conformance(self, type_model_name: str, model_name: str):
|
def check_conformance(self, type_model_name: str, model_name: str):
|
||||||
|
"""
|
||||||
|
If there are existing morphisms between the model and type model
|
||||||
|
check nominal conformance
|
||||||
|
Else
|
||||||
|
find conformance using structural conformance check
|
||||||
|
|
||||||
|
Args:
|
||||||
|
type_model_name: name of the type model to check conformance against
|
||||||
|
model_name: name of the instance model
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Boolean indicating whether conformance was found
|
||||||
|
"""
|
||||||
root = self.state.read_root()
|
root = self.state.read_root()
|
||||||
type_model_node = self.state.read_dict(root, type_model_name)
|
type_model_node = self.state.read_dict(root, type_model_name)
|
||||||
if type_model_node is None:
|
if type_model_node is None:
|
||||||
|
|
@ -120,11 +191,17 @@ class Manager:
|
||||||
UUID(self.state.read_value(type_model_node))).check_nominal(log=True)
|
UUID(self.state.read_value(type_model_node))).check_nominal(log=True)
|
||||||
|
|
||||||
def dump_state(self):
|
def dump_state(self):
|
||||||
|
"""
|
||||||
|
Dumps the current state of the Modelverse to a pickle file
|
||||||
|
"""
|
||||||
import pickle
|
import pickle
|
||||||
with open("state.p", "wb") as file:
|
with open("state.p", "wb") as file:
|
||||||
pickle.dump(self.state, file)
|
pickle.dump(self.state, file)
|
||||||
|
|
||||||
def load_state(self):
|
def load_state(self):
|
||||||
|
"""
|
||||||
|
Loas a state of the Modelverse from a pickle file
|
||||||
|
"""
|
||||||
import pickle
|
import pickle
|
||||||
with open("state.p", "rb") as file:
|
with open("state.p", "rb") as file:
|
||||||
self.state = pickle.load(file)
|
self.state = pickle.load(file)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue