Merge with Robbe's scheduling language

This commit is contained in:
Joeri Exelmans 2025-07-23 15:30:32 +02:00
commit 4ba0ed09b2
203 changed files with 8582 additions and 3886 deletions

View file

@ -15,6 +15,30 @@ from api.od import ODAPI, bind_api_readonly
import functools
def eval_context_decorator(func):
"""
Used to mark functions that can be called inside the evaluation context.
Base functions are mapped into the function, as well as the evaluation context.
This happens at runtime so typechecking will not be happy.
Important: Using the same name in the evaluation context as the function name
will lead to naming conflicts with the function as priority, resulting in missing argument errors.
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from api.od_stub import *
...
Use this to make partially fix the typechecking.
Optionally, define a stub for your own evaluation context and include it.
"""
def wrapper(*args, api_context, eval_context, **kwargs):
for key, value in api_context.items():
func.__globals__[key] = value
for key, value in eval_context.items():
func.__globals__[key] = value
return func(*args, **kwargs)
return wrapper
def render_conformance_check_result(error_list):
if len(error_list) == 0:
return "CONFORM"
@ -25,7 +49,7 @@ def render_conformance_check_result(error_list):
class Conformance:
# Parameter 'constraint_check_subtypes': whether to check local type-level constraints also on subtypes.
def __init__(self, state: State, model: UUID, type_model: UUID, constraint_check_subtypes=True):
def __init__(self, state: State, model: UUID, type_model: UUID, constraint_check_subtypes=True, *, eval_context = None):
self.state = state
self.bottom = Bottom(state)
self.model = model
@ -51,6 +75,9 @@ class Conformance:
self.structures = {}
self.candidates = {}
# add user defined functions to constraints
self.eval_context = eval_context if eval_context else {}
def check_nominal(self, *, log=False):
"""
@ -248,6 +275,13 @@ class Conformance:
raise Exception(f"{description} evaluation result should be boolean or string! Instead got {result}")
# local constraints
_api_context = bind_api_readonly(self.odapi)
_global_binds = {**_api_context}
_eval_context = {**self.eval_context}
for key, code in _eval_context.items():
_f = functools.partial(code, **{"api_context" :_api_context, "eval_context":_eval_context})
_global_binds[key] = _f
_eval_context[key] = _f
for type_name in self.bottom.read_keys(self.type_model):
code = get_code(type_name)
if code != None:
@ -256,7 +290,7 @@ class Conformance:
description = f"Local constraint of \"{type_name}\" in \"{obj_name}\""
# print(description)
try:
result = exec_then_eval(code, _globals=bind_api_readonly(self.odapi), _locals={'this': obj_id}) # may raise
result = exec_then_eval(code, _globals=_global_binds, _locals={'this': obj_id}) # may raise
check_result(result, description)
except:
errors.append(f"Runtime error during evaluation of {description}:\n{indent(traceback.format_exc(), 6)}")
@ -278,7 +312,7 @@ class Conformance:
if code != None:
description = f"Global constraint \"{tm_name}\""
try:
result = exec_then_eval(code, _globals=bind_api_readonly(self.odapi)) # may raise
result = exec_then_eval(code, _globals=_global_binds) # may raise
check_result(result, description)
except:
errors.append(f"Runtime error during evaluation of {description}:\n{indent(traceback.format_exc(), 6)}")

View file

@ -1,98 +0,0 @@
from framework.manager import Manager
from state.devstate import DevState
from InquirerPy import prompt, separator
from pprint import pprint
import prompt_questions as questions
from inspect import signature
from uuid import UUID
from ast import literal_eval
def generate_context_question(ctx_type, services):
"""
Converts service names to human readable form
"""
choices = [
s.__name__.replace('_', ' ') for s in services
]
choices = sorted(choices)
choices.append(separator.Separator())
choices.append("close context")
ctx_question = [
{
'type': 'list',
'name': 'op',
'message': f'Currently in context {ctx_type.__name__}, which operation would you like to perform?',
'choices': choices,
'filter': lambda x: x.replace(' ', '_')
}
]
return ctx_question
def main():
state = DevState()
man = Manager(state)
while True:
if man.current_model != None and man.current_context == None:
# we have selected a model, so we display typing questions
answer = prompt(questions.MODEL_SELECTED)
ctx = man
elif man.current_model != None and man.current_context != None:
# we have selected both a model and a context, so we display available services
qs = generate_context_question(type(man.current_context), man.get_services())
answer = prompt(qs)
if answer['op'] == 'close_context':
man.close_context()
continue
else:
ctx = man.current_context
else:
answer = prompt(questions.MODEL_MGMT)
ctx = man
if answer['op'] == 'exit':
break
else:
method = getattr(ctx, answer['op'])
args_questions = []
types = {}
for p in signature(method).parameters.values():
types[p.name] = p.annotation if p.annotation else literal_eval # can't use filter in question dict, doesn't work for some reason...
if p.annotation == UUID:
args_questions.append({
'type': 'list',
'name': p.name,
'message': f'{p.name.replace("_", " ")}?',
'choices': list(man.get_models()),
'filter': lambda x: state.read_value(state.read_dict(state.read_root(), x))
})
else:
args_questions.append({
'type': 'input',
'name': p.name,
'message': f'{p.name.replace("_", " ")}?',
'filter': lambda x: '' if x.lower() == 'false' else x
})
args = prompt(args_questions)
args = {k: types[k](v) if len(v) > 0 else None for k, v in args.items()}
try:
output = method(**args)
if output != None:
try:
if isinstance(output, str):
raise TypeError
output = list(output)
if len(output) > 0:
for o in sorted(output):
print(f"\u2022 {o}")
except TypeError:
print(f"\u2022 {output}")
except RuntimeError as e:
print(e)
if __name__ == '__main__':
print("""Welcome to...\r\n __ ____ _____ \r\n | \\/ \\ \\ / /__ \\ \r\n | \\ / |\\ \\ / / ) |\r\n | |\\/| | \\ \\/ / / / \r\n | | | | \\ / / /_ \r\n |_| |_| \\/ |____| """)
main()

View file

@ -1,225 +0,0 @@
from state.base import State
from bootstrap.scd import bootstrap_scd
from bootstrap.pn import bootstrap_pn
from services import implemented as services
from framework.conformance import Conformance
from uuid import UUID
class Manager:
def __init__(self, state: State):
self.current_model = None
self.current_context = None
self.state = state
bootstrap_scd(state)
# bootstrap_pn(state, "PN")
scd_node = self.state.read_dict(self.state.read_root(), "SCD")
for key_node in self.state.read_dict_keys(self.state.read_root()):
model_node = self.state.read_dict_node(self.state.read_root(), key_node)
self.state.create_edge(model_node, scd_node)
def get_models(self):
"""
Retrieves all existing models
Returns:
Names of exising models
"""
for key_node in self.state.read_dict_keys(self.state.read_root()):
yield self.state.read_value(key_node)
def instantiate_model(self, type_model_name: str, name: str):
"""
Retrieves all existing models
Args:
type_model_name: name of the type model we want to instantiate
name: name of the instance model to be created
Returns:
Nothing
"""
root = self.state.read_root()
type_model_node = self.state.read_dict(root, type_model_name)
if type_model_node == None:
raise RuntimeError(f"No type model with name {type_model_name} found.")
else:
# check if model is a linguistic type model
scd_node = self.state.read_dict(self.state.read_root(), "SCD")
incoming = self.state.read_incoming(scd_node)
incoming = [self.state.read_edge(e)[0] for e in incoming]
if type_model_node not in incoming:
raise RuntimeError(f"Model with name {type_model_name} is not a type model.")
if name in map(self.state.read_value, self.state.read_dict_keys(root)):
raise RuntimeError(f"Model with name {name} already exists.")
new_model_root = self.state.create_node()
new_model_node = self.state.create_nodevalue(str(new_model_root))
self.state.create_dict(root, name, new_model_node)
self.state.create_edge(new_model_node, type_model_node)
self.current_model = (name, new_model_root)
if type_model_name not in services:
raise RuntimeError(f"Services for type {type_model_name} not implemented.")
self.current_context = services[type_model_name](self.current_model[1], self.state)
def select_model(self, name: str):
"""
Select a model to interact with
Args:
name: name of the model we want to interact with
Returns:
Nothing
"""
root = self.state.read_root()
model_node = self.state.read_dict(root, name)
if model_node == None:
raise RuntimeError(f"No model with name {name} found.")
model_root = UUID(self.state.read_value(model_node))
self.current_model = (name, model_root)
def close_model(self):
"""
Clear the currently selected model
Returns:
Nothing
"""
self.current_model = None
self.current_context = None
def get_types(self):
"""
Retrieve the types of the currently selected model
Returns:
Names of the model's types
"""
root = self.state.read_root()
if self.current_model == None:
raise RuntimeError(f"No model currently selected.")
name, model = self.current_model
model_id = self.state.read_dict(root, name)
outgoing = self.state.read_outgoing(model_id)
outgoing = [e for e in outgoing if len(self.state.read_outgoing(e)) == 0]
elements = [self.state.read_edge(e)[1] for e in outgoing]
for e in elements:
incoming = self.state.read_incoming(e)
label_edge, = [e for e in incoming if len(self.state.read_outgoing(e)) == 1]
label_edge, = self.state.read_outgoing(label_edge)
_, label_node = self.state.read_edge(label_edge)
yield self.state.read_value(label_node)
def select_context(self, name: str):
"""
Select a type to set as the current context
Args:
name: name of the type/context
Returns:
Nothing
"""
if name not in self.get_types():
raise RuntimeError(f"No type {name} that currently selected model conforms to.")
if name not in services:
raise RuntimeError(f"Services for type {name} not implemented.")
self.current_context = services[name](self.current_model[1], self.state)
self.current_context.from_bottom()
def close_context(self):
"""
Exit the current (type) context
Returns:
Nothing
"""
self.current_context.to_bottom()
self.current_context = None
def get_services(self):
"""
Retrieve the services available in the current context
Returns:
Functions exposed by the current context's implementation
"""
if self.current_model == None:
raise RuntimeError(f"No model currently selected.")
if self.current_context == None:
raise RuntimeError(f"No context currently selected.")
yield from [
getattr(self.current_context, func)
for func in dir(self.current_context)
if callable(getattr(self.current_context, func))
and not func.startswith("__")
and not func == "from_bottom"
and not func == "to_bottom"
]
def check_conformance(self, type_model_name: str, model_name: str):
"""
If there are existing morphisms between the model and type model
check nominal conformance
Else
find conformance using structural conformance check
Args:
type_model_name: name of the type model to check conformance against
model_name: name of the instance model
Returns:
Boolean indicating whether conformance was found
"""
root = self.state.read_root()
type_model_node = self.state.read_dict(root, type_model_name)
if type_model_node == None:
raise RuntimeError(f"No type model with name {type_model_name} found.")
model_node = self.state.read_dict(root, model_name)
if model_node == None:
raise RuntimeError(f"No model with name {model_node} found.")
types = self.state.read_outgoing(model_node)
types = [self.state.read_edge(e)[1] for e in types]
# if type_model_node not in types:
if True:
print("checking structural conformance")
conf = Conformance(self.state,
UUID(self.state.read_value(model_node)),
UUID(self.state.read_value(type_model_node))).check_structural(log=True)
if conf:
self.state.create_edge(model_node, type_model_node)
return conf
else:
print("checking nominal conformance")
return Conformance(self.state,
UUID(self.state.read_value(model_node)),
UUID(self.state.read_value(type_model_node))).check_nominal(log=True)
def dump_state(self):
"""
Dumps the current state of the Modelverse to a pickle file
"""
import pickle
with open("state.p", "wb") as file:
pickle.dump(self.state, file)
def load_state(self):
"""
Loas a state of the Modelverse from a pickle file
"""
import pickle
with open("state.p", "rb") as file:
self.state = pickle.load(file)
def to_graphviz(self):
self.state.dump("state.dot")
if __name__ == '__main__':
from state.devstate import DevState
s = DevState()
m = Manager(s)
m.select_model("SCD")
m.select_context("SCD")
for f in m.get_services():
print(f)

View file

@ -1,37 +0,0 @@
from InquirerPy.separator import Separator
MODEL_SELECTED = [
{
'type': 'list',
'name': 'op',
'message': 'Model selected... Which operation would you like to perform?',
'choices': [
'get types',
'select context',
Separator(),
'close model'
],
'filter': lambda x: x.replace(' ', '_')
}
]
MODEL_MGMT = [
{
'type': 'list',
'name': 'op',
'message': 'Which model management operation would you like to perform?',
'choices': [
'get models',
'select model',
'instantiate model',
'check conformance',
Separator(),
'load state',
'dump state',
'to graphviz',
Separator(),
'exit'
],
'filter': lambda x: x.replace(' ', '_')
}
]