cleanup the od api interface

This commit is contained in:
robbe 2025-06-27 12:15:19 +02:00
parent 04a17f6ac8
commit 9eea3618d0
6 changed files with 69 additions and 47 deletions

View file

@ -6,8 +6,7 @@ from services.primitives.integer_type import Integer
from services.primitives.string_type import String from services.primitives.string_type import String
from services.primitives.actioncode_type import ActionCode from services.primitives.actioncode_type import ActionCode
from uuid import UUID from uuid import UUID
from typing import Optional from typing import Optional, Any
from util.timer import Timer
NEXT_ID = 0 NEXT_ID = 0
@ -42,10 +41,10 @@ class ODAPI:
self.create_string_value = self.od.create_string_value self.create_string_value = self.od.create_string_value
self.create_actioncode_value = self.od.create_actioncode_value self.create_actioncode_value = self.od.create_actioncode_value
self.__recompute_mappings() self.recompute_mappings()
# Called after every change - makes querying faster but modifying slower # Called after every change - makes querying faster but modifying slower
def __recompute_mappings(self): def recompute_mappings(self):
self.m_obj_to_name = build_name_mapping(self.state, self.m) self.m_obj_to_name = build_name_mapping(self.state, self.m)
self.mm_obj_to_name = build_name_mapping(self.state, self.mm) self.mm_obj_to_name = build_name_mapping(self.state, self.mm)
self.type_to_objs = { type_name : set() for type_name in self.bottom.read_keys(self.mm)} self.type_to_objs = { type_name : set() for type_name in self.bottom.read_keys(self.mm)}
@ -60,25 +59,33 @@ class ODAPI:
def get_value(self, obj: UUID): def get_value(self, obj: UUID):
return od.read_primitive_value(self.bottom, obj, self.mm)[0] return od.read_primitive_value(self.bottom, obj, self.mm)[0]
def get_target(self, link: UUID): def get_target(self, link: UUID) -> UUID:
return self.bottom.read_edge_target(link) return self.bottom.read_edge_target(link)
def get_source(self, link: UUID): def get_source(self, link: UUID) -> UUID:
return self.bottom.read_edge_source(link) return self.bottom.read_edge_source(link)
def get_slot(self, obj: UUID, attr_name: str): def get_slot(self, obj: UUID, attr_name: str) -> UUID:
slot = self.od.get_slot(obj, attr_name) slot = self.od.get_slot(obj, attr_name)
if slot == None: if slot == None:
raise NoSuchSlotException(f"Object '{self.m_obj_to_name[obj]}' has no slot '{attr_name}'") raise NoSuchSlotException(f"Object '{self.m_obj_to_name[obj]}' has no slot '{attr_name}'")
return slot return slot
def get_slot_link(self, obj: UUID, attr_name: str): def get_slot_link(self, obj: UUID, attr_name: str) -> UUID:
return self.od.get_slot_link(obj, attr_name) return self.od.get_slot_link(obj, attr_name)
# Parameter 'include_subtypes': whether to include subtypes of the given association # Parameter 'include_subtypes': whether to include subtypes of the given association
def get_outgoing(self, obj: UUID, assoc_name: str, include_subtypes=True): def get_outgoing(self, obj: UUID, assoc_name: str, include_subtypes=True) -> list[UUID]:
outgoing = self.bottom.read_outgoing_edges(obj) outgoing = self.bottom.read_outgoing_edges(obj)
result = [] return self.filter_edges_by_type(outgoing, assoc_name, include_subtypes)
# Parameter 'include_subtypes': whether to include subtypes of the given association
def get_incoming(self, obj: UUID, assoc_name: str, include_subtypes=True):
incoming = self.bottom.read_incoming_edges(obj)
return self.filter_edges_by_type(incoming, assoc_name, include_subtypes)
def filter_edges_by_type(self, outgoing: list[UUID], assoc_name: str, include_subtypes=True) -> list[UUID]:
result: list[UUID] = []
for o in outgoing: for o in outgoing:
try: try:
type_of_outgoing_link = self.get_type_name(o) type_of_outgoing_link = self.get_type_name(o)
@ -89,23 +96,8 @@ class ODAPI:
result.append(o) result.append(o)
return result return result
# Parameter 'include_subtypes': whether to include subtypes of the given association
def get_incoming(self, obj: UUID, assoc_name: str, include_subtypes=True):
incoming = self.bottom.read_incoming_edges(obj)
result = []
for i in incoming:
try:
type_of_incoming_link = self.get_type_name(i)
except:
continue # OK, not all edges are typed
if (include_subtypes and self.cdapi.is_subtype(super_type_name=assoc_name, sub_type_name=type_of_incoming_link)
or not include_subtypes and type_of_incoming_link == assoc_name):
result.append(i)
return result
# Returns list of tuples (name, obj) # Returns list of tuples (name, obj)
def get_all_instances(self, type_name: str, include_subtypes=True): def get_all_instances(self, type_name: str, include_subtypes=True) -> list[UUID]:
if include_subtypes: if include_subtypes:
all_types = self.cdapi.transitive_sub_types[type_name] all_types = self.cdapi.transitive_sub_types[type_name]
else: else:
@ -127,7 +119,7 @@ class ODAPI:
else: else:
raise Exception(f"Couldn't find name of {obj} - are you sure it exists in the (meta-)model?") raise Exception(f"Couldn't find name of {obj} - are you sure it exists in the (meta-)model?")
def get(self, name: str): def get(self, name: str) -> UUID:
results = self.bottom.read_outgoing_elements(self.m, name) results = self.bottom.read_outgoing_elements(self.m, name)
if len(results) == 1: if len(results) == 1:
return results[0] return results[0]
@ -136,10 +128,10 @@ class ODAPI:
else: else:
raise Exception(f"No such element in model: '{name}'") raise Exception(f"No such element in model: '{name}'")
def get_type_name(self, obj: UUID): def get_type_name(self, obj: UUID) -> str:
return self.get_name(self.get_type(obj)) return self.get_name(self.get_type(obj))
def is_instance(self, obj: UUID, type_name: str, include_subtypes=True): def is_instance(self, obj: UUID, type_name: str, include_subtypes=True) -> bool:
typ = self.cdapi.get_type(type_name) typ = self.cdapi.get_type(type_name)
types = set(typ) if not include_subtypes else self.cdapi.transitive_sub_types[type_name] types = set(typ) if not include_subtypes else self.cdapi.transitive_sub_types[type_name]
for type_of_obj in self.bottom.read_outgoing_elements(obj, "Morphism"): for type_of_obj in self.bottom.read_outgoing_elements(obj, "Morphism"):
@ -147,18 +139,21 @@ class ODAPI:
return True return True
return False return False
def delete(self, obj: UUID): def delete(self, obj: UUID) -> None:
self.bottom.delete_element(obj) self.bottom.delete_element(obj)
self.__recompute_mappings() self.recompute_mappings()
# Does the the object have the given attribute? # Does the the object have the given attribute?
def has_slot(self, obj: UUID, attr_name: str): def has_slot(self, obj: UUID, attr_name: str) -> bool:
return self.od.get_slot_link(obj, attr_name) != None class_name = self.get_name(self.get_type(obj))
if self.od.get_attr_link_name(class_name, attr_name) is None:
return False
return self.od.get_slot_link(obj, attr_name) is not None
def get_slots(self, obj: UUID) -> list[str]: def get_slots(self, obj: UUID) -> list[str]:
return [attr_name for attr_name, _ in self.od.get_slots(obj)] return [attr_name for attr_name, _ in self.od.get_slots(obj)]
def get_slot_value(self, obj: UUID, attr_name: str): def get_slot_value(self, obj: UUID, attr_name: str) -> Any:
slot = self.get_slot(obj, attr_name) slot = self.get_slot(obj, attr_name)
return self.get_value(slot) return self.get_value(slot)
@ -171,14 +166,14 @@ class ODAPI:
# Returns the given default value if the slot does not exist on the object. # Returns the given default value if the slot does not exist on the object.
# The attribute must exist in the object's class, or an exception will be thrown. # The attribute must exist in the object's class, or an exception will be thrown.
# The slot may not exist however, if the attribute is defined as 'optional' in the class. # The slot may not exist however, if the attribute is defined as 'optional' in the class.
def get_slot_value_default(self, obj: UUID, attr_name: str, default: any): def get_slot_value_default(self, obj: UUID, attr_name: str, default: any) -> any:
try: try:
return self.get_slot_value(obj, attr_name) return self.get_slot_value(obj, attr_name)
except NoSuchSlotException: except NoSuchSlotException:
return default return default
# create or update slot value # create or update slot value
def set_slot_value(self, obj: UUID, attr_name: str, new_value: any, is_code=False): def set_slot_value(self, obj: UUID, attr_name: str, new_value: any, is_code=False) -> None:
obj_name = self.get_name(obj) obj_name = self.get_name(obj)
link_name = f"{obj_name}_{attr_name}" link_name = f"{obj_name}_{attr_name}"
@ -193,7 +188,7 @@ class ODAPI:
new_target = self.create_primitive_value(target_name, new_value, is_code) new_target = self.create_primitive_value(target_name, new_value, is_code)
slot_type = self.cdapi.find_attribute_type(self.get_type_name(obj), attr_name) slot_type = self.cdapi.find_attribute_type(self.get_type_name(obj), attr_name)
new_link = self.od._create_link(link_name, slot_type, obj, new_target) new_link = self.od._create_link(link_name, slot_type, obj, new_target)
self.__recompute_mappings() self.recompute_mappings()
def create_primitive_value(self, name: str, value: any, is_code=False): def create_primitive_value(self, name: str, value: any, is_code=False):
# watch out: in Python, 'bool' is subtype of 'int' # watch out: in Python, 'bool' is subtype of 'int'
@ -209,7 +204,7 @@ class ODAPI:
tgt = self.create_string_value(name, value) tgt = self.create_string_value(name, value)
else: else:
raise Exception("Unimplemented type "+value) raise Exception("Unimplemented type "+value)
self.__recompute_mappings() self.recompute_mappings()
return tgt return tgt
def overwrite_primitive_value(self, name: str, value: any, is_code=False): def overwrite_primitive_value(self, name: str, value: any, is_code=False):
@ -228,7 +223,7 @@ class ODAPI:
else: else:
raise Exception("Unimplemented type "+value) raise Exception("Unimplemented type "+value)
def create_link(self, link_name: Optional[str], assoc_name: str, src: UUID, tgt: UUID): def create_link(self, link_name: Optional[str], assoc_name: str, src: UUID, tgt: UUID) -> UUID:
global NEXT_ID global NEXT_ID
types = self.bottom.read_outgoing_elements(self.mm, assoc_name) types = self.bottom.read_outgoing_elements(self.mm, assoc_name)
if len(types) == 0: if len(types) == 0:
@ -240,12 +235,12 @@ class ODAPI:
link_name = f"__{assoc_name}{NEXT_ID}" link_name = f"__{assoc_name}{NEXT_ID}"
NEXT_ID += 1 NEXT_ID += 1
link_id = self.od._create_link(link_name, typ, src, tgt) link_id = self.od._create_link(link_name, typ, src, tgt)
self.__recompute_mappings() self.recompute_mappings()
return link_id return link_id
def create_object(self, object_name: Optional[str], class_name: str): def create_object(self, object_name: Optional[str], class_name: str) -> UUID:
obj = self.od.create_object(object_name, class_name) obj = self.od.create_object(object_name, class_name)
self.__recompute_mappings() self.recompute_mappings()
return obj return obj
# internal use # internal use
@ -284,6 +279,6 @@ def bind_api(odapi):
'create_object': odapi.create_object, 'create_object': odapi.create_object,
'create_link': odapi.create_link, 'create_link': odapi.create_link,
'delete': odapi.delete, 'delete': odapi.delete,
'set_slot_value': odapi.set_slot_value, 'set_slot_value': odapi.set_slot_value
} }
return funcs return funcs

