fix: semantics for bounded eventually
This commit is contained in:
parent
a295f21049
commit
46f0f60bc0
12 changed files with 412 additions and 534 deletions
|
|
@ -47,11 +47,11 @@ def gen_samples(
|
|||
n_lists = max(1, n_lists)
|
||||
timestamps = draw(
|
||||
st.lists(
|
||||
st.integers(min_value=0, max_value=2**32 - 1),
|
||||
st.integers(min_value=0, max_value=2**32 - 1).map(lambda i: i / 1000),
|
||||
unique=True,
|
||||
min_size=min_size,
|
||||
max_size=max_size,
|
||||
).map(lambda t: list(map(float, sorted(set(t)))))
|
||||
).map(lambda t: list(sorted(set(t))))
|
||||
)
|
||||
elements = gen_element_fn(dtype_)
|
||||
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ features = ["pyo3/extension-module"]
|
|||
module-name = "argus._argus"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = "--import-mode=importlib --doctest-glob=../docs/*.rst --doctest-glob=../docs/**/*.rst"
|
||||
addopts = "--import-mode=importlib --doctest-glob=../docs/*.rst --doctest-glob=../docs/**/*.rst --hypothesis-explain"
|
||||
testpaths = ["tests"]
|
||||
|
||||
[tool.mypy]
|
||||
|
|
@ -63,6 +63,9 @@ show_error_codes = true
|
|||
[[tool.mypy.overrides]]
|
||||
module = "mtl"
|
||||
ignore_missing_imports = true
|
||||
[[tool.mypy.overrides]]
|
||||
module = "rtamt"
|
||||
ignore_missing_imports = true
|
||||
|
||||
|
||||
[tool.ruff]
|
||||
|
|
|
|||
|
|
@ -58,10 +58,8 @@ impl Trace for PyTrace {
|
|||
fn eval_bool_semantics(expr: &PyBoolExpr, trace: &PyTrace, interpolation_method: &str) -> PyResult<Py<BoolSignal>> {
|
||||
let interp = PyInterp::from_str(interpolation_method)?;
|
||||
let sig = match interp {
|
||||
PyInterp::Linear => BooleanSemantics::eval::<Linear, Linear>(&expr.0, trace).map_err(PyArgusError::from)?,
|
||||
PyInterp::Constant => {
|
||||
BooleanSemantics::eval::<Constant, Constant>(&expr.0, trace).map_err(PyArgusError::from)?
|
||||
}
|
||||
PyInterp::Linear => BooleanSemantics::eval::<Linear>(&expr.0, trace).map_err(PyArgusError::from)?,
|
||||
PyInterp::Constant => BooleanSemantics::eval::<Constant>(&expr.0, trace).map_err(PyArgusError::from)?,
|
||||
};
|
||||
Python::with_gil(|py| Py::new(py, (BoolSignal, PySignal::new(sig, interp))))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -237,7 +237,7 @@ macro_rules! impl_signals {
|
|||
// if it is an empty signal, make it sampled. Otherwise, throw an error.
|
||||
let signal: &mut Signal<$ty> = match signal {
|
||||
Signal::Empty => {
|
||||
super_.signal = Signal::<$ty>::new_with_capacity(1).into();
|
||||
super_.signal = Signal::<$ty>::with_capacity(1).into();
|
||||
(&mut super_.signal).try_into().unwrap()
|
||||
}
|
||||
_ => signal,
|
||||
|
|
|
|||
|
|
@ -1,13 +1,26 @@
|
|||
from typing import List, Tuple
|
||||
from typing import List, Literal, Tuple
|
||||
|
||||
import hypothesis.strategies as st
|
||||
import mtl
|
||||
import rtamt
|
||||
from hypothesis import given
|
||||
|
||||
import argus
|
||||
from argus.test_utils.signals_gen import gen_samples
|
||||
|
||||
|
||||
def _create_rtamt_spec(spec: str) -> rtamt.StlDenseTimeSpecification:
|
||||
rtamt_spec = rtamt.StlDenseTimeSpecification()
|
||||
rtamt_spec.name = "STL Spec"
|
||||
rtamt_spec.declare_var("x", "float")
|
||||
rtamt_spec.declare_var("y", "float")
|
||||
rtamt_spec.add_sub_spec("a = (x > 0)")
|
||||
rtamt_spec.add_sub_spec("b = (y > 0)")
|
||||
rtamt_spec.spec = spec
|
||||
rtamt_spec.parse()
|
||||
return rtamt_spec
|
||||
|
||||
|
||||
@given(
|
||||
sample_lists=gen_samples(min_size=3, max_size=50, dtype_=bool, n_lists=2),
|
||||
spec=st.one_of(
|
||||
|
|
@ -24,10 +37,12 @@ from argus.test_utils.signals_gen import gen_samples
|
|||
]
|
||||
]
|
||||
),
|
||||
interpolation_method=st.one_of(st.just("constant"), st.just("linear")),
|
||||
)
|
||||
def test_boolean_propositional_expr(
|
||||
sample_lists: List[List[Tuple[float, bool]]],
|
||||
spec: str,
|
||||
interpolation_method: Literal["linear", "constant"],
|
||||
) -> None:
|
||||
mtl_spec = mtl.parse(spec)
|
||||
argus_spec = argus.parse_expr(spec)
|
||||
|
|
@ -37,13 +52,13 @@ def test_boolean_propositional_expr(
|
|||
mtl_data = dict(a=a, b=b)
|
||||
argus_data = argus.Trace(
|
||||
dict(
|
||||
a=argus.BoolSignal.from_samples(a, interpolation_method="constant"),
|
||||
b=argus.BoolSignal.from_samples(b, interpolation_method="constant"),
|
||||
a=argus.BoolSignal.from_samples(a, interpolation_method=interpolation_method),
|
||||
b=argus.BoolSignal.from_samples(b, interpolation_method=interpolation_method),
|
||||
)
|
||||
)
|
||||
|
||||
mtl_rob = mtl_spec(mtl_data, quantitative=False)
|
||||
argus_rob = argus.eval_bool_semantics(argus_spec, argus_data)
|
||||
argus_rob = argus.eval_bool_semantics(argus_spec, argus_data, interpolation_method=interpolation_method)
|
||||
|
||||
assert mtl_rob == argus_rob.at(0), f"{argus_rob=}"
|
||||
|
||||
|
|
@ -54,40 +69,43 @@ def test_boolean_propositional_expr(
|
|||
[
|
||||
st.just(spec)
|
||||
for spec in [
|
||||
"F a",
|
||||
"G b",
|
||||
"F[0,2] a",
|
||||
"G[0,2] b",
|
||||
"F[0,10] a",
|
||||
"G[0,10] b",
|
||||
"(G(a & b))",
|
||||
"(F(a | b))",
|
||||
# FIXME: `mtl` doesn't contract the signal domain for F[0,2] so it fails if a is True and b is False in the
|
||||
# last sample point.
|
||||
# "G(a -> F[0,2] b)",
|
||||
"G(a -> F b)",
|
||||
"G F a -> F G b",
|
||||
"(G F a -> F G b)",
|
||||
"(a U b)",
|
||||
"(a U[0,2] b)",
|
||||
]
|
||||
]
|
||||
),
|
||||
interpolation_method=st.one_of(st.just("constant"), st.just("linear")),
|
||||
)
|
||||
def test_boolean_temporal_expr(
|
||||
sample_lists: List[List[Tuple[float, bool]]],
|
||||
spec: str,
|
||||
interpolation_method: Literal["linear", "constant"],
|
||||
) -> None:
|
||||
mtl_spec = mtl.parse(spec)
|
||||
argus_spec = argus.parse_expr(spec)
|
||||
assert isinstance(argus_spec, argus.BoolExpr)
|
||||
|
||||
a = sample_lists[0]
|
||||
b = sample_lists[1]
|
||||
a, b = sample_lists
|
||||
mtl_data = dict(a=a, b=b)
|
||||
argus_data = argus.Trace(
|
||||
dict(
|
||||
a=argus.BoolSignal.from_samples(a, interpolation_method="constant"),
|
||||
b=argus.BoolSignal.from_samples(b, interpolation_method="constant"),
|
||||
a=argus.BoolSignal.from_samples(a, interpolation_method=interpolation_method),
|
||||
b=argus.BoolSignal.from_samples(b, interpolation_method=interpolation_method),
|
||||
)
|
||||
)
|
||||
|
||||
mtl_rob = mtl_spec(mtl_data, quantitative=False)
|
||||
argus_rob = argus.eval_bool_semantics(argus_spec, argus_data)
|
||||
argus_rob = argus.eval_bool_semantics(argus_spec, argus_data, interpolation_method=interpolation_method)
|
||||
|
||||
assert mtl_rob == argus_rob.at(0), f"{argus_rob=}"
|
||||
|
|
|
|||
|
|
@ -39,9 +39,9 @@ def test_correctly_create_signals(data: st.DataObject) -> None:
|
|||
actual_end_time = signal.end_time
|
||||
|
||||
assert actual_start_time is not None
|
||||
assert actual_start_time == expected_start_time
|
||||
assert actual_start_time == pytest.approx(expected_start_time)
|
||||
assert actual_end_time is not None
|
||||
assert actual_end_time == expected_end_time
|
||||
assert actual_end_time == pytest.approx(expected_end_time)
|
||||
|
||||
a = data.draw(draw_index(xs))
|
||||
assert a < len(xs)
|
||||
|
|
@ -49,7 +49,10 @@ def test_correctly_create_signals(data: st.DataObject) -> None:
|
|||
actual_val = signal.at(at) # type: ignore
|
||||
|
||||
assert actual_val is not None
|
||||
assert actual_val == expected_val
|
||||
if isinstance(actual_val, float):
|
||||
assert actual_val == pytest.approx(expected_val)
|
||||
else:
|
||||
assert actual_val == expected_val
|
||||
|
||||
# generate one more sample
|
||||
new_time = actual_end_time + 1
|
||||
|
|
@ -58,7 +61,10 @@ def test_correctly_create_signals(data: st.DataObject) -> None:
|
|||
|
||||
get_val = signal.at(new_time)
|
||||
assert get_val is not None
|
||||
assert get_val == new_value
|
||||
if isinstance(get_val, float):
|
||||
assert get_val == pytest.approx(new_value)
|
||||
else:
|
||||
assert get_val == new_value
|
||||
|
||||
else:
|
||||
assert signal.is_empty()
|
||||
|
|
@ -81,7 +87,7 @@ def test_signal_create_should_fail(data: st.DataObject) -> None:
|
|||
# Swap two indices in the samples
|
||||
xs[b], xs[a] = xs[a], xs[b] # type: ignore
|
||||
|
||||
with pytest.raises(RuntimeError, match=r"trying to create a non-monotonically signal.+"):
|
||||
with pytest.raises(RuntimeError, match=r"trying to create a signal with non-monotonic time points.+"):
|
||||
_ = sampled_signal(xs, dtype_) # type: ignore
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue