feat(argus-semantics): implement efficient streaming MinMax

This commit is contained in:
Anand Balakrishnan 2023-08-28 13:02:51 -07:00
parent 4084bb738b
commit 86cef692dc
No known key found for this signature in database
8 changed files with 353 additions and 162 deletions

View file

@ -0,0 +1,7 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 658412106dcfc0e507ce8c292021a7c519550eb07ac47b2fd459df14c8ffce31 # shrinks to (vec, width) = ([0, 0, 0], 2)

View file

@ -3,71 +3,10 @@
//! In this crate, we are predominantly concerned with the monitoring of _offline system //! In this crate, we are predominantly concerned with the monitoring of _offline system
//! traces_, i.e., a collection of signals that have been extracted from observing and //! traces_, i.e., a collection of signals that have been extracted from observing and
//! sampling from some system. //! sampling from some system.
use argus_core::expr::BoolExpr;
use argus_core::signals::Signal;
use argus_core::ArgusResult;
pub mod eval; // pub mod eval;
pub mod semantics; // pub mod semantics;
// pub mod traits;
pub mod utils; pub mod utils;
pub use semantics::boolean::BooleanSemantics; // pub use traits::{BooleanSemantics, QuantitativeSemantics, Trace};
pub use semantics::quantitative::QuantitativeSemantics;
/// A trace is a collection of signals
///
/// # Example
///
/// An example of a `Trace` may be:
///
/// ```rust
/// use argus_core::signals::{Signal, AnySignal};
/// use argus_semantics::Trace;
///
/// struct MyTrace {
/// x: Signal<bool>,
/// y: Signal<i64>,
/// }
///
/// impl Trace for MyTrace {
/// fn signal_names(&self) -> Vec<&str> {
/// vec!["x", "y"]
/// }
///
/// fn get<T: 'static>(&self, name: &str) -> Option<&Signal<T>> {
/// let sig: &dyn AnySignal = match name {
/// "x" => &self.x,
/// "y" => &self.y,
/// _ => return None,
/// };
/// sig.as_any().downcast_ref::<Signal<T>>()
/// }
/// }
///
/// let trace = MyTrace {
/// x: Signal::constant(true),
/// y: Signal::constant(2),
/// };
/// let names = trace.signal_names();
///
/// assert!(names == &["x", "y"] || names == &["y", "x"]);
/// assert!(matches!(trace.get::<bool>("x"), Some(Signal::Constant { value: true })));
/// assert!(matches!(trace.get::<i64>("y"), Some(Signal::Constant { value: 2 })));
/// ```
pub trait Trace {
/// Get the list of signal names contained within the trace.
fn signal_names(&self) -> Vec<&str>;
/// Query a signal using its name
fn get<T: 'static>(&self, name: &str) -> Option<&Signal<T>>;
}
/// General interface for defining semantics for the [`argus-core`](argus_core) logic.
pub trait Semantics {
/// The output of applying the given semantics to an expression and trace.
type Output;
/// Any additional possible context that can be passed to the semantics evaluator.
type Context;
fn eval(expr: &BoolExpr, trace: &impl Trace, ctx: Self::Context) -> ArgusResult<Self::Output>;
}

View file

