diff --git a/requirements.txt b/requirements.txt index 11b890f..b49798e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -pytest==6.2.4 \ No newline at end of file +pytest==6.2.4 +neo4j==4.3.4 +rdflib==6.0.0 diff --git a/state/neo4jstate.py b/state/neo4jstate.py new file mode 100644 index 0000000..14704fa --- /dev/null +++ b/state/neo4jstate.py @@ -0,0 +1,301 @@ +from typing import Any, Optional, List, Tuple, Callable, Generator +from neo4j import GraphDatabase +from ast import literal_eval + +from .base import State, Edge, Node, Element, UUID + + + +class Neo4jState(State): + def __init__(self, uri="bolt://localhost:7687", user="neo4j", password="tests"): + self.driver = GraphDatabase.driver(uri, auth=(user, password)) + self.root = self.create_node() + + def close(self, *, clear=False): + if clear: + self._run_and_return(self._clear) + self.driver.close() + + def _run_and_return(self, query: Callable, **kwargs): + with self.driver.session() as session: + result = session.write_transaction(query, **kwargs) + return result + + @staticmethod + def _clear(tx): + tx.run("MATCH (n) " + "DETACH DELETE n") + + @staticmethod + def _existence_check(tx, eid, label="Element"): + result = tx.run(f"MATCH (elem:{label}) " + "WHERE elem.id = $eid " + "RETURN elem.id", + eid=eid) + try: + return result.single()[0] + except TypeError: + # No node found for nid + # ergo, no edge created + return None + + def create_node(self) -> Node: + def query(tx, nid): + result = tx.run("CREATE (n:Element:Node) " + "SET n.id = $nid " + "RETURN n.id", + nid=nid) + return result.single()[0] + + node = self._run_and_return(query, nid=str(self.new_id())) + return UUID(node) if node is not None else None + + def create_edge(self, source: Element, target: Element) -> Optional[Edge]: + def query(tx, eid, sid, tid): + result = tx.run("MATCH (source), (target) " + "WHERE source.id = $sid AND target.id = $tid " + "CREATE (source) -[:Source]-> (e:Element:Edge) -[:Target]-> (target) " + "SET e.id = $eid " + "RETURN e.id", + eid=eid, sid=sid, tid=tid) + try: + return result.single()[0] + except TypeError: + # No node found for sid and/or tid + # ergo, no edge created + return None + + edge = self._run_and_return(query, eid=str(self.new_id()), sid=str(source), tid=str(target)) + return UUID(edge) if edge is not None else None + + def create_nodevalue(self, value: Any) -> Optional[Node]: + def query(tx, nid, val): + result = tx.run("CREATE (n:Element:Node) " + "SET n.id = $nid, n.value = $val " + "RETURN n.id", + nid=nid, val=val) + return result.single()[0] + + if not self.is_valid_datavalue(value): + return None + + node = self._run_and_return(query, nid=str(self.new_id()), val=repr(value)) + return UUID(node) if node is not None else None + + def create_dict(self, source: Element, value: Any, target: Element) -> Optional[Tuple[Edge, Edge, Node]]: + if not self.is_valid_datavalue(value): + return None + + edge_node = self.create_edge(source, target) + val_node = self.create_nodevalue(value) + if edge_node is not None and val_node is not None: + self.create_edge(edge_node, val_node) + + def read_root(self) -> Node: + return self.root + + def read_value(self, node: Node) -> Optional[Any]: + def query(tx, nid): + result = tx.run("MATCH (n:Node) " + "WHERE n.id = $nid " + "RETURN n.value", + nid=nid) + try: + return result.single()[0] + except TypeError: + # No node found for nid + return None + + value = self._run_and_return(query, nid=str(node)) + return literal_eval(value) if value is not None else None + + def read_outgoing(self, elem: Element) -> Optional[List[Edge]]: + def query(tx, eid): + result = tx.run("MATCH (elem:Element) -[:Source]-> (e:Edge) " + "WHERE elem.id = $eid " + "RETURN e.id", + eid=eid) + return result.value() + + source_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None + if source_exists: + result = self._run_and_return(query, eid=str(elem)) + return [UUID(x) for x in result] if result is not None else None + + def read_incoming(self, elem: Element) -> Optional[List[Edge]]: + def query(tx, eid): + result = tx.run("MATCH (elem:Element) <-[:Target]- (e:Edge) " + "WHERE elem.id = $eid " + "RETURN e.id", + eid=eid) + return result.value() + + target_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None + if target_exists: + result = self._run_and_return(query, eid=str(elem)) + return [UUID(x) for x in result] if result is not None else None + + def read_edge(self, edge: Edge) -> Tuple[Optional[Node], Optional[Node]]: + def query(tx, eid): + result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt)" + "WHERE e.id = $eid " + "RETURN src.id, tgt.id", + eid=eid) + return result.single() + + edge_exists = self._run_and_return(self._existence_check, eid=str(edge), label="Edge") is not None + if edge_exists: + try: + src, tgt = self._run_and_return(query, eid=str(edge)) + return UUID(src), UUID(tgt) + except TypeError: + return None, None + else: + return None, None + + def read_dict(self, elem: Element, value: Any) -> Optional[Element]: + def query(tx, eid, label_value): + result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt), " + "(e) -[:Source]-> (:Edge) -[:Target]-> (label)" + "WHERE src.id = $eid " + "AND label.value = $val " + "RETURN tgt.id", + eid=eid, val=label_value) + try: + return result.single()[0] + except TypeError: + # No edge found with given label + return None + + elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None + if elem_exists: + if isinstance(value, UUID): + return None + result = self._run_and_return(query, eid=str(elem), label_value=repr(value)) + return UUID(result) if result is not None else None + + def read_dict_keys(self, elem: Element) -> Optional[List[Any]]: + def query(tx, eid): + result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (), " + "(e) -[:Source]-> (:Edge) -[:Target]-> (label)" + "WHERE src.id = $eid " + "RETURN label.id", + eid=eid) + try: + return result.value() + except TypeError: + # No edge found with given label + return None + + elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None + if elem_exists: + result = self._run_and_return(query, eid=str(elem)) + return [UUID(x) for x in result if x is not None] + + def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]: + def query(tx, eid, label_value): + result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (), " + "(e) -[:Source]-> (:Edge) -[:Target]-> (label)" + "WHERE src.id = $eid " + "AND label.value = $val " + "RETURN e.id", + eid=eid, val=label_value) + try: + return result.single()[0] + except TypeError: + # No edge found with given label + return None + + elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None + if elem_exists: + result = self._run_and_return(query, eid=str(elem), label_value=repr(value)) + return UUID(result) if result is not None else None + + def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]: + def query(tx, eid, label_id): + result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt), " + "(e) -[:Source]-> (:Edge) -[:Target]-> (label)" + "WHERE src.id = $eid " + "AND label.id = $lid " + "RETURN tgt.id", + eid=eid, lid=label_id) + try: + return result.single()[0] + except TypeError: + # No edge found with given label + return None + + elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None + if elem_exists: + result = self._run_and_return(query, eid=str(elem), label_id=str(value_node)) + return UUID(result) if result is not None else None + + def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]: + def query(tx, eid, label_id): + result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (), " + "(e) -[:Source]-> (:Edge) -[:Target]-> (label)" + "WHERE src.id = $eid " + "AND label.id = $lid " + "RETURN e.id", + eid=eid, lid=label_id) + try: + return result.single()[0] + except TypeError: + # No edge found with given label + return None + + elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None + if elem_exists: + result = self._run_and_return(query, eid=str(elem), label_id=str(value_node)) + return UUID(result) if result is not None else None + + def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]: + def query(tx, eid, label_value): + result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt), " + "(e) -[:Source]-> (:Edge) -[:Target]-> (label)" + "WHERE tgt.id = $eid " + "AND label.value = $val " + "RETURN src.id", + eid=eid, val=label_value) + try: + return result.value() + except TypeError: + # No edge found with given label + return None + + elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None + if elem_exists: + result = self._run_and_return(query, eid=str(elem), label_value=repr(value)) + return [UUID(x) for x in result if x is not None] + + def delete_node(self, node: Node) -> None: + def query(tx, nid): + result = tx.run("MATCH (n:Node) " + "WHERE n.id = $nid " + "OPTIONAL MATCH (n) -- (e:Edge) " + "DETACH DELETE n " + "RETURN e.id", + nid=nid) + return result.value() + + to_be_deleted = self._run_and_return(query, nid=str(node)) + to_be_deleted = [UUID(x) for x in to_be_deleted if x is not None] + for edge in to_be_deleted: + self.delete_edge(edge) + + def delete_edge(self, edge: Edge) -> None: + def query(tx, eid): + result = tx.run("MATCH (e1:Edge) " + "WHERE e1.id = $eid " + "OPTIONAL MATCH (e1) -- (e2:Edge) " + "WHERE (e1) -[:Source]-> (e2) " + "OR (e1) <-[:Target]- (e2) " + "DETACH DELETE e1 " + "RETURN e2.id", + eid=eid) + return result.value() + + to_be_deleted = self._run_and_return(query, eid=str(edge)) + to_be_deleted = [UUID(x) for x in to_be_deleted if x is not None] + for edge in to_be_deleted: + self.delete_edge(edge) diff --git a/state/rdfstate.py b/state/rdfstate.py new file mode 100644 index 0000000..a6209a5 --- /dev/null +++ b/state/rdfstate.py @@ -0,0 +1,276 @@ +from typing import Any, List, Tuple, Optional, Generator +from rdflib import Graph, Namespace, URIRef, Literal +from rdflib.plugins.sparql import prepareQuery +import json + +from .base import State + +# Define graph datasctructures used by implementation +# Use NewType to create distinct type or just create a type alias +Element = URIRef +Node = URIRef +Edge = URIRef + + +class RDFState(State): + def __init__(self, namespace_uri="http://modelverse.mv/#"): + self.graph = Graph() + self.namespace_uri = namespace_uri + self.mv = Namespace(namespace_uri) + self.graph.bind("MV", self.mv) + self.prepared_queries = { + "read_value": """ + SELECT ?value + WHERE { + ?var1 MV:hasValue ?value . + } + """, + "read_outgoing": """ + SELECT ?link + WHERE { + ?link MV:hasSource ?var1 . + } + """, + "read_incoming": """ + SELECT ?link + WHERE { + ?link MV:hasTarget ?var1 . + } + """, + "read_edge": """ + SELECT ?source ?target + WHERE { + ?var1 MV:hasSource ?source ; + MV:hasTarget ?target . + } + """, + "read_dict_keys": """ + SELECT ?key + WHERE { + ?main_edge MV:hasSource ?var1 . + ?attr_edge MV:hasSource ?main_edge ; + MV:hasTarget ?key . + } + """, + "read_dict_node": """ + SELECT ?value_node + WHERE { + ?main_edge MV:hasSource ?var1 ; + MV:hasTarget ?value_node . + ?attr_edge MV:hasSource ?main_edge ; + MV:hasTarget ?var2 . + } + """, + "read_dict_node_edge": """ + SELECT ?main_edge + WHERE { + ?main_edge MV:hasSource ?var1 . + ?attr_edge MV:hasSource ?main_edge ; + MV:hasTarget ?var2 . + } + """, + "delete_node": """ + SELECT ?edge + WHERE { + { ?edge MV:hasTarget ?var1 . } + UNION + { ?edge MV:hasSource ?var1 . } + } + """, + "delete_edge": """ + SELECT ?edge + WHERE { + { ?edge MV:hasTarget ?var1 . } + UNION + { ?edge MV:hasSource ?var1 . } + } + """, + } + self.garbage = set() + + for k, v in list(self.prepared_queries.items()): + self.prepared_queries[k] = prepareQuery(self.prepared_queries[k], initNs={"MV": self.mv}) + + self.root = self.create_node() + + def create_node(self) -> Node: + return URIRef(self.namespace_uri + str(self.new_id())) + + def create_edge(self, source: Element, target: Element) -> Optional[Edge]: + if not isinstance(source, URIRef): + return None + elif not isinstance(target, URIRef): + return None + edge = URIRef(self.namespace_uri + str(self.new_id())) + self.graph.add((edge, self.mv.hasSource, source)) + self.graph.add((edge, self.mv.hasTarget, target)) + return edge + + def create_nodevalue(self, value: Any) -> Optional[Node]: + if not self.is_valid_datavalue(value): + return None + node = URIRef(self.namespace_uri + str(self.new_id())) + if isinstance(value, tuple): + value = {"Type": value[0]} + self.graph.add((node, self.mv.hasValue, Literal(json.dumps(value)))) + return node + + def create_dict(self, source: Element, value: Any, target: Element) -> Optional[Tuple[Edge, Edge, Node]]: + if not isinstance(source, URIRef): + return + if not isinstance(target, URIRef): + return + if not self.is_valid_datavalue(value): + return + + n = self.create_nodevalue(value) + e = self.create_edge(source, target) + self.create_edge(e, n) + + def read_root(self) -> Node: + return self.root + + def read_value(self, node: Node) -> Optional[Any]: + if not isinstance(node, URIRef) or not (node, None, None) in self.graph: + return None + result = self.graph.query(self.prepared_queries["read_value"], initBindings={"var1": node}) + if len(result) == 0: + return None + result = json.loads(list(result)[0][0]) + return result if not isinstance(result, dict) else (result["Type"],) + + def read_outgoing(self, elem: Element) -> Optional[List[Edge]]: + if not isinstance(elem, URIRef) or elem in self.garbage: + return None + result = self.graph.query(self.prepared_queries["read_outgoing"], initBindings={"var1": elem}) + return [i[0] for i in result] + + def read_incoming(self, elem: Element) -> Optional[List[Edge]]: + if not isinstance(elem, URIRef) or elem in self.garbage: + return None + result = self.graph.query(self.prepared_queries["read_incoming"], initBindings={"var1": elem}) + return [i[0] for i in result] + + def read_edge(self, edge: Edge) -> Tuple[Optional[Node], Optional[Node]]: + if not isinstance(edge, URIRef) or not (edge, None, None) in self.graph: + return None, None + result = self.graph.query(self.prepared_queries["read_edge"], initBindings={"var1": edge}) + if len(result) == 0: + return None, None + else: + return list(result)[0][0], list(result)[0][1] + + def read_dict(self, elem: Element, value: Any) -> Optional[Element]: + if not isinstance(elem, URIRef): + return None + q = f""" + SELECT ?value_node + WHERE {{ + ?main_edge MV:hasSource <{elem}> ; + MV:hasTarget ?value_node . + ?attr_edge MV:hasSource ?main_edge ; + MV:hasTarget ?attr_node . + ?attr_node MV:hasValue '{json.dumps(value)}' . + }} + """ + result = self.graph.query(q) + if len(result) == 0: + return None + return list(result)[0][0] + + def read_dict_keys(self, elem: Element) -> Optional[List[Any]]: + if not isinstance(elem, URIRef): + return None + result = self.graph.query(self.prepared_queries["read_dict_keys"], initBindings={"var1": elem}) + return [i[0] for i in result] + + def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]: + if not isinstance(elem, URIRef): + return None + result = self.graph.query( + f""" + SELECT ?main_edge + WHERE {{ + ?main_edge MV:hasSource <{elem}> ; + MV:hasTarget ?value_node . + ?attr_edge MV:hasSource ?main_edge ; + MV:hasTarget ?attr_node . + ?attr_node MV:hasValue '{json.dumps(value)}' . + }} + """) + if len(result) == 0: + return None + return list(result)[0][0] + + def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]: + if not isinstance(elem, URIRef): + return None + if not isinstance(value_node, URIRef): + return None + result = self.graph.query( + self.prepared_queries["read_dict_node"], initBindings={"var1": elem, "var2": value_node} + ) + if len(result) == 0: + return None + return list(result)[0][0] + + def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]: + if not isinstance(elem, URIRef): + return None + if not isinstance(value_node, URIRef): + return None + result = self.graph.query( + self.prepared_queries["read_dict_node_edge"], initBindings={"var1": elem, "var2": value_node} + ) + if len(result) == 0: + return None + return list(result)[0][0] + + def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]: + if not isinstance(elem, URIRef): + return None + result = self.graph.query( + f""" + SELECT ?source_node + WHERE {{ + ?main_edge MV:hasTarget <{elem}> ; + MV:hasSource ?source_node . + ?attr_edge MV:hasSource ?main_edge ; + MV:hasTarget ?value_node . + ?value_node MV:hasValue '{json.dumps(value)}' . + }} + """) + + return [i[0] for i in result] + + def delete_node(self, node: Node) -> None: + if node == self.root: + return + if not isinstance(node, URIRef): + return + # Check whether node isn't an edge + if (node, self.mv.hasSource, None) in self.graph or (node, self.mv.hasTarget, None) in self.graph: + return + # Remove its value if it exists + self.graph.remove((node, None, None)) + # Get all edges connecting this + result = self.graph.query(self.prepared_queries["delete_node"], initBindings={"var1": node}) + # ... and remove them + for e in result: + self.delete_edge(e[0]) + self.garbage.add(node) + + def delete_edge(self, edge: Edge) -> None: + if not isinstance(edge, URIRef): + return + # Check whether edge is actually an edge + if not ((edge, self.mv.hasSource, None) in self.graph and (edge, self.mv.hasTarget, None) in self.graph): + return + # Remove its links + self.graph.remove((edge, None, None)) + # Get all edges connecting this + result = self.graph.query(self.prepared_queries["delete_edge"], initBindings={"var1": edge}) + # ... and remove them + for e in result: + self.delete_edge(e[0]) + self.garbage.add(edge) diff --git a/state/test/fixtures/state.py b/state/test/fixtures/state.py index ab7eb87..86177a2 100644 --- a/state/test/fixtures/state.py +++ b/state/test/fixtures/state.py @@ -1,13 +1,13 @@ import pytest from state.pystate import PyState -# from state.rdfstate import RDFState -# from state.neo4jstate import Neo4jState +from state.rdfstate import RDFState +from state.neo4jstate import Neo4jState @pytest.fixture(params=[ (PyState,), - # (RDFState, "http://example.org/#"), - # (Neo4jState,) + (RDFState, "http://example.org/#"), + (Neo4jState,) ]) def state(request): if len(request.param) > 1: @@ -15,5 +15,5 @@ def state(request): else: state = request.param[0]() yield state - # if isinstance(state, Neo4jState): - # state.close(clear=True) + if isinstance(state, Neo4jState): + state.close(clear=True)