Compare commits

...

10 commits

Author SHA1 Message Date
654a0521f7 remove suffix 'r' from string literals (fixes build)
Some checks are pending
CI / testing (macos-latest) (push) Waiting to run
CI / testing (ubuntu-latest) (push) Waiting to run
CI / testing (windows-latest) (push) Waiting to run
CI / linting (macos-latest) (push) Waiting to run
CI / linting (ubuntu-latest) (push) Waiting to run
CI / linting (windows-latest) (push) Waiting to run
CI / Documentation (push) Waiting to run
CI / coverage (ubuntu-latest) (push) Waiting to run
2025-11-01 13:56:36 +01:00
Anand Balakrishnan
be7fcb6c8a
Release 0.1.4
argus@0.1.4
pyargus@0.1.4

Generated by cargo-workspaces
2023-10-31 13:17:12 -07:00
Anand Balakrishnan
46f0f60bc0
fix: semantics for bounded eventually 2023-10-31 12:08:36 -07:00
Anand Balakrishnan
a295f21049
ci: separate rust and python test sessions 2023-10-18 16:38:06 -07:00
Anand Balakrishnan
eab6e219ef
ci: add cache for coverage 2023-10-17 12:46:45 -07:00
Anand Balakrishnan
7a68fd29d7
fix(pyargus): import TypeAlias from extensions for older python 2023-10-17 12:39:25 -07:00
Anand Balakrishnan
e9b17f1213
ci: install mamba on CI 2023-10-17 12:15:18 -07:00
Anand Balakrishnan
f5f0b3eb28
ci: fix nox lint tag vs session 2023-10-17 12:07:56 -07:00
Anand Balakrishnan
47b91e4ee4
ci: fix cache string 2023-10-17 12:06:34 -07:00
Anand Balakrishnan
dc834bd8be
test(pyargus): add more test cases
Ignore a test case where signals are contracted.
2023-10-17 12:03:52 -07:00
19 changed files with 488 additions and 588 deletions

View file

@ -30,6 +30,8 @@ jobs:
with:
environment-name: ci
environment-file: environment.yaml
create-args: >-
mamba
init-shell: bash
cache-environment: true
post-cleanup: 'all'
@ -49,7 +51,7 @@ jobs:
target/
.nox/
.hypothesis/
key: ${{ runner.os }}-argus-${{ hashFiles('**/Cargo.toml', "noxfile.py") }}
key: ${{ runner.os }}-argus-${{ hashFiles('**/Cargo.toml', 'noxfile.py') }}
restore-keys: ${{ runner.os }}-argus-
- name: Run tests
run: nox -s tests
@ -66,6 +68,8 @@ jobs:
with:
environment-name: ci
environment-file: environment.yaml
create-args: >-
mamba
init-shell: bash
cache-environment: true
post-cleanup: 'all'
@ -88,10 +92,10 @@ jobs:
target/
.nox/
.hypothesis/
key: ${{ runner.os }}-argus-${{ hashFiles('**/Cargo.toml', "noxfile.py") }}
key: ${{ runner.os }}-argus-${{ hashFiles('**/Cargo.toml', 'noxfile.py') }}
restore-keys: ${{ runner.os }}-argus-
- name: Run lints
run: nox -s lint
run: nox -t lint
docs:
name: Documentation
@ -104,6 +108,8 @@ jobs:
with:
environment-name: ci
environment-file: environment.yaml
create-args: >-
mamba
init-shell: bash
cache-environment: true
post-cleanup: 'all'
@ -130,11 +136,29 @@ jobs:
with:
environment-name: ci
environment-file: environment.yaml
create-args: >-
mamba
init-shell: bash
cache-environment: true
post-cleanup: 'all'
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@nightly
- name: Generate lockfile
run: cargo generate-lockfile
- name: Set up project cache
uses: actions/cache@v3
continue-on-error: false
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
.nox/
.hypothesis/
key: ${{ runner.os }}-argus-${{ hashFiles('**/Cargo.toml', 'noxfile.py') }}
restore-keys: ${{ runner.os }}-argus-
- name: Generate Coverage Reports
run: nox -s coverage
- name: Upload coverage reports to Codecov

View file

@ -1,6 +1,6 @@
[package]
name = "argus"
version = "0.1.3"
version = "0.1.4"
authors.workspace = true
license.workspace = true

View file

