feat!(pyargus): simplify the API surface
- Get rid of helper functions. It is not that much more verbose to create signals with `argus.FloatSignal(...)` than `argus.signal(..., dtype=argus.dtype.float64`). - Make the package hierarchy flat: everything is under `argus`. If this is an issue, it can be changed in the future. - Add type hints for interval types.
This commit is contained in:
parent
3714cd5936
commit
d39e3d3e12
14 changed files with 237 additions and 247 deletions
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "pyargus"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
|
||||
authors.workspace = true
|
||||
license.workspace = true
|
||||
|
|
|
|||
|
|
@ -1,95 +1,54 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import List, Optional, Tuple, Type, Union
|
||||
from typing import Union
|
||||
|
||||
from typing_extensions import TypeAlias
|
||||
|
||||
from argus._argus import Abs as Abs
|
||||
from argus._argus import Add as Add
|
||||
from argus._argus import Always as Always
|
||||
from argus._argus import And as And
|
||||
from argus._argus import BoolExpr as BoolExpr
|
||||
from argus._argus import BoolSignal as BoolSignal
|
||||
from argus._argus import Cmp as Cmp
|
||||
from argus._argus import ConstBool as ConstBool
|
||||
from argus._argus import ConstFloat as ConstFloat
|
||||
from argus._argus import ConstInt as ConstInt
|
||||
from argus._argus import ConstUInt as ConstUInt
|
||||
from argus._argus import Div as Div
|
||||
from argus._argus import Eventually as Eventually
|
||||
from argus._argus import Expr as Expr
|
||||
from argus._argus import FloatSignal as FloatSignal
|
||||
from argus._argus import IntSignal as IntSignal
|
||||
from argus._argus import Mul as Mul
|
||||
from argus._argus import Negate as Negate
|
||||
from argus._argus import Next as Next
|
||||
from argus._argus import Not as Not
|
||||
from argus._argus import NumExpr as NumExpr
|
||||
from argus._argus import Or as Or
|
||||
from argus._argus import Signal as Signal
|
||||
from argus._argus import Trace as Trace
|
||||
from argus._argus import UnsignedIntSignal as UnsignedIntSignal
|
||||
from argus._argus import Until as Until
|
||||
from argus._argus import VarBool as VarBool
|
||||
from argus._argus import VarFloat as VarFloat
|
||||
from argus._argus import VarInt as VarInt
|
||||
from argus._argus import VarUInt as VarUInt
|
||||
from argus._argus import dtype as dtype
|
||||
from argus._argus import eval_bool_semantics as eval_bool_semantics
|
||||
from argus._argus import eval_robust_semantics as eval_robust_semantics
|
||||
from argus._argus import parse_expr as parse_expr
|
||||
|
||||
from . import _argus
|
||||
from ._argus import dtype, parse_expr
|
||||
from .exprs import ConstBool, ConstFloat, ConstInt, ConstUInt, VarBool, VarFloat, VarInt, VarUInt
|
||||
from .signals import BoolSignal, FloatSignal, IntSignal, Signal, UnsignedIntSignal
|
||||
|
||||
AllowedDtype: TypeAlias = Union[bool, int, float]
|
||||
|
||||
try:
|
||||
__doc__ = _argus.__doc__
|
||||
except AttributeError:
|
||||
...
|
||||
|
||||
AllowedDtype = Union[bool, int, float]
|
||||
|
||||
|
||||
def declare_var(name: str, dtype_: Union[dtype, Type[AllowedDtype]]) -> Union[VarBool, VarInt, VarUInt, VarFloat]:
|
||||
"""Declare a variable with the given name and type"""
|
||||
dtype_ = dtype.convert(dtype_)
|
||||
|
||||
if dtype_ == dtype.bool_:
|
||||
return VarBool(name)
|
||||
elif dtype_ == dtype.int64:
|
||||
return VarInt(name)
|
||||
elif dtype_ == dtype.uint64:
|
||||
return VarUInt(name)
|
||||
elif dtype_ == dtype.float64:
|
||||
return VarFloat(name)
|
||||
raise TypeError(f"unsupported variable type `{dtype_}`")
|
||||
|
||||
|
||||
def literal(value: AllowedDtype) -> Union[ConstBool, ConstInt, ConstUInt, ConstFloat]:
|
||||
"""Create a literal expression for the given value"""
|
||||
if isinstance(value, bool):
|
||||
return ConstBool(value)
|
||||
if isinstance(value, int):
|
||||
return ConstInt(value)
|
||||
if isinstance(value, float):
|
||||
return ConstFloat(value)
|
||||
raise TypeError(f"unsupported literal type `{type(value)}`")
|
||||
|
||||
|
||||
def signal(
|
||||
dtype_: Union[dtype, Type[AllowedDtype]],
|
||||
*,
|
||||
data: Optional[Union[AllowedDtype, List[Tuple[float, AllowedDtype]]]] = None,
|
||||
) -> Union[BoolSignal, UnsignedIntSignal, IntSignal, FloatSignal]:
|
||||
"""Create a signal of the given type
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
||||
dtype_:
|
||||
Type of the signal
|
||||
|
||||
data :
|
||||
If a constant scalar is given, a constant signal is created. Otherwise, if a list of sample points are given, a sampled
|
||||
signal is constructed. Otherwise, an empty signal is created.
|
||||
"""
|
||||
factory: Type[Union[BoolSignal, UnsignedIntSignal, IntSignal, FloatSignal]]
|
||||
expected_type: Type[AllowedDtype]
|
||||
|
||||
dtype_ = dtype.convert(dtype_)
|
||||
if dtype_ == dtype.bool_:
|
||||
factory = BoolSignal
|
||||
expected_type = bool
|
||||
elif dtype_ == dtype.uint64:
|
||||
factory = UnsignedIntSignal
|
||||
expected_type = int
|
||||
elif dtype_ == dtype.int64:
|
||||
factory = IntSignal
|
||||
expected_type = int
|
||||
elif dtype_ == dtype.float64:
|
||||
factory = FloatSignal
|
||||
expected_type = float
|
||||
else:
|
||||
raise ValueError(f"unsupported dtype_ {dtype}")
|
||||
|
||||
if data is None:
|
||||
return factory.from_samples([])
|
||||
elif isinstance(data, (list, tuple)):
|
||||
return factory.from_samples(data) # type: ignore[arg-type]
|
||||
assert isinstance(data, expected_type)
|
||||
return factory.constant(data) # type: ignore[arg-type]
|
||||
|
||||
|
||||
__all__ = [
|
||||
"dtype",
|
||||
"parse_expr",
|
||||
"declare_var",
|
||||
"literal",
|
||||
"signal",
|
||||
"Signal",
|
||||
]
|
||||
try:
|
||||
__version__ = _argus.__version__
|
||||
except AttributeError:
|
||||
...
|
||||
|
|
|
|||
38
pyargus/argus/__init__.pyi
Normal file
38
pyargus/argus/__init__.pyi
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
from typing_extensions import TypeAlias
|
||||
|
||||
from argus._argus import Abs as Abs
|
||||
from argus._argus import Add as Add
|
||||
from argus._argus import Always as Always
|
||||
from argus._argus import And as And
|
||||
from argus._argus import BoolExpr as BoolExpr
|
||||
from argus._argus import BoolSignal as BoolSignal
|
||||
from argus._argus import Cmp as Cmp
|
||||
from argus._argus import ConstBool as ConstBool
|
||||
from argus._argus import ConstFloat as ConstFloat
|
||||
from argus._argus import ConstInt as ConstInt
|
||||
from argus._argus import ConstUInt as ConstUInt
|
||||
from argus._argus import Div as Div
|
||||
from argus._argus import Eventually as Eventually
|
||||
from argus._argus import Expr as Expr
|
||||
from argus._argus import FloatSignal as FloatSignal
|
||||
from argus._argus import IntSignal as IntSignal
|
||||
from argus._argus import Mul as Mul
|
||||
from argus._argus import Negate as Negate
|
||||
from argus._argus import Next as Next
|
||||
from argus._argus import Not as Not
|
||||
from argus._argus import NumExpr as NumExpr
|
||||
from argus._argus import Or as Or
|
||||
from argus._argus import Signal as Signal
|
||||
from argus._argus import Trace as Trace
|
||||
from argus._argus import UnsignedIntSignal as UnsignedIntSignal
|
||||
from argus._argus import Until as Until
|
||||
from argus._argus import VarBool as VarBool
|
||||
from argus._argus import VarFloat as VarFloat
|
||||
from argus._argus import VarInt as VarInt
|
||||
from argus._argus import VarUInt as VarUInt
|
||||
from argus._argus import dtype as dtype
|
||||
from argus._argus import eval_bool_semantics as eval_bool_semantics
|
||||
from argus._argus import eval_robust_semantics as eval_robust_semantics
|
||||
from argus._argus import parse_expr as parse_expr
|
||||
|
||||
AllowedDtype: TypeAlias = bool | int | float
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
from typing import ClassVar, Literal, TypeAlias, final
|
||||
from typing import ClassVar, Literal, TypeAlias, TypeVar, final
|
||||
|
||||
from typing_extensions import Self
|
||||
from typing_extensions import Generic, Self
|
||||
|
||||
def parse_expr(expr_str: str) -> Expr: ...
|
||||
|
||||
|
|
@ -114,17 +114,21 @@ class Or(BoolExpr):
|
|||
class Next(BoolExpr):
|
||||
def __init__(self, arg: BoolExpr) -> None: ...
|
||||
|
||||
@final
|
||||
class Oracle(BoolExpr):
|
||||
def __init__(self, arg: BoolExpr, steps: int) -> None: ...
|
||||
|
||||
@final
|
||||
class Always(BoolExpr):
|
||||
def __init__(self, arg: BoolExpr) -> None: ...
|
||||
def __init__(self, arg: BoolExpr, *, interval: tuple[float | None, float | None]) -> None: ...
|
||||
|
||||
@final
|
||||
class Eventually(BoolExpr):
|
||||
def __init__(self, arg: BoolExpr) -> None: ...
|
||||
def __init__(self, arg: BoolExpr, *, interval: tuple[float | None, float | None]) -> None: ...
|
||||
|
||||
@final
|
||||
class Until(BoolExpr):
|
||||
def __init__(self, lhs: BoolExpr, rhs: BoolExpr) -> None: ...
|
||||
def __init__(self, lhs: BoolExpr, rhs: BoolExpr, *, interval: tuple[float | None, float | None]) -> None: ...
|
||||
|
||||
@final
|
||||
class dtype: # noqa: N801
|
||||
|
|
@ -138,7 +142,18 @@ class dtype: # noqa: N801
|
|||
def __eq__(self, other: object) -> bool: ...
|
||||
def __int__(self) -> int: ...
|
||||
|
||||
class Signal:
|
||||
_T = TypeVar("_T", bool, int, float)
|
||||
|
||||
class Signal(Generic[_T]):
|
||||
def __init__(self, *, interpolation_method: _InterpolationMethod = "linear") -> None: ...
|
||||
@classmethod
|
||||
def constant(cls, value: _T, *, interpolation_method: _InterpolationMethod = "linear") -> Self: ...
|
||||
@classmethod
|
||||
def from_samples(
|
||||
cls, samples: list[tuple[float, _T]], *, interpolation_method: _InterpolationMethod = "linear"
|
||||
) -> Self: ...
|
||||
def push(self, time: float, value: _T) -> None: ...
|
||||
def at(self, time: float) -> _T | None: ...
|
||||
def is_empty(self) -> bool: ...
|
||||
@property
|
||||
def start_time(self) -> float | None: ...
|
||||
|
|
@ -148,7 +163,8 @@ class Signal:
|
|||
def kind(self) -> dtype: ...
|
||||
|
||||
@final
|
||||
class BoolSignal(Signal):
|
||||
class BoolSignal(Signal[bool]):
|
||||
def __init__(self, *, interpolation_method: _InterpolationMethod = "linear") -> None: ...
|
||||
@classmethod
|
||||
def constant(cls, value: bool, *, interpolation_method: _InterpolationMethod = "linear") -> Self: ...
|
||||
@classmethod
|
||||
|
|
@ -161,7 +177,8 @@ class BoolSignal(Signal):
|
|||
_InterpolationMethod: TypeAlias = Literal["linear", "constant"]
|
||||
|
||||
@final
|
||||
class IntSignal(Signal):
|
||||
class IntSignal(Signal[int]):
|
||||
def __init__(self, *, interpolation_method: _InterpolationMethod = "linear") -> None: ...
|
||||
@classmethod
|
||||
def constant(cls, value: int, *, interpolation_method: _InterpolationMethod = "linear") -> Self: ...
|
||||
@classmethod
|
||||
|
|
@ -172,7 +189,8 @@ class IntSignal(Signal):
|
|||
def at(self, time: float) -> int | None: ...
|
||||
|
||||
@final
|
||||
class UnsignedIntSignal(Signal):
|
||||
class UnsignedIntSignal(Signal[int]):
|
||||
def __init__(self, *, interpolation_method: _InterpolationMethod = "linear") -> None: ...
|
||||
@classmethod
|
||||
def constant(cls, value: int, *, interpolation_method: _InterpolationMethod = "linear") -> Self: ...
|
||||
@classmethod
|
||||
|
|
@ -183,7 +201,8 @@ class UnsignedIntSignal(Signal):
|
|||
def at(self, time: float) -> int | None: ...
|
||||
|
||||
@final
|
||||
class FloatSignal(Signal):
|
||||
class FloatSignal(Signal[float]):
|
||||
def __init__(self, *, interpolation_method: _InterpolationMethod = "linear") -> None: ...
|
||||
@classmethod
|
||||
def constant(cls, value: float, *, interpolation_method: _InterpolationMethod = "linear") -> Self: ...
|
||||
@classmethod
|
||||
|
|
@ -194,7 +213,8 @@ class FloatSignal(Signal):
|
|||
def at(self, time: float) -> float | None: ...
|
||||
|
||||
@final
|
||||
class Trace: ...
|
||||
class Trace:
|
||||
def __init__(self, signals: dict[str, Signal]) -> None: ...
|
||||
|
||||
def eval_bool_semantics(
|
||||
expr: BoolExpr, trace: Trace, *, interpolation_method: _InterpolationMethod = "linear"
|
||||
|
|
|
|||
|
|
@ -1,53 +0,0 @@
|
|||
from argus._argus import (
|
||||
Abs,
|
||||
Add,
|
||||
Always,
|
||||
And,
|
||||
BoolExpr,
|
||||
ConstBool,
|
||||
ConstFloat,
|
||||
ConstInt,
|
||||
ConstUInt,
|
||||
Div,
|
||||
Eventually,
|
||||
Expr,
|
||||
Mul,
|
||||
Negate,
|
||||
Next,
|
||||
Not,
|
||||
NumExpr,
|
||||
Or,
|
||||
Until,
|
||||
VarBool,
|
||||
VarFloat,
|
||||
VarInt,
|
||||
VarUInt,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"Abs",
|
||||
"Add",
|
||||
"Always",
|
||||
"And",
|
||||
"BoolExpr",
|
||||
"ConstBool",
|
||||
"ConstFloat",
|
||||
"ConstInt",
|
||||
"ConstUInt",
|
||||
"Div",
|
||||
"Eventually",
|
||||
"Mul",
|
||||
"Negate",
|
||||
"Next",
|
||||
"Not",
|
||||
"NumExpr",
|
||||
"Or",
|
||||
"Until",
|
||||
"VarBool",
|
||||
"VarFloat",
|
||||
"VarInt",
|
||||
"VarUInt",
|
||||
"Expr",
|
||||
"BoolExpr",
|
||||
"NumExpr",
|
||||
]
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
from argus._argus import Trace, eval_bool_semantics, eval_robust_semantics
|
||||
|
||||
__all__ = [
|
||||
"Trace",
|
||||
"eval_bool_semantics",
|
||||
"eval_robust_semantics",
|
||||
]
|
||||
|
|
@ -1,48 +0,0 @@
|
|||
from typing import List, Optional, Protocol, Tuple, TypeVar, runtime_checkable
|
||||
|
||||
from typing_extensions import Self
|
||||
|
||||
from argus._argus import BoolSignal, FloatSignal, IntSignal, UnsignedIntSignal, dtype
|
||||
|
||||
T = TypeVar("T", bool, int, float)
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class Signal(Protocol[T]):
|
||||
def is_empty(self) -> bool:
|
||||
...
|
||||
|
||||
@property
|
||||
def start_time(self) -> Optional[float]:
|
||||
...
|
||||
|
||||
@property
|
||||
def end_time(self) -> Optional[float]:
|
||||
...
|
||||
|
||||
@property
|
||||
def kind(self) -> dtype:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def constant(cls, value: T) -> Self:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def from_samples(cls, samples: List[Tuple[float, T]]) -> Self:
|
||||
...
|
||||
|
||||
def push(self, time: float, value: T) -> None:
|
||||
...
|
||||
|
||||
def at(self, time: float) -> Optional[T]:
|
||||
...
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Signal",
|
||||
"BoolSignal",
|
||||
"IntSignal",
|
||||
"UnsignedIntSignal",
|
||||
"FloatSignal",
|
||||
]
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
[project]
|
||||
name = "pyargus"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
requires-python = ">=3.8"
|
||||
classifiers = [
|
||||
"Programming Language :: Rust",
|
||||
|
|
@ -38,7 +38,7 @@ features = ["pyo3/extension-module"]
|
|||
module-name = "argus._argus"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = ["--import-mode=importlib"]
|
||||
addopts = "--import-mode=importlib --doctest-glob=../docs/*.rst --doctest-glob=../docs/**/*.rst"
|
||||
testpaths = ["tests"]
|
||||
|
||||
[tool.mypy]
|
||||
|
|
@ -59,6 +59,8 @@ ignore = ["ANN101", "ANN102"]
|
|||
max-line-length = 127
|
||||
max-complexity = 10
|
||||
per-file-ignores = [
|
||||
"__init__.py: F401",
|
||||
"__init__.pyi: F401",
|
||||
"*.py: B905, B907, B950, E203, E501, W503, W291, W293",
|
||||
"*.pyi: B, E301, E302, E305, E501, E701, E704, W503",
|
||||
]
|
||||
|
|
|
|||
|
|
@ -115,11 +115,11 @@ impl ConstInt {
|
|||
}
|
||||
}
|
||||
|
||||
/// Create a constant _unsigned_ integer expression
|
||||
/// Create a constant *unsigned* integer expression
|
||||
///
|
||||
/// # Warning
|
||||
///
|
||||
/// Negating an unsigned integer during evaluation _may_ lead to the evaluation method
|
||||
/// Negating an unsigned integer during evaluation *may* lead to the evaluation method
|
||||
/// panicking.
|
||||
#[pyclass(extends=PyNumExpr, module = "argus")]
|
||||
pub struct ConstUInt;
|
||||
|
|
@ -162,7 +162,7 @@ impl VarInt {
|
|||
}
|
||||
}
|
||||
|
||||
/// Create an _unsigned_ integer variable
|
||||
/// Create an *unsigned* integer variable
|
||||
#[pyclass(extends=PyNumExpr, module = "argus")]
|
||||
pub struct VarUInt;
|
||||
|
||||
|
|
|
|||
|
|
@ -109,6 +109,9 @@ fn parse_expr(expr_str: &str) -> PyResult<PyObject> {
|
|||
fn pyargus(py: Python, m: &PyModule) -> PyResult<()> {
|
||||
pyo3_log::init();
|
||||
|
||||
let version = env!("CARGO_PKG_VERSION");
|
||||
|
||||
m.add("__version__", version)?;
|
||||
m.add_class::<DType>()?;
|
||||
m.add_function(wrap_pyfunction!(parse_expr, m)?)?;
|
||||
|
||||
|
|
|
|||
|
|
@ -3,14 +3,19 @@ use std::str::FromStr;
|
|||
|
||||
use argus::signals::interpolation::{Constant, Linear};
|
||||
use argus::{AnySignal, BooleanSemantics, QuantitativeSemantics, Signal, Trace};
|
||||
use pyo3::exceptions::PyTypeError;
|
||||
use pyo3::prelude::*;
|
||||
use pyo3::types::{PyDict, PyString};
|
||||
|
||||
use crate::expr::PyBoolExpr;
|
||||
use crate::signals::{BoolSignal, FloatSignal, PyInterp, PySignal, SignalKind};
|
||||
use crate::PyArgusError;
|
||||
|
||||
/// A signal trace to pass evaluate.
|
||||
///
|
||||
/// To evaluate the robustness of a set of signals, we need to construct a `Trace`
|
||||
/// containing the signals.
|
||||
///
|
||||
/// :param signals: A dictionary mapping signal names to `argus.Signal` types.
|
||||
/// :type signals: dict[str, argus.Signal]
|
||||
#[pyclass(name = "Trace", module = "argus")]
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct PyTrace {
|
||||
|
|
@ -20,21 +25,8 @@ pub struct PyTrace {
|
|||
#[pymethods]
|
||||
impl PyTrace {
|
||||
#[new]
|
||||
fn new(dict: &PyDict) -> PyResult<Self> {
|
||||
let mut signals = HashMap::with_capacity(dict.len());
|
||||
for (key, val) in dict {
|
||||
let key: &PyString = key
|
||||
.downcast()
|
||||
.map_err(|e| PyTypeError::new_err(format!("expected dictionary with string keys for trace ({})", e)))?;
|
||||
let val: &PyCell<PySignal> = val.downcast().map_err(|e| {
|
||||
PyTypeError::new_err(format!(
|
||||
"expected `argus.Signal` value for key `{}` in trace ({})",
|
||||
key, e
|
||||
))
|
||||
})?;
|
||||
let signal = val.borrow().signal.clone();
|
||||
signals.insert(key.to_string(), signal);
|
||||
}
|
||||
fn new(signals: HashMap<String, PySignal>) -> PyResult<Self> {
|
||||
let signals = signals.into_iter().map(|(k, v)| (k, v.signal)).collect();
|
||||
|
||||
Ok(Self { signals })
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,15 +2,14 @@ use std::str::FromStr;
|
|||
|
||||
use argus::signals::interpolation::{Constant, Linear};
|
||||
use argus::signals::Signal;
|
||||
use pyo3::exceptions::PyValueError;
|
||||
use pyo3::exceptions::{PyNotImplementedError, PyValueError};
|
||||
use pyo3::prelude::*;
|
||||
use pyo3::types::PyType;
|
||||
|
||||
use crate::{DType, PyArgusError};
|
||||
|
||||
#[pyclass(name = "InterpolationMethod", module = "argus")]
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub enum PyInterp {
|
||||
pub(crate) enum PyInterp {
|
||||
#[default]
|
||||
Linear,
|
||||
Constant,
|
||||
|
|
@ -60,7 +59,7 @@ pub struct PySignal {
|
|||
}
|
||||
|
||||
impl PySignal {
|
||||
pub fn new<T>(signal: T, interpolation: PyInterp) -> Self
|
||||
pub(crate) fn new<T>(signal: T, interpolation: PyInterp) -> Self
|
||||
where
|
||||
T: Into<SignalKind>,
|
||||
{
|
||||
|
|
@ -128,6 +127,60 @@ impl PySignal {
|
|||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new empty signal
|
||||
#[new]
|
||||
#[pyo3(signature = (*, interpolation_method = "linear"))]
|
||||
fn init(interpolation_method: &str) -> PyResult<Self> {
|
||||
_ = interpolation_method;
|
||||
Err(PyNotImplementedError::new_err(
|
||||
"cannot directly construct an abstract Signal",
|
||||
))
|
||||
}
|
||||
|
||||
/// Create a new signal with constant value
|
||||
#[classmethod]
|
||||
#[pyo3(signature = (value, *, interpolation_method = "linear"))]
|
||||
fn constant(_: &PyType, _py: Python<'_>, value: &PyAny, interpolation_method: &str) -> PyResult<Py<Self>> {
|
||||
_ = value;
|
||||
_ = interpolation_method;
|
||||
Err(PyNotImplementedError::new_err(
|
||||
"cannot directly construct an abstract Signal",
|
||||
))
|
||||
}
|
||||
|
||||
/// Create a new signal from some finite number of samples
|
||||
#[classmethod]
|
||||
#[pyo3(signature = (samples, *, interpolation_method = "linear"))]
|
||||
fn from_samples(_: &PyType, samples: Vec<(f64, &PyAny)>, interpolation_method: &str) -> PyResult<Py<Self>> {
|
||||
_ = samples;
|
||||
_ = interpolation_method;
|
||||
Err(PyNotImplementedError::new_err(
|
||||
"cannot directly construct an abstract Signal",
|
||||
))
|
||||
}
|
||||
|
||||
/// Push a new sample into the given signal.
|
||||
#[pyo3(signature = (time, value))]
|
||||
fn push(_: PyRefMut<'_, Self>, time: f64, value: &PyAny) -> PyResult<()> {
|
||||
_ = time;
|
||||
_ = value;
|
||||
Err(PyNotImplementedError::new_err(
|
||||
"cannot push samples to an abstract Signal",
|
||||
))
|
||||
}
|
||||
|
||||
/// Get the value of the signal at the given time point.
|
||||
///
|
||||
/// If there exists a sample, then the value is returned, otherwise the value is
|
||||
/// interpolated. If the time point lies outside of the domain of the signal, then
|
||||
/// `None` is returned.
|
||||
fn at(_self_: PyRef<'_, Self>, time: f64) -> PyResult<Option<&PyAny>> {
|
||||
_ = time;
|
||||
Err(PyNotImplementedError::new_err(
|
||||
"cannot query for samples in an abstract Signal",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_signals {
|
||||
|
|
@ -142,7 +195,7 @@ macro_rules! impl_signals {
|
|||
/// Create a new empty signal
|
||||
#[new]
|
||||
#[pyo3(signature = (*, interpolation_method = "linear"))]
|
||||
fn new(interpolation_method: &str) -> PyResult<(Self, PySignal)> {
|
||||
fn init(interpolation_method: &str) -> PyResult<(Self, PySignal)> {
|
||||
let interp = PyInterp::from_str(interpolation_method)?;
|
||||
Ok((Self, PySignal::new(Signal::<$ty>::new(), interp)))
|
||||
}
|
||||
|
|
@ -181,6 +234,14 @@ macro_rules! impl_signals {
|
|||
fn push(mut self_: PyRefMut<'_, Self>, time: f64, value: $ty) -> Result<(), PyArgusError> {
|
||||
let super_: &mut PySignal = self_.as_mut();
|
||||
let signal: &mut Signal<$ty> = (&mut super_.signal).try_into().unwrap();
|
||||
// if it is an empty signal, make it sampled. Otherwise, throw an error.
|
||||
let signal: &mut Signal<$ty> = match signal {
|
||||
Signal::Empty => {
|
||||
super_.signal = Signal::<$ty>::new_with_capacity(1).into();
|
||||
(&mut super_.signal).try_into().unwrap()
|
||||
}
|
||||
_ => signal,
|
||||
};
|
||||
signal.push(core::time::Duration::from_secs_f64(time), value)?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
import typing
|
||||
from typing import List, Tuple, Type, Union
|
||||
|
||||
import pytest
|
||||
from hypothesis import assume, given, note
|
||||
from hypothesis import assume, given
|
||||
from hypothesis import strategies as st
|
||||
from hypothesis.strategies import SearchStrategy, composite
|
||||
|
||||
|
|
@ -53,7 +54,7 @@ def gen_samples(
|
|||
return xs
|
||||
|
||||
|
||||
def empty_signal(*, dtype_: Union[Type[AllowedDtype], dtype]) -> SearchStrategy[argus.Signal]:
|
||||
def empty_signal(dtype_: Union[Type[AllowedDtype], dtype]) -> SearchStrategy[argus.Signal]:
|
||||
new_dtype: dtype = dtype.convert(dtype_)
|
||||
sig: argus.Signal
|
||||
if new_dtype == dtype.bool_:
|
||||
|
|
@ -74,7 +75,30 @@ def empty_signal(*, dtype_: Union[Type[AllowedDtype], dtype]) -> SearchStrategy[
|
|||
|
||||
|
||||
def constant_signal(dtype_: Union[Type[AllowedDtype], dtype]) -> SearchStrategy[argus.Signal]:
|
||||
return gen_element_fn(dtype_).map(lambda val: argus.signal(dtype_, data=val))
|
||||
element = gen_element_fn(dtype_)
|
||||
dtype_ = dtype.convert(dtype_)
|
||||
if dtype_ == dtype.bool_:
|
||||
return element.map(lambda val: argus.BoolSignal.constant(typing.cast(bool, val)))
|
||||
if dtype_ == dtype.uint64:
|
||||
return element.map(lambda val: argus.UnsignedIntSignal.constant(typing.cast(int, val)))
|
||||
if dtype_ == dtype.int64:
|
||||
return element.map(lambda val: argus.IntSignal.constant(typing.cast(int, val)))
|
||||
if dtype_ == dtype.float64:
|
||||
return element.map(lambda val: argus.FloatSignal.constant(typing.cast(float, val)))
|
||||
raise ValueError("unsupported data type for signal")
|
||||
|
||||
|
||||
def sampled_signal(xs: List[Tuple[float, AllowedDtype]], dtype_: Union[Type[AllowedDtype], dtype]) -> argus.Signal:
|
||||
dtype_ = dtype.convert(dtype_)
|
||||
if dtype_ == dtype.bool_:
|
||||
return argus.BoolSignal.from_samples(typing.cast(List[Tuple[float, bool]], xs))
|
||||
if dtype_ == dtype.uint64:
|
||||
return argus.UnsignedIntSignal.from_samples(typing.cast(List[Tuple[float, int]], xs))
|
||||
if dtype_ == dtype.int64:
|
||||
return argus.IntSignal.from_samples(typing.cast(List[Tuple[float, int]], xs))
|
||||
if dtype_ == dtype.float64:
|
||||
return argus.FloatSignal.from_samples(typing.cast(List[Tuple[float, float]], xs))
|
||||
raise ValueError("unsupported data type for signal")
|
||||
|
||||
|
||||
@composite
|
||||
|
|
@ -106,9 +130,7 @@ def test_correct_constant_signals(data: st.DataObject) -> None:
|
|||
def test_correctly_create_signals(data: st.DataObject) -> None:
|
||||
dtype_ = data.draw(gen_dtype())
|
||||
xs = data.draw(gen_samples(min_size=0, max_size=100, dtype_=dtype_))
|
||||
|
||||
note(f"Samples: {gen_samples}")
|
||||
signal = argus.signal(dtype_, data=xs)
|
||||
signal = sampled_signal(xs, dtype_)
|
||||
assert isinstance(signal, argus.Signal)
|
||||
if len(xs) > 0:
|
||||
expected_start_time = xs[0][0]
|
||||
|
|
@ -161,7 +183,7 @@ def test_signal_create_should_fail(data: st.DataObject) -> None:
|
|||
xs[b], xs[a] = xs[a], xs[b]
|
||||
|
||||
with pytest.raises(RuntimeError, match=r"trying to create a non-monotonically signal.+"):
|
||||
_ = argus.signal(dtype_, data=xs)
|
||||
_ = sampled_signal(xs, dtype_)
|
||||
|
||||
|
||||
@given(st.data())
|
||||
|
|
@ -171,8 +193,9 @@ def test_push_to_empty_signal(data: st.DataObject) -> None:
|
|||
assert isinstance(signal, argus.Signal)
|
||||
assert signal.is_empty()
|
||||
element = data.draw(gen_element_fn(dtype_))
|
||||
with pytest.raises(RuntimeError, match="cannot push value to non-sampled signal"):
|
||||
signal.push(0.0, element) # type: ignore[attr-defined]
|
||||
|
||||
signal.push(0.0, element)
|
||||
assert signal.at(0.0) == element
|
||||
|
||||
|
||||
@given(st.data())
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue