Scheduler first commit

This commit is contained in:
robbe 2025-04-24 12:23:07 +02:00
parent 70c53a9aef
commit 2c64ebda67
24 changed files with 880 additions and 0 deletions

View file

@ -0,0 +1,12 @@
from .data_node import DataNode
from .data_modify import DataModify
from .end import End
from .exec_node import ExecNode
from .loop import Loop
from .match import Match
from .null_node import NullNode
from .print import Print
from .rewrite import Rewrite
from .start import Start
__all__ = ["DataNode", "End", "ExecNode", "Loop", "Match", "NullNode", "Rewrite", "Print", "DataModify", "Start"]

View file

@ -0,0 +1,63 @@
import functools
from typing import Any, Generator, Callable
class Data:
def __init__(self, super) -> None:
self.data: list[dict[Any, Any]] = list()
self.success: bool = False
self.super = super
@staticmethod
def store_output(func: Callable) -> Callable:
def wrapper(self, *args, **kwargs) -> Any:
output = func(self, *args, **kwargs)
self.success = output
return output
return wrapper
@store_output
def store_data(self, data_gen: Generator, n: int) -> bool:
self.data.clear()
if n == 0:
return True
i: int = 0
while (match := next(data_gen, None)) is not None:
self.data.append(match)
i+=1
if i >= n:
break
else:
if n == float("inf"):
return bool(len(self.data))
self.data.clear()
return False
return True
def get_super(self) -> int:
return self.super
def replace(self, data: "Data") -> None:
self.data.clear()
self.data.extend(data.data)
def append(self, data: Any) -> None:
self.data.append(data)
def clear(self) -> None:
self.data.clear()
def pop(self, index = -1) -> Any:
return self.data.pop(index)
def empty(self) -> bool:
return len(self.data) == 0
def __getitem__(self, index):
return self.data[index]
def __iter__(self):
return self.data.__iter__()
def __len__(self):
return self.data.__len__()

View file

@ -0,0 +1,26 @@
import functools
from typing import TYPE_CHECKING, Callable, List
from api.od import ODAPI
from examples.schedule.RuleExecuter import RuleExecuter
from .exec_node import ExecNode
from .data_node import DataNode
class DataModify(DataNode):
def __init__(self, modify_dict: dict[str,str]) -> None:
DataNode.__init__(self)
self.modify_dict: dict[str,str] = modify_dict
def input_event(self, success: bool) -> None:
if success or self.data_out.success:
self.data_out.data.clear()
for data in self.data_in.data:
self.data_out.append({self.modify_dict[key]: value for key, value in data.items() if key in self.modify_dict.keys()})
DataNode.input_event(self, success)
def generate_dot(self, nodes: List[str], edges: List[str], visited: set[int]) -> None:
if self.id in visited:
return
nodes.append(f"{self.id}[label=modify]")
super().generate_dot(nodes, edges, visited)

View file

@ -0,0 +1,47 @@
from typing import Any, Generator, List
from examples.schedule.schedule_lib.id_generator import IdGenerator
from .data import Data
class DataNode:
def __init__(self) -> None:
if not hasattr(self, 'id'):
self.id = IdGenerator().generate_id()
self.data_out : Data = Data(self)
self.data_in: Data | None = None
self.eventsub: list[DataNode] = list()
def connect_data(self, data_node: "DataNode", eventsub=True) -> None:
data_node.data_in = self.data_out
if eventsub:
self.eventsub.append(data_node)
def store_data(self, data_gen: Generator, n: int) -> None:
success: bool = self.data_out.store_data(data_gen, n)
for sub in self.eventsub:
sub.input_event(success)
def get_input_data(self) -> list[dict[Any, Any]]:
if not self.data_in.success:
raise Exception("Invalid input data: matching has failed")
data = self.data_in.data
if len(data) == 0:
raise Exception("Invalid input data: no data present")
return data
def input_event(self, success: bool) -> None:
self.data_out.success = success
for sub in self.eventsub:
sub.input_event(success)
def get_id(self) -> int:
return self.id
def generate_dot(self, nodes: List[str], edges: List[str], visited: set[int]) -> None:
visited.add(self.id)
if self.data_in is not None:
edges.append(f"{self.data_in.get_super().get_id()} -> {self.get_id()} [color = green]")
self.data_in.get_super().generate_dot(nodes, edges, visited)
for sub in self.eventsub:
sub.generate_dot(nodes, edges, visited)

View file

@ -0,0 +1,21 @@
import functools
from typing import TYPE_CHECKING, List, Callable, Generator
from api.od import ODAPI
from .exec_node import ExecNode
class End(ExecNode):
def __init__(self) -> None:
super().__init__(out_connections=1)
def execute(self, od: ODAPI) -> Generator | None:
return self.terminate(od)
@staticmethod
def terminate(od: ODAPI) -> Generator:
yield f"end:", functools.partial(lambda od:(od, ""), od)
def generate_dot(self, nodes: List[str], edges: List[str], visited: set[int]) -> None:
if self.id in visited:
return
nodes.append(f"{self.id}[label=end]")

View file

@ -0,0 +1,34 @@
from typing import TYPE_CHECKING, List, Callable, Generator
from api.od import ODAPI
from .id_generator import IdGenerator
class ExecNode:
def __init__(self, out_connections: int = 1) -> None:
from .null_node import NullNode
self.next_state: list[ExecNode] = []
if out_connections > 0:
self.next_state = [NullNode()]*out_connections
self.id: int = IdGenerator().generate_id()
def nextState(self) -> "ExecNode":
return self.next_state[0]
def connect(self, next_state: "ExecNode", from_gate: int = 0, to_gate: int = 0) -> None:
if from_gate >= len(self.next_state):
raise IndexError
self.next_state[from_gate] = next_state
def execute(self, od: ODAPI) -> Generator | None:
return None
def get_id(self) -> int:
return self.id
def generate_dot(self, nodes: List[str], edges: List[str], visited: set[int]) -> None:
visited.add(self.id)
for edge in self.next_state:
edges.append(f"{self.id} -> {edge.get_id()}")
for next in self.next_state:
next.generate_dot(nodes, edges, visited)

View file

@ -0,0 +1,10 @@
from typing import Callable
def generate_dot_wrap(func) -> Callable:
def wrapper(self, *args, **kwargs) -> str:
nodes = []
edges = []
self.reset_visited()
func(self, nodes, edges, *args, **kwargs)
return f"digraph G {{\n\t{"\n\t".join(nodes)}\n\t{"\n\t".join(edges)}\n}}"
return wrapper

View file

@ -0,0 +1,8 @@
from .singleton import Singleton
class IdGenerator(metaclass=Singleton):
def __init__(self):
self.id = -1
def generate_id(self) -> int:
self.id += 1
return self.id

View file

@ -0,0 +1,57 @@
import functools
from random import choice
from typing import TYPE_CHECKING, Callable, List, Generator
from api.od import ODAPI
from examples.schedule.RuleExecuter import RuleExecuter
from .exec_node import ExecNode
from .data_node import DataNode
from .data_node import Data
class Loop(ExecNode, DataNode):
def __init__(self, choice) -> None:
ExecNode.__init__(self, out_connections=2)
DataNode.__init__(self)
self.choice: bool = choice
self.cur_data: Data = Data(-1)
def nextState(self) -> ExecNode:
return self.next_state[not self.data_out.success]
def execute(self, od: ODAPI) -> Generator | None:
if self.cur_data.empty():
self.data_out.clear()
self.data_out.success = False
DataNode.input_event(self, False)
return None
if self.choice:
def select_data() -> Generator:
for i in range(len(self.cur_data)):
yield f"choice: {self.cur_data[i]}", functools.partial(self.select_next,od, i)
return select_data()
else:
self.select_next(od, -1)
return None
def input_event(self, success: bool) -> None:
if (b := self.data_out.success) or success:
self.cur_data.replace(self.data_in)
self.data_out.clear()
self.data_out.success = False
if b:
DataNode.input_event(self, False)
def select_next(self,od: ODAPI, index: int) -> tuple[ODAPI, list[str]]:
self.data_out.clear()
self.data_out.append(self.cur_data.pop(index))
DataNode.input_event(self, True)
return (od, ["data selected"])
def generate_dot(self, nodes: List[str], edges: List[str], visited: set[int]) -> None:
if self.id in visited:
return
nodes.append(f"{self.id}[label=Loop]")
ExecNode.generate_dot(self, nodes, edges, visited)
DataNode.generate_dot(self, nodes, edges, visited)

View file

