Implemented State

This commit is contained in:
Andrei Bondarenko 2021-07-13 03:39:48 +02:00
parent e2c27b427b
commit 046266bfa4
28 changed files with 3517 additions and 26 deletions

52
.gitignore vendored
View file

@ -10,6 +10,7 @@ __pycache__/
# Distribution / packaging # Distribution / packaging
.Python .Python
env/ env/
venv/
build/ build/
develop-eggs/ develop-eggs/
dist/ dist/
@ -59,34 +60,35 @@ docs/_build/
target/ target/
# ---> macOS # ---> macOS
.DS_Store .DS_Store
.AppleDouble .AppleDouble
.LSOverride .LSOverride
# Icon must end with two \r # Icon must end with two \r
Icon Icon
# Thumbnails # Thumbnails
._* ._*
# Files that might appear in the root of a volume # Files that might appear in the root of a volume
.DocumentRevisions-V100 .DocumentRevisions-V100
.fseventsd .fseventsd
.Spotlight-V100 .Spotlight-V100
.TemporaryItems .TemporaryItems
.Trashes .Trashes
.VolumeIcon.icns .VolumeIcon.icns
# Directories potentially created on remote AFP share # Directories potentially created on remote AFP share
.AppleDB .AppleDB
.AppleDesktop .AppleDesktop
Network Trash Folder Network Trash Folder
Temporary Items Temporary Items
.apdisk .apdisk
# ---> VisualStudioCode # ---> VisualStudioCode
.settings .settings
.vscode/settings.json
# ---> Linux # ---> Linux

View file

@ -1,3 +1,11 @@
# MV2 # MV2
This repository contains the code for my take on (a part of) the [Modelverse](https://msdl.uantwerpen.be/git/yentl/modelverse) for my Master's thesis. This repository contains the code for my take on (a part of) the [Modelverse](https://msdl.uantwerpen.be/git/yentl/modelverse) for my Master's thesis.
## Development packages
Some packages were used during development, but are not needed for succesful runtime (e.g. linter, autoformatter). These can be found under `requirements_dev.txt`.
## Mandatory packages
Python packages required to succesfully run/test the code in this repository can be found under `requirements.txt`.

3
requirements.txt Normal file
View file

@ -0,0 +1,3 @@
pytest==6.2.4
neo4j==4.3.4
rdflib==6.0.0

0
state/__init__.py Normal file
View file

296
state/base.py Normal file
View file

@ -0,0 +1,296 @@
from abc import ABC, abstractmethod
from typing import Any, List, Tuple, Optional, Union
from uuid import UUID, uuid4
primitive_types = (int, float, str, bool)
INTEGER = ("Integer",)
FLOAT = ("Float",)
STRING = ("String",)
BOOLEAN = ("Boolean",)
TYPE = ("Type",)
type_values = (INTEGER, FLOAT, STRING, BOOLEAN, TYPE)
Node = UUID
Edge = UUID
Element = Union[Node, Edge]
class State(ABC):
"""
Abstract base class for MvS CRUD interface defined in:
http://msdl.cs.mcgill.ca/people/yentl/files/thesis.pdf
This code is based on:
https://msdl.uantwerpen.be/git/yentl/modelverse/src/master/state/modelverse_state
"""
@staticmethod
def new_id() -> UUID:
"""
Generates a new UUID
"""
return uuid4()
@staticmethod
def is_valid_datavalue(value: Any) -> bool:
"""
Checks whether value type is supported.
Args:
value: value whose type needs to be checked
Returns:
True if value type is supported, False otherwise.
"""
if isinstance(value, tuple) and value in type_values:
return True
if not isinstance(value, primitive_types):
return False
elif isinstance(value, int) and not (-2**63 <= value <= 2**63 - 1):
return False
return True
def purge(self):
"""
Implements a garbage collection routine for implementations that don't have automatic garbage collection.
"""
pass
# =========================================================================
# CREATE
# =========================================================================
@abstractmethod
def create_node(self) -> Node:
"""
Creates node.
Returns:
The created node.
"""
pass
@abstractmethod
def create_edge(self, source: Element, target: Element) -> Optional[Edge]:
"""
Creates edge. Source and target elements should already exist.
Args:
source: source element of edge
target: target element of edge
Returns:
The created edge, None if source or target element doesn't exist.
"""
pass
@abstractmethod
def create_nodevalue(self, value: Any) -> Optional[Node]:
"""
Creates node containing value.
Args:
value: value to assign to new node
Returns:
The created node, None if type of value is not supported.
"""
pass
@abstractmethod
def create_dict(self, source: Element, value: Any, target: Element) -> None:
"""
Creates named edge between two graph elements.
Args:
source: source element of edge
value: edge label
target: target element of edge
Returns:
Nothing.
"""
pass
# =========================================================================
# READ
# =========================================================================
@abstractmethod
def read_root(self) -> Node:
"""
Reads state's root node.
Returns:
The state's root node.
"""
pass
@abstractmethod
def read_value(self, node: Node) -> Optional[Any]:
"""
Reads value of given node.
Args:
node: node whose value to read
Returns:
I node exists, value stored in node, else None.
"""
pass
@abstractmethod
def read_outgoing(self, elem: Element) -> Optional[List[Edge]]:
"""
Retrieves edges whose source is given element.
Args:
elem: source element of edges to retrieve
Returns:
If elem exists, list of edges whose source is elem, else None.
"""
pass
@abstractmethod
def read_incoming(self, elem: Element) -> Optional[List[Edge]]:
"""
Retrieves edges whose target is given element.
Args:
elem: target element of edges to retrieve
Returns:
If elem exists, list of edges whose target is elem, else None.
"""
pass
@abstractmethod
def read_edge(self, edge: Edge) -> Tuple[Optional[Node], Optional[Node]]:
"""
Reads source and target of given edge.
Args:
edge: edge whose source and target to read
Returns:
If edge exists, tuple containing source (first) and target (second) node, else (None, None)
"""
pass
@abstractmethod
def read_dict(self, elem: Element, value: Any) -> Optional[Element]:
"""
Reads element connected to given element through edge with label = value.
Args:
elem: source element
value: edge label
Returns:
If elem doesn't exist or no edge is found with given label, None, else target element of edge with label = value originating from source.
"""
pass
@abstractmethod
def read_dict_keys(self, elem: Element) -> Optional[List[Element]]:
"""
Reads labels of outgoing edges starting in given node.
Args:
elem: source element
Returns:
If elem exists, list of (unique) edge labels, else None.
"""
pass
@abstractmethod
def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]:
"""
Reads edge between two elements connected through edge with label = value.
Args:
elem: source element
value: edge label
Returns:
If elem doesn't exist or no edge is found with given label, None, else edge with label = value originating from source.
"""
pass
@abstractmethod
def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]:
"""
Reads element connected to given element through edge with label node = value_node.
Args:
elem: source element
value_node: edge label node
Returns:
If elem exists, target element of edge with label stored in value_node originating from elem, else None.
"""
pass
@abstractmethod
def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]:
"""
Reads edge connecting two elements through edge with label node = value_node.
Args:
elem: source element
value_node: edge label node
Returns:
If elem exists, edge with label node = value_node, originating from source, else None.
"""
pass
@abstractmethod
def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]:
"""
Retrieves a list of all elements that have an outgoing edge, having label = value, towards the passed element.
Args:
elem: target element
value: edge label
Returns:
If elem exists, list of elements with an outgoing edge with label = value towards elem, else None.
"""
pass
# =========================================================================
# UPDATE
# =========================================================================
"""
Updates are done by performing subsequent CREATE and DELETE operations:
http://msdl.cs.mcgill.ca/people/yentl/files/thesis.pdf
"""
# =========================================================================
# DELETE
# =========================================================================
@abstractmethod
def delete_node(self, node: Node) -> None:
"""
Deletes given node from state graph.
Args:
node: node to be deleted
Returns:
None
"""
pass
@abstractmethod
def delete_edge(self, edge: Edge) -> None:
"""
Deletes given edge from state graph.
Args:
edge: edge to be deleted
Returns:
None
"""
pass

