reimplement evaluation using discrete-signals library

This commit is contained in:
Marcell Vazquez-Chanlatte 2018-09-24 00:55:43 -07:00
parent cfbdabf517
commit dd6bd4b6be
12 changed files with 241 additions and 447 deletions

View file

@ -1,7 +1,6 @@
# flake8: noqa
from mtl.ast import TOP, BOT
from mtl.ast import (Interval, Or, And, F, G, Neg,
AtomicPred, Until, Next)
from mtl.ast import (Interval, And, G, Neg,
AtomicPred, WeakUntil, Next)
from mtl.parser import parse
from mtl.fastboolean_eval import pointwise_sat
from mtl.utils import alw, env, andf, orf
from mtl.utils import alw, env, andf, orf, until

View file

@ -6,14 +6,12 @@ import attr
import funcy as fn
from lenses import bind
import mtl
def flatten_binary(phi, op, dropT, shortT):
def f(x):
return x.args if isinstance(x, op) else [x]
args = [arg for arg in phi.args if arg is not dropT]
args = [arg for arg in phi.args if arg != dropT]
if any(arg is shortT for arg in args):
return shortT
@ -26,7 +24,7 @@ def flatten_binary(phi, op, dropT, shortT):
def _or(exp1, exp2):
return flatten_binary(Or((exp1, exp2)), Or, BOT, TOP)
return ~(~exp1 & ~exp2)
def _and(exp1, exp2):
@ -34,21 +32,18 @@ def _and(exp1, exp2):
def _neg(exp):
if isinstance(exp, _Bot):
return _Top()
elif isinstance(exp, _Top):
return _Bot()
elif isinstance(exp, Neg):
if isinstance(exp, Neg):
return exp.arg
return Neg(exp)
def _eval(exp, trace, time=0):
return mtl.pointwise_sat(exp)(trace, time)
def _eval(exp, trace, time=0, *, dt=0.1, quantitative=True):
from mtl import evaluator
return evaluator.pointwise_sat(exp, dt)(trace, time, quantitative)
def _timeshift(exp, t):
if exp in (BOT, TOP):
if exp == BOT:
return exp
for _ in range(t):
@ -142,22 +137,12 @@ def _update_itvl(itvl, lookup):
return Interval(*map(_update_param, itvl))
@ast_class
class _Top:
def __repr__(self):
return "TRUE"
@ast_class
class _Bot:
def __repr__(self):
return "FALSE"
TOP = _Top()
BOT = _Bot()
@ast_class
class AtomicPred:
id: str
@ -187,10 +172,6 @@ class NaryOpMTL:
return tuple(self.args)
class Or(NaryOpMTL):
OP = "|"
class And(NaryOpMTL):
OP = "&"
@ -211,21 +192,17 @@ class ModalOp:
return (self.arg,)
class F(ModalOp):
OP = "< >"
class G(ModalOp):
OP = "[ ]"
OP = "G"
@ast_class
class Until:
class WeakUntil:
arg1: "Node"
arg2: "Node"
def __repr__(self):
return f"({self.arg1} U {self.arg2})"
return f"({self.arg1} W {self.arg2})"
@property
def children(self):
@ -249,7 +226,7 @@ class Next:
arg: "Node"
def __repr__(self):
return f"@{self.arg}"
return f"X{self.arg}"
@property
def children(self):
@ -259,3 +236,7 @@ class Next:
def type_pred(*args):
ast_types = set(args)
return lambda x: type(x) in ast_types
BOT = _Bot()
TOP = ~BOT

View file