9
api/od_stub.pyi Normal file
View file

@ -0,0 +1,9 @@
from typing import Optional
from uuid import UUID
from od_stub_readonly import *
def create_object(object_name: Optional[str], class_name: str) -> UUID: ...
def create_link(link_name: Optional[str], assoc_name: str, src: UUID, tgt: UUID) -> UUID: ...
def delete(obj: UUID) -> None: ...
def set_slot_value(obj: UUID, attr_name: str, new_value: any, is_code=False) -> None: ...

18
api/od_stub_readonly.pyi Normal file
View file

@ -0,0 +1,18 @@
from typing import Any
from uuid import UUID
def get(name: str) -> UUID: ...
def get_value(obj: UUID) -> Any: ...
def get_target(link: UUID) -> UUID: ...
def get_source(link: UUID) -> UUID: ...
def get_slot(obj: UUID, attr_name: str) -> UUID: ...
def get_slots(obj: UUID) -> list[str]: ...
def get_slot_value(obj: UUID, attr_name: str) -> Any: ...
def get_slot_value_default(obj: UUID, attr_name: str, default: any) -> Any: ...
def get_all_instances(type_name: str, include_subtypes=True) -> list[UUID]: ...
def get_name(obj: UUID) -> str: ...
def get_type_name(obj: UUID) -> str: ...
def get_outgoing(obj: UUID, assoc_name: str, include_subtypes=True) -> list[UUID]: ...
def get_incoming(obj: UUID, assoc_name: str, include_subtypes: object = True) -> list[UUID]: ...
def has_slot(obj: UUID, attr_name: str) -> bool: ...
def is_instance(obj: UUID, type_name: str, include_subtypes=True) -> bool: ...