53
state/devstate.py Normal file
View file

@ -0,0 +1,53 @@
from state.pystate import PyState
from uuid import UUID
class DevState(PyState):
"""
Version of PyState that allows dumping to .dot files
+ node id's are generated sequentially to make writing tests easier
"""
free_id = 0
def __init__(self):
super().__init__()
@staticmethod
def new_id() -> UUID:
DevState.free_id += 1
return UUID(int=DevState.free_id - 1)
def dump(self, path: str, png_path: str = None):
"""Dumps the whole MV graph to a graphviz .dot-file
Args:
path (str): path for .dot-file
png_path (str, optional): path for .png image generated from the .dot-file. Defaults to None.
"""
with open(path, "w") as f:
f.write("digraph main {\n")
for n in sorted(self.nodes):
if n in self.values:
x = self.values[n]
if isinstance(x, tuple):
x = f"{x[0]}"
else:
x = repr(x)
f.write("\"a_%s\" [label=\"%s\"];\n" % (
n.int, x.replace('"', '\\"')))
else:
f.write("\"a_%s\" [label=\"\"];\n" % n)
for i, e in sorted(list(self.edges.items())):
f.write("\"a_%s\" [label=\"e_%s\" shape=point];\n" % (i.int, i.int))
f.write("\"a_%s\" -> \"a_%s\" [arrowhead=none];\n" % (e[0].int, i.int))
f.write("\"a_%s\" -> \"a_%s\";\n" % (i.int, e[1].int))
f.write("}")
if png_path is not None:
# generate png from dot-file
bashCommand = f"dot -Tpng {path} -o {png_path}"
import subprocess
process = subprocess.Popen(
bashCommand.split(), stdout=subprocess.PIPE)
output, error = process.communicate()

301
state/neo4jstate.py Normal file
View file

@ -0,0 +1,301 @@
from typing import Any, Optional, List, Tuple, Callable, Generator
from neo4j import GraphDatabase
from ast import literal_eval
from .base import State, Edge, Node, Element, UUID
class Neo4jState(State):
def __init__(self, uri="bolt://localhost:7687", user="neo4j", password="tests"):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.root = self.create_node()
def close(self, *, clear=False):
if clear:
self._run_and_return(self._clear)
self.driver.close()
def _run_and_return(self, query: Callable, **kwargs):
with self.driver.session() as session:
result = session.write_transaction(query, **kwargs)
return result
@staticmethod
def _clear(tx):
tx.run("MATCH (n) "
"DETACH DELETE n")
@staticmethod
def _existence_check(tx, eid, label="Element"):
result = tx.run(f"MATCH (elem:{label}) "
"WHERE elem.id = $eid "
"RETURN elem.id",
eid=eid)
try:
return result.single()[0]
except TypeError:
# No node found for nid
# ergo, no edge created
return None
def create_node(self) -> Node:
def query(tx, nid):
result = tx.run("CREATE (n:Element:Node) "
"SET n.id = $nid "
"RETURN n.id",
nid=nid)
return result.single()[0]
node = self._run_and_return(query, nid=str(self.new_id()))
return UUID(node) if node is not None else None
def create_edge(self, source: Element, target: Element) -> Optional[Edge]:
def query(tx, eid, sid, tid):
result = tx.run("MATCH (source), (target) "
"WHERE source.id = $sid AND target.id = $tid "
"CREATE (source) -[:Source]-> (e:Element:Edge) -[:Target]-> (target) "
"SET e.id = $eid "
"RETURN e.id",
eid=eid, sid=sid, tid=tid)
try:
return result.single()[0]
except TypeError:
# No node found for sid and/or tid
# ergo, no edge created
return None
edge = self._run_and_return(query, eid=str(self.new_id()), sid=str(source), tid=str(target))
return UUID(edge) if edge is not None else None
def create_nodevalue(self, value: Any) -> Optional[Node]:
def query(tx, nid, val):
result = tx.run("CREATE (n:Element:Node) "
"SET n.id = $nid, n.value = $val "
"RETURN n.id",
nid=nid, val=val)
return result.single()[0]
if not self.is_valid_datavalue(value):
return None
node = self._run_and_return(query, nid=str(self.new_id()), val=repr(value))
return UUID(node) if node is not None else None
def create_dict(self, source: Element, value: Any, target: Element) -> Optional[Tuple[Edge, Edge, Node]]:
if not self.is_valid_datavalue(value):
return None
edge_node = self.create_edge(source, target)
val_node = self.create_nodevalue(value)
if edge_node is not None and val_node is not None:
self.create_edge(edge_node, val_node)
def read_root(self) -> Node:
return self.root
def read_value(self, node: Node) -> Optional[Any]:
def query(tx, nid):
result = tx.run("MATCH (n:Node) "
"WHERE n.id = $nid "
"RETURN n.value",
nid=nid)
try:
return result.single()[0]
except TypeError:
# No node found for nid
return None
value = self._run_and_return(query, nid=str(node))
return literal_eval(value) if value is not None else None
def read_outgoing(self, elem: Element) -> Optional[List[Edge]]:
def query(tx, eid):
result = tx.run("MATCH (elem:Element) -[:Source]-> (e:Edge) "
"WHERE elem.id = $eid "
"RETURN e.id",
eid=eid)
return result.value()
source_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
if source_exists:
result = self._run_and_return(query, eid=str(elem))
return [UUID(x) for x in result] if result is not None else None
def read_incoming(self, elem: Element) -> Optional[List[Edge]]:
def query(tx, eid):
result = tx.run("MATCH (elem:Element) <-[:Target]- (e:Edge) "
"WHERE elem.id = $eid "
"RETURN e.id",
eid=eid)
return result.value()
target_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
if target_exists:
result = self._run_and_return(query, eid=str(elem))
return [UUID(x) for x in result] if result is not None else None
def read_edge(self, edge: Edge) -> Tuple[Optional[Node], Optional[Node]]:
def query(tx, eid):
result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt)"
"WHERE e.id = $eid "
"RETURN src.id, tgt.id",
eid=eid)
return result.single()
edge_exists = self._run_and_return(self._existence_check, eid=str(edge), label="Edge") is not None
if edge_exists:
try:
src, tgt = self._run_and_return(query, eid=str(edge))
return UUID(src), UUID(tgt)
except TypeError:
return None, None
else:
return None, None
def read_dict(self, elem: Element, value: Any) -> Optional[Element]:
def query(tx, eid, label_value):
result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt), "
"(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
"WHERE src.id = $eid "
"AND label.value = $val "
"RETURN tgt.id",
eid=eid, val=label_value)
try:
return result.single()[0]
except TypeError:
# No edge found with given label
return None
elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
if elem_exists:
if isinstance(value, UUID):
return None
result = self._run_and_return(query, eid=str(elem), label_value=repr(value))
return UUID(result) if result is not None else None
def read_dict_keys(self, elem: Element) -> Optional[List[Any]]:
def query(tx, eid):
result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (), "
"(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
"WHERE src.id = $eid "
"RETURN label.id",
eid=eid)
try:
return result.value()
except TypeError:
# No edge found with given label
return None
elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
if elem_exists:
result = self._run_and_return(query, eid=str(elem))
return [UUID(x) for x in result if x is not None]
def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]:
def query(tx, eid, label_value):
result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (), "
"(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
"WHERE src.id = $eid "
"AND label.value = $val "
"RETURN e.id",
eid=eid, val=label_value)
try:
return result.single()[0]
except TypeError:
# No edge found with given label
return None
elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
if elem_exists:
result = self._run_and_return(query, eid=str(elem), label_value=repr(value))
return UUID(result) if result is not None else None
def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]:
def query(tx, eid, label_id):
result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt), "
"(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
"WHERE src.id = $eid "
"AND label.id = $lid "
"RETURN tgt.id",
eid=eid, lid=label_id)
try:
return result.single()[0]
except TypeError:
# No edge found with given label
return None
elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
if elem_exists:
result = self._run_and_return(query, eid=str(elem), label_id=str(value_node))
return UUID(result) if result is not None else None
def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]:
def query(tx, eid, label_id):
result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (), "
"(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
"WHERE src.id = $eid "
"AND label.id = $lid "
"RETURN e.id",
eid=eid, lid=label_id)
try:
return result.single()[0]
except TypeError:
# No edge found with given label
return None
elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
if elem_exists:
result = self._run_and_return(query, eid=str(elem), label_id=str(value_node))
return UUID(result) if result is not None else None
def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]:
def query(tx, eid, label_value):
result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt), "
"(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
"WHERE tgt.id = $eid "
"AND label.value = $val "
"RETURN src.id",
eid=eid, val=label_value)
try:
return result.value()
except TypeError:
# No edge found with given label
return None
elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
if elem_exists:
result = self._run_and_return(query, eid=str(elem), label_value=repr(value))
return [UUID(x) for x in result if x is not None]
def delete_node(self, node: Node) -> None:
def query(tx, nid):
result = tx.run("MATCH (n:Node) "
"WHERE n.id = $nid "
"OPTIONAL MATCH (n) -- (e:Edge) "
"DETACH DELETE n "
"RETURN e.id",
nid=nid)
return result.value()
to_be_deleted = self._run_and_return(query, nid=str(node))
to_be_deleted = [UUID(x) for x in to_be_deleted if x is not None]
for edge in to_be_deleted:
self.delete_edge(edge)
def delete_edge(self, edge: Edge) -> None:
def query(tx, eid):
result = tx.run("MATCH (e1:Edge) "
"WHERE e1.id = $eid "
"OPTIONAL MATCH (e1) -- (e2:Edge) "
"WHERE (e1) -[:Source]-> (e2) "
"OR (e1) <-[:Target]- (e2) "
"DETACH DELETE e1 "
"RETURN e2.id",
eid=eid)
return result.value()
to_be_deleted = self._run_and_return(query, eid=str(edge))
to_be_deleted = [UUID(x) for x in to_be_deleted if x is not None]
for edge in to_be_deleted:
self.delete_edge(edge)