@ -1,183 +0,0 @@
# TODO: figure out how to deduplicate this with robustness
# - Abstract as working on distributive lattice
import operator as op
from functools import singledispatch
import funcy as fn
import traces
import mtl
import mtl.ast
from mtl.utils import const_trace, andf, orf
TRUE_TRACE = const_trace(True)
FALSE_TRACE = const_trace(False)
def negate_trace(x):
return x.operation(TRUE_TRACE, op.xor)
def pointwise_sat(phi, dt=0.1):
def _eval_mtl(x, t=0):
return bool(eval_mtl(phi, dt)(x)[t])
return _eval_mtl
@singledispatch
def eval_mtl(phi, dt):
raise NotImplementedError
def or_traces(xs):
return orf(*xs)
@eval_mtl.register(mtl.Or)
def eval_mtl_or(phi, dt):
fs = [eval_mtl(arg, dt) for arg in phi.args]
def _eval(x):
out = or_traces([f(x) for f in fs])
out.compact()
return out
return _eval
def and_traces(xs):
return andf(*xs)
@eval_mtl.register(mtl.And)
def eval_mtl_and(phi, dt):
fs = [eval_mtl(arg, dt) for arg in phi.args]
def _eval(x):
out = and_traces([f(x) for f in fs])
out.compact()
return out
return _eval
def apply_until(y):
if len(y) == 1:
left, right = y.first_value()
yield (0, min(left, right))
return
periods = list(y.iterperiods())
phi2_next = False
for t, _, (phi1, phi2) in periods[::-1]:
yield (t, phi2 or (phi1 and phi2_next))
phi2_next = phi2
@eval_mtl.register(mtl.Until)
def eval_mtl_until(phi, dt):
f1, f2 = eval_mtl(phi.arg1, dt), eval_mtl(phi.arg2, dt)
def _eval(x):
y1, y2 = f1(x), f2(x)
y = y1.operation(y2, lambda a, b: (a, b))
out = traces.TimeSeries(apply_until(y))
out.compact()
return out
return _eval
@eval_mtl.register(mtl.F)
def eval_mtl_f(phi, dt):
phi = ~mtl.G(phi.interval, ~phi.arg)
return eval_mtl(phi, dt)
@eval_mtl.register(mtl.G)
def eval_mtl_g(phi, dt):
f = eval_mtl(phi.arg, dt)
a, b = phi.interval
if b < a:
return lambda _: TRUE_TRACE
def process_intervals(x):
# Need to add last interval
intervals = fn.chain(x.iterintervals(), [(
x.first_item(),
(float('inf'), None),
)])
for (start, val), (end, val2) in intervals:
start2, end2 = start - b, end + a
if end2 > start2:
yield (start2, val)
if b == float('inf'):
def _eval(x):
y = f(x).slice(a, b)
y.compact()
val = len(y) == 1 and y[a]
return const_trace(val)
else:
def _eval(x):
y = f(x)
if len(y) <= 1:
return y
out = traces.TimeSeries(process_intervals(y)).slice(
y.first_key(), float('inf')
)
out.compact()
return out
return _eval
@eval_mtl.register(mtl.Neg)
def eval_mtl_neg(phi, dt):
f = eval_mtl(phi.arg, dt)
def _eval(x):
out = negate_trace(f(x))
out.compact()
return out
return _eval
@eval_mtl.register(mtl.ast.Next)
def eval_mtl_next(phi, dt):
f = eval_mtl(phi.arg, dt)
def _eval(x):
y = f(x)
out = traces.TimeSeries(((t - dt, v) for t, v in y))
out = out.slice(0, float('inf'))
out.compact()
return out
return _eval
@eval_mtl.register(mtl.AtomicPred)
def eval_mtl_ap(phi, _):
def _eval(x):
out = x[str(phi.id)]
out.compact()
return out
return _eval
@eval_mtl.register(type(mtl.TOP))
def eval_mtl_top(_, _1):
return lambda *_: TRUE_TRACE
@eval_mtl.register(type(mtl.BOT))
def eval_mtl_bot(_, _1):
return lambda *_: FALSE_TRACE

152
mtl/evaluator.py Normal file
View file

@ -0,0 +1,152 @@
# TODO: figure out how to deduplicate this with robustness
# - Abstract as working on distributive lattice
import operator as op
from collections import defaultdict
from functools import reduce, singledispatch
import funcy as fn
from discrete_signals import signal
from mtl import ast
OO = float('inf')
CONST_FALSE = signal([(0, -1)], start=-OO, end=OO, tag=ast.BOT)
CONST_TRUE = signal([(0, 1)], start=-OO, end=OO, tag=ast.TOP)
def to_signal(ts_mapping):
start = min(fn.pluck(0, fn.cat(ts_mapping.values())))
assert start >= 0
signals = (signal(v, start, OO, tag=k) for k, v in ts_mapping.items())
return reduce(op.or_, signals)
def interp(sig, t, tag=None):
# TODO: return function that interpolates the whole signal.
sig = sig.project({tag})
key = sig.data.iloc[sig.data.bisect_right(t) - 1]
return sig[key][tag]
def dense_compose(sig1, sig2, init=None):
sig12 = sig1 | sig2
tags = sig12.tags
def _dense_compose():
state = {tag: init for tag in tags}
for t, val in sig12.items():
state = {k: val.get(k, state[k]) for k in tags}
yield t, state
data = list(_dense_compose())
return sig12.evolve(data=data)
def booleanize_signal(sig):
return sig.transform(lambda mapping: defaultdict(
lambda: None, {k: 2*int(v) - 1 for k, v in mapping.items()}
))
def pointwise_sat(phi, dt=0.1):
f = eval_mtl(phi, dt)
def _eval_mtl(x, t=0, quantitative=False):
sig = to_signal(x)
if not quantitative:
sig = booleanize_signal(sig)
res = interp(f(sig), t, phi)
return res if quantitative else res > 0
return _eval_mtl
@singledispatch
def eval_mtl(phi, dt):
raise NotImplementedError
@eval_mtl.register(ast.And)
def eval_mtl_and(phi, dt):
fs = [eval_mtl(arg, dt) for arg in phi.args]
def _eval(x):
sigs = [f(x) for f in fs]
sig = reduce(lambda x, y: dense_compose(x, y, init=OO), sigs)
return sig.map(lambda v: min(v.values()), tag=phi)
return _eval
def apply_weak_until(left_key, right_key, sig):
prev, max_right = OO, -OO
for t in reversed(sig.times()):
left, right = interp(sig, t, left_key), interp(sig, t, right_key)
max_right = max(max_right, right)
prev = max(right, min(left, prev), -max_right)
yield (t, prev)
@eval_mtl.register(ast.WeakUntil)
def eval_mtl_until(phi, dt):
f1, f2 = eval_mtl(phi.arg1, dt), eval_mtl(phi.arg2, dt)
def _eval(x):
sig = dense_compose(f1(x), f2(x), init=-OO)
data = apply_weak_until(phi.arg1, phi.arg2, sig)
return signal(data, x.start, OO, tag=phi)
return _eval
@eval_mtl.register(ast.G)
def eval_mtl_g(phi, dt):
f = eval_mtl(phi.arg, dt)
a, b = phi.interval
if b < a:
return lambda x: CONST_TRUE.retag({ast.TOP: phi})
def _min(val):
return min(val[phi.arg])
def _eval(x):
return f(x).rolling(a, b).map(_min, tag=phi)
return _eval
@eval_mtl.register(ast.Neg)
def eval_mtl_neg(phi, dt):
f = eval_mtl(phi.arg, dt)
def _eval(x):
return f(x).map(lambda v: -v[phi.arg], tag=phi)
return _eval
@eval_mtl.register(ast.Next)
def eval_mtl_next(phi, dt):
f = eval_mtl(phi.arg, dt)
def _eval(x):
return (f(x) << dt).retag({phi.arg: phi})
return _eval
@eval_mtl.register(ast.AtomicPred)
def eval_mtl_ap(phi, _):
def _eval(x):
return x.project({phi.id}).retag({phi.id: phi})
return _eval
@eval_mtl.register(type(ast.BOT))
def eval_mtl_bot(_, _1):
return lambda x: CONST_FALSE

View file

