Reorganization

This commit is contained in:
Andrei Bondarenko 2021-07-24 20:59:38 +02:00
parent c64e348bf2
commit f44c888ace
26 changed files with 3 additions and 3 deletions

0
services/__init__.py Normal file
View file

View file

296
services/bottom/base.py Normal file
View file

@ -0,0 +1,296 @@
from abc import ABC, abstractmethod
from typing import Any, List, Tuple, Optional, Union
import uuid
primitive_types = (int, float, str, bool)
INTEGER = ("Integer",)
FLOAT = ("Float",)
STRING = ("String",)
BOOLEAN = ("Boolean",)
TYPE = ("Type",)
type_values = (INTEGER, FLOAT, STRING, BOOLEAN, TYPE)
Node = str
Edge = str
Element = Union[Node, Edge]
class State(ABC):
"""
Abstract base class for MvS CRUD interface defined in:
http://msdl.cs.mcgill.ca/people/yentl/files/thesis.pdf
This code is based on:
https://msdl.uantwerpen.be/git/yentl/modelverse/src/master/state/modelverse_state
"""
@staticmethod
def new_id() -> str:
"""
Generates a new UUID
"""
return str(uuid.uuid4())
@staticmethod
def is_valid_datavalue(value: Any) -> bool:
"""
Checks whether value type is supported.
Args:
value: value whose type needs to be checked
Returns:
True if value type is supported, False otherwise.
"""
if isinstance(value, tuple) and value in type_values:
return True
if not isinstance(value, primitive_types):
return False
elif isinstance(value, int) and not (-2**63 <= value <= 2**63 - 1):
return False
return True
def purge(self):
"""
Implements a garbage collection routine for implementations that don't have automatic garbage collection.
"""
pass
# =========================================================================
# CREATE
# =========================================================================
@abstractmethod
def create_node(self) -> Node:
"""
Creates node.
Returns:
The created node.
"""
pass
@abstractmethod
def create_edge(self, source: Element, target: Element) -> Optional[Edge]:
"""
Creates edge. Source and target elements should already exist.
Args:
source: source element of edge
target: target element of edge
Returns:
The created edge, None if source or target element doesn't exist.
"""
pass
@abstractmethod
def create_nodevalue(self, value: Any) -> Optional[Node]:
"""
Creates node containing value.
Args:
value: value to assign to new node
Returns:
The created node, None if type of value is not supported.
"""
pass
@abstractmethod
def create_dict(self, source: Element, value: Any, target: Element) -> None:
"""
Creates named edge between two graph elements.
Args:
source: source element of edge
value: edge label
target: target element of edge
Returns:
Nothing.
"""
pass
# =========================================================================
# READ
# =========================================================================
@abstractmethod
def read_root(self) -> Node:
"""
Reads state's root node.
Returns:
The state's root node.
"""
pass
@abstractmethod
def read_value(self, node: Node) -> Optional[Any]:
"""
Reads value of given node.
Args:
node: node whose value to read
Returns:
I node exists, value stored in node, else None.
"""
pass
@abstractmethod
def read_outgoing(self, elem: Element) -> Optional[List[Edge]]:
"""
Retrieves edges whose source is given element.
Args:
elem: source element of edges to retrieve
Returns:
If elem exists, list of edges whose source is elem, else None.
"""
pass
@abstractmethod
def read_incoming(self, elem: Element) -> Optional[List[Edge]]:
"""
Retrieves edges whose target is given element.
Args:
elem: target element of edges to retrieve
Returns:
If elem exists, list of edges whose target is elem, else None.
"""
pass
@abstractmethod
def read_edge(self, edge: Edge) -> Tuple[Optional[Node], Optional[Node]]:
"""
Reads source and target of given edge.
Args:
edge: edge whose source and target to read
Returns:
If edge exists, tuple containing source (first) and target (second) node, else (None, None)
"""
pass
@abstractmethod
def read_dict(self, elem: Element, value: Any) -> Optional[Element]:
"""
Reads element connected to given element through edge with label = value.
Args:
elem: source element
value: edge label
Returns:
If elem doesn't exist or no edge is found with given label, None, else target element of edge with label = value originating from source.
"""
pass
@abstractmethod
def read_dict_keys(self, elem: Element) -> Optional[List[Any]]:
"""
Reads labels of outgoing edges starting in given node.
Args:
elem: source element
Returns:
If elem exists, list of (unique) edge labels, else None.
"""
pass
@abstractmethod
def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]:
"""
Reads edge between two elements connected through edge with label = value.
Args:
elem: source element
value: edge label
Returns:
If elem doesn't exist or no edge is found with given label, None, else edge with label = value originating from source.
"""
pass
@abstractmethod
def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]:
"""
Reads element connected to given element through edge with label node = value_node.
Args:
elem: source element
value_node: edge label node
Returns:
If elem exists, target element of edge with label stored in value_node originating from elem, else None.
"""
pass
@abstractmethod
def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]:
"""
Reads edge connecting two elements through edge with label node = value_node.
Args:
elem: source element
value_node: edge label node
Returns:
If elem exists, edge with label node = value_node, originating from source, else None.
"""
pass
@abstractmethod
def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]:
"""
Retrieves a list of all elements that have an outgoing edge, having label = value, towards the passed element.
Args:
elem: target element
value: edge label
Returns:
If elem exists, list of elements with an outgoing edge with label = value towards elem, else None.
"""
pass
# =========================================================================
# UPDATE
# =========================================================================
"""
Updates are done by performing subsequent CREATE and DELETE operations:
http://msdl.cs.mcgill.ca/people/yentl/files/thesis.pdf
"""
# =========================================================================
# DELETE
# =========================================================================
@abstractmethod
def delete_node(self, node: Node) -> None:
"""
Deletes given node from state graph.
Args:
node: node to be deleted
Returns:
None
"""
pass
@abstractmethod
def delete_edge(self, edge: Edge) -> None:
"""
Deletes given edge from state graph.
Args:
edge: edge to be deleted
Returns:
None
"""
pass