286
state/pystate.py Normal file
View file

@ -0,0 +1,286 @@
from typing import Any, List, Tuple, Optional
from state.base import State, Node, Edge, Element
class PyState(State):
"""
State interface implemented using Python data structures.
This code is based on:
https://msdl.uantwerpen.be/git/yentl/modelverse/src/master/state/modelverse_state/main.py
"""
def __init__(self):
self.edges = {}
self.outgoing = {}
self.incoming = {}
self.values = {}
self.nodes = set()
# Set used for garbage collection
self.GC = True
self.to_delete = set()
self.cache = {}
self.cache_node = {}
self.root = self.create_node()
def create_node(self) -> Node:
new_id = self.new_id()
self.nodes.add(new_id)
return new_id
def create_edge(self, source: Element, target: Element) -> Optional[Edge]:
if source not in self.edges and source not in self.nodes:
return None
elif target not in self.edges and target not in self.nodes:
return None
else:
new_id = self.new_id()
self.outgoing.setdefault(source, set()).add(new_id)
self.incoming.setdefault(target, set()).add(new_id)
self.edges[new_id] = (source, target)
if source in self.edges:
# We are creating something dict_readable
# Fill in the cache already!
dict_source, dict_target = self.edges[source]
if target in self.values:
self.cache.setdefault(dict_source, {})[self.values[target]] = source
self.cache_node.setdefault(dict_source, {})[target] = source
return new_id
def create_nodevalue(self, value: Any) -> Optional[Node]:
if not self.is_valid_datavalue(value):
return None
new_id = self.new_id()
self.values[new_id] = value
self.nodes.add(new_id)
return new_id
def create_dict(self, source: Element, value: Any, target: Element) -> None:
if source not in self.nodes and source not in self.edges:
return None
elif target not in self.nodes and target not in self.edges:
return None
elif not self.is_valid_datavalue(value):
return None
else:
n = self.create_nodevalue(value)
e = self.create_edge(source, target)
assert n is not None and e is not None
e2 = self.create_edge(e, n)
self.cache.setdefault(source, {})[value] = e
self.cache_node.setdefault(source, {})[n] = e
def read_root(self) -> Node:
return self.root
def read_value(self, node: Node) -> Any:
if node in self.values:
return self.values[node]
else:
return None
def read_outgoing(self, elem: Element) -> Optional[List[Edge]]:
if elem in self.edges or elem in self.nodes:
if elem in self.outgoing:
return list(self.outgoing[elem])
else:
return []
else:
return None
def read_incoming(self, elem: Element) -> Optional[List[Edge]]:
if elem in self.edges or elem in self.nodes:
if elem in self.incoming:
return list(self.incoming[elem])
else:
return []
else:
return None
def read_edge(self, edge: Edge) -> Tuple[Optional[Element], Optional[Element]]:
if edge in self.edges:
return self.edges[edge][0], self.edges[edge][1]
else:
return None, None
def read_dict(self, elem: Element, value: Any) -> Optional[Element]:
e = self.read_dict_edge(elem, value)
if e is None:
return None
else:
return self.edges[e][1]
def read_dict_keys(self, elem: Element) -> Optional[List[Element]]:
if elem not in self.nodes and elem not in self.edges:
return None
result = []
# NOTE: cannot just use the cache here, as some keys in the cache might not actually exist;
# we would have to check all of them anyway
if elem in self.outgoing:
for e1 in self.outgoing[elem]:
if e1 in self.outgoing:
for e2 in self.outgoing[e1]:
result.append(self.edges[e2][1])
return result
def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]:
try:
first = self.cache[elem][value]
# Got hit, so validate
if (self.edges[first][0] == elem) and (value in [self.values[self.edges[i][1]]
for i in self.outgoing[first]
if self.edges[i][1] in self.values]):
return first
# Hit but invalid now
del self.cache[elem][value]
return None
except KeyError:
return None
def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]:
e = self.read_dict_node_edge(elem, value_node)
if e is None:
return None
else:
self.cache_node.setdefault(elem, {})[value_node] = e
return self.edges[e][1]
def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]:
try:
first = self.cache_node[elem][value_node]
# Got hit, so validate
if (self.edges[first][0] == elem) and \
(value_node in [self.edges[i][1] for i in self.outgoing[first]]):
return first
# Hit but invalid now
del self.cache_node[elem][value_node]
return None
except KeyError:
return None
def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]:
if elem not in self.nodes and elem not in self.edges:
return None
# Get all outgoing links
matches = []
if elem in self.incoming:
for e1 in self.incoming[elem]:
# For each link, we read the links that might link to a data value
if e1 in self.outgoing:
for e2 in self.outgoing[e1]:
# Now read out the target of the link
target = self.edges[e2][1]
# And access its value
if target in self.values and self.values[target] == value:
# Found a match
matches.append(e1)
return [self.edges[e][0] for e in matches]
def delete_node(self, node: Node) -> None:
if node == self.root:
return
elif node not in self.nodes:
return
self.nodes.remove(node)
if node in self.values:
del self.values[node]
s = set()
if node in self.outgoing:
for e in self.outgoing[node]:
s.add(e)
del self.outgoing[node]
if node in self.incoming:
for e in self.incoming[node]:
s.add(e)
del self.incoming[node]
for e in s:
self.delete_edge(e)
if node in self.outgoing:
del self.outgoing[node]
if node in self.incoming:
del self.incoming[node]
def delete_edge(self, edge: Edge) -> None:
if edge not in self.edges:
return
s, t = self.edges[edge]
if t in self.incoming:
self.incoming[t].remove(edge)
if s in self.outgoing:
self.outgoing[s].remove(edge)
del self.edges[edge]
s = set()
if edge in self.outgoing:
for e in self.outgoing[edge]:
s.add(e)
if edge in self.incoming:
for e in self.incoming[edge]:
s.add(e)
for e in s:
self.delete_edge(e)
if edge in self.outgoing:
del self.outgoing[edge]
if edge in self.incoming:
del self.incoming[edge]
if self.GC and (t in self.incoming and not self.incoming[t]) and (t not in self.edges):
# Remove this node as well
# Edges aren't deleted like this, as they might have a reachable target and source!
# If they haven't, they will be removed because the source was removed.
self.to_delete.add(t)
def purge(self):
while self.to_delete:
t = self.to_delete.pop()
if t in self.incoming and not self.incoming[t]:
self.delete_node(t)
values = set(self.edges)
values.update(self.nodes)
visit_list = [self.root]
while visit_list:
elem = visit_list.pop()
if elem in values:
# Remove it from the leftover values
values.remove(elem)
if elem in self.edges:
visit_list.extend(self.edges[elem])
if elem in self.outgoing:
visit_list.extend(self.outgoing[elem])
if elem in self.incoming:
visit_list.extend(self.incoming[elem])
dset = set()
for key in self.cache:
if key not in self.nodes and key not in self.edges:
dset.add(key)
for key in dset:
del self.cache[key]
dset = set()
for key in self.cache_node:
if key not in self.nodes and key not in self.edges:
dset.add(key)
for key in dset:
del self.cache_node[key]
# All remaining elements are to be purged
if len(values) > 0:
while values:
v = values.pop()
if v in self.nodes:
self.delete_node(v)