View file

@ -52,7 +52,7 @@ def merge_models(state, mm, models: list[UUID]):
model = state.read_value(obj) model = state.read_value(obj)
scd = SCD(merged, state) scd = SCD(merged, state)
created_obj = scd.create_model_ref(prefixed_obj_name, model) created_obj = scd.create_model_ref(prefixed_obj_name, model)
merged_odapi._ODAPI__recompute_mappings() # dirty!! merged_odapi.recompute_mappings() # dirty!!
else: else:
# create node or edge # create node or edge
if state.is_edge(obj): if state.is_edge(obj):

View file

@ -149,13 +149,13 @@ def rewrite(state,
if od.is_typed_by(bottom, rhs_type, class_type): if od.is_typed_by(bottom, rhs_type, class_type):
obj_name = first_available_name(suggested_name) obj_name = first_available_name(suggested_name)
host_od._create_object(obj_name, host_type) host_od._create_object(obj_name, host_type)
host_odapi._ODAPI__recompute_mappings() host_odapi.recompute_mappings()
rhs_match[rhs_name] = obj_name rhs_match[rhs_name] = obj_name
elif od.is_typed_by(bottom, rhs_type, assoc_type): elif od.is_typed_by(bottom, rhs_type, assoc_type):
_, _, host_src, host_tgt = get_src_tgt() _, _, host_src, host_tgt = get_src_tgt()
link_name = first_available_name(suggested_name) link_name = first_available_name(suggested_name)
host_od._create_link(link_name, host_type, host_src, host_tgt) host_od._create_link(link_name, host_type, host_src, host_tgt)
host_odapi._ODAPI__recompute_mappings() host_odapi.recompute_mappings()
rhs_match[rhs_name] = link_name rhs_match[rhs_name] = link_name
elif od.is_typed_by(bottom, rhs_type, attr_link_type): elif od.is_typed_by(bottom, rhs_type, attr_link_type):
host_src_name, _, host_src, host_tgt = get_src_tgt() host_src_name, _, host_src, host_tgt = get_src_tgt()
@ -163,7 +163,7 @@ def rewrite(state,
host_attr_name = host_mm_odapi.get_slot_value(host_attr_link, "name") host_attr_name = host_mm_odapi.get_slot_value(host_attr_link, "name")
link_name = f"{host_src_name}_{host_attr_name}" # must follow naming convention here link_name = f"{host_src_name}_{host_attr_name}" # must follow naming convention here
host_od._create_link(link_name, host_type, host_src, host_tgt) host_od._create_link(link_name, host_type, host_src, host_tgt)
host_odapi._ODAPI__recompute_mappings() host_odapi.recompute_mappings()
rhs_match[rhs_name] = link_name rhs_match[rhs_name] = link_name
elif rhs_type == rhs_mm_odapi.get("ActionCode"): elif rhs_type == rhs_mm_odapi.get("ActionCode"):
# If we encounter ActionCode in our RHS, we assume that the code computes the value of an attribute... # If we encounter ActionCode in our RHS, we assume that the code computes the value of an attribute...