View file

@ -0,0 +1,52 @@
from services.bottom.pystate import PyState
class DevState(PyState):
"""
Version of PyState that allows dumping to .dot files
+ node id's are generated sequentially to make writing tests easier
"""
free_id = 0
def __init__(self):
super().__init__()
@staticmethod
def new_id() -> str:
DevState.free_id += 1
return str(DevState.free_id - 1)
def dump(self, path: str, png_path: str = None):
"""Dumps the whole MV graph to a graphviz .dot-file
Args:
path (str): path for .dot-file
png_path (str, optional): path for .png image generated from the .dot-file. Defaults to None.
"""
with open(path, "w") as f:
f.write("digraph main {\n")
for n in sorted(self.nodes):
if n in self.values:
x = self.values[n]
if isinstance(x, tuple):
x = f"{x[0]}"
else:
x = repr(x)
f.write("\"a_%s\" [label=\"%s\"];\n" % (
n, x.replace('"', '\\"')))
else:
f.write("\"a_%s\" [label=\"\"];\n" % n)
for i, e in sorted(list(self.edges.items())):
f.write("\"a_%s\" [label=\"e_%s\" shape=point];\n" % (i, i))
f.write("\"a_%s\" -> \"a_%s\" [arrowhead=none];\n" % (e[0], i))
f.write("\"a_%s\" -> \"a_%s\";\n" % (i, e[1]))
f.write("}")
if png_path is not None:
# generate png from dot-file
bashCommand = f"dot -Tpng {path} -o {png_path}"
import subprocess
process = subprocess.Popen(
bashCommand.split(), stdout=subprocess.PIPE)
output, error = process.communicate()

286
services/bottom/pystate.py Normal file
View file