276
state/rdfstate.py Normal file
View file

@ -0,0 +1,276 @@
from typing import Any, List, Tuple, Optional, Generator
from rdflib import Graph, Namespace, URIRef, Literal
from rdflib.plugins.sparql import prepareQuery
import json
from .base import State
# Define graph datasctructures used by implementation
# Use NewType to create distinct type or just create a type alias
Element = URIRef
Node = URIRef
Edge = URIRef
class RDFState(State):
def __init__(self, namespace_uri="http://modelverse.mv/#"):
self.graph = Graph()
self.namespace_uri = namespace_uri
self.mv = Namespace(namespace_uri)
self.graph.bind("MV", self.mv)
self.prepared_queries = {
"read_value": """
SELECT ?value
WHERE {
?var1 MV:hasValue ?value .
}
""",
"read_outgoing": """
SELECT ?link
WHERE {
?link MV:hasSource ?var1 .
}
""",
"read_incoming": """
SELECT ?link
WHERE {
?link MV:hasTarget ?var1 .
}
""",
"read_edge": """
SELECT ?source ?target
WHERE {
?var1 MV:hasSource ?source ;
MV:hasTarget ?target .
}
""",
"read_dict_keys": """
SELECT ?key
WHERE {
?main_edge MV:hasSource ?var1 .
?attr_edge MV:hasSource ?main_edge ;
MV:hasTarget ?key .
}
""",
"read_dict_node": """
SELECT ?value_node
WHERE {
?main_edge MV:hasSource ?var1 ;
MV:hasTarget ?value_node .
?attr_edge MV:hasSource ?main_edge ;
MV:hasTarget ?var2 .
}
""",
"read_dict_node_edge": """
SELECT ?main_edge
WHERE {
?main_edge MV:hasSource ?var1 .
?attr_edge MV:hasSource ?main_edge ;
MV:hasTarget ?var2 .
}
""",
"delete_node": """
SELECT ?edge
WHERE {
{ ?edge MV:hasTarget ?var1 . }
UNION
{ ?edge MV:hasSource ?var1 . }
}
""",
"delete_edge": """
SELECT ?edge
WHERE {
{ ?edge MV:hasTarget ?var1 . }
UNION
{ ?edge MV:hasSource ?var1 . }
}
""",
}
self.garbage = set()
for k, v in list(self.prepared_queries.items()):
self.prepared_queries[k] = prepareQuery(self.prepared_queries[k], initNs={"MV": self.mv})
self.root = self.create_node()
def create_node(self) -> Node:
return URIRef(self.namespace_uri + str(self.new_id()))
def create_edge(self, source: Element, target: Element) -> Optional[Edge]:
if not isinstance(source, URIRef):
return None
elif not isinstance(target, URIRef):
return None
edge = URIRef(self.namespace_uri + str(self.new_id()))
self.graph.add((edge, self.mv.hasSource, source))
self.graph.add((edge, self.mv.hasTarget, target))
return edge
def create_nodevalue(self, value: Any) -> Optional[Node]:
if not self.is_valid_datavalue(value):
return None
node = URIRef(self.namespace_uri + str(self.new_id()))
if isinstance(value, tuple):
value = {"Type": value[0]}
self.graph.add((node, self.mv.hasValue, Literal(json.dumps(value))))
return node
def create_dict(self, source: Element, value: Any, target: Element) -> Optional[Tuple[Edge, Edge, Node]]:
if not isinstance(source, URIRef):
return
if not isinstance(target, URIRef):
return
if not self.is_valid_datavalue(value):
return
n = self.create_nodevalue(value)
e = self.create_edge(source, target)
self.create_edge(e, n)
def read_root(self) -> Node:
return self.root
def read_value(self, node: Node) -> Optional[Any]:
if not isinstance(node, URIRef) or not (node, None, None) in self.graph:
return None
result = self.graph.query(self.prepared_queries["read_value"], initBindings={"var1": node})
if len(result) == 0:
return None
result = json.loads(list(result)[0][0])
return result if not isinstance(result, dict) else (result["Type"],)
def read_outgoing(self, elem: Element) -> Optional[List[Edge]]:
if not isinstance(elem, URIRef) or elem in self.garbage:
return None
result = self.graph.query(self.prepared_queries["read_outgoing"], initBindings={"var1": elem})
return [i[0] for i in result]
def read_incoming(self, elem: Element) -> Optional[List[Edge]]:
if not isinstance(elem, URIRef) or elem in self.garbage:
return None
result = self.graph.query(self.prepared_queries["read_incoming"], initBindings={"var1": elem})
return [i[0] for i in result]
def read_edge(self, edge: Edge) -> Tuple[Optional[Node], Optional[Node]]:
if not isinstance(edge, URIRef) or not (edge, None, None) in self.graph:
return None, None
result = self.graph.query(self.prepared_queries["read_edge"], initBindings={"var1": edge})
if len(result) == 0:
return None, None
else:
return list(result)[0][0], list(result)[0][1]
def read_dict(self, elem: Element, value: Any) -> Optional[Element]:
if not isinstance(elem, URIRef):
return None
q = f"""
SELECT ?value_node
WHERE {{
?main_edge MV:hasSource <{elem}> ;
MV:hasTarget ?value_node .
?attr_edge MV:hasSource ?main_edge ;
MV:hasTarget ?attr_node .
?attr_node MV:hasValue '{json.dumps(value)}' .
}}
"""
result = self.graph.query(q)
if len(result) == 0:
return None
return list(result)[0][0]
def read_dict_keys(self, elem: Element) -> Optional[List[Any]]:
if not isinstance(elem, URIRef):
return None
result = self.graph.query(self.prepared_queries["read_dict_keys"], initBindings={"var1": elem})
return [i[0] for i in result]
def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]:
if not isinstance(elem, URIRef):
return None
result = self.graph.query(
f"""
SELECT ?main_edge
WHERE {{
?main_edge MV:hasSource <{elem}> ;
MV:hasTarget ?value_node .
?attr_edge MV:hasSource ?main_edge ;
MV:hasTarget ?attr_node .
?attr_node MV:hasValue '{json.dumps(value)}' .
}}
""")
if len(result) == 0:
return None
return list(result)[0][0]
def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]:
if not isinstance(elem, URIRef):
return None
if not isinstance(value_node, URIRef):
return None
result = self.graph.query(
self.prepared_queries["read_dict_node"], initBindings={"var1": elem, "var2": value_node}
)
if len(result) == 0:
return None
return list(result)[0][0]
def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]:
if not isinstance(elem, URIRef):
return None
if not isinstance(value_node, URIRef):
return None
result = self.graph.query(
self.prepared_queries["read_dict_node_edge"], initBindings={"var1": elem, "var2": value_node}
)
if len(result) == 0:
return None
return list(result)[0][0]
def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]:
if not isinstance(elem, URIRef):
return None
result = self.graph.query(
f"""
SELECT ?source_node
WHERE {{
?main_edge MV:hasTarget <{elem}> ;
MV:hasSource ?source_node .
?attr_edge MV:hasSource ?main_edge ;
MV:hasTarget ?value_node .
?value_node MV:hasValue '{json.dumps(value)}' .
}}
""")
return [i[0] for i in result]
def delete_node(self, node: Node) -> None:
if node == self.root:
return
if not isinstance(node, URIRef):
return
# Check whether node isn't an edge
if (node, self.mv.hasSource, None) in self.graph or (node, self.mv.hasTarget, None) in self.graph:
return
# Remove its value if it exists
self.graph.remove((node, None, None))
# Get all edges connecting this
result = self.graph.query(self.prepared_queries["delete_node"], initBindings={"var1": node})
# ... and remove them
for e in result:
self.delete_edge(e[0])
self.garbage.add(node)
def delete_edge(self, edge: Edge) -> None:
if not isinstance(edge, URIRef):
return
# Check whether edge is actually an edge
if not ((edge, self.mv.hasSource, None) in self.graph and (edge, self.mv.hasTarget, None) in self.graph):
return
# Remove its links
self.graph.remove((edge, None, None))
# Get all edges connecting this
result = self.graph.query(self.prepared_queries["delete_edge"], initBindings={"var1": edge})
# ... and remove them
for e in result:
self.delete_edge(e[0])
self.garbage.add(edge)