@ -0,0 +1,42 @@
import functools
from typing import TYPE_CHECKING, Callable, List, Generator
from api.od import ODAPI
from examples.schedule.RuleExecuter import RuleExecuter
from .exec_node import ExecNode
from .data_node import DataNode
class Match(ExecNode, DataNode):
def __init__(self, label: str, n: int | float) -> None:
ExecNode.__init__(self, out_connections=2)
DataNode.__init__(self)
self.label: str = label
self.n:int = n
self.rule = None
self.rule_executer : RuleExecuter
def nextState(self) -> ExecNode:
return self.next_state[not self.data_out.success]
def execute(self, od: ODAPI) -> Generator | None:
self.match(od)
return None
def init_rule(self, rule, rule_executer):
self.rule = rule
self.rule_executer = rule_executer
def match(self, od: ODAPI) -> None:
pivot = {}
if self.data_in is not None:
pivot = self.get_input_data()[0]
print(f"matching: {self.label}\n\tpivot: {pivot}")
self.store_data(self.rule_executer.match_rule(od.m, self.rule, pivot=pivot), self.n)
def generate_dot(self, nodes: List[str], edges: List[str], visited: set[int]) -> None:
if self.id in visited:
return
nodes.append(f"{self.id}[label=M_{self.label.split("/")[-1]}_{self.n}]")
ExecNode.generate_dot(self, nodes, edges, visited)
DataNode.generate_dot(self, nodes, edges, visited)

View file

@ -0,0 +1,25 @@
import functools
from symtable import Function
from typing import List, Callable, Generator
from api.od import ODAPI
from .singleton import Singleton
from .exec_node import ExecNode
class NullNode(ExecNode, metaclass=Singleton):
def __init__(self):
ExecNode.__init__(self, out_connections=0)
def execute(self, od: ODAPI) -> Generator | None:
raise Exception('Null node should already have terminated the schedule')
@staticmethod
def terminate(od: ODAPI):
return None
yield # verrrry important line, dont remove this unreachable code
def generate_dot(self, nodes: List[str], edges: List[str], visited: set[int]) -> None:
if self.id in visited:
return
nodes.append(f"{self.id}[label=Null]")

View file

@ -0,0 +1,28 @@
import functools
from typing import TYPE_CHECKING, Callable, List, Generator
from api.od import ODAPI
from examples.schedule.RuleExecuter import RuleExecuter
from .exec_node import ExecNode
from .data_node import DataNode
class Print(ExecNode, DataNode):
def __init__(self, label: str = "") -> None:
ExecNode.__init__(self, out_connections=1)
DataNode.__init__(self)
self.label = label
def execute(self, od: ODAPI) -> Generator | None:
self.input_event(True)
return None
def input_event(self, success: bool) -> None:
print(f"{self.label}{self.data_in.data}")
def generate_dot(self, nodes: List[str], edges: List[str], visited: set[int]) -> None:
if self.id in visited:
return
nodes.append(f"{self.id}[label=Print_{self.label.replace(":", "")}]")
ExecNode.generate_dot(self, nodes, edges, visited)
DataNode.generate_dot(self, nodes, edges, visited)

View file

@ -0,0 +1,38 @@
import functools
from typing import List, Callable, Generator
from api.od import ODAPI
from .exec_node import ExecNode
from .data_node import DataNode
from ..RuleExecuter import RuleExecuter
class Rewrite(ExecNode, DataNode):
def __init__(self, label: str) -> None:
ExecNode.__init__(self, out_connections=1)
DataNode.__init__(self)
self.label = label
self.rule = None
self.rule_executer : RuleExecuter
def init_rule(self, rule, rule_executer):
self.rule = rule
self.rule_executer= rule_executer
def execute(self, od: ODAPI) -> Generator | None:
yield "ghello", functools.partial(self.rewrite, od)
def rewrite(self, od):
print("rewrite" + self.label)
pivot = {}
if self.data_in is not None:
pivot = self.get_input_data()[0]
self.store_data(self.rule_executer.rewrite_rule(od.m, self.rule, pivot=pivot), 1)
return ODAPI(od.state, od.m, od.mm),[f"rewrite {self.label}\n\tpivot: {pivot}\n\t{"success" if self.data_out.success else "failure"}\n"]
def generate_dot(self, nodes: List[str], edges: List[str], visited: set[int]) -> None:
if self.id in visited:
return
nodes.append(f"{self.id}[label=R_{self.label.split("/")[-1]}]")
ExecNode.generate_dot(self, nodes, edges, visited)
DataNode.generate_dot(self, nodes, edges, visited)

View file

@ -0,0 +1,8 @@
from abc import ABCMeta
class Singleton(ABCMeta):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]

View file

@ -0,0 +1,16 @@
from typing import TYPE_CHECKING, Callable, List, Any
from .funcs import generate_dot_wrap
from .exec_node import ExecNode
class Start(ExecNode):
def __init__(self) -> None:
ExecNode.__init__(self, out_connections=1)
def generate_dot(self, nodes: List[str], edges: List[str], visited: set[int]) -> None:
if self.id in visited:
return
nodes.append(f"{self.id}[label=start]")
super().generate_dot(nodes, edges, visited)