@ -0,0 +1,286 @@
from typing import Any, List, Tuple, Optional
from services.bottom.base import State, Node, Edge, Element
class PyState(State):
"""
State interface implemented using Python data structures.
This code is based on:
https://msdl.uantwerpen.be/git/yentl/modelverse/src/master/state/modelverse_state/main.py
"""
def __init__(self):
self.edges = {}
self.outgoing = {}
self.incoming = {}
self.values = {}
self.nodes = set()
# Set used for garbage collection
self.GC = True
self.to_delete = set()
self.cache = {}
self.cache_node = {}
self.root = self.create_node()
def create_node(self) -> Node:
new_id = self.new_id()
self.nodes.add(new_id)
return new_id
def create_edge(self, source: Element, target: Element) -> Optional[Edge]:
if source not in self.edges and source not in self.nodes:
return None
elif target not in self.edges and target not in self.nodes:
return None
else:
new_id = self.new_id()
self.outgoing.setdefault(source, set()).add(new_id)
self.incoming.setdefault(target, set()).add(new_id)
self.edges[new_id] = (source, target)
if source in self.edges:
# We are creating something dict_readable
# Fill in the cache already!
dict_source, dict_target = self.edges[source]
if target in self.values:
self.cache.setdefault(dict_source, {})[self.values[target]] = source
self.cache_node.setdefault(dict_source, {})[target] = source
return new_id
def create_nodevalue(self, value: Any) -> Optional[Node]:
if not self.is_valid_datavalue(value):
return None
new_id = self.new_id()
self.values[new_id] = value
self.nodes.add(new_id)
return new_id
def create_dict(self, source: Element, value: Any, target: Element) -> None:
if source not in self.nodes and source not in self.edges:
return None
elif target not in self.nodes and target not in self.edges:
return None
elif not self.is_valid_datavalue(value):
return None
else:
n = self.create_nodevalue(value)
e = self.create_edge(source, target)
assert n is not None and e is not None
e2 = self.create_edge(e, n)
self.cache.setdefault(source, {})[value] = e
self.cache_node.setdefault(source, {})[n] = e
def read_root(self) -> Node:
return self.root
def read_value(self, node: Node) -> Any:
if node in self.values:
return self.values[node]
else:
return None
def read_outgoing(self, elem: Element) -> Optional[List[Edge]]:
if elem in self.edges or elem in self.nodes:
if elem in self.outgoing:
return list(self.outgoing[elem])
else:
return []
else:
return None
def read_incoming(self, elem: Element) -> Optional[List[Edge]]:
if elem in self.edges or elem in self.nodes:
if elem in self.incoming:
return list(self.incoming[elem])
else:
return []
else:
return None
def read_edge(self, edge: Edge) -> Tuple[Optional[Element], Optional[Element]]:
if edge in self.edges:
return self.edges[edge][0], self.edges[edge][1]
else:
return None, None
def read_dict(self, elem: Element, value: Any) -> Optional[Element]:
e = self.read_dict_edge(elem, value)
if e is None:
return None
else:
return self.edges[e][1]
def read_dict_keys(self, elem: Element) -> Optional[List[Any]]:
if elem not in self.nodes and elem not in self.edges:
return None
result = []
# NOTE: cannot just use the cache here, as some keys in the cache might not actually exist;
# we would have to check all of them anyway
if elem in self.outgoing:
for e1 in self.outgoing[elem]:
if e1 in self.outgoing:
for e2 in self.outgoing[e1]:
result.append(self.edges[e2][1])
return result
def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]:
try:
first = self.cache[elem][value]
# Got hit, so validate
if (self.edges[first][0] == elem) and (value in [self.values[self.edges[i][1]]
for i in self.outgoing[first]
if self.edges[i][1] in self.values]):
return first
# Hit but invalid now
del self.cache[elem][value]
return None
except KeyError:
return None
def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]:
e = self.read_dict_node_edge(elem, value_node)
if e is None:
return None
else:
self.cache_node.setdefault(elem, {})[value_node] = e
return self.edges[e][1]
def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]:
try:
first = self.cache_node[elem][value_node]
# Got hit, so validate
if (self.edges[first][0] == elem) and \
(value_node in [self.edges[i][1] for i in self.outgoing[first]]):
return first
# Hit but invalid now
del self.cache_node[elem][value_node]
return None
except KeyError:
return None
def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]:
if elem not in self.nodes and elem not in self.edges:
return None
# Get all outgoing links
matches = []
if elem in self.incoming:
for e1 in self.incoming[elem]:
# For each link, we read the links that might link to a data value
if e1 in self.outgoing:
for e2 in self.outgoing[e1]:
# Now read out the target of the link
target = self.edges[e2][1]
# And access its value
if target in self.values and self.values[target] == value:
# Found a match
matches.append(e1)
return [self.edges[e][0] for e in matches]
def delete_node(self, node: Node) -> None:
if node == self.root:
return
elif node not in self.nodes:
return
self.nodes.remove(node)
if node in self.values:
del self.values[node]
s = set()
if node in self.outgoing:
for e in self.outgoing[node]:
s.add(e)
del self.outgoing[node]
if node in self.incoming:
for e in self.incoming[node]:
s.add(e)
del self.incoming[node]
for e in s:
self.delete_edge(e)
if node in self.outgoing:
del self.outgoing[node]
if node in self.incoming:
del self.incoming[node]
def delete_edge(self, edge: Edge) -> None:
if edge not in self.edges:
return
s, t = self.edges[edge]
if t in self.incoming:
self.incoming[t].remove(edge)
if s in self.outgoing:
self.outgoing[s].remove(edge)
del self.edges[edge]
s = set()
if edge in self.outgoing:
for e in self.outgoing[edge]:
s.add(e)
if edge in self.incoming:
for e in self.incoming[edge]:
s.add(e)
for e in s:
self.delete_edge(e)
if edge in self.outgoing:
del self.outgoing[edge]
if edge in self.incoming:
del self.incoming[edge]
if self.GC and (t in self.incoming and not self.incoming[t]) and (t not in self.edges):
# Remove this node as well
# Edges aren't deleted like this, as they might have a reachable target and source!
# If they haven't, they will be removed because the source was removed.
self.to_delete.add(t)
def purge(self):
while self.to_delete:
t = self.to_delete.pop()
if t in self.incoming and not self.incoming[t]:
self.delete_node(t)
values = set(self.edges)
values.update(self.nodes)
visit_list = [self.root]
while visit_list:
elem = visit_list.pop()
if elem in values:
# Remove it from the leftover values
values.remove(elem)
if elem in self.edges:
visit_list.extend(self.edges[elem])
if elem in self.outgoing:
visit_list.extend(self.outgoing[elem])
if elem in self.incoming:
visit_list.extend(self.incoming[elem])
dset = set()
for key in self.cache:
if key not in self.nodes and key not in self.edges:
dset.add(key)
for key in dset:
del self.cache[key]
dset = set()
for key in self.cache_node:
if key not in self.nodes and key not in self.edges:
dset.add(key)
for key in dset:
del self.cache_node[key]
# All remaining elements are to be purged
if len(values) > 0:
while values:
v = values.pop()
if v in self.nodes:
self.delete_node(v)

View file

View file

@ -0,0 +1,2 @@
import pytest
from .fixtures.state import state

View file

19
services/bottom/test/fixtures/state.py vendored Normal file
View file

@ -0,0 +1,19 @@
import pytest
from services.bottom.pystate import PyState
# from state.rdfstate import RDFState
# from state.neo4jstate import Neo4jState
@pytest.fixture(params=[
(PyState,),
# (RDFState, "http://example.org/#"),
# (Neo4jState,)
])
def state(request):
if len(request.param) > 1:
state = request.param[0](*request.param[1:])
else:
state = request.param[0]()
yield state
# if isinstance(state, Neo4jState):
# state.close(clear=True)

View file