0
state/test/__init__.py Normal file
View file

2
state/test/conftest.py Normal file
View file

@ -0,0 +1,2 @@
import pytest
from .fixtures.state import state

0
state/test/fixtures/__init__.py vendored Normal file
View file

19
state/test/fixtures/state.py vendored Normal file
View file

@ -0,0 +1,19 @@
import pytest
from state.pystate import PyState
from state.rdfstate import RDFState
from state.neo4jstate import Neo4jState
@pytest.fixture(params=[
(PyState,),
(RDFState, "http://example.org/#"),
(Neo4jState,)
])
def state(request):
if len(request.param) > 1:
state = request.param[0](*request.param[1:])
else:
state = request.param[0]()
yield state
if isinstance(state, Neo4jState):
state.close(clear=True)

View file

@ -0,0 +1,41 @@
import pytest
@pytest.mark.usefixtures("state")
def test_create_dict_simple(state):
id1 = state.create_node()
id2 = state.create_node()
assert id1 is not None
assert id2 is not None
n = state.create_dict(id1, "abc", id2)
assert n is None
v = state.read_dict(id1, "abc")
assert v == id2
@pytest.mark.usefixtures("state")
def test_create_dict_no_source(state):
id1 = 100000
id2 = state.create_node()
assert id2 is not None
n = state.create_dict(id1, "abc", id2)
assert n is None
v = state.read_dict(id1, "abc")
assert v is None
@pytest.mark.usefixtures("state")
def test_create_dict_no_target(state):
id2 = 100000
id1 = state.create_node()
assert id1 is not None
n = state.create_dict(id1, "abc", id2)
assert n is None
v = state.read_dict(id1, "abc")
assert v is None

View file

@ -0,0 +1,144 @@
import pytest
@pytest.mark.usefixtures("state")
def test_create_edge_invalid_source(state):
a = -1
b = state.create_node()
assert b is not None
e = state.create_edge(a, b)
assert e is None
@pytest.mark.usefixtures("state")
def test_create_edge_invalid_target(state):
b = -1
a = state.create_node()
assert a is not None
e = state.create_edge(a, b)
assert e is None
@pytest.mark.usefixtures("state")
def test_create_edge_invalid_both(state):
a = -1
b = -1
e = state.create_edge(a, b)
assert e is None
@pytest.mark.usefixtures("state")
def test_create_edge_node_to_node(state):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge = state.create_edge(a, b)
assert edge is not None
@pytest.mark.usefixtures("state")
def test_create_edge_multiple(state):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge1 = state.create_edge(a, b)
assert edge1 is not None
edge2 = state.create_edge(a, b)
assert edge2 is not None
assert edge1 != edge2
@pytest.mark.usefixtures("state")
def test_create_edge_many(state):
v = set()
for i in range(1000):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge = state.create_edge(a, b)
assert edge is not None
v.add(edge)
assert len(v) == 1000
@pytest.mark.usefixtures("state")
def test_create_edge_edge_to_node(state):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge1 = state.create_edge(a, b)
assert edge1 is not None
edge2 = state.create_edge(edge1, b)
assert edge2 is not None
assert edge1 != edge2
@pytest.mark.usefixtures("state")
def test_create_edge_node_to_edge(state):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge1 = state.create_edge(a, b)
assert edge1 is not None
edge2 = state.create_edge(a, edge1)
assert edge2 is not None
assert edge1 != edge2
@pytest.mark.usefixtures("state")
def test_create_edge_edge_to_edge(state):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge1 = state.create_edge(a, b)
assert edge1 is not None
edge2 = state.create_edge(a, b)
assert edge2 is not None
assert edge1 != edge2
edge3 = state.create_edge(edge1, edge2)
assert edge3 is not None
@pytest.mark.usefixtures("state")
def test_create_edge_loop_node(state):
a = state.create_node()
assert a is not None
edge = state.create_edge(a, a)
assert edge is not None
@pytest.mark.usefixtures("state")
def test_create_edge_loop_edge(state):
a = state.create_node()
assert a is not None
edge1 = state.create_edge(a, a)
assert edge1 is not None
edge2 = state.create_edge(edge1, edge1)
assert edge2 is not None

