297 lines
12 KiB
Python
297 lines
12 KiB
Python
from services import od
|
|
from api import cd
|
|
from services.bottom.V0 import Bottom
|
|
from services.primitives.boolean_type import Boolean
|
|
from services.primitives.integer_type import Integer
|
|
from services.primitives.string_type import String
|
|
from services.primitives.actioncode_type import ActionCode
|
|
from uuid import UUID
|
|
from typing import Optional
|
|
from util.timer import Timer
|
|
|
|
NEXT_ID = 0
|
|
|
|
# Models map names to elements
|
|
# This builds the inverse mapping, so we can quickly lookup the name of an element
|
|
def build_name_mapping(state, m):
|
|
mapping = {}
|
|
bottom = Bottom(state)
|
|
for name in bottom.read_keys(m):
|
|
elements = bottom.read_outgoing_elements(m, name)
|
|
if len(elements) > 1:
|
|
print(f"Warning: more than one element with name '{name}'")
|
|
mapping[elements[0]] = name
|
|
return mapping
|
|
|
|
class NoSuchSlotException(Exception):
|
|
pass
|
|
|
|
# Object Diagram API
|
|
# Intended to replace the 'services.od.OD' class eventually
|
|
class ODAPI:
|
|
def __init__(self, state, m: UUID, mm: UUID):
|
|
self.state = state
|
|
self.bottom = Bottom(state)
|
|
self.m = m
|
|
self.mm = mm
|
|
self.od = od.OD(mm, m, state)
|
|
self.cdapi = cd.CDAPI(state, mm)
|
|
|
|
self.create_boolean_value = self.od.create_boolean_value
|
|
self.create_integer_value = self.od.create_integer_value
|
|
self.create_string_value = self.od.create_string_value
|
|
self.create_actioncode_value = self.od.create_actioncode_value
|
|
|
|
self.__recompute_mappings()
|
|
|
|
# Called after every change - makes querying faster but modifying slower
|
|
def __recompute_mappings(self):
|
|
self.m_obj_to_name = build_name_mapping(self.state, self.m)
|
|
self.mm_obj_to_name = build_name_mapping(self.state, self.mm)
|
|
self.type_to_objs = { type_name : set() for type_name in self.bottom.read_keys(self.mm)}
|
|
for m_name in self.bottom.read_keys(self.m):
|
|
m_element, = self.bottom.read_outgoing_elements(self.m, m_name)
|
|
tm_element = self.get_type(m_element)
|
|
if tm_element in self.mm_obj_to_name:
|
|
tm_name = self.mm_obj_to_name[tm_element]
|
|
# self.obj_to_type[m_name] = tm_name
|
|
self.type_to_objs[tm_name].add(m_name)
|
|
|
|
def get_value(self, obj: UUID):
|
|
return od.read_primitive_value(self.bottom, obj, self.mm)[0]
|
|
|
|
def get_target(self, link: UUID):
|
|
return self.bottom.read_edge_target(link)
|
|
|
|
def get_source(self, link: UUID):
|
|
return self.bottom.read_edge_source(link)
|
|
|
|
def get_slot(self, obj: UUID, attr_name: str):
|
|
slot = self.od.get_slot(obj, attr_name)
|
|
if slot == None:
|
|
raise NoSuchSlotException(f"Object '{self.m_obj_to_name[obj]}' has no slot '{attr_name}'")
|
|
return slot
|
|
|
|
def get_slot_link(self, obj: UUID, attr_name: str):
|
|
return self.od.get_slot_link(obj, attr_name)
|
|
|
|
# Parameter 'include_subtypes': whether to include subtypes of the given association
|
|
def get_outgoing(self, obj: UUID, assoc_name: str, include_subtypes=True):
|
|
outgoing = self.bottom.read_outgoing_edges(obj)
|
|
result = []
|
|
for o in outgoing:
|
|
try:
|
|
type_of_outgoing_link = self.get_type_name(o)
|
|
except:
|
|
continue # OK, not all edges are typed
|
|
if (include_subtypes and self.cdapi.is_subtype(super_type_name=assoc_name, sub_type_name=type_of_outgoing_link)
|
|
or not include_subtypes and type_of_outgoing_link == assoc_name):
|
|
result.append(o)
|
|
return result
|
|
|
|
|
|
# Parameter 'include_subtypes': whether to include subtypes of the given association
|
|
def get_incoming(self, obj: UUID, assoc_name: str, include_subtypes=True):
|
|
incoming = self.bottom.read_incoming_edges(obj)
|
|
result = []
|
|
for i in incoming:
|
|
try:
|
|
type_of_incoming_link = self.get_type_name(i)
|
|
except:
|
|
continue # OK, not all edges are typed
|
|
if (include_subtypes and self.cdapi.is_subtype(super_type_name=assoc_name, sub_type_name=type_of_incoming_link)
|
|
or not include_subtypes and type_of_incoming_link == assoc_name):
|
|
result.append(i)
|
|
return result
|
|
|
|
# Returns list of tuples (name, obj)
|
|
def get_all_instances(self, type_name: str, include_subtypes=True):
|
|
if include_subtypes:
|
|
all_types = self.cdapi.transitive_sub_types[type_name]
|
|
else:
|
|
all_types = set([type_name])
|
|
obj_names = [obj_name for type_name in all_types for obj_name in self.type_to_objs[type_name]]
|
|
return [(obj_name, self.bottom.read_outgoing_elements(self.m, obj_name)[0]) for obj_name in obj_names]
|
|
|
|
def get_type(self, obj: UUID):
|
|
types = self.bottom.read_outgoing_elements(obj, "Morphism")
|
|
if len(types) != 1:
|
|
raise Exception(f"Expected obj to have 1 type, instead got {len(types)} types.")
|
|
return types[0]
|
|
|
|
def get_name(self, obj: UUID):
|
|
if obj in self.m_obj_to_name:
|
|
return self.m_obj_to_name[obj]
|
|
elif obj in self.mm_obj_to_name:
|
|
return self.mm_obj_to_name[obj]
|
|
else:
|
|
raise Exception(f"Couldn't find name of {obj} - are you sure it exists in the (meta-)model?")
|
|
|
|
def get(self, name: str):
|
|
results = self.bottom.read_outgoing_elements(self.m, name)
|
|
if len(results) == 1:
|
|
return results[0]
|
|
elif len(results) >= 2:
|
|
raise Exception("this should never happen")
|
|
else:
|
|
raise Exception(f"No such element in model: '{name}'")
|
|
|
|
def get_type_name(self, obj: UUID):
|
|
return self.get_name(self.get_type(obj))
|
|
|
|
def is_instance(self, obj: UUID, type_name: str, include_subtypes=True):
|
|
typ = self.cdapi.get_type(type_name)
|
|
types = set(typ) if not include_subtypes else self.cdapi.transitive_sub_types[type_name]
|
|
for type_of_obj in self.bottom.read_outgoing_elements(obj, "Morphism"):
|
|
if type_of_obj in types:
|
|
return True
|
|
return False
|
|
|
|
def delete(self, obj: UUID):
|
|
self.bottom.delete_element(obj)
|
|
self.__recompute_mappings()
|
|
|
|
# Does the class of the object have the given attribute?
|
|
def has_slot(self, obj: UUID, attr_name: str):
|
|
class_name = self.get_name(self.get_type(obj))
|
|
return self.od.get_attr_link_name(class_name, attr_name) != None
|
|
|
|
def get_slots(self, obj: UUID) -> list[str]:
|
|
return [attr_name for attr_name, _ in self.od.get_slots(obj)]
|
|
|
|
def get_slot_value(self, obj: UUID, attr_name: str):
|
|
slot = self.get_slot(obj, attr_name)
|
|
return self.get_value(slot)
|
|
|
|
# does the given slot contain code?
|
|
# this complements `get_slot_value` which will return code as a string
|
|
def slot_has_code(self, obj: UUID, attr_name: str):
|
|
slot = self.get_slot(obj, attr_name)
|
|
return self.get_type_name(slot) == "ActionCode"
|
|
|
|
# Returns the given default value if the slot does not exist on the object.
|
|
# The attribute must exist in the object's class, or an exception will be thrown.
|
|
# The slot may not exist however, if the attribute is defined as 'optional' in the class.
|
|
def get_slot_value_default(self, obj: UUID, attr_name: str, default: any):
|
|
try:
|
|
return self.get_slot_value(obj, attr_name)
|
|
except NoSuchSlotException:
|
|
return default
|
|
|
|
# create or update slot value
|
|
def set_slot_value(self, obj: UUID, attr_name: str, new_value: any, is_code=False):
|
|
obj_name = self.get_name(obj)
|
|
|
|
link_name = f"{obj_name}_{attr_name}"
|
|
target_name = f"{obj_name}.{attr_name}"
|
|
|
|
old_slot_link = self.get_slot_link(obj, attr_name)
|
|
if old_slot_link != None:
|
|
old_target = self.get_target(old_slot_link)
|
|
# if old_target != None:
|
|
self.bottom.delete_element(old_target) # this also deletes the slot-link
|
|
|
|
new_target = self.create_primitive_value(target_name, new_value, is_code)
|
|
slot_type = self.cdapi.find_attribute_type(self.get_type_name(obj), attr_name)
|
|
new_link = self.od._create_link(link_name, slot_type, obj, new_target)
|
|
self.__recompute_mappings()
|
|
|
|
def create_primitive_value(self, name: str, value: any, is_code=False):
|
|
# watch out: in Python, 'bool' is subtype of 'int'
|
|
# so we must check for 'bool' first
|
|
if isinstance(value, bool):
|
|
tgt = self.create_boolean_value(name, value)
|
|
elif isinstance(value, int):
|
|
tgt = self.create_integer_value(name, value)
|
|
elif isinstance(value, str):
|
|
if is_code:
|
|
tgt = self.create_actioncode_value(name, value)
|
|
else:
|
|
tgt = self.create_string_value(name, value)
|
|
else:
|
|
raise Exception("Unimplemented type "+value)
|
|
self.__recompute_mappings()
|
|
return tgt
|
|
|
|
def overwrite_primitive_value(self, name: str, value: any, is_code=False):
|
|
referred_model = UUID(self.bottom.read_value(self.get(name)))
|
|
to_overwrite_type = self.get_type_name(self.get(name))
|
|
# watch out: in Python, 'bool' is subtype of 'int'
|
|
# so we must check for 'bool' first
|
|
if isinstance(value, bool):
|
|
if to_overwrite_type != "Boolean":
|
|
raise Exception(f"Cannot assign boolean value '{value}' to value of type {to_overwrite_type}.")
|
|
Boolean(referred_model, self.state).create(value)
|
|
elif isinstance(value, int):
|
|
if to_overwrite_type != "Integer":
|
|
raise Exception(f"Cannot assign integer value '{value}' to value of type {to_overwrite_type}.")
|
|
Integer(referred_model, self.state).create(value)
|
|
elif isinstance(value, str):
|
|
if is_code:
|
|
if to_overwrite_type != "ActionCode":
|
|
raise Exception(f"Cannot assign code to value of type {to_overwrite_type}.")
|
|
ActionCode(referred_model, self.state).create(value)
|
|
else:
|
|
if to_overwrite_type != "String":
|
|
raise Exception(f"Cannot assign string value '{value}' to value of type {to_overwrite_type}.")
|
|
String(referred_model, self.state).create(value)
|
|
else:
|
|
raise Exception("Unimplemented type "+value)
|
|
|
|
def create_link(self, link_name: Optional[str], assoc_name: str, src: UUID, tgt: UUID):
|
|
global NEXT_ID
|
|
types = self.bottom.read_outgoing_elements(self.mm, assoc_name)
|
|
if len(types) == 0:
|
|
raise Exception(f"No such association: '{assoc_name}'")
|
|
elif len(types) >= 2:
|
|
raise Exception(f"More than one association exists with name '{assoc_name}' - this means the MM is invalid.")
|
|
typ = types[0]
|
|
if link_name == None:
|
|
link_name = f"__{assoc_name}{NEXT_ID}"
|
|
NEXT_ID += 1
|
|
link_id = self.od._create_link(link_name, typ, src, tgt)
|
|
self.__recompute_mappings()
|
|
return link_id
|
|
|
|
def create_object(self, object_name: Optional[str], class_name: str):
|
|
obj = self.od.create_object(object_name, class_name)
|
|
self.__recompute_mappings()
|
|
return obj
|
|
|
|
# internal use
|
|
# Get API methods as bound functions, to pass as globals to 'eval'
|
|
# Readonly version is used for:
|
|
# - Conformance checking
|
|
# - Pattern matching (LHS/NAC of rule)
|
|
def bind_api_readonly(odapi):
|
|
funcs = {
|
|
'read_value': odapi.state.read_value,
|
|
'get': odapi.get,
|
|
'get_value': odapi.get_value,
|
|
'get_target': odapi.get_target,
|
|
'get_source': odapi.get_source,
|
|
'get_slot': odapi.get_slot,
|
|
'get_slot_value': odapi.get_slot_value,
|
|
'get_slot_value_default': odapi.get_slot_value_default,
|
|
'get_all_instances': odapi.get_all_instances,
|
|
'get_name': odapi.get_name,
|
|
'get_type_name': odapi.get_type_name,
|
|
'get_outgoing': odapi.get_outgoing,
|
|
'get_incoming': odapi.get_incoming,
|
|
'has_slot': odapi.has_slot,
|
|
}
|
|
return funcs
|
|
|
|
# internal use
|
|
# Get API methods as bound functions, to pass as globals to 'eval'
|
|
# Read/write version is used for:
|
|
# - Graph rewriting (RHS of rule)
|
|
def bind_api(odapi):
|
|
funcs = {
|
|
**bind_api_readonly(odapi),
|
|
'create_object': odapi.create_object,
|
|
'create_link': odapi.create_link,
|
|
'delete': odapi.delete,
|
|
'set_slot_value': odapi.set_slot_value,
|
|
}
|
|
return funcs
|