@ -0,0 +1,41 @@
import pytest
@pytest.mark.usefixtures("state")
def test_create_dict_simple(state):
id1 = state.create_node()
id2 = state.create_node()
assert id1 is not None
assert id2 is not None
n = state.create_dict(id1, "abc", id2)
assert n is None
v = state.read_dict(id1, "abc")
assert v == id2
@pytest.mark.usefixtures("state")
def test_create_dict_no_source(state):
id1 = 100000
id2 = state.create_node()
assert id2 is not None
n = state.create_dict(id1, "abc", id2)
assert n is None
v = state.read_dict(id1, "abc")
assert v is None
@pytest.mark.usefixtures("state")
def test_create_dict_no_target(state):
id2 = 100000
id1 = state.create_node()
assert id1 is not None
n = state.create_dict(id1, "abc", id2)
assert n is None
v = state.read_dict(id1, "abc")
assert v is None

View file

@ -0,0 +1,144 @@
import pytest
@pytest.mark.usefixtures("state")
def test_create_edge_invalid_source(state):
a = -1
b = state.create_node()
assert b is not None
e = state.create_edge(a, b)
assert e is None
@pytest.mark.usefixtures("state")
def test_create_edge_invalid_target(state):
b = -1
a = state.create_node()
assert a is not None
e = state.create_edge(a, b)
assert e is None
@pytest.mark.usefixtures("state")
def test_create_edge_invalid_both(state):
a = -1
b = -1
e = state.create_edge(a, b)
assert e is None
@pytest.mark.usefixtures("state")
def test_create_edge_node_to_node(state):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge = state.create_edge(a, b)
assert edge is not None
@pytest.mark.usefixtures("state")
def test_create_edge_multiple(state):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge1 = state.create_edge(a, b)
assert edge1 is not None
edge2 = state.create_edge(a, b)
assert edge2 is not None
assert edge1 != edge2
@pytest.mark.usefixtures("state")
def test_create_edge_many(state):
v = set()
for i in range(1000):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge = state.create_edge(a, b)
assert edge is not None
v.add(edge)
assert len(v) == 1000
@pytest.mark.usefixtures("state")
def test_create_edge_edge_to_node(state):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge1 = state.create_edge(a, b)
assert edge1 is not None
edge2 = state.create_edge(edge1, b)
assert edge2 is not None
assert edge1 != edge2
@pytest.mark.usefixtures("state")
def test_create_edge_node_to_edge(state):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge1 = state.create_edge(a, b)
assert edge1 is not None
edge2 = state.create_edge(a, edge1)
assert edge2 is not None
assert edge1 != edge2
@pytest.mark.usefixtures("state")
def test_create_edge_edge_to_edge(state):
a = state.create_node()
assert a is not None
b = state.create_node()
assert b is not None
edge1 = state.create_edge(a, b)
assert edge1 is not None
edge2 = state.create_edge(a, b)
assert edge2 is not None
assert edge1 != edge2
edge3 = state.create_edge(edge1, edge2)
assert edge3 is not None
@pytest.mark.usefixtures("state")
def test_create_edge_loop_node(state):
a = state.create_node()
assert a is not None
edge = state.create_edge(a, a)
assert edge is not None
@pytest.mark.usefixtures("state")
def test_create_edge_loop_edge(state):
a = state.create_node()
assert a is not None
edge1 = state.create_edge(a, a)
assert edge1 is not None
edge2 = state.create_edge(edge1, edge1)
assert edge2 is not None

View file

@ -0,0 +1,22 @@
import pytest
@pytest.mark.usefixtures("state")
def test_create_node_different_id_simple(state):
id1 = state.create_node()
assert id1 is not None
id2 = state.create_node()
assert id2 is not None
assert id1 != id2
@pytest.mark.usefixtures("state")
def test_create_node_different_id_long(state):
results = set()
for i in range(1000):
v = state.create_node()
assert v is not None
results.add(v)
assert len(results) == 1000

View file