View file

@ -0,0 +1,22 @@
import pytest
@pytest.mark.usefixtures("state")
def test_create_node_different_id_simple(state):
id1 = state.create_node()
assert id1 is not None
id2 = state.create_node()
assert id2 is not None
assert id1 != id2
@pytest.mark.usefixtures("state")
def test_create_node_different_id_long(state):
results = set()
for i in range(1000):
v = state.create_node()
assert v is not None
results.add(v)
assert len(results) == 1000

View file

@ -0,0 +1,173 @@
import pytest
@pytest.mark.usefixtures("state")
def test_create_nodevalue_different_id_simple(state):
id1 = state.create_nodevalue(1)
id2 = state.create_nodevalue(1)
assert id1 is not None
assert id2 is not None
assert id1 != id2
@pytest.mark.usefixtures("state")
def test_create_nodevalue_read(state):
id1 = state.create_nodevalue(1)
assert id1 is not None
val = state.read_value(id1)
assert val == 1
@pytest.mark.usefixtures("state")
def test_create_nodevalue_integer_ib_zero(state):
# Nicely within range
v = set()
size = 0
for i in range(-10, 10):
id1 = state.create_nodevalue(i)
assert id1 is not None
size += 1
v.add(id1)
assert len(v) == size
@pytest.mark.usefixtures("state")
def test_create_nodevalue_boolean(state):
id1 = state.create_nodevalue(True)
id2 = state.create_nodevalue(False)
assert id1 is not None
assert id2 is not None
assert id1 != id2
@pytest.mark.usefixtures("state")
def test_create_nodevalue_boolean_same(state):
id1 = state.create_nodevalue(True)
id2 = state.create_nodevalue(True)
assert id1 is not None
assert id2 is not None
assert id1 != id2
@pytest.mark.usefixtures("state")
def test_create_nodevalue_float_keeps_type(state):
id1 = state.create_nodevalue(0.0)
assert id1 is not None
v = state.read_value(id1)
assert type(v) == float
assert v == 0.0
@pytest.mark.usefixtures("state")
def test_create_nodevalue_string_empty(state):
id1 = state.create_nodevalue("")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == ""
@pytest.mark.usefixtures("state")
def test_create_nodevalue_string_normal(state):
id1 = state.create_nodevalue("ABC")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == "ABC"
@pytest.mark.usefixtures("state")
def test_create_nodevalue_string_not_parsed(state):
id1 = state.create_nodevalue("1")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == "1"
id1 = state.create_nodevalue("1.0")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == "1.0"
id1 = state.create_nodevalue("-1.0")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == "-1.0"
id1 = state.create_nodevalue("True")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == "True"
@pytest.mark.usefixtures("state")
def test_create_nodevalue_junk(state):
class Unknown(object):
pass
n = state.create_nodevalue(Unknown())
assert n is None
@pytest.mark.usefixtures("state")
def test_create_nodevalue_type_type(state):
id1 = state.create_nodevalue(("Type",))
assert id1 is not None
v = state.read_value(id1)
assert v == ("Type",)
@pytest.mark.usefixtures("state")
def test_create_nodevalue_integer_type(state):
id1 = state.create_nodevalue(("Integer",))
assert id1 is not None
v = state.read_value(id1)
assert v == ("Integer",)
@pytest.mark.usefixtures("state")
def test_create_nodevalue_float_type(state):
id1 = state.create_nodevalue(("Float",))
assert id1 is not None
v = state.read_value(id1)
assert v == ("Float",)
@pytest.mark.usefixtures("state")
def test_create_nodevalue_boolean_type(state):
id1 = state.create_nodevalue(("Boolean",))
assert id1 is not None
v = state.read_value(id1)
assert v == ("Boolean",)
@pytest.mark.usefixtures("state")
def test_create_nodevalue_string_type(state):
id1 = state.create_nodevalue(("String",))
assert id1 is not None
v = state.read_value(id1)
assert v == ("String",)
@pytest.mark.usefixtures("state")
def test_create_nodevalue_invalid_type(state):
id1 = state.create_nodevalue(("Class",))
assert id1 is None

View file

