Points example working

This commit is contained in:
Andrei Bondarenko 2021-08-27 09:46:44 +02:00
parent dfcc24f487
commit 6df566373d
13 changed files with 518 additions and 50 deletions

View file

@ -6,10 +6,11 @@ from pprint import pprint
class Conformance:
def __init__(self, state: State, scd_model: UUID, model: UUID, type_model: UUID):
def __init__(self, state: State, model: UUID, type_model: UUID):
self.state = state
self.bottom = Bottom(state)
self.scd_model = scd_model
type_model_id = state.read_dict(state.read_root(), "SCD")
self.scd_model = UUID(state.read_value(type_model_id))
self.model = model
self.type_model = type_model
self.type_mapping: Dict[str, str] = {}
@ -35,18 +36,30 @@ class Conformance:
self.matches = {}
self.candidates = {}
def check_nominal(self):
steps = [
self.check_typing,
self.check_link_typing,
self.check_multiplicities,
self.check_constraints
]
for step in steps:
conforms = step()
if not conforms:
return False
return True
def check_nominal(self, *, log=False):
try:
self.check_typing()
self.check_link_typing()
self.check_multiplicities()
self.check_constraints()
return True
except RuntimeError as e:
if log:
print(e)
return False
def check_structural(self, *, build_morphisms=True, log=False):
try:
self.precompute_structures()
self.match_structures()
if build_morphisms:
self.build_morphisms()
self.check_nominal(log=log)
return True
except RuntimeError as e:
if log:
print(e)
return False
def read_attribute(self, element: UUID, attr_name: str):
@ -184,12 +197,11 @@ class Conformance:
if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
sub_m = UUID(self.bottom.read_value(m_element))
sub_tm = UUID(self.bottom.read_value(tm_element))
if not Conformance(self.state, self.scd_model, sub_m, sub_tm).check_nominal():
return False
if not Conformance(self.state, sub_m, sub_tm).check_nominal():
raise RuntimeError(f"Incorrectly model reference: {m_name}")
except ValueError:
# no or too many morphism links found
print(f"Incorrectly typed element: {m_name}")
return False
raise RuntimeError(f"Incorrectly typed element: {m_name}")
return True
def check_link_typing(self):
@ -210,16 +222,14 @@ class Conformance:
source_type_expected = self.type_model_names[tm_source]
if source_type_actual != source_type_expected:
if source_type_actual not in self.sub_types[source_type_expected]:
print(f"Invalid source type {source_type_actual} for element {m_name}")
return False
raise RuntimeError(f"Invalid source type {source_type_actual} for element {m_name}")
# check if target is typed correctly
target_name = self.model_names[m_target]
target_type_actual = self.type_mapping[target_name]
target_type_expected = self.type_model_names[tm_target]
if target_type_actual != target_type_expected:
if target_type_actual not in self.sub_types[target_type_expected]:
print(f"Invalid target type {target_type_actual} for element {m_name}")
return False
raise RuntimeError(f"Invalid target type {target_type_actual} for element {m_name}")
return True
def check_multiplicities(self):
@ -230,8 +240,7 @@ class Conformance:
if tm_name in self.abstract_types:
type_count = list(self.type_mapping.values()).count(tm_name)
if type_count > 0:
print(f"Invalid instantiation of abstract class: {tm_name}")
return False
raise RuntimeError(f"Invalid instantiation of abstract class: {tm_name}")
# class multiplicities
if tm_name in self.multiplicities:
lc, uc = self.multiplicities[tm_name]
@ -239,8 +248,7 @@ class Conformance:
for sub_type in self.sub_types[tm_name]:
type_count += list(self.type_mapping.values()).count(sub_type)
if type_count < lc or type_count > uc:
print(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})")
return False
raise RuntimeError(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})")
# association source multiplicities
if tm_name in self.source_multiplicities:
tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
@ -259,8 +267,7 @@ class Conformance:
except KeyError:
pass # for elements not part of model, e.g. morphism links
if count < lc or count > uc:
print(f"Source cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
return False
raise RuntimeError(f"Source cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
# association target multiplicities
if tm_name in self.target_multiplicities:
@ -307,7 +314,7 @@ class Conformance:
morphisms = [m for m in morphisms if m in self.model_names]
for m_element in morphisms:
if not self.evaluate_constraint(code, element=m_element):
return False
raise RuntimeError(f"Local constraint of {tm_name} not satisfied in {m_name}.")
# global constraints
for m_name, tm_name in self.type_mapping.items():
@ -316,7 +323,7 @@ class Conformance:
code = self.read_attribute(tm_element, "constraint")
if code is not None:
if not self.evaluate_constraint(code, model=self.model):
return False
raise RuntimeError(f"Global constraint {tm_name} not satisfied.")
return True
def precompute_structures(self):
@ -408,7 +415,7 @@ class Conformance:
# check conformance of reference model
type_model_uuid = UUID(self.bottom.read_value(attr_tm))
model_uuid = UUID(self.bottom.read_value(attr))
attr_conforms = Conformance(self.state, self.scd_model, model_uuid, type_model_uuid)\
attr_conforms = Conformance(self.state, model_uuid, type_model_uuid)\
.check_nominal()
else:
# eval constraints
@ -446,8 +453,7 @@ class Conformance:
def build_morphisms(self):
if not all([len(c) == 1 for c in self.candidates.values()]):
print("Ambiguous structural matches found, unable to build unique morphism.")
return False
raise RuntimeError("Cannot build incomplete or ambiguous morphism.")
mapping = {k: v.pop() for k, v in self.candidates.items()}
for m_name, tm_name in mapping.items():
# morphism to class/assoc
@ -480,14 +486,14 @@ if __name__ == '__main__':
ltm_pn_lola = bootstrap_pn(s, "PNlola")
from services.pn import PN
my_pn = s.create_node()
PNserv = PN(ltm_pn, my_pn, s)
PNserv = PN(my_pn, s)
PNserv.create_place("p1", 5)
PNserv.create_place("p2", 0)
PNserv.create_transition("t1")
PNserv.create_p2t("p1", "t1", 1)
PNserv.create_t2p("t1", "p2", 1)
cf = Conformance(s, scd, my_pn, ltm_pn_lola)
cf = Conformance(s, my_pn, ltm_pn_lola)
# cf = Conformance(s, scd, ltm_pn, scd)
cf.precompute_structures()
cf.match_structures()

View file

@ -0,0 +1,99 @@
from framework.manager import Manager
from state.devstate import DevState
from PyInquirer import prompt, Separator
from pprint import pprint
import prompt_questions as questions
from inspect import signature
from uuid import UUID
def generate_context_question(ctx_type, services):
choices = [
s.__name__.replace('_', ' ') for s in services
]
choices = sorted(choices)
choices.append(Separator())
choices.append("close context")
ctx_question = [
{
'type': 'list',
'name': 'op',
'message': f'Currently in context {ctx_type.__name__}, which operation would you like to perform?',
'choices': choices,
'filter': lambda x: x.replace(' ', '_')
}
]
return ctx_question
def main():
state = DevState()
man = Manager(state)
while True:
if man.current_model is not None and man.current_context is None:
answer = prompt(questions.MODEL_SELECTED)
ctx = man
elif man.current_model is not None and man.current_context is not None:
qs = generate_context_question(type(man.current_context), man.get_services())
answer = prompt(qs)
if answer['op'] == 'close_context':
man.close_context()
continue
else:
ctx = man.current_context
else:
answer = prompt(questions.MODEL_MGMT)
ctx = man
if answer['op'] == 'exit':
break
else:
method = getattr(ctx, answer['op'])
args_questions = []
types = {}
for p in signature(method).parameters.values():
types[p.name] = p.annotation # can't use filter in question dict, doesn't work for some reason...
if p.annotation == UUID:
args_questions.append({
'type': 'list',
'name': p.name,
'message': f'{p.name.replace("_", " ")}?',
'choices': list(man.get_models()),
'filter': lambda x: state.read_value(state.read_dict(state.read_root(), x))
})
else:
args_questions.append({
'type': 'input',
'name': p.name,
'message': f'{p.name.replace("_", " ")}?',
'filter': lambda x: False if x.lower() == 'false' else x
})
args = prompt(args_questions)
args = {k: types[k](v) for k, v in args.items()}
try:
output = method(**args)
if output is not None:
try:
if isinstance(output, str):
raise TypeError
output = list(output)
if len(output) > 0:
for o in sorted(output):
print(f"\u2022 {o}")
except TypeError:
print(f"\u2022 {output}")
except RuntimeError as e:
print(e)
if __name__ == '__main__':
print("""Welcome to...
__ ____ _____
| \/ \ \ / /__ \
| \ / |\ \ / / ) |
| |\/| | \ \/ / / /
| | | | \ / / /_
|_| |_| \/ |____|
""")
main()

140
framework/manager.py Normal file
View file

@ -0,0 +1,140 @@
from state.base import State
from bootstrap.scd import bootstrap_scd
from services import implemented as services
from framework.conformance import Conformance
from uuid import UUID
class Manager:
def __init__(self, state: State):
self.current_model = None
self.current_context = None
self.state = state
bootstrap_scd(state)
scd_node = self.state.read_dict(self.state.read_root(), "SCD")
for key_node in self.state.read_dict_keys(self.state.read_root()):
model_node = self.state.read_dict_node(self.state.read_root(), key_node)
self.state.create_edge(model_node, scd_node)
def get_models(self):
for key_node in self.state.read_dict_keys(self.state.read_root()):
yield self.state.read_value(key_node)
def instantiate_model(self, type_model_name: str, name: str):
root = self.state.read_root()
type_model_node = self.state.read_dict(root, type_model_name)
if type_model_node is None:
raise RuntimeError(f"No type model with name {type_model_name} found.")
else:
# check if model is a linguistic type model
scd_node = self.state.read_dict(self.state.read_root(), "SCD")
incoming = self.state.read_incoming(scd_node)
incoming = [self.state.read_edge(e)[0] for e in incoming]
if type_model_node not in incoming:
raise RuntimeError(f"Model with name {type_model_name} is not a type model.")
if name in map(self.state.read_value, self.state.read_dict_keys(root)):
raise RuntimeError(f"Model with name {name} already exists.")
new_model_root = self.state.create_node()
new_model_node = self.state.create_nodevalue(str(new_model_root))
self.state.create_dict(root, name, new_model_node)
self.state.create_edge(new_model_node, type_model_node)
self.current_model = (name, new_model_root)
if type_model_name not in services:
raise RuntimeError(f"Services for type {type_model_name} not implemented.")
self.current_context = services[type_model_name](self.current_model[1], self.state)
def select_model(self, name: str):
root = self.state.read_root()
model_node = self.state.read_dict(root, name)
if model_node is None:
raise RuntimeError(f"No model with name {name} found.")
model_root = UUID(self.state.read_value(model_node))
self.current_model = (name, model_root)
def close_model(self):
self.current_model = None
self.current_context = None
def get_types(self):
root = self.state.read_root()
if self.current_model is None:
raise RuntimeError(f"No model currently selected.")
name, model = self.current_model
model_id = self.state.read_dict(root, name)
outgoing = self.state.read_outgoing(model_id)
outgoing = [e for e in outgoing if len(self.state.read_outgoing(e)) == 0]
elements = [self.state.read_edge(e)[1] for e in outgoing]
for e in elements:
incoming = self.state.read_incoming(e)
label_edge, = [e for e in incoming if len(self.state.read_outgoing(e)) == 1]
label_edge, = self.state.read_outgoing(label_edge)
_, label_node = self.state.read_edge(label_edge)
yield self.state.read_value(label_node)
def select_context(self, name: str):
if name not in self.get_types():
raise RuntimeError(f"No type {name} that currently selected model conforms to.")
if name not in services:
raise RuntimeError(f"Services for type {name} not implemented.")
self.current_context = services[name](self.current_model[1], self.state)
self.current_context.from_bottom()
def close_context(self):
self.current_context.to_bottom()
self.current_context = None
def get_services(self):
if self.current_model is None:
raise RuntimeError(f"No model currently selected.")
if self.current_context is None:
raise RuntimeError(f"No context currently selected.")
yield from [
getattr(self.current_context, func)
for func in dir(self.current_context)
if callable(getattr(self.current_context, func))
and not func.startswith("__")
and not func == "from_bottom"
and not func == "to_bottom"
]
def check_conformance(self, type_model_name: str, model_name: str):
root = self.state.read_root()
type_model_node = self.state.read_dict(root, type_model_name)
if type_model_node is None:
raise RuntimeError(f"No type model with name {type_model_name} found.")
model_node = self.state.read_dict(root, model_name)
if model_node is None:
raise RuntimeError(f"No model with name {model_node} found.")
types = self.state.read_outgoing(model_node)
types = [self.state.read_edge(e)[1] for e in types]
if type_model_node not in types:
conf = Conformance(self.state,
UUID(self.state.read_value(model_node)),
UUID(self.state.read_value(type_model_node))).check_structural(log=True)
if conf:
self.state.create_edge(model_node, type_model_node)
return conf
else:
return Conformance(self.state,
UUID(self.state.read_value(model_node)),
UUID(self.state.read_value(type_model_node))).check_nominal(log=True)
def dump_state(self):
import pickle
with open("state.p", "wb") as file:
pickle.dump(self.state, file)
def load_state(self):
import pickle
with open("state.p", "rb") as file:
self.state = pickle.load(file)
if __name__ == '__main__':
from state.devstate import DevState
s = DevState()
m = Manager(s)
m.select_model("SCD")
m.select_context("SCD")
for f in m.get_services():
print(f)

View file

@ -0,0 +1,36 @@
from PyInquirer import Separator
MODEL_SELECTED = [
{
'type': 'list',
'name': 'op',
'message': 'Model selected... Which operation would you like to perform?',
'choices': [
'get types',
'select context',
Separator(),
'close model'
],
'filter': lambda x: x.replace(' ', '_')
}
]
MODEL_MGMT = [
{
'type': 'list',
'name': 'op',
'message': 'Which model management operation would you like to perform?',
'choices': [
'get models',
'select model',
'instantiate model',
'check conformance',
Separator(),
'load state',
'dump state',
Separator(),
'exit'
],
'filter': lambda x: x.replace(' ', '_')
}
]