@ -0,0 +1,173 @@
import pytest
@pytest.mark.usefixtures("state")
def test_create_nodevalue_different_id_simple(state):
id1 = state.create_nodevalue(1)
id2 = state.create_nodevalue(1)
assert id1 is not None
assert id2 is not None
assert id1 != id2
@pytest.mark.usefixtures("state")
def test_create_nodevalue_read(state):
id1 = state.create_nodevalue(1)
assert id1 is not None
val = state.read_value(id1)
assert val == 1
@pytest.mark.usefixtures("state")
def test_create_nodevalue_integer_ib_zero(state):
# Nicely within range
v = set()
size = 0
for i in range(-10, 10):
id1 = state.create_nodevalue(i)
assert id1 is not None
size += 1
v.add(id1)
assert len(v) == size
@pytest.mark.usefixtures("state")
def test_create_nodevalue_boolean(state):
id1 = state.create_nodevalue(True)
id2 = state.create_nodevalue(False)
assert id1 is not None
assert id2 is not None
assert id1 != id2
@pytest.mark.usefixtures("state")
def test_create_nodevalue_boolean_same(state):
id1 = state.create_nodevalue(True)
id2 = state.create_nodevalue(True)
assert id1 is not None
assert id2 is not None
assert id1 != id2
@pytest.mark.usefixtures("state")
def test_create_nodevalue_float_keeps_type(state):
id1 = state.create_nodevalue(0.0)
assert id1 is not None
v = state.read_value(id1)
assert type(v) == float
assert v == 0.0
@pytest.mark.usefixtures("state")
def test_create_nodevalue_string_empty(state):
id1 = state.create_nodevalue("")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == ""
@pytest.mark.usefixtures("state")
def test_create_nodevalue_string_normal(state):
id1 = state.create_nodevalue("ABC")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == "ABC"
@pytest.mark.usefixtures("state")
def test_create_nodevalue_string_not_parsed(state):
id1 = state.create_nodevalue("1")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == "1"
id1 = state.create_nodevalue("1.0")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == "1.0"
id1 = state.create_nodevalue("-1.0")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == "-1.0"
id1 = state.create_nodevalue("True")
assert id1 is not None
v = state.read_value(id1)
assert type(v) == str
assert v == "True"
@pytest.mark.usefixtures("state")
def test_create_nodevalue_junk(state):
class Unknown(object):
pass
n = state.create_nodevalue(Unknown())
assert n is None
@pytest.mark.usefixtures("state")
def test_create_nodevalue_type_type(state):
id1 = state.create_nodevalue(("Type",))
assert id1 is not None
v = state.read_value(id1)
assert v == ("Type",)
@pytest.mark.usefixtures("state")
def test_create_nodevalue_integer_type(state):
id1 = state.create_nodevalue(("Integer",))
assert id1 is not None
v = state.read_value(id1)
assert v == ("Integer",)
@pytest.mark.usefixtures("state")
def test_create_nodevalue_float_type(state):
id1 = state.create_nodevalue(("Float",))
assert id1 is not None
v = state.read_value(id1)
assert v == ("Float",)
@pytest.mark.usefixtures("state")
def test_create_nodevalue_boolean_type(state):
id1 = state.create_nodevalue(("Boolean",))
assert id1 is not None
v = state.read_value(id1)
assert v == ("Boolean",)
@pytest.mark.usefixtures("state")
def test_create_nodevalue_string_type(state):
id1 = state.create_nodevalue(("String",))
assert id1 is not None
v = state.read_value(id1)
assert v == ("String",)
@pytest.mark.usefixtures("state")
def test_create_nodevalue_invalid_type(state):
id1 = state.create_nodevalue(("Class",))
assert id1 is None

View file