@ -16,13 +16,13 @@ pub enum Ordering {
#[display(fmt = "!=")]
NotEq,
/// Less than check
#[display(fmt = "{}", r#"if *strict { "<".to_string() } else { "<=".to_string() } "#r)]
#[display(fmt = "{}", r#"if *strict { "<".to_string() } else { "<=".to_string() } "#)]
Less {
/// Denotes `lhs < rhs` if `strict`, and `lhs <= rhs` otherwise.
strict: bool,
},
/// Greater than check
#[display(fmt = "{}", r#"if *strict { ">".to_string() } else { ">=".to_string() } "#r)]
#[display(fmt = "{}", r#"if *strict { ">".to_string() } else { ">=".to_string() } "#)]
Greater {
/// Denotes `lhs > rhs` if `strict`, and `lhs >= rhs` otherwise.
strict: bool,
@ -235,7 +235,7 @@ impl_bool_expr!(Not, arg);
/// Logical conjunction of a list of expressions
#[derive(Clone, Debug, PartialEq, argus_derive::BoolExpr, derive_more::Display)]
#[display(fmt = "({})", r#"args.iter().map(ToString::to_string).join(") && (")"#r)]
#[display(fmt = "({})", r#"args.iter().map(ToString::to_string).join(") && (")"#)]
pub struct And {
/// Expressions to be "and"-ed
pub args: Vec<BoolExpr>,
@ -245,7 +245,7 @@ impl_bool_expr!(And, [args]);
/// Logical disjunction of a list of expressions
#[derive(Clone, Debug, PartialEq, argus_derive::BoolExpr, derive_more::Display)]
#[display(fmt = "({})", r#"args.iter().map(ToString::to_string).join(") || (")"#r)]
#[display(fmt = "({})", r#"args.iter().map(ToString::to_string).join(") || (")"#)]
pub struct Or {
/// Expressions to be "or"-ed
pub args: Vec<BoolExpr>,

View file

@ -90,7 +90,7 @@ impl_num_expr!(Neg, arg);
/// Arithmetic addition of a list of numeric expressions
#[derive(Clone, Debug, PartialEq, argus_derive::NumExpr, derive_more::Display)]
#[display(fmt = "({})", r#"args.iter().map(ToString::to_string).join(" + ")"#r)]
#[display(fmt = "({})", r#"args.iter().map(ToString::to_string).join(" + ")"#)]
pub struct Add {
/// List of expressions being added
pub args: Vec<NumExpr>,
@ -110,7 +110,7 @@ impl_num_expr!(Sub, lhs, rhs);
/// Arithmetic multiplication of a list of numeric expressions
#[derive(Clone, Debug, PartialEq, argus_derive::NumExpr, derive_more::Display)]
#[display(fmt = "({})", r#"args.iter().map(ToString::to_string).join(" * ")"#r)]
#[display(fmt = "({})", r#"args.iter().map(ToString::to_string).join(" * ")"#)]
pub struct Mul {
/// List of expressions being multiplied
pub args: Vec<NumExpr>,

View file

@ -73,7 +73,7 @@ impl<T> Signal<T> {
}
/// Create a new empty signal with the specified capacity
pub fn new_with_capacity(size: usize) -> Self {
pub fn with_capacity(size: usize) -> Self {
Self::Sampled {
values: Vec::with_capacity(size),
time_points: Vec::with_capacity(size),
@ -168,7 +168,7 @@ impl<T> Signal<T> {
I: IntoIterator<Item = (Duration, T)>,
{
let iter = iter.into_iter();
let mut signal = Signal::new_with_capacity(iter.size_hint().0);
let mut signal = Signal::with_capacity(iter.size_hint().0);
for (time, value) in iter.into_iter() {
signal.push(time, value)?;
}
@ -308,9 +308,12 @@ impl<T> Signal<T> {
.interpolate_at::<Interp>(*t)
.map(|value| Sample { time: *t, value }),
};
let intersect = Interp::find_intersection(&a, &b)
.unwrap_or_else(|| panic!("unable to find intersection for crossing signals"));
return_points.push(intersect.time);
if let Some(intersect) = Interp::find_intersection(&a, &b) {
// There is an intersection
return_points.push(intersect.time);
} else {
// ignore, as the interpolation may not support intersection.
}
}
}
return_points.push(*t);
@ -448,7 +451,7 @@ pub mod arbitrary {
ts.sort_unstable();
ts.dedup();
ts.into_iter()
.map(Duration::from_secs)
.map(Duration::from_millis)
.zip(values.clone())
.collect::<Vec<_>>()
})

View file

@ -124,25 +124,29 @@ macro_rules! interpolate_for_num {
let t2 = second.time.as_secs_f64();
let at = time.as_secs_f64();
assert!((t1..=t2).contains(&at));
// Set t to a value in [0, 1]
let t = (at - t1) / (t2 - t1);
assert!((0.0..=1.0).contains(&t));
// We need to do stable linear interpolation
// https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p0811r3.html
let a: f64 = cast(first.value).unwrap_or_else(|| panic!("unable to cast {:?} to f64", first.value));
let b: f64 = cast(second.value).unwrap_or_else(|| panic!("unable to cast {:?} to f64", second.value));
// Set t to a value in [0, 1]
let t = (at - t1) / (t2 - t1);
assert!((0.0..=1.0).contains(&t));
let val = if (a <= 0.0 && b >= 0.0) || (a >= 0.0 && b <= 0.0) {
t * b + (1.0 - t) * a
} else if t == 1.0 {
b
if !a.is_finite() || !b.is_finite() {
// Cannot do stable interpolation for infinite values, so assume constant
// interpolation
cast(<Constant as InterpolationMethod<$ty>>::at(first, second, time).unwrap())
} else {
a + t * (b - a)
};
cast(val)
let val: f64 = if (a <= 0.0 && b >= 0.0) || (a >= 0.0 && b <= 0.0) {
t * b + (1.0 - t) * a
} else if t == 1.0 {
b
} else {
a + t * (b - a)
};
cast(val)
}
}
fn find_intersection(a: &Neighborhood<$ty>, b: &Neighborhood<$ty>) -> Option<Sample<$ty>> {
@ -169,12 +173,26 @@ macro_rules! interpolate_for_num {
// The lines may be parallel or coincident.
// We just return None
return None;
} else if !denom.is_finite() {
// Assume parallel or coincident because the time of intersection is not finite
return None;
}
let t_top = (((t1 * y2) - (y1 * t2)) * (t3 - t4)) - ((t1 - t2) * (t3 * y4 - y3 * t4));
if !t_top.is_finite() {
// Assume parallel or coincident because the time of intersection is not finite
return None;
}
let y_top = (((t1 * y2) - (y1 * t2)) * (y3 - y4)) - ((y1 - y2) * (t3 * y4 - y3 * t4));
let t = Duration::from_secs_f64(t_top / denom);
let t = Duration::try_from_secs_f64(t_top / denom).unwrap_or_else(|_| {
panic!(
"cannot convert {} / {} = {} to Duration",
t_top,
denom,
t_top / denom
)
});
let y: $ty = num_traits::cast(y_top / denom).unwrap();
Some(Sample { time: t, value: y })
}

View file

@ -75,7 +75,7 @@ pub enum Error {
/// Pushing the new value to the sampled signal makes it not strictly monotonically
/// increasing.
#[error(
"trying to create a non-monotonically signal, signal end time ({end_time:?}) > sample time point \
"trying to create a signal with non-monotonic time points, signal end time ({end_time:?}) > sample time point \
({current_sample:?})"
)]
NonMonotonicSignal {

View file

@ -1,371 +1,26 @@
use std::ops::Bound;
use std::time::Duration;
use super::utils::lemire_minmax::MonoWedge;
use super::Trace;
use crate::core::expr::*;
use crate::core::signals::{InterpolationMethod, SignalPartialOrd};
use crate::semantics::QuantitativeSemantics;
use crate::{ArgusError, ArgusResult, Signal};
use crate::{ArgusResult, Signal};
/// Boolean semantics for Signal Temporal Logic expressionsd define by an [`Expr`].
pub struct BooleanSemantics;
impl BooleanSemantics {
/// Evaluates a [Boolean expression](BoolExpr) given a [`Trace`].
pub fn eval<BoolI, NumI>(expr: &BoolExpr, trace: &impl Trace) -> ArgusResult<Signal<bool>>
pub fn eval<I>(expr: &BoolExpr, trace: &impl Trace) -> ArgusResult<Signal<bool>>
where
BoolI: InterpolationMethod<bool>,
NumI: InterpolationMethod<f64>,
I: InterpolationMethod<f64>,
{
let ret = match expr {
BoolExpr::BoolLit(val) => Signal::constant(val.0),
BoolExpr::BoolVar(BoolVar { name }) => trace
.get::<bool>(name.as_str())
.ok_or(ArgusError::SignalNotPresent)?
.clone(),
BoolExpr::Cmp(Cmp { op, lhs, rhs }) => {
use crate::core::expr::Ordering::*;
let lhs = QuantitativeSemantics::eval_num_expr::<f64, NumI>(lhs, trace)?;
let rhs = QuantitativeSemantics::eval_num_expr::<f64, NumI>(rhs, trace)?;
match op {
Eq => lhs.signal_eq::<NumI>(&rhs).unwrap(),
NotEq => lhs.signal_ne::<NumI>(&rhs).unwrap(),
Less { strict } if *strict => lhs.signal_lt::<NumI>(&rhs).unwrap(),
Less { strict: _ } => lhs.signal_le::<NumI>(&rhs).unwrap(),
Greater { strict } if *strict => lhs.signal_gt::<NumI>(&rhs).unwrap(),
Greater { strict: _ } => lhs.signal_ge::<NumI>(&rhs).unwrap(),
}
}
BoolExpr::Not(Not { arg }) => {
let arg = Self::eval::<BoolI, NumI>(arg, trace)?;
!&arg
}
BoolExpr::And(And { args }) => {
assert!(args.len() >= 2);
args.iter().map(|arg| Self::eval::<BoolI, NumI>(arg, trace)).try_fold(
Signal::const_true(),
|acc, item| {
let item = item?;
Ok(acc.and::<BoolI>(&item))
},
)?
}
BoolExpr::Or(Or { args }) => {
assert!(args.len() >= 2);
args.iter().map(|arg| Self::eval::<BoolI, NumI>(arg, trace)).try_fold(
Signal::const_false(),
|acc, item| {
let item = item?;
Ok(acc.or::<BoolI>(&item))
},
)?
}
BoolExpr::Next(Next { arg }) => {
let arg = Self::eval::<BoolI, NumI>(arg, trace)?;
compute_next(arg)?
}
BoolExpr::Oracle(Oracle { steps, arg }) => {
let arg = Self::eval::<BoolI, NumI>(arg, trace)?;
compute_oracle(arg, *steps)?
}
BoolExpr::Always(Always { arg, interval }) => {
let arg = Self::eval::<BoolI, NumI>(arg, trace)?;
compute_always::<BoolI>(arg, interval)?
}
BoolExpr::Eventually(Eventually { arg, interval }) => {
let arg = Self::eval::<BoolI, NumI>(arg, trace)?;
compute_eventually::<BoolI>(arg, interval)?
}
BoolExpr::Until(Until { lhs, rhs, interval }) => {
let lhs = Self::eval::<BoolI, NumI>(lhs, trace)?;
let rhs = Self::eval::<BoolI, NumI>(rhs, trace)?;
compute_until::<BoolI>(lhs, rhs, interval)?
}
};
Ok(ret)
let rob = QuantitativeSemantics::eval::<I>(expr, trace)?;
Ok(rob.signal_ge::<I>(&Signal::zero()).unwrap())
}
}
fn compute_next(arg: Signal<bool>) -> ArgusResult<Signal<bool>> {
compute_oracle(arg, 1)
}
fn compute_oracle(arg: Signal<bool>, steps: usize) -> ArgusResult<Signal<bool>> {
if steps == 0 {
return Ok(Signal::Empty);
}
match arg {
Signal::Empty => Ok(Signal::Empty),
sig @ Signal::Constant { value: _ } => {
// Just return the signal as is
Ok(sig)
}
Signal::Sampled {
mut values,
mut time_points,
} => {
// TODO(anand): Verify this
// Just shift the signal by `steps` timestamps
assert_eq!(values.len(), time_points.len());
if values.len() <= steps {
return Ok(Signal::Empty);
}
let expected_len = values.len() - steps;
let values = values.split_off(steps);
let _ = time_points.split_off(steps);
assert_eq!(values.len(), expected_len);
assert_eq!(values.len(), time_points.len());
Ok(Signal::Sampled { values, time_points })
}
}
}
/// Compute always for a signal
fn compute_always<I: InterpolationMethod<bool>>(
signal: Signal<bool>,
interval: &Interval,
) -> ArgusResult<Signal<bool>> {
if interval.is_empty() || interval.is_singleton() {
return Err(ArgusError::InvalidInterval {
reason: "interval is either empty or singleton",
});
}
let ret = match signal {
// if signal is empty or constant, return the signal itself.
// This works because if a signal is True everythere, then it must
// "always be true".
sig @ (Signal::Empty | Signal::Constant { value: _ }) => sig,
sig => {
use Bound::*;
if interval.is_singleton() {
// for singleton intervals, return the signal itself.
sig
} else if interval.is_untimed() {
compute_untimed_always(sig)?
} else if let (Included(a), Included(b)) = interval.into() {
compute_timed_always::<I>(sig, *a, Some(*b))?
} else if let (Included(a), Unbounded) = interval.into() {
compute_timed_always::<I>(sig, *a, None)?
} else {
unreachable!("interval should be created using Interval::new, and is_untimed checks this")
}
}
};
Ok(ret)
}
/// Compute timed always for the interval `[a, b]` (or, if `b` is `None`, `[a, ..]`.
fn compute_timed_always<I: InterpolationMethod<bool>>(
signal: Signal<bool>,
a: Duration,
b: Option<Duration>,
) -> ArgusResult<Signal<bool>> {
let z1 = !signal;
let z2 = compute_timed_eventually::<I>(z1, a, b)?;
Ok(!z2)
}
/// Compute untimed always
fn compute_untimed_always(signal: Signal<bool>) -> ArgusResult<Signal<bool>> {
let Signal::Sampled {
mut values,
time_points,
} = signal
else {
unreachable!("we shouldn't be passing non-sampled signals here")
};
// Compute the & in a expanding window fashion from the back
for i in (0..(time_points.len() - 1)).rev() {
values[i] = values[i + 1].min(values[i]);
}
Ok(Signal::Sampled { values, time_points })
}
/// Compute eventually for a signal
fn compute_eventually<I: InterpolationMethod<bool>>(
signal: Signal<bool>,
interval: &Interval,
) -> ArgusResult<Signal<bool>> {
if interval.is_empty() || interval.is_singleton() {
return Err(ArgusError::InvalidInterval {
reason: "interval is either empty or singleton",
});
}
let ret = match signal {
// if signal is empty or constant, return the signal itself.
// This works because if a signal is True everythere, then it must
// "eventually be true".
sig @ (Signal::Empty | Signal::Constant { value: _ }) => sig,
sig => {
use Bound::*;
if interval.is_singleton() {
// for singleton intervals, return the signal itself.
sig
} else if interval.is_untimed() {
compute_untimed_eventually(sig)?
} else if let (Included(a), Included(b)) = interval.into() {
compute_timed_eventually::<I>(sig, *a, Some(*b))?
} else if let (Included(a), Unbounded) = interval.into() {
compute_timed_eventually::<I>(sig, *a, None)?
} else {
unreachable!("interval should be created using Interval::new, and is_untimed checks this")
}
}
};
Ok(ret)
}
/// Compute timed eventually for the interval `[a, b]` (or, if `b` is `None`, `[a,..]`.
fn compute_timed_eventually<I: InterpolationMethod<bool>>(
signal: Signal<bool>,
a: Duration,
b: Option<Duration>,
) -> ArgusResult<Signal<bool>> {
match b {
Some(b) => {
// We want to compute the windowed max/or of the signal.
// The window is dictated by the time duration though.
let Signal::Sampled { values, time_points } = signal else {
unreachable!("we shouldn't be passing non-sampled signals here")
};
assert!(b > a);
assert!(!time_points.is_empty());
let signal_duration = *time_points.last().unwrap() - *time_points.first().unwrap();
let width = if signal_duration < (b - a) {
signal_duration
} else {
b - a
};
let mut ret_vals = Vec::with_capacity(values.len());
// For boolean signals we dont need to worry about intersections with ZERO as much as
// for quantitative signals, as linear interpolation is just a discrte switch.
let mut wedge = MonoWedge::<bool>::max_wedge(width);
for (i, value) in time_points.iter().zip(&values) {
wedge.update((i, value));
if i >= &(time_points[0] + width) {
ret_vals.push(
wedge
.front()
.map(|(_, &v)| (*i - width, v))
.unwrap_or_else(|| panic!("wedge should have at least 1 element")),
)
}
}
Signal::try_from_iter(ret_vals)
}
None => {
// Shift the signal to the left by `a` and then run the untimed eventually.
let shifted = signal.shift_left::<I>(a);
compute_untimed_eventually(shifted)
}
}
}
/// Compute untimed eventually
fn compute_untimed_eventually(signal: Signal<bool>) -> ArgusResult<Signal<bool>> {
let Signal::Sampled {
mut values,
time_points,
} = signal
else {
unreachable!("we shouldn't be passing non-sampled signals here")
};
// Compute the | in a expanding window fashion from the back
for i in (0..(time_points.len() - 1)).rev() {
values[i] = values[i + 1].max(values[i]);
}
Ok(Signal::Sampled { values, time_points })
}
/// Compute until
fn compute_until<I: InterpolationMethod<bool>>(
lhs: Signal<bool>,
rhs: Signal<bool>,
interval: &Interval,
) -> ArgusResult<Signal<bool>> {
let ret = match (lhs, rhs) {
// If either signals are empty, return empty
(sig @ Signal::Empty, _) | (_, sig @ Signal::Empty) => sig,
(lhs, rhs) => {
use Bound::*;
if interval.is_untimed() {
compute_untimed_until::<I>(lhs, rhs)?
} else if let (Included(a), Included(b)) = interval.into() {
compute_timed_until::<I>(lhs, rhs, *a, Some(*b))?
} else if let (Included(a), Unbounded) = interval.into() {
compute_timed_until::<I>(lhs, rhs, *a, None)?
} else {
unreachable!("interval should be created using Interval::new, and is_untimed checks this")
}
}
};
Ok(ret)
}
/// Compute timed until for the interval `[a, b]` (or, if `b` is `None`, `[a, ..]`.
///
/// For this, we will perform the Until rewrite defined in [1]:
/// $$
/// \varphi_1 U_{[a, b]} \varphi_2 = F_{[a,b]} \varphi_2 \land (\varphi_1 U_{[a,
/// \infty)} \varphi_2)
/// $$
///
/// $$
/// \varphi_1 U_{[a, \infty)} \varphi_2 = G_{[0,a]} (\varphi_1 U \varphi_2)
/// $$
///
/// [1]: <> (A. Donzé, T. Ferrère, and O. Maler, "Efficient Robust Monitoring for STL.")
fn compute_timed_until<I: InterpolationMethod<bool>>(
lhs: Signal<bool>,
rhs: Signal<bool>,
a: Duration,
b: Option<Duration>,
) -> ArgusResult<Signal<bool>> {
// First compute eventually [a, b]
let ev_a_b_rhs = compute_timed_eventually::<I>(rhs.clone(), a, b)?;
// Then compute untimed until
let untimed = compute_untimed_until::<I>(lhs, rhs)?;
if a.is_zero() {
Ok(ev_a_b_rhs.and::<I>(&untimed))
} else {
let g_a = compute_timed_always::<I>(untimed, Duration::ZERO, Some(a))?;
Ok(ev_a_b_rhs.and::<I>(&g_a))
}
}
/// Compute untimed until
fn compute_untimed_until<I: InterpolationMethod<bool>>(
lhs: Signal<bool>,
rhs: Signal<bool>,
) -> ArgusResult<Signal<bool>> {
let sync_points = lhs.sync_with_intersection::<I>(&rhs).unwrap();
let mut ret_samples = Vec::with_capacity(sync_points.len());
let expected_len = sync_points.len();
let mut next = false;
for (i, t) in sync_points.into_iter().enumerate().rev() {
let v1 = lhs.interpolate_at::<I>(t).unwrap();
let v2 = rhs.interpolate_at::<I>(t).unwrap();
#[allow(clippy::nonminimal_bool)]
let z = (v1 && v2) || (v1 && next);
if z == next && i < (expected_len - 2) {
ret_samples.pop();
}
ret_samples.push((t, z));
next = z;
}
Signal::<bool>::try_from_iter(ret_samples.into_iter().rev())
}
#[cfg(test)]
mod tests {
use core::time::Duration;
use std::collections::HashMap;
use itertools::assert_equal;
@ -411,7 +66,7 @@ mod tests {
let trace = MyTrace { signals };
let rob = BooleanSemantics::eval::<Linear, Linear>(&spec, &trace).unwrap();
let rob = BooleanSemantics::eval::<Linear>(&spec, &trace).unwrap();
let expected = Signal::from_iter(vec![
(Duration::from_secs_f64(0.0), false),
(Duration::from_secs_f64(0.7), false),
@ -432,18 +87,13 @@ mod tests {
let spec = ctx.make_eventually(cmp);
{
let signals = HashMap::from_iter(vec![(
"a".to_owned(),
Box::new(Signal::from_iter(vec![
(Duration::from_secs_f64(0.0), 2.5),
(Duration::from_secs_f64(0.7), 4.0),
(Duration::from_secs_f64(1.3), -1.0),
(Duration::from_secs_f64(2.1), 1.7),
])) as Box<dyn AnySignal>,
)]);
let trace = MyTrace { signals };
let rob = BooleanSemantics::eval::<Linear, Linear>(&spec, &trace).unwrap();
let rob = run_test_float_time::<Linear, _, _>(
vec![(
"a".to_owned(),
vec![((0.0), 2.5), ((0.7), 4.0), ((1.3), -1.0), ((2.1), 1.7)],
)],
&spec,
);
let Signal::Sampled { values, time_points: _ } = rob else {
panic!("boolean semantics should remain sampled");
@ -451,19 +101,13 @@ mod tests {
assert!(values.into_iter().all(|v| v));
}
{
let signals = HashMap::from_iter(vec![(
"a".to_owned(),
Box::new(Signal::from_iter(vec![
(Duration::from_secs_f64(0.0), 2.5),
(Duration::from_secs_f64(0.7), 4.0),
(Duration::from_secs_f64(1.3), 1.7),
(Duration::from_secs_f64(1.4), 0.0),
(Duration::from_secs_f64(2.1), -2.0),
])) as Box<dyn AnySignal>,
)]);
let trace = MyTrace { signals };
let rob = BooleanSemantics::eval::<Linear, Linear>(&spec, &trace).unwrap();
let rob = run_test_float_time::<Linear, _, _>(
vec![(
"a".to_owned(),
(vec![((0.0), 2.5), ((0.7), 4.0), ((1.3), 1.7), ((1.4), 0.0), ((2.1), -2.0)]),
)],
&spec,
);
println!("{:#?}", rob);
let Signal::Sampled { values, time_points: _ } = rob else {
@ -479,35 +123,115 @@ mod tests {
let Expr::Bool(spec) = crate::parse_str("G(a -> F[0,2] b)").unwrap() else {
panic!("should be bool expr")
};
let signals: HashMap<String, Box<dyn AnySignal>> = (vec![
(
"a".to_owned(),
Box::new(
vec![(1, false), (2, false), (3, false)]
.into_iter()
.map(|(t, v)| (Duration::from_secs(t), v))
.collect::<Signal<bool>>(),
) as Box<dyn AnySignal>,
),
(
"b".to_owned(),
Box::new(
vec![(1, false), (2, true), (3, false)]
.into_iter()
.map(|(t, v)| (Duration::from_secs(t), v))
.collect::<Signal<bool>>(),
) as Box<dyn AnySignal>,
),
])
.into_iter()
.collect();
let trace = MyTrace { signals };
let rob = BooleanSemantics::eval::<Constant, Constant>(&spec, &trace).unwrap();
let rob = run_test_float_time::<Constant, _, bool>(
vec![
("a".to_owned(), vec![(1.0, false), (2.0, false), (3.0, false)]),
("b".to_owned(), vec![(1.0, false), (2.0, true), (3.0, false)]),
],
&spec,
);
let Signal::Sampled { values, time_points: _ } = rob else {
panic!("boolean semantics should remain sampled");
};
assert!(values.into_iter().all(|v| v));
}
#[test]
fn smoketest_2() {
let Expr::Bool(spec) = crate::parse_str("a").unwrap() else {
panic!("should be bool expr")
};
let rob = run_test_float_time::<Linear, _, bool>(
vec![
("a".to_owned(), vec![(0.0, false), (196.864, true), (12709.888, true)]),
("b".to_owned(), vec![(0.0, false), (196.864, false), (12709.888, false)]),
],
&spec,
);
let Signal::Sampled { values, time_points: _ } = rob else {
panic!("boolean semantics should remain sampled");
};
assert_eq!(values, vec![false, true, true]);
}
#[test]
fn smoketest_3() {
// {
// sample_lists=[
// [(0.0, False), (0.001, False), (2.001, False)],
// [(0.0, False), (0.001, False), (2.001, False)]
// ],
// spec='F[0,2] a',
// interpolation_method='constant',
// }
let Expr::Bool(spec) = crate::parse_str("F[0,2] a").unwrap() else {
panic!("should be bool expr")
};
let rob = run_test_float_time::<Linear, _, bool>(
vec![
("a".to_owned(), vec![(0.0, false), (0.001, false), (2.002, false)]),
("b".to_owned(), vec![(0.0, false), (0.001, false), (2.002, false)]),
],
&spec,
);
let Signal::Sampled { values, time_points: _ } = rob else {
panic!("boolean semantics should remain sampled");
};
assert!(values.into_iter().all(|v| !v));
}
#[test]
fn smoketest_4() {
// {
// sample_lists = [
// [(0.0, False), (0.001, False), (4.002, False)],
// [(0.0, False), (0.001, False), (4.002, False)]
// ],
// spec = 'F[0,2] a',
// interpolation_method = 'constant'
// }
let Expr::Bool(spec) = crate::parse_str("F[0,2] a").unwrap() else {
panic!("should be bool expr")
};
let rob = run_test_float_time::<Linear, _, bool>(
vec![
("a".to_owned(), vec![(0.0, false), (0.001, false), (4.002, false)]),
("b".to_owned(), vec![(0.0, false), (0.001, false), (4.002, false)]),
],
&spec,
);
let Signal::Sampled { values, time_points: _ } = rob else {
panic!("boolean semantics should remain sampled");
};
assert!(values.into_iter().all(|v| !v));
}
fn run_test_float_time<Interp, I, T>(signals: I, spec: &BoolExpr) -> Signal<bool>
where
I: IntoIterator<Item = (String, Vec<(f64, T)>)>,
T: Copy + core::fmt::Debug + 'static,
Interp: InterpolationMethod<f64>,
{
let signals: HashMap<String, Box<dyn AnySignal>> = signals
.into_iter()
.map(|(name, samples)| {
(
name,
Box::new(
samples
.into_iter()
.map(|(t, v)| (Duration::from_secs_f64(t), v))
.collect::<Signal<T>>(),
) as Box<dyn AnySignal>,
)
})
.collect();
let trace = MyTrace { signals };
BooleanSemantics::eval::<Interp>(spec, &trace).unwrap()
}
}

View file

@ -1,6 +1,7 @@
use std::ops::Bound;
use std::time::Duration;
use itertools::Itertools;
use num_traits::{Num, NumCast};
use super::utils::lemire_minmax::MonoWedge;
@ -270,44 +271,69 @@ fn compute_timed_eventually<I: InterpolationMethod<f64>>(
a: Duration,
b: Option<Duration>,
) -> ArgusResult<Signal<f64>> {
let time_points = signal.time_points().unwrap().into_iter().copied().collect_vec();
let start_time = time_points.first().copied().unwrap();
let end_time = time_points.last().copied().unwrap();
// Shift the signal to the left by `a`, and interpolate at the time points.
let shifted = signal.shift_left::<I>(a);
let signal: Signal<f64> = time_points
.iter()
.map(|&t| (t, shifted.interpolate_at::<I>(t).unwrap()))
.collect();
match b {
Some(b) => {
// We want to compute the windowed max/or of the signal.
// The window is dictated by the time duration though.
let Signal::Sampled { values, time_points } = signal else {
unreachable!("we shouldn't be passing non-sampled signals here")
};
Some(b) if end_time - start_time < (b - a) => {
assert!(b > a);
assert!(!time_points.is_empty());
let signal_duration = *time_points.last().unwrap() - *time_points.first().unwrap();
let width = if signal_duration < (b - a) {
signal_duration
} else {
b - a
};
let mut ret_vals = Vec::with_capacity(values.len());
// For boolean signals we dont need to worry about intersections with ZERO as much as
// for quantitative signals, as linear interpolation is just a discrte switch.
let mut wedge = MonoWedge::<f64>::max_wedge(width);
for (i, value) in time_points.iter().zip(&values) {
wedge.update((i, value));
let time_points = signal
.sync_with_intersection::<I>(&Signal::zero())
.expect("Non-empty time points as we shouldn't be passing non-sampled signals here.");
let time_points_plus_b = time_points.iter().map(|t| end_time.min(*t + b)).collect_vec();
let time_points_minus_b = time_points
.iter()
.map(|t| start_time.max(t.saturating_sub(b)))
.collect_vec();
let time_points: Vec<Duration> = itertools::kmerge([time_points, time_points_plus_b, time_points_minus_b])
.dedup()
.collect();
assert!(!time_points.is_empty());
let width = b - a;
let mut ret_vals = Signal::<f64>::with_capacity(time_points.len());
let mut wedge = MonoWedge::<f64>::max_wedge();
let mut j: usize = 0;
for i in &time_points {
let value = signal
.interpolate_at::<I>(*i)
.expect("signal should be well defined at this point");
wedge.purge_before(i.saturating_sub(width));
wedge.update((*i, value));
if i >= &(time_points[0] + width) {
ret_vals.push(
wedge
.front()
.map(|(_, &v)| (*i - width, v))
.unwrap_or_else(|| panic!("wedge should have at least 1 element")),
)
let (new_t, new_v) = wedge
.front()
.map(|(&t, &v)| (t, v))
.unwrap_or_else(|| panic!("wedge should have at least 1 element"));
ret_vals.push(new_t, new_v)?;
j += 1;
}
}
Signal::try_from_iter(ret_vals)
}
None => {
// Shift the signal to the left by `a` and then run the untimed eventually.
let shifted = signal.shift_left::<I>(a);
compute_untimed_eventually::<I>(shifted)
// Get the rest of the values
for i in &time_points[j..] {
wedge.purge_before(*i);
let (t, val) = wedge
.front()
.map(|(&t, &v)| (t, v))
.unwrap_or_else(|| panic!("wedge should have at least 1 element"));
assert_eq!(
t, *i,
"{:?} != {:?}\n\ttime_points = {:?}, wedge.time_points = {:?}\n\tret_vals = {:?}",
t, i, time_points, wedge.time_points, ret_vals,
);
ret_vals.push(*i, val)?;
}
Ok(ret_vals)
}
_ => compute_untimed_eventually::<I>(shifted),
}
}

View file

@ -10,82 +10,169 @@
// TODO: Make a MonoWedge iterator adapter.
use std::collections::VecDeque;
use std::collections::{BTreeSet, VecDeque};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct MonoWedge<'a, T> {
window: VecDeque<(&'a Duration, &'a T)>,
duration: Duration,
pub struct MonoWedge<T> {
window: VecDeque<(Duration, T)>,
pub(crate) time_points: BTreeSet<Duration>,
cmp: fn(&T, &T) -> bool,
}
impl<'a, T> MonoWedge<'a, T> {
pub fn new(duration: Duration, cmp: fn(&T, &T) -> bool) -> Self {
impl<T> MonoWedge<T> {
pub fn new(cmp: fn(&T, &T) -> bool) -> Self {
Self {
window: Default::default(),
time_points: Default::default(),
cmp,
duration,
}
}
}
impl<'a, T> MonoWedge<'a, T> {
pub fn update(&mut self, sample: (&'a Duration, &'a T)) {
impl<T> MonoWedge<T> {
pub fn update(&mut self, sample: (Duration, T)) {
assert!(
self.window.back().map_or(true, |v| v.0 < sample.0),
"MonoWedge window samples don't have monotonic time"
);
// Find the index to partition the inner queue based on the comparison function.
let cmp_idx = self.window.partition_point(|a| (self.cmp)(a.1, sample.1));
let cmp_idx = self.window.partition_point(|a| (self.cmp)(&a.1, &sample.1));
assert!(cmp_idx <= self.window.len());
// And delete all items in the second partition.
let _ = self.window.split_off(cmp_idx);
// Clear all older values
while let Some(item) = self.window.front() {
if *sample.0 > self.duration + *item.0 {
let _ = self.pop_front();
} else {
break;
}
}
// Add new time point
self.time_points.insert(sample.0);
// Add the new value
self.window.push_back(sample);
}
pub fn front(&self) -> Option<(&'a Duration, &'a T)> {
self.window.front().copied()
pub fn front(&self) -> Option<(&Duration, &T)> {
Some((self.time_points.first()?, &self.window.front()?.1))
}
pub fn pop_front(&mut self) -> Option<(&'a Duration, &'a T)> {
self.window.pop_front()
fn remove_older_samples(&mut self, t: Duration) {
while let Some(item) = self.window.front() {
if item.0 < t {
let _ = self.window.pop_front();
} else {
break;
}
}
}
pub fn purge_before(&mut self, t: Duration) {
self.time_points = self.time_points.split_off(&t);
self.remove_older_samples(t);
}
}
impl<'a, T> MonoWedge<'a, T>
impl<T> MonoWedge<T>
where
T: PartialOrd,
{
#[allow(dead_code)]
pub fn min_wedge(duration: Duration) -> Self {
Self::new(duration, T::lt)
pub fn min_wedge() -> Self {
Self::new(T::lt)
}
pub fn max_wedge(duration: Duration) -> Self {
Self::new(duration, T::gt)
pub fn max_wedge() -> Self {
Self::new(T::gt)
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use proptest::prelude::*;
use super::*;
fn run_test_min_max<T>(values: Vec<T>, width: usize)
where
T: Copy + Clone + core::cmp::PartialOrd + Ord + std::fmt::Debug,
{
// NOTE: When we convert the width from usize to Duration, the window becomes inclusive.
let expected_mins: Vec<T> = (0..values.len())
.map(|i| {
let j = usize::min(i + width + 1, values.len());
values[i..j].iter().min().copied().unwrap()
})
.collect();
assert_eq!(expected_mins.len(), values.len());
let expected_maxs: Vec<T> = (0..values.len())
.map(|i| {
let j = usize::min(i + width + 1, values.len());
values[i..j].iter().max().copied().unwrap()
})
.collect();
assert_eq!(expected_maxs.len(), values.len());
let time_points: Vec<Duration> = (0..values.len()).map(|i| Duration::from_millis(i as u64)).collect();
let start_time: Duration = time_points.first().copied().unwrap();
let width = Duration::from_millis(width as u64);
let mut min_wedge = MonoWedge::<T>::min_wedge();
let mut max_wedge = MonoWedge::<T>::max_wedge();
let mut ret_mins = Vec::with_capacity(time_points.len());
let mut ret_maxs = Vec::with_capacity(time_points.len());
let mut j: usize = 0;
// Now we do the actual updates
for (i, value) in time_points.iter().zip(&values) {
min_wedge.purge_before(i.saturating_sub(width));
min_wedge.update((*i, *value));
max_wedge.purge_before(i.saturating_sub(width));
max_wedge.update((*i, *value));
if width <= *i - start_time {
ret_mins.push(
min_wedge
.front()
.map(|(&t, &v)| (t, v))
.unwrap_or_else(|| panic!("min_wedge should have at least 1 element")),
);
ret_maxs.push(
max_wedge
.front()
.map(|(&t, &v)| (t, v))
.unwrap_or_else(|| panic!("max_wedge should have at least 1 element")),
);
j += 1;
}
}
assert_eq!(j, ret_mins.len());
assert_eq!(j, ret_maxs.len());
assert!(j <= time_points.len());
for i in &time_points[j..] {
min_wedge.purge_before(*i);
let min = min_wedge
.front()
.map(|(&t, &v)| (t, v))
.unwrap_or_else(|| panic!("min_wedge should have at least 1 element"));
assert_eq!(min.0, *i);
ret_mins.push(min);
max_wedge.purge_before(*i);
let max = max_wedge
.front()
.map(|(&t, &v)| (t, v))
.unwrap_or_else(|| panic!("max_wedge should have at least 1 element"));
assert_eq!(max.0, *i);
ret_maxs.push(max);
}
assert_eq!(time_points, ret_mins.iter().map(|s| s.0).collect_vec());
assert_eq!(time_points, ret_maxs.iter().map(|s| s.0).collect_vec());
let ret_mins: Vec<_> = ret_mins.into_iter().map(|s| s.1).collect();
let ret_maxs: Vec<_> = ret_maxs.into_iter().map(|s| s.1).collect();
assert_eq!(expected_mins, ret_mins, "window min incorrect");
assert_eq!(expected_maxs, ret_maxs, "window max incorrect");
}
prop_compose! {
fn vec_and_window()(vec in prop::collection::vec(any::<u64>(), 3..100))
(window_size in 2..vec.len(), vec in Just(vec))
@ -97,34 +184,29 @@ mod tests {
proptest! {
#[test]
fn test_rolling_minmax((vec, width) in vec_and_window()) {
// NOTE: When we convert the width from usize to Duration, the window becomes inclusive.
let expected_mins: Vec<u64> = vec.as_slice().windows(width + 1).map(|w| w.iter().min().unwrap_or_else(|| panic!("slice should have min: {:?}", w))).copied().collect();
assert_eq!(expected_mins.len(), vec.len() - width);
let expected_maxs: Vec<u64> = vec.as_slice().windows(width + 1).map(|w| w.iter().max().unwrap_or_else(|| panic!("slice should have max: {:?}", w))).copied().collect();
assert_eq!(expected_maxs.len(), vec.len() - width);
let time_points: Vec<Duration> = (0..vec.len()).map(|i| Duration::from_secs(i as u64)).collect();
let width = Duration::from_secs(width as u64);
let mut min_wedge = MonoWedge::<u64>::min_wedge(width);
let mut max_wedge = MonoWedge::<u64>::max_wedge(width);
let mut ret_mins = Vec::with_capacity(expected_mins.len());
let mut ret_maxs = Vec::with_capacity(expected_maxs.len());
// Now we do the actual updates
for (i, value) in time_points.iter().zip(&vec) {
min_wedge.update((i, value));
max_wedge.update((i, value));
if i >= &(time_points[0] + width) {
ret_mins.push(min_wedge.front().unwrap_or_else(|| panic!("min_wedge should have at least 1 element")));
ret_maxs.push(max_wedge.front().unwrap_or_else(|| panic!("max_wedge should have at least 1 element")))
}
}
let ret_mins: Vec<_> = ret_mins.into_iter().map(|s| s.1).copied().collect();
let ret_maxs: Vec<_> = ret_maxs.into_iter().map(|s| s.1).copied().collect();
assert_eq!(expected_mins, ret_mins, "window min incorrect");
assert_eq!(expected_maxs, ret_maxs, "window max incorrect");
run_test_min_max(vec, width)
}
}
#[test]
fn smoketest_1() {
let vec: Vec<u64> = vec![
14978539203261649134,
16311637665202408393,
14583675943388486036,
1550360951880186785,
14850777793052200443,
];
let width: usize = 2;
run_test_min_max(vec, width)
}
#[test]
fn smoketest_2() {
let vec: Vec<u64> = vec![0, 0, 0];
let width: usize = 2;
run_test_min_max(vec, width)
}
}

View file

@ -6,21 +6,11 @@ from pathlib import Path
import nox
import nox.registry
MICROMAMBA = shutil.which("micromamba")
CURRENT_DIR = Path(__file__).parent.resolve()
TARGET_DIR = CURRENT_DIR / "target"
COVERAGE_DIR = TARGET_DIR / "debug/coverage"
if MICROMAMBA:
BIN_DIR = CURRENT_DIR / "build" / "bin"
BIN_DIR.mkdir(exist_ok=True, parents=True)
CONDA = BIN_DIR / "conda"
if not CONDA.is_file():
CONDA.hardlink_to(MICROMAMBA)
os.environ["PATH"] = os.pathsep.join([str(BIN_DIR), os.environ["PATH"]])
nox.options.default_venv_backend = "conda"
elif shutil.which("mamba"):
if shutil.which("mamba"):
nox.options.default_venv_backend = "mamba"
else:
nox.options.default_venv_backend = "conda"
@ -141,37 +131,41 @@ def mypy(session: nox.Session):
# )
@nox.session(python=PYTHONS)
def tests(session: nox.Session):
session.conda_install("pytest", "hypothesis", "lark")
@nox.session(python=False)
def rust_tests(session: nox.Session) -> None:
session.env.update(ENV)
try:
session.run(
"cargo",
"test",
"--release",
"--workspace",
"--exclude",
"pyargus",
external=True,
)
except Exception:
...
try:
session.run(
"maturin",
"develop",
"--release",
"-m",
"./pyargus/Cargo.toml",
"-E",
"test",
silent=True,
)
with session.chdir(CURRENT_DIR / "pyargus"):
session.run("pytest", ".", "--hypothesis-explain")
except Exception:
...
session.run(
"cargo",
"test",
"--release",
"--workspace",
"--exclude",
"pyargus",
external=True,
)
@nox.session(python=PYTHONS)
def python_tests(session: nox.Session) -> None:
session.conda_install("pytest", "hypothesis", "lark", "maturin")
session.run(
"maturin",
"develop",
"--release",
"-m",
"./pyargus/Cargo.toml",
"-E",
"test",
silent=True,
)
with session.chdir(CURRENT_DIR / "pyargus"):
session.run("pytest", ".", "--hypothesis-explain")
@nox.session(python=False)
def tests(session: nox.Session):
session.notify("rust_tests")
session.notify("python_tests")
@nox.session(python=DEFAULT_PYTHON)

View file

@ -1,6 +1,6 @@
[package]
name = "pyargus"
version = "0.1.3"
version = "0.1.4"
authors.workspace = true
license.workspace = true

View file

@ -1,6 +1,6 @@
from typing import ClassVar, Literal, TypeAlias, TypeVar, final
from typing import ClassVar, Literal, TypeVar, final
from typing_extensions import Generic, Self
from typing_extensions import Generic, Self, TypeAlias
__version__: str

View file

@ -47,11 +47,11 @@ def gen_samples(
n_lists = max(1, n_lists)
timestamps = draw(
st.lists(
st.integers(min_value=0, max_value=2**32 - 1),
st.integers(min_value=0, max_value=2**32 - 1).map(lambda i: i / 1000),
unique=True,
min_size=min_size,
max_size=max_size,
).map(lambda t: list(map(float, sorted(set(t)))))
).map(lambda t: list(sorted(set(t))))
)
elements = gen_element_fn(dtype_)

View file

@ -52,7 +52,7 @@ features = ["pyo3/extension-module"]
module-name = "argus._argus"
[tool.pytest.ini_options]
addopts = "--import-mode=importlib --doctest-glob=../docs/*.rst --doctest-glob=../docs/**/*.rst"
addopts = "--import-mode=importlib --doctest-glob=../docs/*.rst --doctest-glob=../docs/**/*.rst --hypothesis-explain"
testpaths = ["tests"]
[tool.mypy]
@ -63,6 +63,9 @@ show_error_codes = true
[[tool.mypy.overrides]]
module = "mtl"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "rtamt"
ignore_missing_imports = true
[tool.ruff]

View file

@ -58,10 +58,8 @@ impl Trace for PyTrace {
fn eval_bool_semantics(expr: &PyBoolExpr, trace: &PyTrace, interpolation_method: &str) -> PyResult<Py<BoolSignal>> {
let interp = PyInterp::from_str(interpolation_method)?;
let sig = match interp {
PyInterp::Linear => BooleanSemantics::eval::<Linear, Linear>(&expr.0, trace).map_err(PyArgusError::from)?,
PyInterp::Constant => {
BooleanSemantics::eval::<Constant, Constant>(&expr.0, trace).map_err(PyArgusError::from)?
}
PyInterp::Linear => BooleanSemantics::eval::<Linear>(&expr.0, trace).map_err(PyArgusError::from)?,
PyInterp::Constant => BooleanSemantics::eval::<Constant>(&expr.0, trace).map_err(PyArgusError::from)?,
};
Python::with_gil(|py| Py::new(py, (BoolSignal, PySignal::new(sig, interp))))
}

View file

@ -237,7 +237,7 @@ macro_rules! impl_signals {
// if it is an empty signal, make it sampled. Otherwise, throw an error.
let signal: &mut Signal<$ty> = match signal {
Signal::Empty => {
super_.signal = Signal::<$ty>::new_with_capacity(1).into();
super_.signal = Signal::<$ty>::with_capacity(1).into();
(&mut super_.signal).try_into().unwrap()
}
_ => signal,

View file

@ -1,13 +1,26 @@
from typing import List, Tuple
from typing import List, Literal, Tuple
import hypothesis.strategies as st
import mtl
import rtamt
from hypothesis import given
import argus
from argus.test_utils.signals_gen import gen_samples
def _create_rtamt_spec(spec: str) -> rtamt.StlDenseTimeSpecification:
rtamt_spec = rtamt.StlDenseTimeSpecification()
rtamt_spec.name = "STL Spec"
rtamt_spec.declare_var("x", "float")
rtamt_spec.declare_var("y", "float")
rtamt_spec.add_sub_spec("a = (x > 0)")
rtamt_spec.add_sub_spec("b = (y > 0)")
rtamt_spec.spec = spec
rtamt_spec.parse()
return rtamt_spec
@given(
sample_lists=gen_samples(min_size=3, max_size=50, dtype_=bool, n_lists=2),
spec=st.one_of(
@ -24,10 +37,12 @@ from argus.test_utils.signals_gen import gen_samples
]
]
),
interpolation_method=st.one_of(st.just("constant"), st.just("linear")),
)
def test_boolean_propositional_expr(
sample_lists: List[List[Tuple[float, bool]]],
spec: str,
interpolation_method: Literal["linear", "constant"],
) -> None:
mtl_spec = mtl.parse(spec)
argus_spec = argus.parse_expr(spec)
@ -37,13 +52,13 @@ def test_boolean_propositional_expr(
mtl_data = dict(a=a, b=b)
argus_data = argus.Trace(
dict(
a=argus.BoolSignal.from_samples(a, interpolation_method="constant"),
b=argus.BoolSignal.from_samples(b, interpolation_method="constant"),
a=argus.BoolSignal.from_samples(a, interpolation_method=interpolation_method),
b=argus.BoolSignal.from_samples(b, interpolation_method=interpolation_method),
)
)
mtl_rob = mtl_spec(mtl_data, quantitative=False)
argus_rob = argus.eval_bool_semantics(argus_spec, argus_data)
argus_rob = argus.eval_bool_semantics(argus_spec, argus_data, interpolation_method=interpolation_method)
assert mtl_rob == argus_rob.at(0), f"{argus_rob=}"
@ -54,36 +69,43 @@ def test_boolean_propositional_expr(
[
st.just(spec)
for spec in [
"F a",
"G b",
"F[0,2] a",
"G[0,2] b",
"F[0,10] a",
"G[0,10] b",
"(G(a & b))",
"(F(a | b))",
"G(a -> F[0,2] b)",
# FIXME: `mtl` doesn't contract the signal domain for F[0,2] so it fails if a is True and b is False in the
# last sample point.
# "G(a -> F[0,2] b)",
"G(a -> F b)",
"(G F a -> F G b)",
"(a U b)",
"(a U[0,2] b)",
]
]
),
interpolation_method=st.one_of(st.just("constant"), st.just("linear")),
)
def test_boolean_temporal_expr(
sample_lists: List[List[Tuple[float, bool]]],
spec: str,
interpolation_method: Literal["linear", "constant"],
) -> None:
mtl_spec = mtl.parse(spec)
argus_spec = argus.parse_expr(spec)
assert isinstance(argus_spec, argus.BoolExpr)
a = sample_lists[0]
b = sample_lists[1]
a, b = sample_lists
mtl_data = dict(a=a, b=b)
argus_data = argus.Trace(
dict(
a=argus.BoolSignal.from_samples(a, interpolation_method="constant"),
b=argus.BoolSignal.from_samples(b, interpolation_method="constant"),
a=argus.BoolSignal.from_samples(a, interpolation_method=interpolation_method),
b=argus.BoolSignal.from_samples(b, interpolation_method=interpolation_method),
)
)
mtl_rob = mtl_spec(mtl_data, quantitative=False)
argus_rob = argus.eval_bool_semantics(argus_spec, argus_data)
argus_rob = argus.eval_bool_semantics(argus_spec, argus_data, interpolation_method=interpolation_method)
assert mtl_rob == argus_rob.at(0), f"{argus_rob=}"

View file

@ -39,9 +39,9 @@ def test_correctly_create_signals(data: st.DataObject) -> None:
actual_end_time = signal.end_time
assert actual_start_time is not None
assert actual_start_time == expected_start_time
assert actual_start_time == pytest.approx(expected_start_time)
assert actual_end_time is not None
assert actual_end_time == expected_end_time
assert actual_end_time == pytest.approx(expected_end_time)
a = data.draw(draw_index(xs))
assert a < len(xs)
@ -49,7 +49,10 @@ def test_correctly_create_signals(data: st.DataObject) -> None:
actual_val = signal.at(at) # type: ignore
assert actual_val is not None
assert actual_val == expected_val
if isinstance(actual_val, float):
assert actual_val == pytest.approx(expected_val)
else:
assert actual_val == expected_val
# generate one more sample
new_time = actual_end_time + 1
@ -58,7 +61,10 @@ def test_correctly_create_signals(data: st.DataObject) -> None:
get_val = signal.at(new_time)
assert get_val is not None
assert get_val == new_value
if isinstance(get_val, float):
assert get_val == pytest.approx(new_value)
else:
assert get_val == new_value
else:
assert signal.is_empty()
@ -81,7 +87,7 @@ def test_signal_create_should_fail(data: st.DataObject) -> None:
# Swap two indices in the samples
xs[b], xs[a] = xs[a], xs[b] # type: ignore
with pytest.raises(RuntimeError, match=r"trying to create a non-monotonically signal.+"):
with pytest.raises(RuntimeError, match=r"trying to create a signal with non-monotonic time points.+"):
_ = sampled_signal(xs, dtype_) # type: ignore