Add example of 'woods' operational semantics. Clearer error messages. Implement full OD-API. Small refactoring of Conformance class.
This commit is contained in:
parent
f2dc299eab
commit
cd26a401fe
7 changed files with 449 additions and 63 deletions
92
api/cd.py
Normal file
92
api/cd.py
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
from services.bottom.V0 import Bottom
|
||||
from uuid import UUID
|
||||
|
||||
class CDAPI:
|
||||
def __init__(self, state, m: UUID):
|
||||
self.state = state
|
||||
self.bottom = Bottom(state)
|
||||
self.m = m
|
||||
self.mm = UUID(state.read_value(state.read_dict(state.read_root(), "SCD")))
|
||||
|
||||
# pre-compute some things
|
||||
|
||||
# element -> name
|
||||
self.type_model_names = {
|
||||
self.bottom.read_outgoing_elements(self.m, e)[0]
|
||||
: e for e in self.bottom.read_keys(self.m)
|
||||
}
|
||||
|
||||
|
||||
inh_type, = self.bottom.read_outgoing_elements(self.mm, "Inheritance")
|
||||
inh_links = []
|
||||
for tm_element, tm_name in self.type_model_names.items():
|
||||
types = self.bottom.read_outgoing_elements(tm_element, "Morphism")
|
||||
if inh_type in types:
|
||||
inh_links.append(tm_element)
|
||||
|
||||
# for each inheritance link we add the parent and child to the sub types map
|
||||
# name -> name
|
||||
self.direct_sub_types = { type_name: set() for type_name in self.bottom.read_keys(self.m) } # empty initially
|
||||
self.direct_super_types = { type_name: set() for type_name in self.bottom.read_keys(self.m) } # empty initially
|
||||
for link in inh_links:
|
||||
tm_source = self.bottom.read_edge_source(link)
|
||||
tm_target = self.bottom.read_edge_target(link)
|
||||
parent_name = self.type_model_names[tm_target]
|
||||
child_name = self.type_model_names[tm_source]
|
||||
self.direct_sub_types[parent_name].add(child_name)
|
||||
self.direct_super_types[child_name].add(parent_name)
|
||||
|
||||
def get_transitive_sub_types(type_name: str):
|
||||
# includes the type itself - reason: if we want to get all the instances of some type and its subtypes, we don't have to consider the type itself as an extra case
|
||||
return [type_name, *(sub_type for child_name in self.direct_sub_types.get(type_name, set()) for sub_type in get_transitive_sub_types(child_name) )]
|
||||
|
||||
def get_transitive_super_types(type_name: str):
|
||||
# includes the type itself - reason: if we want to check if something is an instance of a type, we check if its type or one of its super types is equal to the type we're looking for, without having to consider the instance's type itself as an extra case
|
||||
return [type_name, *(super_type for parent_name in self.direct_super_types.get(type_name, set()) for super_type in get_transitive_super_types(parent_name))]
|
||||
|
||||
|
||||
self.transitive_sub_types = { type_name: set(get_transitive_sub_types(type_name)) for type_name in self.direct_sub_types }
|
||||
self.transitive_super_types = { type_name: set(get_transitive_super_types(type_name)) for type_name in self.direct_super_types }
|
||||
|
||||
def get_type(type_name: str):
|
||||
return self.bottom.read_outgoing_elements(self.m, type_name)[0]
|
||||
|
||||
def is_direct_subtype(super_type_name: str, sub_type_name: str):
|
||||
return sub_type_name in self.direct_sub_types[super_type]
|
||||
|
||||
def is_direct_supertype(sub_type_name: str, super_type_name: str):
|
||||
return super_type_name in self.direct_super_types[sub_type_name]
|
||||
|
||||
def is_subtype(super_type_name: str, sub_type_name: str):
|
||||
return sub_type_name in self.transitive_sub_types[super_type]
|
||||
|
||||
def is_supertype(sub_type_name: str, super_type_name: str):
|
||||
return super_type_name in self.transitive_super_types[sub_type_name]
|
||||
|
||||
# # The edge connecting an object to the value of a slot must be named `{object_name}_{attr_name}`
|
||||
# def get_attr_link_name(self, class_name, attr_name):
|
||||
# assoc_name = f"{class_name}_{attr_name}"
|
||||
# type_edges = self.bottom.read_outgoing_elements(self.m, assoc_name)
|
||||
# if len(type_edges) == 1:
|
||||
# return assoc_name
|
||||
# else:
|
||||
# # look for attribute in the super-types
|
||||
# conf = Conformance(self.bottom.state, self.model, self.type_model)
|
||||
# conf.precompute_sub_types() # only need to know about subtypes
|
||||
# super_types = [s for s in conf.sub_types if class_name in conf.sub_types[s]]
|
||||
# for s in super_types:
|
||||
# assoc_name = f"{s}_{attr_name}"
|
||||
# if len(self.bottom.read_outgoing_elements(self.type_model, assoc_name)) == 1:
|
||||
# return assoc_name
|
||||
|
||||
# Attributes are inherited, so when we instantiate an attribute of a class, the AttributeLink may contain the name of the superclass
|
||||
def find_attribute_type(self, class_name: str, attr_name: str):
|
||||
assoc_name = f"{class_name}_{attr_name}"
|
||||
type_edges = self.bottom.read_outgoing_elements(self.m, assoc_name)
|
||||
if len(type_edges) == 1:
|
||||
return type_edges[0]
|
||||
else:
|
||||
for supertype in self.direct_super_types[class_name]:
|
||||
result = self.find_attribute_type(supertype, attr_name)
|
||||
if result != None:
|
||||
return result
|
||||
148
api/od.py
Normal file
148
api/od.py
Normal file
|
|
@ -0,0 +1,148 @@
|
|||
from services import od
|
||||
from api import cd
|
||||
from services.bottom.V0 import Bottom
|
||||
from uuid import UUID
|
||||
from typing import Optional
|
||||
|
||||
NEXT_ID = 0
|
||||
|
||||
# Models map names to elements
|
||||
# This builds the inverse mapping, so we can quickly lookup the name of an element
|
||||
def build_name_mapping(state, m):
|
||||
mapping = {}
|
||||
bottom = Bottom(state)
|
||||
for name in bottom.read_keys(m):
|
||||
element, = bottom.read_outgoing_elements(m, name)
|
||||
mapping[element] = name
|
||||
return mapping
|
||||
|
||||
# Object Diagram API
|
||||
# Intended to replace the 'services.od.OD' class eventually
|
||||
class ODAPI:
|
||||
def __init__(self, state, m: UUID, mm: UUID):
|
||||
self.state = state
|
||||
self.bottom = Bottom(state)
|
||||
self.m = m
|
||||
self.mm = mm
|
||||
self.od = od.OD(mm, m, state)
|
||||
self.cd = cd.CDAPI(state, mm)
|
||||
|
||||
self.create_boolean_value = self.od.create_boolean_value
|
||||
self.create_integer_value = self.od.create_integer_value
|
||||
self.create_string_value = self.od.create_string_value
|
||||
self.create_actioncode_value = self.od.create_actioncode_value
|
||||
|
||||
self.__recompute_mappings()
|
||||
|
||||
# Called after every change - makes querying faster but modifying slower
|
||||
def __recompute_mappings(self):
|
||||
self.obj_to_name = {**build_name_mapping(self.state, self.m), **build_name_mapping(self.state, self.mm)}
|
||||
# self.obj_to_type = {}
|
||||
self.type_to_objs = { type_name : set() for type_name in self.bottom.read_keys(self.mm)}
|
||||
for m_name in self.bottom.read_keys(self.m):
|
||||
m_element, = self.bottom.read_outgoing_elements(self.m, m_name)
|
||||
tm_element = self.get_type(m_element)
|
||||
tm_name = self.obj_to_name[tm_element]
|
||||
# self.obj_to_type[m_name] = tm_name
|
||||
self.type_to_objs[tm_name].add(m_name)
|
||||
|
||||
def get_value(self, obj: UUID):
|
||||
return od.read_primitive_value(self.bottom, obj, self.mm)[0]
|
||||
|
||||
def get_target(self, link: UUID):
|
||||
return self.bottom.read_edge_target(link)
|
||||
|
||||
def get_source(self, link: UUID):
|
||||
return self.bottom.read_edge_source(link)
|
||||
|
||||
def get_slot(self, obj: UUID, attr_name: str):
|
||||
return self.od.get_slot(obj, attr_name)
|
||||
|
||||
def get_slot_link(self, obj: UUID, attr_name: str):
|
||||
return self.od.get_slot_link(obj, attr_name)
|
||||
|
||||
def get_outgoing(self, obj: UUID, assoc_name: str):
|
||||
return od.find_outgoing_typed_by(self.bottom, src=obj, type_node=self.bottom.read_outgoing_elements(self.mm, assoc_name)[0])
|
||||
|
||||
def get_incoming(self, obj: UUID, assoc_name: str):
|
||||
return od.find_incoming_typed_by(self.bottom, tgt=obj, type_node=self.bottom.read_outgoing_elements(self.mm, assoc_name)[0])
|
||||
|
||||
def get_all_instances(self, type_name: str, include_subtypes=True):
|
||||
obj_names = self.type_to_objs[type_name]
|
||||
return [(obj_name, self.bottom.read_outgoing_elements(self.m, obj_name)[0]) for obj_name in obj_names]
|
||||
|
||||
def get_type(self, obj: UUID):
|
||||
types = self.bottom.read_outgoing_elements(obj, "Morphism")
|
||||
if len(types) != 1:
|
||||
raise Exception(f"Expected obj to have 1 type, instead got {len(types)} types.")
|
||||
return types[0]
|
||||
|
||||
def get_name(self, obj: UUID):
|
||||
return (
|
||||
[name for name in self.bottom.read_keys(self.m) if self.bottom.read_outgoing_elements(self.m, name)[0] == obj] +
|
||||
[name for name in self.bottom.read_keys(self.mm) if self.bottom.read_outgoing_elements(self.mm, name)[0] == obj]
|
||||
)[0]
|
||||
return self.obj_to_name[obj]
|
||||
|
||||
def get(self, name: str):
|
||||
return self.bottom.read_outgoing_elements(self.m, name)[0]
|
||||
|
||||
def get_type_name(self, obj: UUID):
|
||||
return self.get_name(self.get_type(obj))
|
||||
|
||||
def is_instance(obj: UUID, type_name: str, include_subtypes=True):
|
||||
typ = self.cd.get_type(type_name)
|
||||
types = set(typ) if not include_subtypes else self.cd.transitive_subtypes[type_name]
|
||||
for type_of_obj in self.bottom.read_outgoing_elements(obj, "Morphism"):
|
||||
if type_of_obj in types:
|
||||
return True
|
||||
return False
|
||||
|
||||
def delete(self, obj: UUID):
|
||||
self.bottom.delete_element(obj)
|
||||
self.__recompute_mappings()
|
||||
|
||||
def get_slot_value(self, obj: UUID, attr_name: str):
|
||||
return self.get_value(self.get_slot(obj, attr_name))
|
||||
|
||||
def set_slot_value(self, obj: UUID, attr_name: str, new_value: any):
|
||||
obj_name = self.get_name(obj)
|
||||
|
||||
link_name = f"{obj_name}_{attr_name}"
|
||||
target_name = f"{obj_name}.{attr_name}"
|
||||
|
||||
old_slot_link = self.get_slot_link(obj, attr_name)
|
||||
if old_slot_link != None:
|
||||
old_target = self.get_target(old_slot_link)
|
||||
# if old_target != None:
|
||||
self.bottom.delete_element(old_target) # this also deletes the slot-link
|
||||
|
||||
new_target = self.create_primitive_value(target_name, new_value)
|
||||
slot_type = self.cd.find_attribute_type(self.get_type_name(obj), attr_name)
|
||||
new_link = self.od._create_link(link_name, slot_type, obj, new_target)
|
||||
self.__recompute_mappings()
|
||||
|
||||
def create_primitive_value(self, name: str, value: any, is_code=False):
|
||||
if isinstance(value, bool):
|
||||
tgt = self.create_boolean_value(name, value)
|
||||
elif isinstance(value, int):
|
||||
tgt = self.create_integer_value(name, value)
|
||||
elif isinstance(value, str):
|
||||
if is_code:
|
||||
tgt = self.create_actioncode_value(name, value)
|
||||
else:
|
||||
tgt = self.create_string_value(name, value)
|
||||
else:
|
||||
raise Exception("Unimplemented type "+value)
|
||||
self.__recompute_mappings()
|
||||
return tgt
|
||||
|
||||
def create_link(self, link_name: Optional[str], assoc_name: str, src: UUID, tgt: UUID):
|
||||
global NEXT_ID
|
||||
typ, = self.bottom.read_outgoing_elements(self.mm, assoc_name)
|
||||
if link_name == None:
|
||||
link_name = f"__{assoc_name}{NEXT_ID}"
|
||||
NEXT_ID += 1
|
||||
link_id = self.od._create_link(link_name, typ, src, tgt)
|
||||
self.__recompute_mappings()
|
||||
return link_id
|
||||
|
|
@ -90,16 +90,16 @@ def parse_od(state, m_text, mm):
|
|||
# watch out: in Python, 'bool' is subtype of 'int'
|
||||
# so we must check for 'bool' first
|
||||
if isinstance(value, bool):
|
||||
tgt = od.create_boolean_value(value_name, value)
|
||||
od.create_boolean_value(value_name, value)
|
||||
elif isinstance(value, int):
|
||||
tgt = od.create_integer_value(value_name, value)
|
||||
od.create_integer_value(value_name, value)
|
||||
elif isinstance(value, str):
|
||||
tgt = od.create_string_value(value_name, value)
|
||||
od.create_string_value(value_name, value)
|
||||
elif isinstance(value, _Code):
|
||||
tgt = od.create_actioncode_value(value_name, value.code)
|
||||
od.create_actioncode_value(value_name, value.code)
|
||||
else:
|
||||
raise Exception("Unimplemented type "+value)
|
||||
od.create_slot(attr_name, obj_name, tgt)
|
||||
od.create_slot(attr_name, obj_name, value_name)
|
||||
|
||||
return obj_name
|
||||
|
||||
|
|
|
|||
|
|
@ -1,11 +1,14 @@
|
|||
import functools
|
||||
|
||||
from state.devstate import DevState
|
||||
from bootstrap.scd import bootstrap_scd
|
||||
from framework.conformance import Conformance, render_conformance_check_result
|
||||
from concrete_syntax.textual_od import parser, renderer
|
||||
from concrete_syntax.common import indent
|
||||
from concrete_syntax.plantuml import renderer as plantuml
|
||||
from util.prompt import yes_no, pause
|
||||
from util import prompt
|
||||
from transformation.cloner import clone_od
|
||||
from api.od import ODAPI
|
||||
|
||||
state = DevState()
|
||||
|
||||
|
|
@ -86,6 +89,40 @@ woods_rt_mm_cs = woods_mm_cs + """
|
|||
}
|
||||
:Inheritance (ManState -> AnimalState)
|
||||
|
||||
attacking:Association (AnimalState -> ManState) {
|
||||
# Animal can only attack one Man at a time
|
||||
target_upper_cardinality = 1;
|
||||
|
||||
# Man can only be attacked by one Animal at a time
|
||||
source_upper_cardinality = 1;
|
||||
|
||||
constraint = ```
|
||||
source = get_source(this)
|
||||
if get_type_name(source) == "BearState":
|
||||
# only BearState has 'hunger' attribute
|
||||
hunger = get_value(get_slot(source, "hunger"))
|
||||
else:
|
||||
hunger = 100 # Man can always attack
|
||||
animal_state = get_source(this)
|
||||
animal_dead = get_value(get_slot(animal_state, "dead"))
|
||||
man_state = get_target(this)
|
||||
man_dead = get_value(get_slot(man_state, "dead"))
|
||||
hunger > 50 and not animal_dead and not man_dead # whoever is dead cannot attack or get attacked
|
||||
```;
|
||||
}
|
||||
|
||||
attacking_starttime:AttributeLink (attacking -> Integer) {
|
||||
name = "starttime";
|
||||
optional = False;
|
||||
constraint = ```
|
||||
val = get_value(get_target(this))
|
||||
_, clock = get_all_instances("Clock")[0]
|
||||
current_time = get_slot_value(clock, "time")
|
||||
val >= 0 and val <= current_time
|
||||
```;
|
||||
}
|
||||
|
||||
# Just a clock singleton for keeping the time
|
||||
Clock:Class {
|
||||
lower_cardinality = 1;
|
||||
upper_cardinality = 1;
|
||||
|
|
@ -182,11 +219,106 @@ print("RT-M valid?")
|
|||
conf = Conformance(state, woods_rt_m, woods_rt_mm)
|
||||
print(render_conformance_check_result(conf.check_nominal()))
|
||||
|
||||
# print("--------------")
|
||||
# print(indent(
|
||||
# renderer.render_od(state,
|
||||
# m_id=woods_rt_m,
|
||||
# mm_id=woods_rt_mm),
|
||||
# 4))
|
||||
# print("--------------")
|
||||
def filter_actions(old_od):
|
||||
result = {}
|
||||
for name, callback in get_actions(old_od).items():
|
||||
# Clone OD before transforming
|
||||
cloned_rt_m = clone_od(state, old_od.m, old_od.mm)
|
||||
new_od = ODAPI(state, cloned_rt_m, old_od.mm)
|
||||
|
||||
print(f"checking '{name}' ...", end='\r')
|
||||
|
||||
msgs = callback(new_od)
|
||||
conf = Conformance(state, new_od.m, new_od.mm)
|
||||
errors = conf.check_nominal()
|
||||
# erase current line:
|
||||
print(" ", end='\r')
|
||||
if len(errors) == 0:
|
||||
# updated RT-M is conform, we have a valid action:
|
||||
yield (name, (new_od, msgs))
|
||||
|
||||
def state_of(od, animal):
|
||||
return od.get_source(od.get_incoming(animal, "of")[0])
|
||||
|
||||
def animal_of(od, state):
|
||||
return od.get_target(od.get_outgoing(state, "of")[0])
|
||||
|
||||
def advance_time(od):
|
||||
msgs = []
|
||||
_, clock = od.get_all_instances("Clock")[0]
|
||||
old_time = od.get_slot_value(clock, "time")
|
||||
new_time = old_time + 1
|
||||
od.set_slot_value(clock, "time", new_time)
|
||||
msgs.append(f"Time is now {new_time}")
|
||||
|
||||
for _, attacking_link in od.get_all_instances("attacking"):
|
||||
man_state = od.get_target(attacking_link)
|
||||
animal_state = od.get_source(attacking_link)
|
||||
if od.get_type_name(animal_state) == "BearState":
|
||||
od.set_slot_value(animal_state, "hunger", max(od.get_slot_value(animal_state, "hunger") - 50, 0))
|
||||
od.set_slot_value(man_state, "dead", True)
|
||||
od.delete(attacking_link)
|
||||
msgs.append(f"{od.get_name(animal_of(od, animal_state))} kills {od.get_name(animal_of(od, man_state))}.")
|
||||
|
||||
for _, bear_state in od.get_all_instances("BearState"):
|
||||
if od.get_slot_value(bear_state, "dead"):
|
||||
continue # bear already dead
|
||||
old_hunger = od.get_slot_value(bear_state, "hunger")
|
||||
new_hunger = min(old_hunger + 5, 100)
|
||||
od.set_slot_value(bear_state, "hunger", new_hunger)
|
||||
bear = od.get_target(od.get_outgoing(bear_state, "of")[0])
|
||||
bear_name = od.get_name(bear)
|
||||
if new_hunger == 100:
|
||||
od.set_slot_value(bear_state, "dead", True)
|
||||
msgs.append(f"Bear {bear_name} dies of hunger.")
|
||||
else:
|
||||
msgs.append(f"Bear {bear_name}'s hunger level is now {new_hunger}.")
|
||||
return msgs
|
||||
|
||||
# we must use the names of the objects as parameters, because when cloning, the IDs of objects change!
|
||||
def attack(od, animal_name: str, man_name: str):
|
||||
msgs = []
|
||||
animal = od.get(animal_name)
|
||||
man = od.get(man_name)
|
||||
animal_state = state_of(od, animal)
|
||||
man_state = state_of(od, man)
|
||||
attack_link = od.create_link(None, # auto-generate link name
|
||||
"attacking", animal_state, man_state)
|
||||
_, clock = od.get_all_instances("Clock")[0]
|
||||
current_time = od.get_slot_value(clock, "time")
|
||||
od.set_slot_value(attack_link, "starttime", current_time)
|
||||
msgs.append(f"{animal_name} is now attacking {man_name}")
|
||||
return msgs
|
||||
|
||||
def get_actions(od):
|
||||
# can always advance time:
|
||||
actions = { "advance time": advance_time }
|
||||
|
||||
# who can attack whom?
|
||||
for _, afraid_link in od.get_all_instances("afraidOf"):
|
||||
man = od.get_source(afraid_link)
|
||||
animal = od.get_target(afraid_link)
|
||||
animal_name = od.get_name(animal)
|
||||
man_name = od.get_name(man)
|
||||
man_state = state_of(od, man)
|
||||
animal_state = state_of(od, animal)
|
||||
actions[f"{animal_name} ({od.get_type_name(animal)}) attacks {man_name} ({od.get_type_name(man)})"] =functools.partial(attack, animal_name=animal_name, man_name=man_name)
|
||||
|
||||
return actions
|
||||
|
||||
od = ODAPI(state, woods_rt_m, woods_rt_mm)
|
||||
|
||||
while True:
|
||||
print("--------------")
|
||||
print(indent(
|
||||
renderer.render_od(state,
|
||||
m_id=od.m,
|
||||
mm_id=od.mm),
|
||||
4))
|
||||
print("--------------")
|
||||
|
||||
(od, msgs) = prompt.choose("Select action:", filter_actions(od))
|
||||
print(indent('\n'.join(msgs), 4))
|
||||
if od == None:
|
||||
print("No enabled actions. Quit.")
|
||||
break # no more enabled actions
|
||||
|
|
|
|||
|
|
@ -6,6 +6,9 @@ from state.base import State
|
|||
from typing import Dict, Tuple, Set, Any, List
|
||||
from pprint import pprint
|
||||
|
||||
from api.cd import CDAPI
|
||||
from api.od import ODAPI
|
||||
|
||||
import functools
|
||||
|
||||
|
||||
|
|
@ -21,10 +24,10 @@ def exec_then_eval(code, _globals, _locals):
|
|||
|
||||
def render_conformance_check_result(error_list):
|
||||
if len(error_list) == 0:
|
||||
return "OK"
|
||||
return "CONFORM"
|
||||
else:
|
||||
joined = '\n '.join(error_list)
|
||||
return f"There were {len(error_list)} errors: \n {joined}"
|
||||
return f"NOT CONFORM, {len(error_list)} errors: \n {joined}"
|
||||
|
||||
|
||||
class Conformance:
|
||||
|
|
@ -59,6 +62,10 @@ class Conformance:
|
|||
self.matches = {}
|
||||
self.candidates = {}
|
||||
|
||||
self.odapi = ODAPI(state, model, type_model)
|
||||
|
||||
# CDAPI(state, type_model)
|
||||
|
||||
def check_nominal(self, *, log=False):
|
||||
"""
|
||||
Perform a nominal conformance check
|
||||
|
|
@ -220,7 +227,8 @@ class Conformance:
|
|||
suc = self.read_attribute(tm_element, "source_upper_cardinality")
|
||||
if slc or suc:
|
||||
mult = (
|
||||
slc if slc != None else float("-inf"),
|
||||
# slc if slc != None else float("-inf"),
|
||||
slc if slc != None else 0,
|
||||
suc if suc != None else float("inf")
|
||||
)
|
||||
self.source_multiplicities[tm_name] = mult
|
||||
|
|
@ -228,7 +236,8 @@ class Conformance:
|
|||
tuc = self.read_attribute(tm_element, "target_upper_cardinality")
|
||||
if tlc or tuc:
|
||||
mult = (
|
||||
tlc if tlc != None else float("-inf"),
|
||||
# tlc if tlc != None else float("-inf"),
|
||||
tlc if tlc != None else 0,
|
||||
tuc if tuc != None else float("inf")
|
||||
)
|
||||
self.target_multiplicities[tm_name] = mult
|
||||
|
|
@ -381,15 +390,16 @@ class Conformance:
|
|||
|
||||
funcs = {
|
||||
'read_value': self.state.read_value,
|
||||
'get_value': lambda el: od.read_primitive_value(self.bottom, el, self.type_model)[0],
|
||||
'get_target': lambda el: self.bottom.read_edge_target(el),
|
||||
'get_source': lambda el: self.bottom.read_edge_source(el),
|
||||
'get_slot': od.OD(self.type_model, self.model, self.state).get_slot,
|
||||
'get_all_instances': self.get_all_instances,
|
||||
'get_name': self.get_name,
|
||||
'get_type_name': self.get_type_name,
|
||||
'get_outgoing': self.get_outgoing,
|
||||
'get_incoming': self.get_incoming,
|
||||
'get_value': self.odapi.get_value,
|
||||
'get_target': self.odapi.get_target,
|
||||
'get_source': self.odapi.get_source,
|
||||
'get_slot': self.odapi.get_slot,
|
||||
'get_slot_value': self.odapi.get_slot_value,
|
||||
'get_all_instances': self.odapi.get_all_instances,
|
||||
'get_name': self.odapi.get_name,
|
||||
'get_type_name': self.odapi.get_type_name,
|
||||
'get_outgoing': self.odapi.get_outgoing,
|
||||
'get_incoming': self.odapi.get_incoming,
|
||||
}
|
||||
# print("evaluating constraint ...", code)
|
||||
loc = {**kwargs, }
|
||||
|
|
@ -404,30 +414,6 @@ class Conformance:
|
|||
# print('result =', result)
|
||||
return result
|
||||
|
||||
def get_name(self, element: UUID):
|
||||
return [name for name in self.bottom.read_keys(self.model) if self.bottom.read_outgoing_elements(self.model, name)[0] == element][0]
|
||||
|
||||
def get_type_name(self, element: UUID):
|
||||
type_node = self.bottom.read_outgoing_elements(element, "Morphism")[0]
|
||||
for type_name in self.bottom.read_keys(self.type_model):
|
||||
if self.bottom.read_outgoing_elements(self.type_model, type_name)[0] == type_node:
|
||||
return type_name
|
||||
|
||||
def get_all_instances(self, type_name: str, include_subtypes=True):
|
||||
result = [e_name for e_name, t_name in self.type_mapping.items() if t_name == type_name]
|
||||
if include_subtypes:
|
||||
for subtype_name in self.sub_types[type_name]:
|
||||
# print(subtype_name, 'is subtype of ')
|
||||
result += [e_name for e_name, t_name in self.type_mapping.items() if t_name == subtype_name]
|
||||
result_with_ids = [ (e_name, self.bottom.read_outgoing_elements(self.model, e_name)[0]) for e_name in result]
|
||||
return result_with_ids
|
||||
|
||||
def get_outgoing(self, element: UUID, assoc_or_attr_name: str):
|
||||
return od.find_outgoing_typed_by(self.bottom, src=element, type_node=self.bottom.read_outgoing_elements(self.type_model, assoc_or_attr_name)[0])
|
||||
|
||||
def get_incoming(self, element: UUID, assoc_or_attr_name: str):
|
||||
return od.find_incoming_typed_by(self.bottom, tgt=element, type_node=self.bottom.read_outgoing_elements(self.type_model, assoc_or_attr_name)[0])
|
||||
|
||||
def check_constraints(self):
|
||||
"""
|
||||
Check whether all constraints defined for a model are respected
|
||||
|
|
@ -451,10 +437,11 @@ class Conformance:
|
|||
for type_name in self.bottom.read_keys(self.type_model):
|
||||
code = get_code(type_name)
|
||||
if code != None:
|
||||
instances = self.get_all_instances(type_name, include_subtypes=self.constraint_check_subtypes)
|
||||
instances = self.odapi.get_all_instances(type_name, include_subtypes=self.constraint_check_subtypes)
|
||||
for obj_name, obj_id in instances:
|
||||
result = self.evaluate_constraint(code, this=obj_id)
|
||||
description = f"Local constraint of \"{type_name}\" in \"{obj_name}\""
|
||||
# print(description)
|
||||
result = self.evaluate_constraint(code, this=obj_id)
|
||||
check_result(result, description)
|
||||
|
||||
# global constraints
|
||||
|
|
|
|||
|
|
@ -78,14 +78,20 @@ class OD:
|
|||
return slot_id
|
||||
|
||||
def get_slot(self, object_node: UUID, attr_name: str):
|
||||
edge = self.get_slot_link(object_node, attr_name)
|
||||
slot_ref = self.bottom.read_edge_target(edge)
|
||||
return slot_ref
|
||||
|
||||
def get_slot_link(self, object_node: UUID, attr_name: str):
|
||||
# I really don't like how complex and inefficient it is to read an attribute of an object...
|
||||
class_name = self._get_class_of_object(object_node)
|
||||
attr_link_name = self.get_attr_link_name(class_name, attr_name)
|
||||
if attr_link_name == None:
|
||||
raise Exception(f"Type '{class_name}' has no attribute '{attr_name}'")
|
||||
type_edge, = self.bottom.read_outgoing_elements(self.type_model, attr_link_name)
|
||||
for outgoing_edge in self.bottom.read_outgoing_edges(object_node):
|
||||
if type_edge in self.bottom.read_outgoing_elements(outgoing_edge, "Morphism"):
|
||||
slot_ref = self.bottom.read_edge_target(outgoing_edge)
|
||||
return slot_ref
|
||||
return outgoing_edge
|
||||
|
||||
def get_slots(self, object_node):
|
||||
attrlink_node = get_scd_mm_attributelink_node(self.bottom)
|
||||
|
|
@ -111,32 +117,28 @@ class OD:
|
|||
integer_t.create(value)
|
||||
# name = 'int'+str(value) # name of the ref to the created integer
|
||||
# By convention, the type model must have a ModelRef named "Integer"
|
||||
self.create_model_ref(name, "Integer", int_node)
|
||||
return name
|
||||
return self.create_model_ref(name, "Integer", int_node)
|
||||
|
||||
def create_boolean_value(self, name: str, value: bool):
|
||||
from services.primitives.boolean_type import Boolean
|
||||
bool_node = self.bottom.create_node()
|
||||
bool_service = Boolean(bool_node, self.bottom.state)
|
||||
bool_service.create(value)
|
||||
self.create_model_ref(name, "Boolean", bool_node)
|
||||
return name
|
||||
return self.create_model_ref(name, "Boolean", bool_node)
|
||||
|
||||
def create_string_value(self, name: str, value: str):
|
||||
from services.primitives.string_type import String
|
||||
string_node = self.bottom.create_node()
|
||||
string_t = String(string_node, self.bottom.state)
|
||||
string_t.create(value)
|
||||
self.create_model_ref(name, "String", string_node)
|
||||
return name
|
||||
return self.create_model_ref(name, "String", string_node)
|
||||
|
||||
def create_actioncode_value(self, name: str, value: str):
|
||||
from services.primitives.actioncode_type import ActionCode
|
||||
actioncode_node = self.bottom.create_node()
|
||||
actioncode_t = ActionCode(actioncode_node, self.bottom.state)
|
||||
actioncode_t.create(value)
|
||||
self.create_model_ref(name, "ActionCode", actioncode_node)
|
||||
return name
|
||||
return self.create_model_ref(name, "ActionCode", actioncode_node)
|
||||
|
||||
# Identical to the same SCD method:
|
||||
def create_model_ref(self, name: str, type_name: str, model: UUID):
|
||||
|
|
@ -184,7 +186,10 @@ class OD:
|
|||
link_id = self._create_link(link_name, type_edge, src_obj_node, tgt_obj_node)
|
||||
return link_id
|
||||
|
||||
# used for attribute-links and association-links
|
||||
def _create_link(self, link_name: str, type_edge: UUID, src_obj_node: UUID, tgt_obj_node: UUID):
|
||||
# print('create_link', link_name, type_edge, src_obj_node, tgt_obj_node)
|
||||
|
||||
# the link itself is unlabeled:
|
||||
link_edge = self.bottom.create_edge(src_obj_node, tgt_obj_node)
|
||||
# it is only in the context of the model, that the link has a name:
|
||||
|
|
|
|||
|
|
@ -14,4 +14,26 @@ def yes_no(msg: str):
|
|||
|
||||
def pause():
|
||||
print("press any key...")
|
||||
input()
|
||||
input()
|
||||
|
||||
def choose(msg:str, options):
|
||||
arr = []
|
||||
for i, (key, result) in enumerate(options):
|
||||
print(f" {i}. {key}")
|
||||
arr.append(result)
|
||||
if len(arr) == 0:
|
||||
return
|
||||
return __choose(msg, arr)
|
||||
|
||||
def __choose(msg: str, arr):
|
||||
sys.stdout.write(f"{msg} ")
|
||||
try:
|
||||
raw = input()
|
||||
choice = int(raw) # may raise ValueError
|
||||
if choice >= 0 and choice < len(arr):
|
||||
return arr[choice]
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
print("Invalid option")
|
||||
return __choose(msg, arr)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue