test(pyargus): add general signal tests

This commit is contained in:
Anand Balakrishnan 2023-09-05 10:27:14 -07:00
parent 5489ddbd09
commit ac5867e7b0
No known key found for this signature in database
2 changed files with 75 additions and 36 deletions

View file

@ -136,10 +136,10 @@ macro_rules! impl_signals {
/// Create a new signal from some finite number of samples
#[classmethod]
fn from_samples(_: &PyType, samples: Vec<(f64, $ty)>) -> PyResult<Py<Self>> {
let ret: Signal<$ty> = samples
let ret: Signal::<$ty> = Signal::<$ty>::try_from_iter(samples
.into_iter()
.map(|(t, v)| (Duration::from_secs_f64(t), v))
.collect();
.map(|(t, v)| (Duration::try_from_secs_f64(t).unwrap_or_else(|err| panic!("Value = {}, {}", t, err)), v))
).map_err(PyArgusError::from)?;
Python::with_gil(|py| {
Py::new(
py,

View file

@ -1,8 +1,9 @@
from typing import List, Tuple, Type, Union
from typing import List, Type, Union
from hypothesis import given, note
import pytest
from hypothesis import Verbosity, given, note, settings
from hypothesis import strategies as st
from hypothesis.strategies import composite
from hypothesis.strategies import SearchStrategy, composite
import argus
@ -10,7 +11,7 @@ AllowedDtype = Union[bool, int, float]
@composite
def samples(draw, *, min_size: int, max_size: int, dtype: Type[AllowedDtype]):
def gen_samples(draw: st.DrawFn, *, min_size: int, max_size: int, dtype: Type[AllowedDtype]):
"""
Generate arbitrary samples for a signal where the time stamps are strictly
monotonically increasing
@ -27,44 +28,82 @@ def samples(draw, *, min_size: int, max_size: int, dtype: Type[AllowedDtype]):
raise ValueError(f"invalid dtype {dtype}")
values = draw(st.lists(elements, min_size=min_size, max_size=max_size))
return draw(
st.lists(st.floats(min_value=0), unique=True, min_size=len(values), max_size=len(values))
.map(lambda t: sorted(t))
xs = draw(
st.lists(st.integers(min_value=0, max_value=2**32 - 1), unique=True, min_size=len(values), max_size=len(values))
.map(lambda t: map(float, sorted(set(t))))
.map(lambda t: list(zip(t, values)))
)
return xs
@composite
def samples_and_indices(
draw: st.DrawFn, *, min_size: int, max_size: int
) -> Tuple[List[Tuple[float, AllowedDtype]], int, int, Type[AllowedDtype]]:
"""
Generate an arbitrary list of samples and two indices within the list
"""
dtype = draw(st.one_of(st.just(bool), st.just(int), st.just(float)))
xs = draw(samples(min_size=min_size, max_size=max_size, dtype=dtype))
if len(xs) > 0:
i0 = draw(st.integers(min_value=0, max_value=len(xs) - 1))
i1 = draw(st.integers(min_value=0, max_value=len(xs) - 1))
def draw_index(draw: st.DrawFn, vec: List) -> int:
if len(vec) > 0:
return draw(st.integers(min_value=0, max_value=len(vec) - 1))
else:
i0 = draw(st.just(0))
i1 = draw(st.just(0))
return (xs, i0, i1, dtype)
return draw(st.just(0))
@given(samples_and_indices(min_size=0, max_size=100))
def test_correctly_create_signals(data: Tuple[List[Tuple[float, AllowedDtype]], int, int, Type[AllowedDtype]]) -> None:
samples: List[Tuple[float, AllowedDtype]] = data[0]
a: int = data[1]
b: int = data[2]
dtype: Type[AllowedDtype] = data[3]
def gen_dtype() -> SearchStrategy[Type[AllowedDtype]]:
return st.one_of(st.just(bool), st.just(int), st.just(float))
@settings(verbosity=Verbosity.verbose)
@given(st.data())
def test_correctly_create_signals(data: st.DataObject) -> None:
dtype = data.draw(gen_dtype())
xs = data.draw(gen_samples(min_size=0, max_size=100, dtype=dtype))
note(f"Samples: {gen_samples}")
signal = argus.signal(dtype, data=xs)
if len(xs) > 0:
expected_start_time = xs[0][0]
expected_end_time = xs[-1][0]
actual_start_time = signal.start_time
actual_end_time = signal.end_time
assert actual_start_time is not None
assert actual_start_time == expected_start_time
assert actual_end_time is not None
assert actual_end_time == expected_end_time
note(f"Samples: {samples}")
signal = argus.signal(dtype, data=samples)
if len(samples) > 0:
assert a < len(samples)
assert b < len(samples)
else:
assert signal.is_empty()
assert signal.at(0) is None
@settings(verbosity=Verbosity.verbose)
@given(st.data())
def test_signal_at(data: st.DataObject) -> None:
dtype = data.draw(gen_dtype())
xs = data.draw(gen_samples(min_size=10, max_size=100, dtype=dtype))
a = data.draw(draw_index(xs))
assert len(xs) > 2
assert a < len(xs)
signal = argus.signal(dtype, data=xs)
at, expected_val = xs[a]
actual_val = signal.at(at)
assert actual_val is not None
assert actual_val == expected_val
@given(st.data())
def test_signal_create_should_fail(data: st.DataObject) -> None:
dtype = data.draw(gen_dtype())
xs = data.draw(gen_samples(min_size=10, max_size=100, dtype=dtype))
a = data.draw(draw_index(xs))
b = data.draw(draw_index(xs))
assert len(xs) > 2
assert a < len(xs)
assert b < len(xs)
# Swap two indices in the samples
xs[b], xs[a] = xs[a], xs[b]
with pytest.raises(RuntimeError):
_ = argus.signal(dtype, data=xs)