@ -1,98 +0,0 @@
from functools import reduce, singledispatch
from operator import and_, or_
import funcy as fn
from bitarray import bitarray
import mtl.ast
oo = float('inf')
def get_times(x, tau, lo, hi):
end = min(v.last_key() for v in x.values())
lo, hi = map(float, (lo, hi))
hi = hi + tau if hi + tau <= end else end
lo = lo + tau if lo + tau <= end else end
if lo > hi:
return []
elif hi == lo:
return [lo]
all_times = fn.cat(v.slice(lo, hi).items() for v in x.values())
return sorted(set(fn.pluck(0, all_times)))
def pointwise_sat(mtl):
f = pointwise_satf(mtl)
return lambda x, t: bool(int(f(x, [t]).to01()))
@singledispatch
def pointwise_satf(mtl):
raise NotImplementedError
def bool_op(mtl, conjunction=False):
binop = and_ if conjunction else or_
fs = [pointwise_satf(arg) for arg in mtl.args]
return lambda x, t: reduce(binop, (f(x, t) for f in fs))
@pointwise_satf.register(mtl.Or)
def pointwise_satf_or(mtl):
return bool_op(mtl, conjunction=False)
@pointwise_satf.register(mtl.And)
def pointwise_satf_and(mtl):
return bool_op(mtl, conjunction=True)
def temporal_op(mtl, lo, hi, conjunction=False):
fold = bitarray.all if conjunction else bitarray.any
f = pointwise_satf(mtl.arg)
def sat_comp(x, t):
return bitarray(fold(f(x, get_times(x, tau, lo, hi))) for tau in t)
return sat_comp
@pointwise_satf.register(mtl.F)
def pointwise_satf_f(mtl):
lo, hi = mtl.interval
return temporal_op(mtl, lo, hi, conjunction=False)
@pointwise_satf.register(mtl.G)
def pointwise_satf_g(mtl):
lo, hi = mtl.interval
return temporal_op(mtl, lo, hi, conjunction=True)
@pointwise_satf.register(mtl.Neg)
def pointwise_satf_neg(mtl):
return lambda x, t: ~pointwise_satf(mtl.arg)(x, t)
@pointwise_satf.register(mtl.AtomicPred)
def pointwise_satf_(phi):
return lambda x, t: bitarray(x[str(phi.id)][tau] for tau in t)
@pointwise_satf.register(mtl.Until)
def pointwise_satf_until(phi):
raise NotImplementedError
@pointwise_satf.register(type(mtl.TOP))
def pointwise_satf_top(_):
return lambda _, t: bitarray([True] * len(t))
@pointwise_satf.register(type(mtl.BOT))
def pointwise_satf_bot(_):
return lambda _, t: bitarray([False] * len(t))

View file

@ -4,12 +4,13 @@ from functools import partialmethod, reduce
from parsimonious import Grammar, NodeVisitor
from mtl import ast
from mtl.utils import iff, implies, xor, timed_until
from mtl.utils import iff, implies, xor, timed_until, until
from mtl.utils import env, alw
MTL_GRAMMAR = Grammar(u'''
phi = (neg / paren_phi / next / bot / top
/ xor_outer / iff_outer / implies_outer / and_outer / or_outer
/ timed_until / until / g / f / AP)
/ timed_until / until / weak_until / g / f / AP)
paren_phi = "(" __ phi __ ")"
neg = ("~" / "¬") __ phi
@ -32,6 +33,7 @@ xor_inner = (phi __ ("⊕" / "^" / "xor") __ xor_inner) / phi
f = ("< >" / "F") interval? __ phi
g = ("[ ]" / "G") interval? __ phi
weak_until = "(" __ phi _ "W" _ phi __ ")"
until = "(" __ phi _ "U" _ phi __ ")"
timed_until = "(" __ phi _ "U" interval _ phi __ ")"
interval = "[" __ const_or_unbound __ "," __ const_or_unbound __ "]"
@ -97,7 +99,7 @@ class MTLVisitor(NodeVisitor):
return ast.BOT
def visit_top(self, *_):
return ast.TOP
return ~ast.BOT
def visit_interval(self, _, children):
_, _, left, _, _, _, right, _, _ = children
@ -110,15 +112,19 @@ class MTLVisitor(NodeVisitor):
def unary_temp_op_visitor(self, _, children, op):
_, i, _, phi = children
i = self.default_interval if not i else i[0]
return op(i, phi)
lo, hi = self.default_interval if not i else i[0]
return op(phi, lo=lo, hi=hi)
visit_f = partialmethod(unary_temp_op_visitor, op=ast.F)
visit_g = partialmethod(unary_temp_op_visitor, op=ast.G)
visit_f = partialmethod(unary_temp_op_visitor, op=env)
visit_g = partialmethod(unary_temp_op_visitor, op=alw)
def visit_weak_until(self, _, children):
_, _, phi1, _, _, _, phi2, _, _ = children
return ast.WeakUntil(phi1, phi2)
def visit_until(self, _, children):
_, _, phi1, _, _, _, phi2, _, _ = children
return ast.Until(phi1, phi2)
return until(phi1, phi2)
def visit_timed_until(self, _, children):
_, _, phi1, _, _, itvl, _, phi2, _, _ = children

View file

@ -28,4 +28,4 @@ def test_identities(phi):
def test_walk():
phi = mtl.parse(
'(([ ][0, 1] ap1 & < >[1,2] ap2) | (@ap1 U ap2))')
assert len(list((~phi).walk())) == 11
assert len(list((~phi).walk())) == 19

View file

@ -1,121 +1,59 @@
import hypothesis.strategies as st
import traces
from hypothesis import given
from pytest import raises
import mtl
import mtl.boolean_eval
import mtl.fastboolean_eval
from mtl.hypothesis import MetricTemporalLogicStrategy
"""
TODO: property based test that fasteval should be the same as slow
TODO: property based test that x |= phi == ~(x |= ~phi)
TODO: property based test that ~~phi == phi
TODO: property based test that ~~~phi == ~phi
TODO: property based test that ~phi => phi
TODO: property based test that phi => phi
TODO: property based test that phi <=> phi
TODO: property based test that phi & psi => phi
TODO: property based test that psi => phi | psi
TODO: property based test that (True U psi) => F(psi)
TODO: property based test that G(psi) = ~F(~psi)
TODO: Automatically generate input time series.
"""
x = {
"ap1": traces.TimeSeries([(0, True), (0.1, True), (0.2, False)]),
"ap2":
traces.TimeSeries([(0, False), (0.2, True), (0.5, False)]),
"ap3":
traces.TimeSeries([(0, True), (0.1, True), (0.3, False)]),
"ap4":
traces.TimeSeries([(0, False), (0.1, False), (0.3, False)]),
"ap5":
traces.TimeSeries([(0, False), (0.1, False), (0.3, True)]),
"ap6":
traces.TimeSeries([(0, True), (float('inf'), True)]),
"ap1": [(0, True), (0.1, True), (0.2, False)],
"ap2": [(0, False), (0.2, True), (0.5, False)],
"ap3": [(0, True), (0.1, True), (0.3, False)],
"ap4": [(0, False), (0.1, False), (0.3, False)],
"ap5": [(0, False), (0.1, False), (0.3, True)],
"ap6": [(0, True), (float('inf'), True)],
}
@given(st.just(mtl.ast.Next(mtl.BOT) | mtl.ast.Next(mtl.TOP)))
def test_eval_smoke_tests(phi):
mtl_eval9 = mtl.boolean_eval.pointwise_sat(mtl.ast.Next(phi))
mtl_eval10 = mtl.boolean_eval.pointwise_sat(~mtl.ast.Next(phi))
assert mtl_eval9(x, 0) != mtl_eval10(x, 0)
phi4 = mtl.parse('~ap4')
mtl_eval11 = mtl.boolean_eval.pointwise_sat(phi4)
assert mtl_eval11(x, 0)
phi5 = mtl.parse('G[0.1, 0.03] ~ap4')
mtl_eval12 = mtl.boolean_eval.pointwise_sat(phi5)
assert mtl_eval12(x, 0)
assert mtl.parse('~ap4')(x, 0, quantitative=False)
assert mtl.parse('G[0.1, 0.03] ~ap4')(x, 0, quantitative=False)
phi6 = mtl.parse('G[0.1, 0.03] ~ap5')
mtl_eval13 = mtl.boolean_eval.pointwise_sat(phi6)
assert mtl_eval13(x, 0)
assert mtl_eval13(x, 0.4)
assert phi6(x, 0, quantitative=False)
assert phi6(x, 0.2, quantitative=False)
phi7 = mtl.parse('G ~ap4')
mtl_eval14 = mtl.boolean_eval.pointwise_sat(phi7)
assert mtl_eval14(x, 0)
phi8 = mtl.parse('F ap5')
mtl_eval15 = mtl.boolean_eval.pointwise_sat(phi8)
assert mtl_eval15(x, 0)
phi9 = mtl.parse('(ap1 U ap2)')
mtl_eval16 = mtl.boolean_eval.pointwise_sat(phi9)
assert mtl_eval16(x, 0)
phi10 = mtl.parse('(ap2 U ap2)')
mtl_eval17 = mtl.boolean_eval.pointwise_sat(phi10)
assert not mtl_eval17(x, 0)
with raises(NotImplementedError):
mtl.boolean_eval.eval_mtl(None, None)
assert mtl.parse('G ~ap4')(x, 0, quantitative=False)
assert mtl.parse('F ap5')(x, 0, quantitative=False)
assert mtl.parse('(ap1 U ap2)')(x, 0, quantitative=False)
assert not mtl.parse('(ap2 U ap2)')(x, 0, quantitative=False)
@given(MetricTemporalLogicStrategy)
def test_temporal_identities(phi):
mtl_eval = mtl.boolean_eval.pointwise_sat(phi)
mtl_eval2 = mtl.boolean_eval.pointwise_sat(~phi)
assert mtl_eval2(x, 0) == (not mtl_eval(x, 0))
mtl_eval3 = mtl.boolean_eval.pointwise_sat(~~phi)
assert mtl_eval3(x, 0) == mtl_eval(x, 0)
mtl_eval4 = mtl.boolean_eval.pointwise_sat(phi & phi)
assert mtl_eval4(x, 0) == mtl_eval(x, 0)
mtl_eval5 = mtl.boolean_eval.pointwise_sat(phi & ~phi)
assert not mtl_eval5(x, 0)
mtl_eval6 = mtl.boolean_eval.pointwise_sat(phi | ~phi)
assert mtl_eval6(x, 0)
mtl_eval7 = mtl.boolean_eval.pointwise_sat(mtl.ast.Until(mtl.TOP, phi))
mtl_eval8 = mtl.boolean_eval.pointwise_sat(mtl.env(phi))
assert mtl_eval7(x, 0) == mtl_eval8(x, 0)
def test_temporal_identity1(phi):
assert ((~phi) >> 1)(x, 0, quantitative=False) \
== (~(phi >> 1))(x, 0, quantitative=False)
@given(st.just(mtl.BOT))
def test_fastboolean_equiv(phi):
mtl_eval = mtl.fastboolean_eval.pointwise_sat(mtl.alw(phi, lo=0, hi=4))
mtl_eval2 = mtl.fastboolean_eval.pointwise_sat(~mtl.env(~phi, lo=0, hi=4))
assert mtl_eval2(x, 0) == mtl_eval(x, 0)
mtl_eval3 = mtl.fastboolean_eval.pointwise_sat(~mtl.alw(~phi, lo=0, hi=4))
mtl_eval4 = mtl.fastboolean_eval.pointwise_sat(mtl.env(phi, lo=0, hi=4))
assert mtl_eval4(x, 0) == mtl_eval3(x, 0)
@given(MetricTemporalLogicStrategy)
def test_temporal_identity2(phi):
assert phi(x, 0, quantitative=False) \
== (not (~phi)(x, 0, quantitative=False))
def test_fastboolean_smoketest():
phi = mtl.parse(
'(((G[0, 4] ap6 & F[2, 1] ap1) | ap2) & G[0,0](ap2))')
mtl_eval = mtl.fastboolean_eval.pointwise_sat(phi)
assert not mtl_eval(x, 0)
with raises(NotImplementedError):
mtl.fastboolean_eval.pointwise_sat(None)
@given(MetricTemporalLogicStrategy)
def test_temporal_identity3(phi):
assert (phi & phi)(x, 0, quantitative=False) \
== phi(x, 0, quantitative=False)
def test_callable_interface():
phi = mtl.parse(
'(((G[0, 4] ap6 & F[2, 1] ap1) | ap2) & G[0,0](ap2))')
assert not phi(x, 0)
@given(MetricTemporalLogicStrategy)
def test_temporal_identity4(phi):
assert not (phi & ~phi)(x, 0, quantitative=False)
assert (phi | ~phi)(x, 0, quantitative=False)
@given(MetricTemporalLogicStrategy)
def test_temporal_identity5(phi):
assert mtl.until(mtl.TOP, phi)(x, 0, quantitative=False) \
== mtl.env(phi)(x, 0, quantitative=False)

View file

@ -60,8 +60,6 @@ def test_discretize():
phi2 = mtl.utils.discretize(phi, dt, distribute=True)
phi3 = mtl.utils.discretize(phi2, dt, distribute=True)
assert phi2 == phi3
assert phi2 == mtl.parse(
'(~(@a | @@a) & ~(@@a | @@@a))')
phi = mtl.TOP
assert mtl.utils.is_discretizable(phi, dt)

View file

@ -2,18 +2,18 @@ import operator as op
from functools import reduce, wraps
from math import isfinite
import traces
from discrete_signals import signal
import numpy as np
import mtl.ast
from mtl.ast import (And, F, G, Interval, Neg, Or, Next, Until,
AtomicPred, _Top, _Bot)
from mtl.ast import (And, G, Interval, Neg, Next, WeakUntil,
AtomicPred, _Bot)
oo = float('inf')
def const_trace(x):
return traces.TimeSeries([(0, x), (oo, x)])
return signal([(0, x)], start=0, end=oo)
def require_discretizable(func):
@ -29,9 +29,9 @@ def require_discretizable(func):
def scope(phi, dt, *, _t=0, horizon=oo):
if isinstance(phi, Next):
_t += dt
elif isinstance(phi, (G, F)):
elif isinstance(phi, G):
_t += phi.interval.upper
elif isinstance(phi, Until):
elif isinstance(phi, WeakUntil):
_t += float('inf')
_scope = max((scope(c, dt, _t=_t) for c in phi.children), default=_t)
@ -51,19 +51,19 @@ def discretize(phi, dt, distribute=False, *, horizon=None):
def _discretize(phi, dt, horizon):
if isinstance(phi, (AtomicPred, _Top, _Bot)):
if isinstance(phi, (AtomicPred, _Bot)):
return phi
if not isinstance(phi, (F, G, Until)):
if not isinstance(phi, (G, WeakUntil)):
children = tuple(_discretize(arg, dt, horizon) for arg in phi.children)
if isinstance(phi, (And, Or)):
if isinstance(phi, And):
return phi.evolve(args=children)
elif isinstance(phi, (Neg, Next)):
return phi.evolve(arg=children[0])
raise NotImplementedError
elif isinstance(phi, Until):
elif isinstance(phi, WeakUntil):
raise NotImplementedError
# Only remaining cases are G and F
@ -91,19 +91,19 @@ def _distribute_next(phi, i=0):
children = tuple(_distribute_next(c, i) for c in phi.children)
if isinstance(phi, (And, Or)):
if isinstance(phi, And):
return phi.evolve(args=children)
elif isinstance(phi, (Neg, Next)):
return phi.evolve(arg=children[0])
def is_discretizable(phi, dt):
if any(c for c in phi.walk() if isinstance(c, Until)):
if any(c for c in phi.walk() if isinstance(c, WeakUntil)):
return False
return all(
_interval_discretizable(c.interval, dt) for c in phi.walk()
if isinstance(c, (F, G)))
if isinstance(c, G))
# EDSL
@ -113,7 +113,7 @@ def alw(phi, *, lo=0, hi=float('inf')):
def env(phi, *, lo=0, hi=float('inf')):
return F(Interval(lo, hi), phi)
return ~alw(~phi, lo=lo, hi=hi)
def andf(*args):
@ -140,5 +140,9 @@ def next(phi, i=1):
return phi >> i
def until(phi, psi):
return mtl.ast.WeakUntil(phi, psi) & env(psi)
def timed_until(phi, psi, lo, hi):
return env(psi, lo=lo, hi=hi) & alw(mtl.ast.Until(phi, psi), lo=0, hi=lo)
return env(psi, lo=lo, hi=hi) & alw(until(phi, psi), lo=0, hi=lo)

View file

@ -1,11 +1,9 @@
-e git://github.com/mvcisback/hypothesis-cfg@master#egg=hypothesis-cfg
bitarray==0.8.1
funcy==1.9.1
lenses==0.4.0
parsimonious==0.7.0
traces==0.4.1
discrete-signals==0.7.1
hypothesis==3.32.1
numpy==1.15.0
pytest==3.2.3
pytest-bpdb==0.1.4

View file

@ -12,8 +12,7 @@ setup(
'funcy',
'parsimonious',
'lenses',
'bitarray',
'traces',
'discrete-signals',
],
packages=find_packages(),
)