@ -0,0 +1,396 @@
import pytest
@pytest.mark.usefixtures("state")
def test_delete_edge_no_exists(state):
e = state.delete_edge(1)
assert e is None
@pytest.mark.usefixtures("state")
def test_delete_edge_node(state):
a = state.create_node()
assert a is not None
e = state.delete_edge(a)
assert e is None
@pytest.mark.usefixtures("state")
def test_delete_edge_nodevalue(state):
a = state.create_nodevalue(1)
assert a is not None
e = state.delete_edge(a)
assert e is None
@pytest.mark.usefixtures("state")
def test_delete_edge_normal(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_edge(c)
assert n is None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_delete_edge_remove_recursive(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
n = state.delete_edge(c)
assert n is None
l = state.read_value(a)
assert l == 1
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(c)
assert s is None
assert t is None
s, t = state.read_edge(d)
assert s is None
assert t is None
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_delete_edge_remove_edge_recursive_deep(state):
a = state.create_node()
b = state.create_node()
c = state.create_node()
d = state.create_edge(a, b)
e = state.create_edge(d, c)
f = state.create_node()
g = state.create_edge(f, e)
h = state.create_edge(b, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
assert f is not None
assert g is not None
assert h is not None
n = state.delete_edge(d)
assert n is None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([h])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([h])
s, t = state.read_edge(d)
assert s is None
assert t is None
s, t = state.read_edge(e)
assert s is None
assert t is None
s, t = state.read_edge(g)
assert s is None
assert t is None
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(h)
assert s == b
assert t == c
@pytest.mark.usefixtures("state")
def test_delete_edge_remove_edge_recursive_steps(state):
a = state.create_node()
b = state.create_node()
c = state.create_node()
d = state.create_edge(a, b)
e = state.create_edge(d, c)
f = state.create_node()
g = state.create_edge(f, e)
h = state.create_edge(b, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
assert f is not None
assert g is not None
assert h is not None
n = state.delete_edge(g)
assert n is None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([d])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([h])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([d])
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([h, e])
s, t = state.read_edge(d)
assert s == a
assert t == b
l = state.read_outgoing(d)
assert l is not None
assert set(l) == set([e])
l = state.read_incoming(d)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(e)
assert s == d
assert t == c
l = state.read_outgoing(e)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(e)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(g)
assert s is None
assert t is None
l = state.read_outgoing(g)
assert l is None
l = state.read_incoming(g)
assert l is None
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(h)
assert s == b
assert t == c
n = state.delete_edge(e)
assert n is None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([d])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([h])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([d])
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([h])
s, t = state.read_edge(d)
assert s == a
assert t == b
l = state.read_outgoing(d)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(d)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(e)
assert s is None
assert t is None
l = state.read_outgoing(e)
assert l is None
l = state.read_incoming(e)
assert l is None
s, t = state.read_edge(g)
assert s is None
assert t is None
l = state.read_outgoing(g)
assert l is None
l = state.read_incoming(g)
assert l is None
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(h)
assert s == b
assert t == c
n = state.delete_edge(d)
assert n is None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([h])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([h])
s, t = state.read_edge(d)
assert s is None
assert t is None
l = state.read_outgoing(d)
assert l is None
l = state.read_incoming(d)
assert l is None
s, t = state.read_edge(e)
assert s is None
assert t is None
l = state.read_outgoing(e)
assert l is None
l = state.read_incoming(e)
assert l is None
s, t = state.read_edge(g)
assert s is None
assert t is None
l = state.read_outgoing(g)
assert l == None
l = state.read_incoming(g)
assert l == None
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(h)
assert s == b
assert t == c

View file

@ -0,0 +1,227 @@
import pytest
@pytest.mark.usefixtures("state")
def test_delete_node_no_exists(state):
n = state.delete_node(-1)
assert n is None
@pytest.mark.usefixtures("state")
def test_delete_node_no_value(state):
a = state.create_node()
assert a is not None
n = state.delete_node(a)
assert n is None
@pytest.mark.usefixtures("state")
def test_delete_node_value(state):
a = state.create_nodevalue(1)
assert a is not None
d = state.read_value(a)
assert d == 1
n = state.delete_node(a)
assert n is None
d = state.read_value(a)
assert d is None
@pytest.mark.usefixtures("state")
def test_delete_node_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_node(c)
assert n is None
@pytest.mark.usefixtures("state")
def test_delete_node_remove_edge_outgoing(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_node(a)
assert n is None
d = state.read_value(a)
assert d is None
s, t = state.read_edge(c)
assert s is None
assert t is None
d = state.read_outgoing(b)
assert d is not None
assert set(d) == set([])
@pytest.mark.usefixtures("state")
def test_delete_node_remove_edge_incoming(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(b, a)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_node(a)
assert n is None
d = state.read_value(a)
assert d is None
s, t = state.read_edge(c)
assert s is None
assert t is None
d = state.read_outgoing(b)
assert d is not None
assert set(d) == set([])
@pytest.mark.usefixtures("state")
def test_delete_node_remove_edge_both(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
e = state.create_node()
f = state.create_edge(e, a)
assert a is not None
assert b is not None
assert c is not None
assert e is not None
assert f is not None
n = state.delete_node(a)
assert n is None
d = state.read_value(a)
assert d is None
s, t = state.read_edge(c)
assert s is None
assert t is None
d = state.read_incoming(b)
assert d is not None
assert set(d) == set([])
s, t = state.read_edge(f)
assert s is None
assert t is None
d = state.read_outgoing(e)
assert d is not None
assert set(d) == set([])
@pytest.mark.usefixtures("state")
def test_delete_node_remove_edge_recursive(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
n = state.delete_node(a)
assert n is None
d = state.read_value(a)
assert d is None
s, t = state.read_edge(c)
assert s is None
assert t is None
s, t = state.read_edge(d)
assert s is None
assert t is None
d = state.read_outgoing(b)
assert d is not None
assert set(d) == set([])
@pytest.mark.usefixtures("state")
def test_delete_node_remove_edge_recursive_deep(state):
a = state.create_node()
b = state.create_node()
c = state.create_node()
d = state.create_edge(a, b)
e = state.create_edge(d, c)
f = state.create_node()
g = state.create_edge(f, e)
h = state.create_edge(b, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
assert f is not None
assert g is not None
assert h is not None
n = state.delete_node(a)
assert n is None
l = state.read_outgoing(a)
assert l is None
l = state.read_incoming(a)
assert l is None
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([h])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([h])
s, t = state.read_edge(d)
assert s is None
assert t is None
s, t = state.read_edge(e)
assert s is None
assert t is None
s, t = state.read_edge(g)
assert s is None
assert t is None
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
s, t = state.read_edge(h)
assert s == b
assert t == c

View file

@ -0,0 +1,94 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_dict_no_exists(state):
assert state.read_dict(-1, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_not_found_node(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict(a, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_not_found_nodevalue(state):
a = state.create_nodevalue(1)
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict(a, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_not_found_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict(c, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_no_primitive(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict(a, a) is None
@pytest.mark.usefixtures("state")
def test_read_dict_node_simple(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_dict(a, "f")
assert l == b
@pytest.mark.usefixtures("state")
def test_read_dict_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_nodevalue("k")
i = state.create_edge(a, g)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_dict(a, "f")
assert l == b
l = state.read_dict(a, "k")
assert l == g
assert state.read_dict(a, "l") is None

View file

@ -0,0 +1,94 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_dict_edge_no_exists(state):
assert state.read_dict_edge(-1, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_edge_not_found_node(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_edge(a, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_edge_not_found_nodevalue(state):
a = state.create_nodevalue(1)
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_edge(a, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_edge_not_found_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_edge(c, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_edge_no_primitive(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_edge(a, a) is None
@pytest.mark.usefixtures("state")
def test_read_dict_edge_node_simple(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_dict_edge(a, "f")
assert l == d
@pytest.mark.usefixtures("state")
def test_read_dict_edge_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_nodevalue("k")
i = state.create_edge(a, g)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_dict_edge(a, "f")
assert l == d
l = state.read_dict_edge(a, "k")
assert l == i
assert state.read_dict_edge(a, "l") is None

View file

@ -0,0 +1,51 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_dict_keys_no_exists(state):
assert state.read_dict_keys(100000) is None
@pytest.mark.usefixtures("state")
def test_read_dict_keys_simple(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_dict_keys(a)
assert l is not None
assert set(l) == set([c])
@pytest.mark.usefixtures("state")
def test_read_dict_keys_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_nodevalue("k")
i = state.create_edge(a, g)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_dict_keys(a)
assert l is not None
assert set(l) == set([c, h])

View file

@ -0,0 +1,74 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_dict_node_no_exists(state):
assert state.read_dict_node(-1, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_node_not_found_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_node(c, "abc") is None
@pytest.mark.usefixtures("state")
def test_read_dict_node_no_primitive(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
assert state.read_dict_node(a, a) is None
@pytest.mark.usefixtures("state")
def test_read_dict_node_node_simple(state):
a = state.create_node()
b = state.create_node()
c = state.create_node()
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_dict_node(a, c)
assert l == b
@pytest.mark.usefixtures("state")
def test_read_dict_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_node()
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_node()
i = state.create_edge(a, g)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_dict_node(a, c)
assert l == b
l = state.read_dict_node(a, h)
assert l == g

View file

@ -0,0 +1,136 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_edge_node(state):
b = state.create_node()
assert b is not None
s, t = state.read_edge(b)
assert s is None
assert t is None
@pytest.mark.usefixtures("state")
def test_read_edge_no_exists(state):
s, t = state.read_edge(-1)
assert s is None
assert t is None
@pytest.mark.usefixtures("state")
def test_read_edge_nodevalue(state):
b = state.create_nodevalue(1)
assert b is not None
s, t = state.read_edge(b)
assert s is None
assert t is None
@pytest.mark.usefixtures("state")
def test_read_edge_normal(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
s, t = state.read_edge(c)
assert s == a
assert t == b
@pytest.mark.usefixtures("state")
def test_read_edge_edge_to_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(c, d)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
s, t = state.read_edge(c)
assert s == a
assert t == b
s, t = state.read_edge(d)
assert s == a
assert t == b
s, t = state.read_edge(e)
assert s == c
assert t == d
@pytest.mark.usefixtures("state")
def test_read_edge_edge_to_node(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
s, t = state.read_edge(c)
assert s == a
assert t == b
s, t = state.read_edge(d)
assert s == c
assert t == b
@pytest.mark.usefixtures("state")
def test_read_edge_node_to_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(b, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
s, t = state.read_edge(c)
assert s == a
assert t == b
s, t = state.read_edge(d)
assert s == b
assert t == c
@pytest.mark.usefixtures("state")
def test_read_edge_node_to_nodevalue(state):
a = state.create_node()
b = state.create_nodevalue(1)
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
s, t = state.read_edge(c)
assert s == a
assert t == b
@pytest.mark.usefixtures("state")
def test_read_edge_nodevalue_to_nodevalue(state):
a = state.create_nodevalue(1)
b = state.create_nodevalue(1)
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
s, t = state.read_edge(c)
assert s == a
assert t == b

View file

@ -0,0 +1,275 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_incoming_node_none(state):
b = state.create_node()
assert b is not None
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_node_one(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([c])
@pytest.mark.usefixtures("state")
def test_read_incoming_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([c, d, e])
@pytest.mark.usefixtures("state")
def test_read_incoming_node_multi_others_unaffected(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
f = state.create_node()
assert f is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_edge_none(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_edge_one(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, a)
e = state.create_edge(a, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([e])
l = state.read_incoming(d)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(e)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_edge_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, c)
e = state.create_edge(b, c)
f = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
assert f is not None
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([c])
l = state.read_incoming(c)
assert l is not None
assert set(l) == set([d, e, f])
l = state.read_incoming(d)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(e)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_nodevalue_none(state):
b = state.create_nodevalue(1)
assert b is not None
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_nodevalue_one(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(b, a)
assert a is not None
assert b is not None
assert c is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([c])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_nodevalue_multi(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(b, a)
d = state.create_edge(b, a)
e = state.create_edge(b, a)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_nodevalue_multi_others_unaffected(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(b, a)
d = state.create_edge(b, a)
e = state.create_edge(b, a)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
f = state.create_nodevalue(1)
assert f is not None
l = state.read_incoming(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_incoming(b)
assert l is not None
assert set(l) == set([])
l = state.read_incoming(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_incoming_node_deleted(state):
b = state.create_node()
assert b is not None
n = state.delete_node(b)
assert n is None
l = state.read_incoming(b)
assert l is None
@pytest.mark.usefixtures("state")
def test_read_incoming_nodevalue_deleted(state):
b = state.create_nodevalue(1)
assert b is not None
n = state.delete_node(b)
assert n is None
l = state.read_incoming(b)
assert l is None
@pytest.mark.usefixtures("state")
def test_read_incoming_edge_deleted(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_edge(c)
assert n is None
l = state.read_incoming(c)
assert l is None

View file

@ -0,0 +1,265 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_outgoing_node_none(state):
b = state.create_node()
assert b is not None
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_node_one(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_node_multi_others_unaffected(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
f = state.create_node()
assert f is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_edge_none(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_edge_one(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, a)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([d])
l = state.read_outgoing(d)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_edge_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(c, a)
e = state.create_edge(c, b)
f = state.create_edge(c, d)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
assert f is not None
l = state.read_outgoing(c)
assert l is not None
assert set(l) == set([d, e, f])
l = state.read_outgoing(d)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(e)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_nodevalue_none(state):
b = state.create_nodevalue(1)
assert b is not None
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_nodevalue_one(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_nodevalue_multi(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_nodevalue_multi_others_unaffected(state):
a = state.create_nodevalue(1)
b = state.create_node()
c = state.create_edge(a, b)
d = state.create_edge(a, b)
e = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
f = state.create_nodevalue(1)
assert f is not None
l = state.read_outgoing(a)
assert l is not None
assert set(l) == set([c, d, e])
l = state.read_outgoing(b)
assert l is not None
assert set(l) == set([])
l = state.read_outgoing(f)
assert l is not None
assert set(l) == set([])
@pytest.mark.usefixtures("state")
def test_read_outgoing_node_deleted(state):
b = state.create_node()
assert b is not None
n = state.delete_node(b)
assert n is None
l = state.read_outgoing(b)
assert l is None
@pytest.mark.usefixtures("state")
def test_read_outgoing_nodevalue_deleted(state):
b = state.create_nodevalue(1)
assert b is not None
n = state.delete_node(b)
assert n is None
l = state.read_outgoing(b)
assert l is None
@pytest.mark.usefixtures("state")
def test_read_outgoing_edge_deleted(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
n = state.delete_edge(c)
assert n is None
l = state.read_outgoing(c)
assert l is None

View file

@ -0,0 +1,165 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_no_exists(state):
l = state.read_reverse_dict(-1, "abc")
assert l is None
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_not_found_node(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
l = state.read_reverse_dict(a, "abc")
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_not_found_nodevalue(state):
a = state.create_nodevalue(1)
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
l = state.read_reverse_dict(a, "abc")
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_not_found_edge(state):
a = state.create_node()
b = state.create_node()
c = state.create_edge(a, b)
assert a is not None
assert b is not None
assert c is not None
# Passing data is not enforced, as the data will be interpreted if necessary
l = state.read_reverse_dict(c, "abc")
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_no_primitive(state):
a = state.create_node()
assert a is not None
# Passing data is not enforced, as the data will be interpreted if necessary
l = state.read_reverse_dict(a, a)
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_node_simple(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_reverse_dict(b, "f")
assert set(l) == set([a])
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_no_match(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("g")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
l = state.read_reverse_dict(b, "f")
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_node_multi(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_nodevalue("k")
i = state.create_edge(a, g)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_reverse_dict(b, "f")
assert set(l) == set([a])
l = state.read_reverse_dict(g, "k")
assert set(l) == set([a])
l = state.read_reverse_dict(a, "l")
assert l == []
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_node_multi_ambiguous(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(b, a)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
g = state.create_node()
h = state.create_nodevalue("f")
i = state.create_edge(g, a)
j = state.create_edge(i, h)
assert g is not None
assert h is not None
assert i is not None
assert j is not None
l = state.read_reverse_dict(a, "f")
assert set(l) == set([b, g])
@pytest.mark.usefixtures("state")
def test_read_reverse_dict_node_uncertain(state):
a = state.create_node()
b = state.create_node()
c = state.create_nodevalue("f")
d = state.create_edge(a, b)
e = state.create_edge(d, c)
assert a is not None
assert b is not None
assert c is not None
assert d is not None
assert e is not None
h = state.create_nodevalue("g")
i = state.create_edge(d, h)
assert h is not None
assert i is not None
l = state.read_reverse_dict(b, "f")
assert set(l) == set([a])

View file

@ -0,0 +1,88 @@
import pytest
@pytest.mark.usefixtures("state")
def test_read_value_different_id_simple(state):
id1 = state.create_nodevalue(1)
id2 = state.create_nodevalue(2)
assert id1 is not None
assert id2 is not None
v1 = state.read_value(id1)
v2 = state.read_value(id2)
assert v1 == 1
assert v2 == 2
@pytest.mark.usefixtures("state")
def test_read_value_integer_ib_negative(state):
# Just within range
for i in range(-2 ** 63, -2 ** 63 + 10):
id1 = state.create_nodevalue(i)
assert id1 is not None
v = state.read_value(id1)
assert v == i
@pytest.mark.usefixtures("state")
def test_read_value_integer_ib_zero(state):
# Nicely within range
for i in range(-10, 10):
id1 = state.create_nodevalue(i)
assert id1 is not None
v = state.read_value(id1)
assert v == i
@pytest.mark.usefixtures("state")
def test_read_value_integer_ib_positive(state):
# Just within range
for i in range(2 ** 63 - 10, 2 ** 63):
id1 = state.create_nodevalue(i)
assert id1 is not None
v = state.read_value(id1)
assert v == i
@pytest.mark.usefixtures("state")
def test_read_value_boolean(state):
id1 = state.create_nodevalue(True)
id2 = state.create_nodevalue(False)
assert id1 is not None
assert id2 is not None
v1 = state.read_value(id1)
v2 = state.read_value(id2)
assert v1 == True
assert v2 == False
@pytest.mark.usefixtures("state")
def test_read_nodevalue_boolean_same(state):
id1 = state.create_nodevalue(True)
id2 = state.create_nodevalue(True)
assert id1 is not None
assert id2 is not None
v1 = state.read_value(id1)
v2 = state.read_value(id2)
assert v1 == True
assert v2 == True
@pytest.mark.usefixtures("state")
def test_read_value_no_exist(state):
v1 = state.read_value(100000)
assert v1 is None
@pytest.mark.usefixtures("state")
def test_read_value_no_value(state):
id1 = state.create_node()
assert id1 is not None
v1 = state.read_value(id1)
assert v1 is None