feat!(core): Change Signal to be a sumtype

We want to be able to reason about if a signal is empty, constant, or sampled
at compile time without using any trait objects. Moreover, the core Argus
library shouldn't care about how it deals with interfacing with other languages
like Python. Thus, we remove the need for having an `AnySignal` type and what
not.
This commit is contained in:
Anand Balakrishnan 2023-04-14 10:53:38 -07:00
parent a6a3805107
commit 4431b79bcd
No known key found for this signature in database
10 changed files with 442 additions and 966 deletions

View file

@ -16,30 +16,20 @@ pub mod num_ops;
pub mod traits;
mod utils;
use std::ops::{RangeFull, RangeInclusive};
use std::ops::{Bound, RangeBounds};
use std::time::Duration;
pub use bool_ops::*;
pub use cast::*;
pub use cmp_ops::*;
use itertools::Itertools;
pub use num_ops::*;
use num_traits::NumCast;
use utils::intersect_bounds;
use self::traits::{BaseSignal, LinearInterpolatable};
use self::traits::LinearInterpolatable;
use crate::{ArgusResult, Error};
/// All supported signal types in Argus
#[derive(Debug, Clone, derive_more::From)]
pub enum AnySignal {
Bool(Signal<bool>),
ConstBool(ConstantSignal<bool>),
Int(Signal<i64>),
ConstInt(ConstantSignal<i64>),
UInt(Signal<u64>),
ConstUInt(ConstantSignal<u64>),
Float(Signal<f64>),
ConstFloat(ConstantSignal<f64>),
}
#[derive(Debug, Clone, Copy)]
pub enum InterpolationMethod {
Linear,
@ -76,31 +66,116 @@ pub struct Sample<T> {
pub value: T,
}
/// A signal is a sequence of time points ([`Duration`](core::time::Duration)) and
/// corresponding value samples.
#[derive(Default, Debug, Clone)]
pub struct Signal<T> {
pub(crate) values: Vec<T>,
pub(crate) time_points: Vec<Duration>,
/// A typed Signal
///
/// A Signal can either be empty, constant throughout its domain, or sampled at a
/// finite set of strictly monotonically increasing time points.
#[derive(Default, Clone, Debug)]
pub enum Signal<T> {
#[default]
Empty,
Constant {
value: T,
},
Sampled {
values: Vec<T>,
time_points: Vec<Duration>,
},
}
impl<T> Signal<T> {
/// Create a new empty signal
pub fn new() -> Self {
Self {
values: Default::default(),
time_points: Default::default(),
}
Self::Empty
}
/// Create a new constant signal
pub fn constant(value: T) -> Self {
Self::Constant { value }
}
/// Create a new empty signal with the specified capacity
pub fn new_with_capacity(size: usize) -> Self {
Self {
Self::Sampled {
values: Vec::with_capacity(size),
time_points: Vec::with_capacity(size),
}
}
/// Get the bounds of the signal.
///
/// Returns `None` if the signal is empty (either [`Signal::Empty`] or
/// [`Signal::Sampled`] with no samples.
pub fn bounds(&self) -> Option<(Bound<Duration>, Bound<Duration>)> {
use core::ops::Bound::*;
match self {
Signal::Empty => None,
Signal::Constant { value: _ } => Some((Unbounded, Unbounded)),
Signal::Sampled { values: _, time_points } => {
if time_points.is_empty() {
None
} else {
Some((Included(time_points[0]), Included(*time_points.last().unwrap())))
}
}
}
}
/// Check if the signal is empty
pub fn is_empty(&self) -> bool {
use core::ops::Bound::*;
let bounds = match self.bounds() {
Some(b) => b,
None => return true,
};
match (bounds.start_bound(), bounds.end_bound()) {
(Included(start), Included(end)) => start > end,
(Included(start), Excluded(end)) | (Excluded(start), Included(end)) | (Excluded(start), Excluded(end)) => {
start >= end
}
(Unbounded, Unbounded) => false,
bound => unreachable!("Argus doesn't support signals with bound {:?}", bound),
}
}
/// Get the time at which the given signal starts.
pub fn start_time(&self) -> Option<Bound<Duration>> {
self.bounds().map(|b| b.0)
}
/// Get the time at which the given signal ends.
pub fn end_time(&self) -> Option<Bound<Duration>> {
self.bounds().map(|b| b.1)
}
/// Push a new sample to the signal at the given time point
///
/// The method enforces the invariant that the time points of the signal must have
/// strictly monotonic increasing values, otherwise it returns an error without
/// adding the sample point.
/// Moreover, it is an error to `push` a value to an [`Empty`](Signal::Empty) or
/// [`Constant`](Signal::Constant) signal.
pub fn push(&mut self, time: Duration, value: T) -> ArgusResult<()> {
match self {
Signal::Empty | Signal::Constant { value: _ } => Err(Error::InvalidPushToSignal),
Signal::Sampled { values, time_points } => {
let last_time = time_points.last();
match last_time {
Some(last_t) if last_t >= &time => Err(Error::NonMonotonicSignal {
end_time: *last_t,
current_sample: time,
}),
_ => {
time_points.push(time);
values.push(value);
Ok(())
}
}
}
}
}
/// Create an iterator over the pairs of time points and values of the signal.
pub fn iter(&self) -> impl Iterator<Item = (&Duration, &T)> {
self.into_iter()
@ -121,152 +196,202 @@ impl<T> Signal<T> {
}
Ok(signal)
}
}
impl<T> BaseSignal for Signal<T> {
type Value = T;
type Bounds = RangeInclusive<Duration>;
/// Get the value of the signal at the given time point
///
/// If there exists a sample at the given time point then `Some(value)` is returned.
/// Otherwise, `None` is returned. If the goal is to interpolate the value at the
/// a given time, see [`interpolate_at`](Self::interpolate_at).
pub fn at(&self, time: Duration) -> Option<&T> {
match self {
Signal::Empty => None,
Signal::Constant { value } => Some(value),
Signal::Sampled { values, time_points } => {
assert_eq!(
time_points.len(),
values.len(),
"invariant: number of time points must equal number of samples"
);
// if there are no sample points, then there is no sample point (nor neighboring
// sample points) to return
if time_points.is_empty() {
return None;
}
fn at(&self, time: Duration) -> Option<&Self::Value> {
assert_eq!(
self.time_points.len(),
self.values.len(),
"invariant: number of time points must equal number of samples"
);
// if there are no sample points, then there is no sample point (nor neighboring
// sample points) to return
if self.time_points.is_empty() {
return None;
}
// We will use binary search to find the appropriate index
match self.time_points.binary_search(&time) {
Ok(idx) => self.values.get(idx),
Err(_) => None,
}
}
fn interpolate_at(&self, time: Duration, interp: InterpolationMethod) -> Option<Self::Value>
where
Self::Value: Copy + LinearInterpolatable,
{
assert_eq!(
self.time_points.len(),
self.values.len(),
"invariant: number of time points must equal number of samples"
);
// if there are no sample points, then there is no sample point (nor neighboring
// sample points) to return
if self.time_points.is_empty() {
return None;
}
// We will use binary search to find the appropriate index
let hint_idx = match self.time_points.binary_search(&time) {
Ok(idx) => return self.values.get(idx).copied(),
Err(idx) => idx,
};
// We have an hint as to where the sample _should have been_.
// So, lets check if there is a preceding and/or following sample.
let (first, second) = if hint_idx == 0 {
// Sample appears before the start of the signal
// So, let's return just the following sample, which is the first sample
// (since we know that the signal is non-empty).
let preceding = None;
let following = Some(Sample {
time: self.time_points[hint_idx],
value: self.values[hint_idx],
});
(preceding, following)
} else if hint_idx == self.time_points.len() {
// Sample appears past the end of the signal
// So, let's return just the preceding sample, which is the last sample
// (since we know the signal is non-empty)
let preceding = Some(Sample {
time: self.time_points[hint_idx - 1],
value: self.values[hint_idx - 1],
});
let following = None;
(preceding, following)
} else {
// The sample should exist within the signal.
assert!(self.time_points.len() >= 2, "There should be at least 2 elements");
let preceding = Some(Sample {
time: self.time_points[hint_idx - 1],
value: self.values[hint_idx - 1],
});
let following = Some(Sample {
time: self.time_points[hint_idx],
value: self.values[hint_idx],
});
(preceding, following)
};
interp.at(time, &first, &second)
}
fn bounds(&self) -> Self::Bounds {
let first = self.time_points.first();
let last = self.time_points.last();
match (first, last) {
(None, None) => Duration::from_secs(1)..=Duration::from_secs(0),
(Some(first), Some(last)) => *first..=*last,
(..) => unreachable!("there is either 0 time points or some time points"),
}
}
fn push(&mut self, time: Duration, value: Self::Value) -> ArgusResult<bool> {
assert_eq!(self.time_points.len(), self.values.len());
let last_time = self.time_points.last();
match last_time {
Some(last_t) if last_t >= &time => Err(Error::NonMonotonicSignal {
end_time: *last_t,
current_sample: time,
}),
_ => {
self.time_points.push(time);
self.values.push(value);
Ok(true)
// We will use binary search to find the appropriate index
match time_points.binary_search(&time) {
Ok(idx) => values.get(idx),
Err(_) => None,
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct ConstantSignal<T> {
pub value: T,
}
impl<T> ConstantSignal<T> {
pub fn new(value: T) -> Self {
Self { value }
}
}
impl<T> BaseSignal for ConstantSignal<T> {
type Value = T;
type Bounds = RangeFull;
fn at(&self, _time: Duration) -> Option<&Self::Value> {
Some(&self.value)
}
fn bounds(&self) -> Self::Bounds {
..
}
fn interpolate_at(&self, _time: Duration, _interp: InterpolationMethod) -> Option<Self::Value>
/// Interpolate the value of the signal at the given time point
///
/// If there exists a sample at the given time point then `Some(value)` is returned
/// with the value of the signal at the point. Otherwise, a the
/// [`InterpolationMethod`] is used to compute the value. If the given interpolation
/// method cannot be used at the given time (for example, if we use
/// [`InterpolationMethod::Linear`] and the `time` point is outside the signal
/// domain), then a `None` is returned.
pub fn interpolate_at(&self, time: Duration, interp: InterpolationMethod) -> Option<T>
where
Self::Value: Copy + LinearInterpolatable,
T: Copy + LinearInterpolatable,
{
Some(self.value)
match self {
Signal::Empty => None,
Signal::Constant { value } => Some(*value),
Signal::Sampled { values, time_points } => {
assert_eq!(
time_points.len(),
values.len(),
"invariant: number of time points must equal number of samples"
);
// if there are no sample points, then there is no sample point (nor neighboring
// sample points) to return
if time_points.is_empty() {
return None;
}
// We will use binary search to find the appropriate index
let hint_idx = match time_points.binary_search(&time) {
Ok(idx) => return values.get(idx).copied(),
Err(idx) => idx,
};
// We have an hint as to where the sample _should have been_.
// So, lets check if there is a preceding and/or following sample.
let (first, second) = if hint_idx == 0 {
// Sample appears before the start of the signal
// So, let's return just the following sample, which is the first sample
// (since we know that the signal is non-empty).
let preceding = None;
let following = Some(Sample {
time: time_points[hint_idx],
value: values[hint_idx],
});
(preceding, following)
} else if hint_idx == time_points.len() {
// Sample appears past the end of the signal
// So, let's return just the preceding sample, which is the last sample
// (since we know the signal is non-empty)
let preceding = Some(Sample {
time: time_points[hint_idx - 1],
value: values[hint_idx - 1],
});
let following = None;
(preceding, following)
} else {
// The sample should exist within the signal.
assert!(time_points.len() >= 2, "There should be at least 2 elements");
let preceding = Some(Sample {
time: time_points[hint_idx - 1],
value: values[hint_idx - 1],
});
let following = Some(Sample {
time: time_points[hint_idx],
value: values[hint_idx],
});
(preceding, following)
};
interp.at(time, &first, &second)
}
}
}
fn push(&mut self, _time: Duration, _value: Self::Value) -> ArgusResult<bool> {
Ok(false)
pub fn time_points(&self) -> Option<Vec<&Duration>> {
match self {
Signal::Empty => None,
Signal::Constant { value: _ } => Vec::new().into(),
Signal::Sampled { values: _, time_points } => time_points.iter().collect_vec().into(),
}
}
/// Return a list consisting of all the points where the two signals should be
/// sampled and synchronized for operations.
pub fn sync_points<'a>(&'a self, other: &'a Self) -> Option<Vec<&'a Duration>> {
use core::ops::Bound::*;
if self.is_empty() || other.is_empty() {
return None;
}
match (self, other) {
(Signal::Empty, _) | (_, Signal::Empty) => None,
(Signal::Constant { value: _ }, Signal::Constant { value: _ }) => Vec::new().into(),
(Signal::Constant { value: _ }, Signal::Sampled { values: _, time_points })
| (Signal::Sampled { values: _, time_points }, Signal::Constant { value: _ }) => {
time_points.iter().collect_vec().into()
}
(
Signal::Sampled {
values: _,
time_points: lhs,
},
Signal::Sampled {
values: _,
time_points: rhs,
},
) => {
let bounds = match intersect_bounds(&self.bounds()?, &other.bounds()?) {
(Included(start), Included(end)) => start..=end,
(..) => unreachable!(),
};
itertools::merge(lhs, rhs)
.filter(|time| bounds.contains(time))
.dedup()
.collect_vec()
.into()
}
}
}
/// Augment synchronization points with time points where signals intersect
pub fn sync_with_intersection(&self, other: &Signal<T>) -> Option<Vec<Duration>>
where
T: PartialOrd + Copy + LinearInterpolatable + NumCast,
{
use core::cmp::Ordering::*;
let sync_points: Vec<&Duration> = self.sync_points(other)?.into_iter().collect();
// This will contain the new signal with an initial capacity of twice the input
// signals sample points (as that is the upper limit of the number of new points
// that will be added
let mut return_points = Vec::<Duration>::with_capacity(sync_points.len() * 2);
// this will contain the last sample point and ordering
let mut last_sample = None;
// We will now loop over the sync points, compare across signals and (if
// an intersection happens) we will have to compute the intersection point
for t in sync_points {
let lhs = self.at(*t).expect("value must be present at given time");
let rhs = other.at(*t).expect("values must be present at given time");
let ord = lhs.partial_cmp(rhs).unwrap();
// We will check for any intersections between the current sample and the
// previous one before we push the current sample time
if let Some((tm1, last)) = last_sample {
// Check if the signals crossed, this will happen essentially if the last
// and the current are opposites and were not Equal.
if let (Less, Greater) | (Greater, Less) = (last, ord) {
// Find the point of intersection between the points.
let a = utils::Neighborhood {
first: self.at(tm1).copied().map(|value| Sample { time: tm1, value }),
second: self.at(*t).copied().map(|value| Sample { time: *t, value }),
};
let b = utils::Neighborhood {
first: other.at(tm1).copied().map(|value| Sample { time: tm1, value }),
second: other.at(*t).copied().map(|value| Sample { time: *t, value }),
};
let intersect = utils::find_intersection(&a, &b);
return_points.push(intersect.time);
}
}
return_points.push(*t);
last_sample = Some((*t, ord));
}
return_points.shrink_to_fit();
Some(return_points)
}
}
@ -322,45 +447,19 @@ pub mod arbitrary {
}
/// Generate an arbitrary constant signal
pub fn constant_signal<T>() -> impl Strategy<Value = ConstantSignal<T>>
pub fn constant_signal<T>() -> impl Strategy<Value = Signal<T>>
where
T: Arbitrary,
T: Arbitrary + Copy,
{
any::<T>().prop_map(ConstantSignal::new)
}
/// Generate an arbitrary boolean signal
pub fn any_bool_signal(size: impl Into<SizeRange>) -> impl Strategy<Value = AnySignal> {
prop_oneof![
constant_signal::<bool>().prop_map(AnySignal::from),
sampled_signal::<bool>(size).prop_map(AnySignal::from),
]
}
/// Generate an arbitrary numeric signal
pub fn any_num_signal(size: impl Into<SizeRange> + Clone) -> impl Strategy<Value = AnySignal> {
prop_oneof![
constant_signal::<i64>().prop_map(AnySignal::from),
constant_signal::<u64>().prop_map(AnySignal::from),
constant_signal::<f64>().prop_map(AnySignal::from),
sampled_signal::<i64>(size.clone()).prop_map(AnySignal::from),
sampled_signal::<u64>(size.clone()).prop_map(AnySignal::from),
sampled_signal::<f64>(size).prop_map(AnySignal::from),
]
any::<T>().prop_map(Signal::constant)
}
/// Generate an arbitrary signal
pub fn any_signal(size: impl Into<SizeRange> + Clone) -> impl Strategy<Value = AnySignal> {
prop_oneof![
constant_signal::<bool>().prop_map(AnySignal::from),
constant_signal::<i64>().prop_map(AnySignal::from),
constant_signal::<u64>().prop_map(AnySignal::from),
constant_signal::<f64>().prop_map(AnySignal::from),
sampled_signal::<bool>(size.clone()).prop_map(AnySignal::from),
sampled_signal::<i64>(size.clone()).prop_map(AnySignal::from),
sampled_signal::<u64>(size.clone()).prop_map(AnySignal::from),
sampled_signal::<f64>(size).prop_map(AnySignal::from),
]
pub fn signal<T>(size: impl Into<SizeRange>) -> impl Strategy<Value = Signal<T>>
where
T: Arbitrary + Copy,
{
prop_oneof![constant_signal::<T>(), sampled_signal::<T>(size),]
}
}
@ -387,8 +486,8 @@ mod tests {
// Get the value of the sample at a given index
let (at, val) = samples[idx];
assert_eq!(signal.start_time(), Bound::Included(start_time));
assert_eq!(signal.end_time(), Bound::Included(end_time));
assert_eq!(signal.start_time(), Some(Bound::Included(start_time)));
assert_eq!(signal.end_time(), Some(Bound::Included(end_time)));
assert_eq!(signal.at(at), Some(&val));
assert_eq!(signal.at(end_time + Duration::from_secs(1)), None);
assert_eq!(signal.at(start_time - Duration::from_secs(1)), None);
@ -500,10 +599,10 @@ mod tests {
proptest! {
|(sig1 in arbitrary::constant_signal::<$ty>(), sig2 in arbitrary::constant_signal::<$ty>())| {
let new_sig = &sig1 $op &sig2;
let v1 = sig1.value;
let v2 = sig2.value;
let v = new_sig.value;
assert_eq!(v1 $op v2, v);
match (sig1, sig2, new_sig) {
(Signal::Constant { value: v1 }, Signal::Constant { value: v2 }, Signal::Constant { value: v }) => assert_eq!(v1 $op v2, v),
(s1, s2, s3) => panic!("{:?}, {:?} = {:?}", s1, s2, s3),
}
}
}
};