@ -0,0 +1,396 @@
import pytest
@pytest.mark.usefixtures("state")
def test_delete_edge_no_exists(state):
e = state.delete_edge(1)
assert e is None
@pytest.mark.usefixtures("state")
def test_delete_edge_node(state):
a = state.create_node()
assert a is not None
e = state.delete_edge(a)
assert e is None
@pytest.mark.usefixtures("state")
def test_delete_edge_nodevalue(state):
a = state.create_nodevalue(1)
assert a is not None
e = state.delete_edge(a)
assert e is None
@pytest.mark.usefixtures("state")
def test_delete_edge_normal(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_edge(c)
assert n is None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_delete_edge_remove_recursive(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
n = state.delete_edge(c)
assert n is None
l = state.read_value(a)
assert l == 1
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(c)
assert s is None
assert t is None
s, t = state.read_edge(d)
assert s is None
assert t is None
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_delete_edge_remove_edge_recursive_deep(state):
a = state.create_node()
b = state.create_node()
c = state.create_node()
d = state.create_edge(a, b)
e = state.create_edge(d, c)
f = state.create_node()
g = state.create_edge(f, e)
h = state.create_edge(b, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
assert f is not None
assert g is not None
assert h is not None
n = state.delete_edge(d)
assert n is None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([h])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([h])
s, t = state.read_edge(d)
assert s is None
assert t is None
s, t = state.read_edge(e)
assert s is None
assert t is None
s, t = state.read_edge(g)
assert s is None
assert t is None
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(h)
assert s == b
assert t == c
@pytest.mark.usefixtures("state")
def test_delete_edge_remove_edge_recursive_steps(state):
a = state.create_node()
b = state.create_node()
c = state.create_node()
d = state.create_edge(a, b)
e = state.create_edge(d, c)
f = state.create_node()
g = state.create_edge(f, e)
h = state.create_edge(b, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
assert f is not None
assert g is not None
assert h is not None
n = state.delete_edge(g)
assert n is None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([d])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([h])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([d])
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([h, e])
s, t = state.read_edge(d)
assert s == a
assert t == b
l = state.read_outgoing(d)
assert l is not None
assert set(l) == set([e])
l = state.read_incoming(d)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(e)
assert s == d
assert t == c
l = state.read_outgoing(e)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(e)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(g)
assert s is None
assert t is None
l = state.read_outgoing(g)
assert l is None
l = state.read_incoming(g)
assert l is None
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(h)
assert s == b
assert t == c
n = state.delete_edge(e)
assert n is None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([d])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([h])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([d])
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([h])
s, t = state.read_edge(d)
assert s == a
assert t == b
l = state.read_outgoing(d)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(d)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(e)
assert s is None
assert t is None
l = state.read_outgoing(e)
assert l is None
l = state.read_incoming(e)
assert l is None
s, t = state.read_edge(g)
assert s is None
assert t is None
l = state.read_outgoing(g)
assert l is None
l = state.read_incoming(g)
assert l is None
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(h)
assert s == b
assert t == c
n = state.delete_edge(d)
assert n is None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([h])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([h])
s, t = state.read_edge(d)
assert s is None
assert t is None
l = state.read_outgoing(d)
assert l is None
l = state.read_incoming(d)
assert l is None
s, t = state.read_edge(e)
assert s is None
assert t is None
l = state.read_outgoing(e)
assert l is None
l = state.read_incoming(e)
assert l is None
s, t = state.read_edge(g)
assert s is None
assert t is None
l = state.read_outgoing(g)
assert l == None
l = state.read_incoming(g)
assert l == None
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(h)
assert s == b
assert t == c

View file

@ -0,0 +1,227 @@
import pytest
@pytest.mark.usefixtures("state")
def test_delete_node_no_exists(state):
n = state.delete_node(-1)
assert n is None
@pytest.mark.usefixtures("state")
def test_delete_node_no_value(state):
a = state.create_node()
assert a is not None
n = state.delete_node(a)
assert n is None
@pytest.mark.usefixtures("state")
def test_delete_node_value(state):
a = state.create_nodevalue(1)
assert a is not None
d = state.read_value(a)
assert d == 1
n = state.delete_node(a)
assert n is None
d = state.read_value(a)
assert d is None
@pytest.mark.usefixtures("state")
def test_delete_node_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_node(c)
assert n is None
@pytest.mark.usefixtures("state")
def test_delete_node_remove_edge_outgoing(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_node(a)
assert n is None
d = state.read_value(a)
assert d is None
s, t = state.read_edge(c)
assert s is None
assert t is None
d = state.read_outgoing(b)
assert d is not None
assert set(d) == set([])
@pytest.mark.usefixtures("state")
def test_delete_node_remove_edge_incoming(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(b, a)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_node(a)
assert n is None
d = state.read_value(a)
assert d is None
s, t = state.read_edge(c)
assert s is None
assert t is None
d = state.read_outgoing(b)
assert d is not None
assert set(d) == set([])
@pytest.mark.usefixtures("state")
def test_delete_node_remove_edge_both(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
e = state.create_node()
f = state.create_edge(e, a)
assert a is not None
assert b is not None
assert c is not None
assert e is not None
assert f is not None
n = state.delete_node(a)
assert n is None
d = state.read_value(a)
assert d is None
s, t = state.read_edge(c)
assert s is None
assert t is None
d = state.read_incoming(b)
assert d is not None
assert set(d) == set([])
s, t = state.read_edge(f)
assert s is None
assert t is None
d = state.read_outgoing(e)
assert d is not None
assert set(d) == set([])
@pytest.mark.usefixtures("state")
def test_delete_node_remove_edge_recursive(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
n = state.delete_node(a)
assert n is None
d = state.read_value(a)
assert d is None
s, t = state.read_edge(c)
assert s is None
assert t is None
s, t = state.read_edge(d)
assert s is None
assert t is None
d = state.read_outgoing(b)
assert d is not None
assert set(d) == set([])
@pytest.mark.usefixtures("state")
def test_delete_node_remove_edge_recursive_deep(state):
a = state.create_node()
b = state.create_node()
c = state.create_node()
d = state.create_edge(a, b)
e = state.create_edge(d, c)
f = state.create_node()
g = state.create_edge(f, e)
h = state.create_edge(b, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
assert f is not None
assert g is not None
assert h is not None
n = state.delete_node(a)
assert n is None
l = state.read_outgoing(a)
assert l is None
l = state.read_incoming(a)
assert l is None
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([h])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([h])
s, t = state.read_edge(d)
assert s is None
assert t is None
s, t = state.read_edge(e)
assert s is None
assert t is None
s, t = state.read_edge(g)
assert s is None
assert t is None
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(h)
assert s == b
assert t == c

View file

@ -0,0 +1,94 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_dict_no_exists(state):
assert state.read_dict(-1, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_not_found_node(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict(a, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_not_found_nodevalue(state):
a = state.create_nodevalue(1)
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict(a, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_not_found_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict(c, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_no_primitive(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict(a, a) is None
@pytest.mark.usefixtures("state")
def test_read_dict_node_simple(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_dict(a, "f")
assert l == b
@pytest.mark.usefixtures("state")
def test_read_dict_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_nodevalue("k")
i = state.create_edge(a, g)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_dict(a, "f")
assert l == b
l = state.read_dict(a, "k")
assert l == g
assert state.read_dict(a, "l") is None

View file

@ -0,0 +1,94 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_dict_edge_no_exists(state):
assert state.read_dict_edge(-1, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_edge_not_found_node(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_edge(a, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_edge_not_found_nodevalue(state):
a = state.create_nodevalue(1)
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_edge(a, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_edge_not_found_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_edge(c, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_edge_no_primitive(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_edge(a, a) is None
@pytest.mark.usefixtures("state")
def test_read_dict_edge_node_simple(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_dict_edge(a, "f")
assert l == d
@pytest.mark.usefixtures("state")
def test_read_dict_edge_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_nodevalue("k")
i = state.create_edge(a, g)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_dict_edge(a, "f")
assert l == d
l = state.read_dict_edge(a, "k")
assert l == i
assert state.read_dict_edge(a, "l") is None

View file

@ -0,0 +1,51 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_dict_keys_no_exists(state):
assert state.read_dict_keys(100000) is None
@pytest.mark.usefixtures("state")
def test_read_dict_keys_simple(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_dict_keys(a)
assert l is not None
assert set(l) == set([c])
@pytest.mark.usefixtures("state")
def test_read_dict_keys_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_nodevalue("k")
i = state.create_edge(a, g)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_dict_keys(a)
assert l is not None
assert set(l) == set([c, h])

View file

@ -0,0 +1,74 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_dict_node_no_exists(state):
assert state.read_dict_node(-1, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_node_not_found_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_node(c, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_node_no_primitive(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_node(a, a) is None
@pytest.mark.usefixtures("state")
def test_read_dict_node_node_simple(state):
a = state.create_node()
b = state.create_node()
c = state.create_node()
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_dict_node(a, c)
assert l == b
@pytest.mark.usefixtures("state")
def test_read_dict_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_node()
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_node()
i = state.create_edge(a, g)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_dict_node(a, c)
assert l == b
l = state.read_dict_node(a, h)
assert l == g

View file

@ -0,0 +1,136 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_edge_node(state):
b = state.create_node()
assert b is not None
s, t = state.read_edge(b)
assert s is None
assert t is None
@pytest.mark.usefixtures("state")
def test_read_edge_no_exists(state):
s, t = state.read_edge(-1)
assert s is None
assert t is None
@pytest.mark.usefixtures("state")
def test_read_edge_nodevalue(state):
b = state.create_nodevalue(1)
assert b is not None
s, t = state.read_edge(b)
assert s is None
assert t is None
@pytest.mark.usefixtures("state")
def test_read_edge_normal(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
s, t = state.read_edge(c)
assert s == a
assert t == b
@pytest.mark.usefixtures("state")
def test_read_edge_edge_to_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(c, d)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
s, t = state.read_edge(c)
assert s == a
assert t == b
s, t = state.read_edge(d)
assert s == a
assert t == b
s, t = state.read_edge(e)
assert s == c
assert t == d
@pytest.mark.usefixtures("state")
def test_read_edge_edge_to_node(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
s, t = state.read_edge(c)
assert s == a
assert t == b
s, t = state.read_edge(d)
assert s == c
assert t == b
@pytest.mark.usefixtures("state")
def test_read_edge_node_to_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(b, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
s, t = state.read_edge(c)
assert s == a
assert t == b
s, t = state.read_edge(d)
assert s == b
assert t == c
@pytest.mark.usefixtures("state")
def test_read_edge_node_to_nodevalue(state):
a = state.create_node()
b = state.create_nodevalue(1)
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
s, t = state.read_edge(c)
assert s == a
assert t == b
@pytest.mark.usefixtures("state")
def test_read_edge_nodevalue_to_nodevalue(state):
a = state.create_nodevalue(1)
b = state.create_nodevalue(1)
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
s, t = state.read_edge(c)
assert s == a
assert t == b

View file

@ -0,0 +1,275 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_incoming_node_none(state):
b = state.create_node()
assert b is not None
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_node_one(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([c])
@pytest.mark.usefixtures("state")
def test_read_incoming_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([c, d, e])
@pytest.mark.usefixtures("state")
def test_read_incoming_node_multi_others_unaffected(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
f = state.create_node()
assert f is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_edge_none(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_edge_one(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, a)
e = state.create_edge(a, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([e])
l = state.read_incoming(d)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(e)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_edge_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, c)
e = state.create_edge(b, c)
f = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
assert f is not None
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([c])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([d, e, f])
l = state.read_incoming(d)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(e)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_nodevalue_none(state):
b = state.create_nodevalue(1)
assert b is not None
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_nodevalue_one(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(b, a)
assert a is not None
assert b is not None
assert c is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([c])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_nodevalue_multi(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(b, a)
d = state.create_edge(b, a)
e = state.create_edge(b, a)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_nodevalue_multi_others_unaffected(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(b, a)
d = state.create_edge(b, a)
e = state.create_edge(b, a)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
f = state.create_nodevalue(1)
assert f is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_node_deleted(state):
b = state.create_node()
assert b is not None
n = state.delete_node(b)
assert n is None
l = state.read_incoming(b)
assert l is None
@pytest.mark.usefixtures("state")
def test_read_incoming_nodevalue_deleted(state):
b = state.create_nodevalue(1)
assert b is not None
n = state.delete_node(b)
assert n is None
l = state.read_incoming(b)
assert l is None
@pytest.mark.usefixtures("state")
def test_read_incoming_edge_deleted(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_edge(c)
assert n is None
l = state.read_incoming(c)
assert l is None

View file

@ -0,0 +1,265 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_outgoing_node_none(state):
b = state.create_node()
assert b is not None
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_node_one(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_node_multi_others_unaffected(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
f = state.create_node()
assert f is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_edge_none(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_edge_one(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, a)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([d])
l = state.read_outgoing(d)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_edge_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, a)
e = state.create_edge(c, b)
f = state.create_edge(c, d)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
assert f is not None
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([d, e, f])
l = state.read_outgoing(d)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(e)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_nodevalue_none(state):
b = state.create_nodevalue(1)
assert b is not None
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_nodevalue_one(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_nodevalue_multi(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_nodevalue_multi_others_unaffected(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
f = state.create_nodevalue(1)
assert f is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_node_deleted(state):
b = state.create_node()
assert b is not None
n = state.delete_node(b)
assert n is None
l = state.read_outgoing(b)
assert l is None
@pytest.mark.usefixtures("state")
def test_read_outgoing_nodevalue_deleted(state):
b = state.create_nodevalue(1)
assert b is not None
n = state.delete_node(b)
assert n is None
l = state.read_outgoing(b)
assert l is None
@pytest.mark.usefixtures("state")
def test_read_outgoing_edge_deleted(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_edge(c)
assert n is None
l = state.read_outgoing(c)
assert l is None

View file

@ -0,0 +1,165 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_no_exists(state):
l = state.read_reverse_dict(-1, "abc")
assert l is None
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_not_found_node(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
l = state.read_reverse_dict(a, "abc")
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_not_found_nodevalue(state):
a = state.create_nodevalue(1)
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
l = state.read_reverse_dict(a, "abc")
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_not_found_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
# Passing data is not enforced, as the data will be interpreted if necessary
l = state.read_reverse_dict(c, "abc")
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_no_primitive(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
l = state.read_reverse_dict(a, a)
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_node_simple(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_reverse_dict(b, "f")
assert set(l) == set([a])
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_no_match(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("g")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_reverse_dict(b, "f")
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_nodevalue("k")
i = state.create_edge(a, g)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_reverse_dict(b, "f")
assert set(l) == set([a])
l = state.read_reverse_dict(g, "k")
assert set(l) == set([a])
l = state.read_reverse_dict(a, "l")
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_node_multi_ambiguous(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(b, a)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_nodevalue("f")
i = state.create_edge(g, a)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_reverse_dict(a, "f")
assert set(l) == set([b, g])
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_node_uncertain(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
h = state.create_nodevalue("g")
i = state.create_edge(d, h)
assert h is not None
assert i is not None
l = state.read_reverse_dict(b, "f")
assert set(l) == set([a])

View file

@ -0,0 +1,88 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_value_different_id_simple(state):
id1 = state.create_nodevalue(1)
id2 = state.create_nodevalue(2)
assert id1 is not None
assert id2 is not None
v1 = state.read_value(id1)
v2 = state.read_value(id2)
assert v1 == 1
assert v2 == 2
@pytest.mark.usefixtures("state")
def test_read_value_integer_ib_negative(state):
# Just within range
for i in range(-2 ** 63, -2 ** 63 + 10):
id1 = state.create_nodevalue(i)
assert id1 is not None
v = state.read_value(id1)
assert v == i
@pytest.mark.usefixtures("state")
def test_read_value_integer_ib_zero(state):
# Nicely within range
for i in range(-10, 10):
id1 = state.create_nodevalue(i)
assert id1 is not None
v = state.read_value(id1)
assert v == i
@pytest.mark.usefixtures("state")
def test_read_value_integer_ib_positive(state):
# Just within range
for i in range(2 ** 63 - 10, 2 ** 63):
id1 = state.create_nodevalue(i)
assert id1 is not None
v = state.read_value(id1)
assert v == i
@pytest.mark.usefixtures("state")
def test_read_value_boolean(state):
id1 = state.create_nodevalue(True)
id2 = state.create_nodevalue(False)
assert id1 is not None
assert id2 is not None
v1 = state.read_value(id1)
v2 = state.read_value(id2)
assert v1 == True
assert v2 == False
@pytest.mark.usefixtures("state")
def test_read_nodevalue_boolean_same(state):
id1 = state.create_nodevalue(True)
id2 = state.create_nodevalue(True)
assert id1 is not None
assert id2 is not None
v1 = state.read_value(id1)
v2 = state.read_value(id2)
assert v1 == True
assert v2 == True
@pytest.mark.usefixtures("state")
def test_read_value_no_exist(state):
v1 = state.read_value(100000)
assert v1 is None
@pytest.mark.usefixtures("state")
def test_read_value_no_value(state):
id1 = state.create_node()
assert id1 is not None
v1 = state.read_value(id1)
assert v1 is None