tests(pyargus): add test cases for signals

This commit is contained in:
Anand Balakrishnan 2023-08-31 17:11:52 -07:00
parent ab0a2c6d85
commit 137c22cd70
11 changed files with 193 additions and 31 deletions

View file

@ -1,8 +1,6 @@
//! Expression tree for Argus specifications
use std::collections::HashSet;
use std::ops::Bound;
use std::time::Duration;
mod bool_expr;
pub mod iter;
@ -361,6 +359,9 @@ pub mod arbitrary {
#![allow(clippy::arc_with_non_send_sync)]
//! Helper functions to generate arbitrary expressions using [`mod@proptest`].
use core::ops::Bound;
use core::time::Duration;
use proptest::prelude::*;
use super::*;

View file

@ -332,6 +332,7 @@ fn compute_untimed_until(lhs: Signal<bool>, rhs: Signal<bool>) -> ArgusResult<Si
let v1 = lhs.interpolate_at::<Linear>(t).unwrap();
let v2 = rhs.interpolate_at::<Linear>(t).unwrap();
#[allow(clippy::nonminimal_bool)]
let z = (v1 && v2) || (v1 && next);
if z == next && i < (expected_len - 2) {
ret_samples.pop();

View file

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import Optional, Type, Union
from typing import List, Optional, Tuple, Type, Union
from argus import _argus
from argus._argus import DType as DType
@ -12,8 +12,10 @@ try:
except AttributeError:
...
AllowedDtype = Union[bool, int, float]
def declare_var(name: str, dtype: Union[DType, Type[Union[bool, int, float]]]) -> Union[VarBool, VarInt, VarUInt, VarFloat]:
def declare_var(name: str, dtype: Union[DType, Type[AllowedDtype]]) -> Union[VarBool, VarInt, VarUInt, VarFloat]:
"""Declare a variable with the given name and type"""
if isinstance(dtype, type):
if dtype == bool:
@ -34,7 +36,7 @@ def declare_var(name: str, dtype: Union[DType, Type[Union[bool, int, float]]]) -
raise TypeError(f"unsupported variable type `{dtype}`")
def literal(value: Union[bool, int, float]) -> Union[ConstBool, ConstInt, ConstUInt, ConstFloat]:
def literal(value: AllowedDtype) -> Union[ConstBool, ConstInt, ConstUInt, ConstFloat]:
"""Create a literal expression for the given value"""
if isinstance(value, bool):
return ConstBool(value)
@ -46,14 +48,18 @@ def literal(value: Union[bool, int, float]) -> Union[ConstBool, ConstInt, ConstU
def signal(
dtype: Union[DType, Type[Union[bool, int, float]]],
dtype: Union[DType, Type[AllowedDtype]],
*,
value: Optional[Union[bool, int, float]] = None,
data: Optional[Union[AllowedDtype, List[Tuple[float, AllowedDtype]]]] = None,
) -> Signal:
"""Create a signal of the given type
If a `value` isn't given the signal created is assumed to be a sampled signal, i.e., new data points can be pushed to the
signal. Otherwise, the signal is constant with the given value.
Parameters
----------
data :
If a constant scalar is given, a constant signal is created. Otherwise, if a list of sample points are given, a sampled
signal is constructed. Otherwise, an empty signal is created.
"""
if isinstance(dtype, type):
if dtype == bool:
@ -63,27 +69,30 @@ def signal(
elif dtype == float:
dtype = DType.Float
factory: Type[Union[BoolSignal, UnsignedIntSignal, IntSignal, FloatSignal]]
expected_type: Type[AllowedDtype]
if dtype == DType.Bool:
if value is None:
return BoolSignal.from_samples([])
else:
assert isinstance(value, bool)
return BoolSignal.constant(value)
elif dtype == DType.Int:
if value is None:
return IntSignal.from_samples([])
else:
assert isinstance(value, int)
return IntSignal.constant(value)
factory = BoolSignal
expected_type = bool
elif dtype == DType.UnsignedInt:
if value is None:
return UnsignedIntSignal.from_samples([])
else:
assert isinstance(value, int)
return UnsignedIntSignal.constant(value)
factory = UnsignedIntSignal
expected_type = int
elif dtype == DType.Int:
factory = IntSignal
expected_type = int
elif dtype == DType.Float:
return FloatSignal.from_samples([])
raise TypeError(f"unsupported signal type `{dtype}`")
factory = FloatSignal
expected_type = float
else:
raise ValueError(f"unsupported dtype {dtype}")
if data is None:
return factory.from_samples([])
elif isinstance(data, (list, tuple)):
return factory.from_samples(data) # type: ignore
else:
assert isinstance(data, expected_type)
return factory.constant(data) # type: ignore
__all__ = [

View file

@ -0,0 +1,13 @@
from typing import List, Optional, Tuple, Type, Union
from argus._argus import DType as DType
from argus.exprs import ConstBool, ConstFloat, ConstInt, ConstUInt, VarBool, VarFloat, VarInt, VarUInt
from argus.signals import Signal
AllowedDtype = Union[bool, int, float]
def declare_var(name: str, dtype: Union[DType, Type[AllowedDtype]]) -> Union[VarBool, VarInt, VarUInt, VarFloat]: ...
def literal(value: AllowedDtype) -> Union[ConstBool, ConstInt, ConstUInt, ConstFloat]: ...
def signal(
dtype: Union[DType, Type[AllowedDtype]], *, data: Optional[Union[AllowedDtype, List[Tuple[float, AllowedDtype]]]] = ...
) -> Signal: ...

22
pyargus/argus/exprs.pyi Normal file
View file

@ -0,0 +1,22 @@
from argus._argus import Abs as Abs
from argus._argus import Add as Add
from argus._argus import Always as Always
from argus._argus import And as And
from argus._argus import BoolExpr as BoolExpr
from argus._argus import ConstBool as ConstBool
from argus._argus import ConstFloat as ConstFloat
from argus._argus import ConstInt as ConstInt
from argus._argus import ConstUInt as ConstUInt
from argus._argus import Div as Div
from argus._argus import Eventually as Eventually
from argus._argus import Mul as Mul
from argus._argus import Negate as Negate
from argus._argus import Next as Next
from argus._argus import Not as Not
from argus._argus import NumExpr as NumExpr
from argus._argus import Or as Or
from argus._argus import Until as Until
from argus._argus import VarBool as VarBool
from argus._argus import VarFloat as VarFloat
from argus._argus import VarInt as VarInt
from argus._argus import VarUInt as VarUInt

View file

@ -0,0 +1,3 @@
from argus._argus import Trace as Trace
from argus._argus import eval_bool_semantics as eval_bool_semantics
from argus._argus import eval_robust_semantics as eval_robust_semantics

View file

@ -0,0 +1,5 @@
from argus._argus import BoolSignal as BoolSignal
from argus._argus import FloatSignal as FloatSignal
from argus._argus import IntSignal as IntSignal
from argus._argus import Signal as Signal
from argus._argus import UnsignedIntSignal as UnsignedIntSignal

View file

@ -24,7 +24,7 @@ dev = [
"black",
]
test = ["pytest", "pytest-cov", "hypothesis"]
test = ["pytest", "pytest-cov", "hypothesis", "numpy"]
[build-system]
requires = ["maturin>=1.0,<2.0"]
@ -37,6 +37,7 @@ module-name = "argus._argus"
[tool.pytest.ini_options]
addopts = ["--import-mode=importlib"]
testpaths = ["tests"]
[tool.mypy]
# ignore_missing_imports = true

View file

@ -1,5 +1,6 @@
use std::time::Duration;
use argus_core::signals::interpolation::Linear;
use argus_core::signals::Signal;
use pyo3::prelude::*;
@ -83,6 +84,45 @@ macro_rules! impl_signals {
self.0.push(Duration::from_secs_f64(time), value)?;
Ok(())
}
/// Check if the signal is empty
fn is_empty(&self) -> bool {
self.0.is_empty()
}
/// The start time of the signal
#[getter]
fn start_time(&self) -> Option<f64> {
use core::ops::Bound::*;
match self.0.start_time()? {
Included(t) | Excluded(t) => Some(t.as_secs_f64()),
_ => None,
}
}
/// The end time of the signal
#[getter]
fn end_time(&self) -> Option<f64> {
use core::ops::Bound::*;
match self.0.end_time()? {
Included(t) | Excluded(t) => Some(t.as_secs_f64()),
_ => None,
}
}
/// Get the value of the signal at the given time point.
///
/// If there exists a sample, then the value is returned, otherwise the value is
/// interpolated. If the time point lies outside of the domain of the signal, then `None`
/// is returned.
fn at(self_: PyRef<'_, Self>, time: f64) -> Option<$ty> {
let super_ = self_.as_ref();
let time = core::time::Duration::from_secs_f64(time);
match super_.interpolation {
PyInterp::Linear => self_.0.interpolate_at::<Linear>(time),
}
}
}
}
};

View file

@ -1,3 +0,0 @@

View file

@ -0,0 +1,70 @@
from typing import List, Tuple, Type, Union
from hypothesis import given, note
from hypothesis import strategies as st
from hypothesis.strategies import composite
import argus
AllowedDtype = Union[bool, int, float]
@composite
def samples(draw, *, min_size: int, max_size: int, dtype: Type[AllowedDtype]):
"""
Generate arbitrary samples for a signal where the time stamps are strictly
monotonically increasing
"""
elements: st.SearchStrategy[AllowedDtype]
if dtype == bool:
elements = st.booleans()
elif dtype == int:
size = 2**64
elements = st.integers(min_value=(-size // 2), max_value=((size - 1) // 2))
elif dtype == float:
elements = st.floats(width=64)
else:
raise ValueError(f"invalid dtype {dtype}")
values = draw(st.lists(elements, min_size=min_size, max_size=max_size))
return draw(
st.lists(st.floats(min_value=0), unique=True, min_size=len(values), max_size=len(values))
.map(lambda t: sorted(t))
.map(lambda t: list(zip(t, values)))
)
@composite
def samples_and_indices(
draw: st.DrawFn, *, min_size: int, max_size: int
) -> Tuple[List[Tuple[float, AllowedDtype]], int, int, Type[AllowedDtype]]:
"""
Generate an arbitrary list of samples and two indices within the list
"""
dtype = draw(st.one_of(st.just(bool), st.just(int), st.just(float)))
xs = draw(samples(min_size=min_size, max_size=max_size, dtype=dtype))
if len(xs) > 0:
i0 = draw(st.integers(min_value=0, max_value=len(xs) - 1))
i1 = draw(st.integers(min_value=0, max_value=len(xs) - 1))
else:
i0 = draw(st.just(0))
i1 = draw(st.just(0))
return (xs, i0, i1, dtype)
@given(samples_and_indices(min_size=0, max_size=100))
def test_correctly_create_signals(data: Tuple[List[Tuple[float, AllowedDtype]], int, int, Type[AllowedDtype]]) -> None:
samples: List[Tuple[float, AllowedDtype]] = data[0]
a: int = data[1]
b: int = data[2]
dtype: Type[AllowedDtype] = data[3]
note(f"Samples: {samples}")
signal = argus.signal(dtype, data=samples)
if len(samples) > 0:
assert a < len(samples)
assert b < len(samples)
else:
assert signal.is_empty()
assert signal.at(0) is None