@ -1,31 +1,51 @@
use std::ops::Bound; use std::ops::Bound;
use std::time::Duration; use std::time::Duration;
use argus_core::expr::{Always, And, BoolVar, Cmp, Eventually, Interval, Next, Not, Or, Oracle, Until}; use argus_core::expr::*;
use argus_core::prelude::*; use argus_core::prelude::*;
use argus_core::signals::SignalPartialOrd;
use crate::eval::eval_num_expr; use crate::traits::{BooleanSemantics, QuantitativeSemantics, Trace};
use crate::{Semantics, Trace};
/// Boolean semantics of Argus expressions impl BooleanSemantics for BoolExpr {
pub struct BooleanSemantics; fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>> {
match self {
impl Semantics for BooleanSemantics { BoolExpr::BoolLit(sig) => BooleanSemantics::eval(sig, trace),
type Output = Signal<bool>; BoolExpr::BoolVar(sig) => BooleanSemantics::eval(sig, trace),
type Context = (); BoolExpr::Cmp(sig) => BooleanSemantics::eval(sig, trace),
BoolExpr::Not(sig) => BooleanSemantics::eval(sig, trace),
fn eval(expr: &BoolExpr, trace: &impl Trace, ctx: Self::Context) -> ArgusResult<Self::Output> { BoolExpr::And(sig) => BooleanSemantics::eval(sig, trace),
match expr { BoolExpr::Or(sig) => BooleanSemantics::eval(sig, trace),
BoolExpr::BoolLit(val) => Ok(Signal::constant(val.0)), BoolExpr::Next(sig) => BooleanSemantics::eval(sig, trace),
BoolExpr::BoolVar(BoolVar { name }) => { BoolExpr::Oracle(sig) => BooleanSemantics::eval(sig, trace),
trace.get(name.as_str()).cloned().ok_or(ArgusError::SignalNotPresent) BoolExpr::Always(sig) => BooleanSemantics::eval(sig, trace),
BoolExpr::Eventually(sig) => BooleanSemantics::eval(sig, trace),
BoolExpr::Until(sig) => BooleanSemantics::eval(sig, trace),
} }
BoolExpr::Cmp(Cmp { op, lhs, rhs }) => { }
}
impl BooleanSemantics for BoolLit {
fn eval(&self, _trace: &impl Trace) -> ArgusResult<Signal<bool>> {
Ok(Signal::constant(self.0))
}
}
impl BooleanSemantics for BoolVar {
fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>> {
trace
.get(self.name.as_str())
.cloned()
.ok_or(ArgusError::SignalNotPresent)
}
}
impl BooleanSemantics for Cmp {
fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>> {
use argus_core::expr::Ordering::*; use argus_core::expr::Ordering::*;
let lhs = eval_num_expr::<f64>(lhs, trace)?;
let rhs = eval_num_expr::<f64>(rhs, trace)?; let lhs = QuantitativeSemantics::eval(self.lhs.as_ref(), trace)?;
let ret = match op { let rhs = QuantitativeSemantics::eval(self.rhs.as_ref(), trace)?;
let ret = match self.op {
Eq => lhs.signal_eq(&rhs), Eq => lhs.signal_eq(&rhs),
NotEq => lhs.signal_ne(&rhs), NotEq => lhs.signal_ne(&rhs),
Less { strict } if *strict => lhs.signal_lt(&rhs), Less { strict } if *strict => lhs.signal_lt(&rhs),
@ -35,59 +55,76 @@ impl Semantics for BooleanSemantics {
}; };
ret.ok_or(ArgusError::InvalidOperation) ret.ok_or(ArgusError::InvalidOperation)
} }
BoolExpr::Not(Not { arg }) => {
let arg = Self::eval(arg, trace, ctx)?;
Ok(!&arg)
} }
BoolExpr::And(And { args }) => {
impl BooleanSemantics for Not {
fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>> {
let arg = BooleanSemantics::eval(self.arg.as_ref(), trace)?;
Ok(arg.not())
}
}
impl BooleanSemantics for And {
fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>> {
let mut ret = Signal::constant(true); let mut ret = Signal::constant(true);
for arg in args.iter() { for arg in self.args.iter() {
let arg = Self::eval(arg, trace, ctx)?; let arg = Self::eval(arg, trace)?;
ret = &ret & &arg; ret = ret.and(&arg);
} }
Ok(ret)
} }
BoolExpr::Or(Or { args }) => {
let mut ret = Signal::constant(false);
for arg in args.iter() {
let arg = Self::eval(arg, trace, ctx)?;
ret = &ret | &arg;
} }
Ok(ret)
impl BooleanSemantics for Or {
fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>> {
let mut ret = Signal::constant(true);
for arg in self.args.iter() {
let arg = Self::eval(arg, trace)?;
ret = ret.or(&arg);
} }
BoolExpr::Next(Next { arg }) => { }
let arg = Self::eval(arg, trace, ctx)?; }
fn compute_next(arg: Signal<bool>) -> ArgusResult<Signal<bool>> {
match arg {
Signal::Empty => Ok(Signal::Empty),
sig @ Signal::Constant { value: _ } => {
// Just return the signal as is
Ok(sig)
}
Signal::Sampled {
mut values,
mut time_points,
} => {
// TODO(anand): Verify this
// Just shift the signal by 1 timestamp
assert!(values.len() == time_points.len());
if values.len() <= 1 {
return Ok(Signal::Empty);
}
values.remove(0);
time_points.pop();
Ok(Signal::Sampled { values, time_points })
}
}
}
impl BooleanSemantics for Next {
fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>> {
let arg = BooleanSemantics::eval(self.arg.as_ref(), trace)?;
compute_next(arg) compute_next(arg)
} }
BoolExpr::Oracle(Oracle { steps, arg }) => {
let arg = Self::eval(arg, trace, ctx)?;
compute_oracle(arg, *steps)
}
BoolExpr::Always(Always { arg, interval }) => {
let arg = Self::eval(arg, trace, ctx)?;
compute_always(arg, interval)
}
BoolExpr::Eventually(Eventually { arg, interval }) => {
let arg = Self::eval(arg, trace, ctx)?;
compute_eventually(arg, interval)
}
BoolExpr::Until(Until { lhs, rhs, interval }) => {
let lhs = Self::eval(lhs, trace, ctx)?;
let rhs = Self::eval(rhs, trace, ctx)?;
compute_until(lhs, rhs, interval)
}
}
}
} }
/// Compute next for a signal impl BooleanSemantics for Oracle {
fn compute_next(signal: Signal<bool>) -> ArgusResult<Signal<bool>> { fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>> {
unimplemented!() if self.steps == 0 {
Ok(Signal::Empty)
} else {
(0..self.steps).try_fold(BooleanSemantics::eval(self.arg.as_ref(), trace)?, |arg, _| {
compute_next(arg)
})
}
} }
/// Compute oracle for a signal
fn compute_oracle(signal: Signal<bool>, steps: usize) -> ArgusResult<Signal<bool>> {
unimplemented!()
} }
/// Compute always for a signal /// Compute always for a signal
@ -122,7 +159,8 @@ fn compute_always(signal: Signal<bool>, interval: &Interval) -> ArgusResult<Sign
fn compute_timed_always(signal: Signal<bool>, a: Duration, b: Option<Duration>) -> Signal<bool> { fn compute_timed_always(signal: Signal<bool>, a: Duration, b: Option<Duration>) -> Signal<bool> {
match b { match b {
Some(b) => { Some(b) => {
// We want to compute the // We want to compute the windowed min/and of the signal.
// The window is dictated by the time duration though.
todo!() todo!()
} }
None => { None => {
@ -149,6 +187,13 @@ fn compute_untimed_always(signal: Signal<bool>) -> Signal<bool> {
Signal::Sampled { values, time_points } Signal::Sampled { values, time_points }
} }
impl BooleanSemantics for Always {
fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>> {
let arg = BooleanSemantics::eval(&self.arg, trace)?;
compute_always(arg, &self.interval)
}
}
/// Compute eventually for a signal /// Compute eventually for a signal
fn compute_eventually(signal: Signal<bool>, interval: &Interval) -> ArgusResult<Signal<bool>> { fn compute_eventually(signal: Signal<bool>, interval: &Interval) -> ArgusResult<Signal<bool>> {
if interval.is_empty() || interval.is_singleton() { if interval.is_empty() || interval.is_singleton() {
@ -184,7 +229,8 @@ fn compute_eventually(signal: Signal<bool>, interval: &Interval) -> ArgusResult<
fn compute_timed_eventually(signal: Signal<bool>, a: Duration, b: Option<Duration>) -> Signal<bool> { fn compute_timed_eventually(signal: Signal<bool>, a: Duration, b: Option<Duration>) -> Signal<bool> {
match b { match b {
Some(b) => { Some(b) => {
// We want to compute the // We want to compute the windowed max/or of the signal.
// The window is dictated by the time duration though.
todo!() todo!()
} }
None => { None => {
@ -211,6 +257,13 @@ fn compute_untimed_eventually(signal: Signal<bool>) -> Signal<bool> {
Signal::Sampled { values, time_points } Signal::Sampled { values, time_points }
} }
impl BooleanSemantics for Eventually {
fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>> {
let arg = BooleanSemantics::eval(&self.arg, trace)?;
compute_eventually(arg, &self.interval)
}
}
/// Compute until /// Compute until
fn compute_until(lhs: Signal<bool>, rhs: Signal<bool>, interval: &Interval) -> ArgusResult<Signal<bool>> { fn compute_until(lhs: Signal<bool>, rhs: Signal<bool>, interval: &Interval) -> ArgusResult<Signal<bool>> {
let ret = match (lhs, rhs) { let ret = match (lhs, rhs) {
@ -255,7 +308,6 @@ fn compute_until(lhs: Signal<bool>, rhs: Signal<bool>, interval: &Interval) -> A
/// Compute timed until for the interval `[a, b]` (or, if `b` is `None`, `[a, ..]`. /// Compute timed until for the interval `[a, b]` (or, if `b` is `None`, `[a, ..]`.
fn compute_timed_until(lhs: Signal<bool>, rhs: Signal<bool>, a: Duration, b: Option<Duration>) -> Signal<bool> { fn compute_timed_until(lhs: Signal<bool>, rhs: Signal<bool>, a: Duration, b: Option<Duration>) -> Signal<bool> {
// For this, we will perform the Until rewrite defined in [1]: // For this, we will perform the Until rewrite defined in [1]:
//
// $$ // $$
// \varphi_1 U_{[a, b]} \varphi_2 = F_{[a,b]} \varphi_2 \land (\varphi_1 U_{[a, // \varphi_1 U_{[a, b]} \varphi_2 = F_{[a,b]} \varphi_2 \land (\varphi_1 U_{[a,
// \infty)} \varphi_2) // \infty)} \varphi_2)
@ -265,9 +317,7 @@ fn compute_timed_until(lhs: Signal<bool>, rhs: Signal<bool>, a: Duration, b: Opt
// \varphi_1 U_{[a, \infty)} \varphi_2 = G_{[0,a]} (\varphi_1 U \varphi_2) // \varphi_1 U_{[a, \infty)} \varphi_2 = G_{[0,a]} (\varphi_1 U \varphi_2)
// $$ // $$
// //
// // [1] A. Donzé, T. Ferrère, and O. Maler, "Efficient Robust Monitoring for STL."
// [1] A. Donzé, T. Ferrère, and O. Maler, "Efficient Robust Monitoring for STL." doi:
// 10.1007/978-3-642-39799-8_19.
match b { match b {
Some(b) => { Some(b) => {
@ -292,3 +342,9 @@ fn compute_untimed_until(lhs: Signal<bool>, rhs: Signal<bool>) -> Signal<bool> {
let sync_points = lhs.sync_with_intersection(&rhs); let sync_points = lhs.sync_with_intersection(&rhs);
todo!() todo!()
} }
impl BooleanSemantics for Until {
fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>> {
todo!()
}
}

View file

@ -1,2 +1,2 @@
pub mod boolean; pub mod boolean;
pub mod quantitative; // pub mod quantitative;

View file

@ -6,7 +6,7 @@ use argus_core::signals::traits::{SignalAbs, SignalMinMax};
use argus_core::signals::SignalNumCast; use argus_core::signals::SignalNumCast;
use crate::eval::eval_num_expr; use crate::eval::eval_num_expr;
use crate::{Semantics, Trace}; use crate::Trace;
fn top_or_bot(sig: &Signal<bool>) -> Signal<f64> { fn top_or_bot(sig: &Signal<bool>) -> Signal<f64> {
match sig { match sig {

View file

@ -0,0 +1,62 @@
//! Traits to define semantics for temporal logic specifications
use argus_core::expr::{IsBoolExpr, IsNumExpr};
use argus_core::prelude::*;
/// A trace is a collection of signals
///
/// # Example
///
/// An example of a `Trace` may be:
///
/// ```rust
/// use argus_core::signals::{Signal, AnySignal};
/// use argus_semantics::Trace;
///
/// struct MyTrace {
/// x: Signal<bool>,
/// y: Signal<i64>,
/// }
///
/// impl Trace for MyTrace {
/// fn signal_names(&self) -> Vec<&str> {
/// vec!["x", "y"]
/// }
///
/// fn get<T: 'static>(&self, name: &str) -> Option<&Signal<T>> {
/// let sig: &dyn AnySignal = match name {
/// "x" => &self.x,
/// "y" => &self.y,
/// _ => return None,
/// };
/// sig.as_any().downcast_ref::<Signal<T>>()
/// }
/// }
///
/// let trace = MyTrace {
/// x: Signal::constant(true),
/// y: Signal::constant(2),
/// };
/// let names = trace.signal_names();
///
/// assert!(names == &["x", "y"] || names == &["y", "x"]);
/// assert!(matches!(trace.get::<bool>("x"), Some(Signal::Constant { value: true })));
/// assert!(matches!(trace.get::<i64>("y"), Some(Signal::Constant { value: 2 })));
/// ```
pub trait Trace {
/// Get the list of signal names contained within the trace.
fn signal_names(&self) -> Vec<&str>;
/// Query a signal using its name
fn get<T: 'static>(&self, name: &str) -> Option<&Signal<T>>;
}
/// Boolean semantics for a [`BoolExpr`] or type that is
/// convertable to a [`BoolExpr`]
pub trait BooleanSemantics: IsBoolExpr {
fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>>;
}
pub trait QuantitativeSemantics: IsNumExpr {
fn eval(&self, trace: &impl Trace) -> ArgusResult<Signal<bool>>;
}

View file

@ -1 +1 @@
pub mod lemire_minmax;

View file

@ -0,0 +1,127 @@
//! A Rusty implementation of Alex Donze's adaptation[^1] of Daniel Lemire's streaming
//! min-max algorithm[^2] for piecewise linear signals.
//!
//! [^1]: Alexandre Donzé, Thomas Ferrère, and Oded Maler. 2013. Efficient Robust
//! Monitoring for STL. In Computer Aided Verification (Lecture Notes in Computer
//! Science), Springer, Berlin, Heidelberg, 264279.
//!
//! [^2]: Daniel Lemire. 2007. Streaming Maximum-Minimum Filter Using No More than Three
//! Comparisons per Element. arXiv:cs/0610046.
use std::collections::VecDeque;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct MonoWedge<'a, T> {
window: VecDeque<(&'a Duration, &'a T)>,
duration: Duration,
cmp: fn(&T, &T) -> bool,
}
impl<'a, T> MonoWedge<'a, T> {
pub fn new(duration: Duration, cmp: fn(&T, &T) -> bool) -> Self {
Self {
window: Default::default(),
cmp,
duration,
}
}
}
impl<'a, T> MonoWedge<'a, T> {
pub fn update(&mut self, sample: (&'a Duration, &'a T)) {
assert!(
self.window.back().map_or(true, |v| v.0 < sample.0),
"MonoWedge window samples don't have monotonic time"
);
// Find the index to partition the inner queue based on the comparison function.
let cmp_idx = self.window.partition_point(|a| (self.cmp)(a.1, sample.1));
assert!(cmp_idx <= self.window.len());
// And delete all items in the second partition.
let _ = self.window.split_off(cmp_idx);
// Clear all older values
while let Some(item) = self.window.front() {
if *sample.0 > self.duration + *item.0 {
let _ = self.pop_front();
} else {
break;
}
}
// Add the new value
self.window.push_back(sample);
}
pub fn front(&self) -> Option<(&'a Duration, &'a T)> {
self.window.front().copied()
}
pub fn pop_front(&mut self) -> Option<(&'a Duration, &'a T)> {
self.window.pop_front()
}
}
impl<'a, T> MonoWedge<'a, T>
where
T: PartialOrd,
{
pub fn min_wedge(duration: Duration) -> Self {
Self::new(duration, T::lt)
}
pub fn max_wedge(duration: Duration) -> Self {
Self::new(duration, T::gt)
}
}
#[cfg(test)]
mod tests {
use proptest::prelude::*;
use super::*;
prop_compose! {
fn vec_and_window()(vec in prop::collection::vec(any::<u64>(), 3..100))
(window_size in 2..vec.len(), vec in Just(vec))
-> (Vec<u64>, usize) {
(vec, window_size)
}
}
proptest! {
#[test]
fn test_rolling_minmax((vec, width) in vec_and_window()) {
// NOTE: When we convert the width from usize to Duration, the window becomes inclusive.
let expected_mins: Vec<u64> = vec.as_slice().windows(width + 1).map(|w| w.iter().min().unwrap_or_else(|| panic!("slice should have min: {:?}", w))).copied().collect();
assert_eq!(expected_mins.len(), vec.len() - width);
let expected_maxs: Vec<u64> = vec.as_slice().windows(width + 1).map(|w| w.iter().max().unwrap_or_else(|| panic!("slice should have max: {:?}", w))).copied().collect();
assert_eq!(expected_maxs.len(), vec.len() - width);
let time_points: Vec<Duration> = (0..vec.len()).map(|i| Duration::from_secs(i as u64)).collect();
let width = Duration::from_secs(width as u64);
let mut min_wedge = MonoWedge::<u64>::min_wedge(width);
let mut max_wedge = MonoWedge::<u64>::max_wedge(width);
let mut ret_mins = Vec::with_capacity(expected_mins.len());
let mut ret_maxs = Vec::with_capacity(expected_maxs.len());
// Now we do the actual updates
for (i, value) in time_points.iter().zip(&vec) {
min_wedge.update((i, value));
max_wedge.update((i, value));
if i >= &(time_points[0] + width) {
ret_mins.push(min_wedge.front().unwrap_or_else(|| panic!("min_wedge should have at least 1 element")));
ret_maxs.push(max_wedge.front().unwrap_or_else(|| panic!("max_wedge should have at least 1 element")))
}
}
let ret_mins: Vec<_> = ret_mins.into_iter().map(|s| s.1).copied().collect();
let ret_maxs: Vec<_> = ret_maxs.into_iter().map(|s| s.1).copied().collect();
assert_eq!(expected_mins, ret_mins, "window min incorrect");
assert_eq!(expected_maxs, ret_maxs, "window max incorrect");
}
}
}