continue refactoring to optimize for mtl

This commit is contained in:
Marcell Vazquez-Chanlatte 2018-09-06 11:09:01 -07:00
parent 5fd66cfd2c
commit 98824c9ba1
21 changed files with 393 additions and 467 deletions

7
mtl/__init__.py Normal file
View file

@ -0,0 +1,7 @@
# flake8: noqa
from mtl.ast import TOP, BOT
from mtl.ast import (Interval, Or, And, F, G, Neg,
AtomicPred, Until, Next)
from mtl.parser import parse
from mtl.fastboolean_eval import pointwise_sat
from mtl.utils import alw, env, andf, orf

279
mtl/ast.py Normal file
View file

@ -0,0 +1,279 @@
# -*- coding: utf-8 -*-
from collections import deque, namedtuple
from functools import lru_cache
import funcy as fn
from lenses import lens, bind
import mtl
def flatten_binary(phi, op, dropT, shortT):
def f(x):
return x.args if isinstance(x, op) else [x]
args = [arg for arg in phi.args if arg is not dropT]
if any(arg is shortT for arg in args):
return shortT
elif not args:
return dropT
elif len(args) == 1:
return args[0]
else:
return op(tuple(fn.mapcat(f, phi.args)))
class AST(object):
__slots__ = ()
def __or__(self, other):
return flatten_binary(Or((self, other)), Or, BOT, TOP)
def __and__(self, other):
return flatten_binary(And((self, other)), And, TOP, BOT)
def __invert__(self):
if isinstance(self, Neg):
return self.arg
return Neg(self)
def __rshift__(self, t):
if self in (BOT, TOP):
return self
phi = self
for _ in range(t):
phi = Next(phi)
return phi
def __call__(self, trace, time=0):
return mtl.pointwise_sat(self)(trace, time)
@property
def children(self):
return tuple()
def walk(self):
"""Walk of the AST."""
pop = deque.pop
children = deque([self])
while len(children) > 0:
node = pop(children)
yield node
children.extend(node.children)
@property
def params(self):
def get_params(leaf):
if isinstance(leaf, ModalOp):
if isinstance(leaf.interval[0], Param):
yield leaf.interval[0]
if isinstance(leaf.interval[1], Param):
yield leaf.interval[1]
return set(fn.mapcat(get_params, self.walk()))
def set_params(self, val):
phi = param_lens(self)
return phi.modify(lambda x: float(val.get(x, val.get(str(x), x))))
@property
def atomic_predicates(self):
return set(AP_lens.collect()(self))
def inline_context(self, context):
phi, phi2 = self, None
def update(ap):
return context.get(ap, ap)
while phi2 != phi:
phi2, phi = phi, AP_lens.modify(update)(phi)
return phi
def __hash__(self):
# TODO: compute hash based on contents
return hash(repr(self))
class _Top(AST):
__slots__ = ()
def __repr__(self):
return "1"
def __invert__(self):
return BOT
class _Bot(AST):
__slots__ = ()
def __repr__(self):
return "0"
def __invert__(self):
return TOP
TOP = _Top()
BOT = _Bot()
class AtomicPred(namedtuple("AP", ["id"]), AST):
__slots__ = ()
def __repr__(self):
return f"{self.id}"
def __hash__(self):
# TODO: compute hash based on contents
return hash(repr(self))
@property
def children(self):
return tuple()
class Interval(namedtuple('I', ['lower', 'upper'])):
__slots__ = ()
def __repr__(self):
return f"[{self.lower},{self.upper}]"
class NaryOpMTL(namedtuple('NaryOp', ['args']), AST):
__slots__ = ()
OP = "?"
def __repr__(self):
return "(" + f" {self.OP} ".join(f"{x}" for x in self.args) + ")"
@property
def children(self):
return tuple(self.args)
class Or(NaryOpMTL):
__slots__ = ()
OP = "|"
def __hash__(self):
# TODO: compute hash based on contents
return hash(repr(self))
class And(NaryOpMTL):
__slots__ = ()
OP = "&"
def __hash__(self):
# TODO: compute hash based on contents
return hash(repr(self))
class ModalOp(namedtuple('ModalOp', ['interval', 'arg']), AST):
__slots__ = ()
OP = '?'
def __repr__(self):
if self.interval.lower == 0 and self.interval.upper == float('inf'):
return f"{self.OP}{self.arg}"
return f"{self.OP}{self.interval}{self.arg}"
@property
def children(self):
return (self.arg,)
class F(ModalOp):
__slots__ = ()
OP = "< >"
def __hash__(self):
# TODO: compute hash based on contents
return hash(repr(self))
class G(ModalOp):
__slots__ = ()
OP = "[ ]"
def __hash__(self):
# TODO: compute hash based on contents
return hash(repr(self))
class Until(namedtuple('ModalOp', ['arg1', 'arg2']), AST):
__slots__ = ()
def __repr__(self):
return f"({self.arg1} U {self.arg2})"
@property
def children(self):
return (self.arg1, self.arg2)
def __hash__(self):
# TODO: compute hash based on contents
return hash(repr(self))
class Neg(namedtuple('Neg', ['arg']), AST):
__slots__ = ()
def __repr__(self):
return f"~{self.arg}"
@property
def children(self):
return (self.arg,)
def __hash__(self):
# TODO: compute hash based on contents
return hash(repr(self))
class Next(namedtuple('Next', ['arg']), AST):
__slots__ = ()
def __repr__(self):
return f"@{self.arg}"
@property
def children(self):
return (self.arg,)
def __hash__(self):
# TODO: compute hash based on contents
return hash(repr(self))
class Param(namedtuple('Param', ['name']), AST):
__slots__ = ()
def __repr__(self):
return self.name
def __hash__(self):
# TODO: compute hash based on contents
return hash(repr(self))
@lru_cache()
def param_lens(phi, *, getter=False):
return bind(phi).Recur(Param)
def type_pred(*args):
ast_types = set(args)
return lambda x: type(x) in ast_types
AP_lens = lens.Recur(AtomicPred)

187
mtl/boolean_eval.py Normal file
View file

@ -0,0 +1,187 @@
# TODO: figure out how to deduplicate this with robustness
# - Abstract as working on distributive lattice
import operator as op
from functools import singledispatch
import funcy as fn
import traces
import mtl
import mtl.ast
from mtl.utils import const_trace, andf, orf
TRUE_TRACE = const_trace(True)
FALSE_TRACE = const_trace(False)
def negate_trace(x):
out = x.operation(TRUE_TRACE, op.xor)
out.domain = x.domain
return out
def pointwise_sat(phi, dt=0.1):
ap_names = [z.id for z in phi.atomic_predicates]
def _eval_mtl(x, t=0):
evaluated = fn.project(x, ap_names)
return bool(eval_mtl(phi, dt)(evaluated)[t])
return _eval_mtl
@singledispatch
def eval_mtl(phi, dt):
raise NotImplementedError
def or_traces(xs):
out = orf(*xs)
out.domain = xs[0].domain
return out
@eval_mtl.register(mtl.Or)
def eval_mtl_or(phi, dt):
fs = [eval_mtl(arg, dt) for arg in phi.args]
def _eval(x):
out = or_traces([f(x) for f in fs])
out.compact()
return out
return _eval
def and_traces(xs):
out = andf(*xs)
out.domain = xs[0].domain
return out
@eval_mtl.register(mtl.And)
def eval_mtl_and(phi, dt):
fs = [eval_mtl(arg, dt) for arg in phi.args]
def _eval(x):
out = and_traces([f(x) for f in fs])
out.compact()
return out
return _eval
def apply_until(y):
periods = list(y.iterperiods())
phi2_next = False
for t, _, (phi1, phi2) in periods[::-1]:
yield (t, phi2 or (phi1 and phi2_next))
phi2_next = phi2
@eval_mtl.register(mtl.Until)
def eval_mtl_until(phi, dt):
f1, f2 = eval_mtl(phi.arg1, dt), eval_mtl(phi.arg2, dt)
def _eval(x):
y1, y2 = f1(x), f2(x)
y = y1.operation(y2, lambda a, b: (a, b))
out = traces.TimeSeries(apply_until(y), domain=y1.domain)
out.compact()
return out
return _eval
@eval_mtl.register(mtl.F)
def eval_mtl_f(phi, dt):
phi = ~mtl.G(phi.interval, ~phi.arg)
return eval_mtl(phi, dt)
@eval_mtl.register(mtl.G)
def eval_mtl_g(phi, dt):
f = eval_mtl(phi.arg, dt)
a, b = phi.interval
if b < a:
return lambda _: TRUE_TRACE
def process_intervals(x):
# Need to add last interval
intervals = fn.chain(x.iterintervals(), [(
x.last(),
(float('inf'), None),
)])
for (start, val), (end, val2) in intervals:
start2, end2 = start - b, end + a
if end2 > start2:
yield (start2, val)
if b == float('inf'):
def _eval(x):
y = f(x)
val = len(y.slice(a, b)) == 1 and y[a]
return traces.TimeSeries(
[(y.domain.start(), val)], domain=y.domain)
else:
def _eval(x):
y = f(x)
if len(y) <= 1:
return y
out = traces.TimeSeries(process_intervals(y)).slice(
y.domain.start(), y.domain.end())
out.compact()
return out
return _eval
@eval_mtl.register(mtl.Neg)
def eval_mtl_neg(phi, dt):
f = eval_mtl(phi.arg, dt)
def _eval(x):
out = negate_trace(f(x))
out.compact()
return out
return _eval
@eval_mtl.register(mtl.ast.Next)
def eval_mtl_next(phi, dt):
f = eval_mtl(phi.arg, dt)
def _eval(x):
y = f(x)
out = traces.TimeSeries(((t - dt, v) for t, v in y))
out = out.slice(y.domain.start(), y.domain.end())
out.compact()
return out
return _eval
@eval_mtl.register(mtl.AtomicPred)
def eval_mtl_ap(phi, _):
def _eval(x):
out = x[str(phi.id)]
out.compact()
return out
return _eval
@eval_mtl.register(type(mtl.TOP))
def eval_mtl_top(_, _1):
return lambda *_: TRUE_TRACE
@eval_mtl.register(type(mtl.BOT))
def eval_mtl_bot(_, _1):
return lambda *_: FALSE_TRACE

98
mtl/fastboolean_eval.py Normal file
View file

@ -0,0 +1,98 @@
from functools import reduce, singledispatch
from operator import and_, or_
import funcy as fn
from bitarray import bitarray
import mtl.ast
oo = float('inf')
def get_times(x, tau, lo, hi):
end = min(v.domain.end() for v in x.values())
lo, hi = map(float, (lo, hi))
hi = hi + tau if hi + tau <= end else end
lo = lo + tau if lo + tau <= end else end
if lo > hi:
return []
elif hi == lo:
return [lo]
all_times = fn.cat(v.slice(lo, hi).items() for v in x.values())
return sorted(set(fn.pluck(0, all_times)))
def pointwise_sat(mtl):
f = pointwise_satf(mtl)
return lambda x, t: bool(int(f(x, [t]).to01()))
@singledispatch
def pointwise_satf(mtl):
raise NotImplementedError
def bool_op(mtl, conjunction=False):
binop = and_ if conjunction else or_
fs = [pointwise_satf(arg) for arg in mtl.args]
return lambda x, t: reduce(binop, (f(x, t) for f in fs))
@pointwise_satf.register(mtl.Or)
def pointwise_satf_or(mtl):
return bool_op(mtl, conjunction=False)
@pointwise_satf.register(mtl.And)
def pointwise_satf_and(mtl):
return bool_op(mtl, conjunction=True)
def temporal_op(mtl, lo, hi, conjunction=False):
fold = bitarray.all if conjunction else bitarray.any
f = pointwise_satf(mtl.arg)
def sat_comp(x, t):
return bitarray(fold(f(x, get_times(x, tau, lo, hi))) for tau in t)
return sat_comp
@pointwise_satf.register(mtl.F)
def pointwise_satf_f(mtl):
lo, hi = mtl.interval
return temporal_op(mtl, lo, hi, conjunction=False)
@pointwise_satf.register(mtl.G)
def pointwise_satf_g(mtl):
lo, hi = mtl.interval
return temporal_op(mtl, lo, hi, conjunction=True)
@pointwise_satf.register(mtl.Neg)
def pointwise_satf_neg(mtl):
return lambda x, t: ~pointwise_satf(mtl.arg)(x, t)
@pointwise_satf.register(mtl.AtomicPred)
def pointwise_satf_(phi):
return lambda x, t: bitarray(x[str(phi.id)][tau] for tau in t)
@pointwise_satf.register(mtl.Until)
def pointwise_satf_until(phi):
raise NotImplementedError
@pointwise_satf.register(type(mtl.TOP))
def pointwise_satf_top(_):
return lambda _, t: bitarray([True] * len(t))
@pointwise_satf.register(type(mtl.BOT))
def pointwise_satf_bot(_):
return lambda _, t: bitarray([False] * len(t))

24
mtl/hypothesis.py Normal file
View file

@ -0,0 +1,24 @@
import hypothesis.strategies as st
from hypothesis_cfg import ContextFreeGrammarStrategy
import mtl
GRAMMAR = {
'phi': (
('Unary', 'phi'),
('(', 'phi', 'Binary', 'phi', ')'),
('AP', ), ('0', ), ('1', )
),
'Unary': (('~', ), ('G', 'Interval'), ('F', 'Interval'), ('X', )),
'Interval': (('', ), ('[1, 3]', )),
'Binary': (
(' | ', ), (' & ', ), (' -> ', ), (' <-> ',), (' ^ ',),
(' U ',),
),
'AP': (('ap1', ), ('ap2', ), ('ap3', ), ('ap4', ), ('ap5', )),
}
MetricTemporalLogicStrategy = st.builds(
lambda term: mtl.parse(''.join(term)),
ContextFreeGrammarStrategy(GRAMMAR, max_length=14, start='phi')
)

141
mtl/parser.py Normal file
View file

@ -0,0 +1,141 @@
# -*- coding: utf-8 -*-
import operator as op
from functools import partialmethod, reduce
from parsimonious import Grammar, NodeVisitor
from mtl import ast
from mtl.utils import iff, implies, xor, timed_until
MTL_GRAMMAR = Grammar(u'''
phi = (neg / paren_phi / next / bot / top
/ xor_outer / iff_outer / implies_outer / and_outer / or_outer
/ timed_until / until / g / f / AP)
paren_phi = "(" __ phi __ ")"
neg = ("~" / "¬") __ phi
next = ("@" / "X") __ phi
and_outer = "(" __ and_inner __ ")"
and_inner = (phi __ ("" / "and" / "&") __ and_inner) / phi
or_outer = "(" __ or_inner __ ")"
or_inner = (phi __ ("" / "or" / "|") __ or_inner) / phi
implies_outer = "(" __ implies_inner __ ")"
implies_inner = (phi __ ("" / "->") __ implies_inner) / phi
iff_outer = "(" __ iff_inner __ ")"
iff_inner = (phi __ ("" / "<->" / "iff") __ iff_inner) / phi
xor_outer = "(" __ xor_inner __ ")"
xor_inner = (phi __ ("" / "^" / "xor") __ xor_inner) / phi
f = ("< >" / "F") interval? __ phi
g = ("[ ]" / "G") interval? __ phi
until = "(" __ phi _ "U" _ phi __ ")"
timed_until = "(" __ phi _ "U" interval _ phi __ ")"
interval = "[" __ const_or_unbound __ "," __ const_or_unbound __ "]"
const_or_unbound = const / "inf" / id
AP = ~r"[a-z\d]+"
bot = "0"
top = "1"
id = ~r"[a-z\d]+"
const = ~r"[-+]?(\d*\.\d+|\d+)"
_ = ~r"\s"+
__ = ~r"\s"*
EOL = "\\n"
''')
oo = float('inf')
class MTLVisitor(NodeVisitor):
def __init__(self, H=oo):
super().__init__()
self.default_interval = ast.Interval(0.0, H)
def binop_inner(self, _, children):
if isinstance(children[0], ast.AST):
return children
((left, _, _, _, right),) = children
return [left] + right
def binop_outer(self, _, children, *, binop):
return reduce(binop, children[2])
def visit_const_or_unbound(self, node, children):
child = children[0]
return ast.Param(child) if isinstance(child, str) else float(node.text)
visit_and_inner = binop_inner
visit_iff_inner = binop_inner
visit_implies_inner = binop_inner
visit_or_inner = binop_inner
visit_xor_inner = binop_inner
visit_and_outer = partialmethod(binop_outer, binop=op.and_)
visit_iff_outer = partialmethod(binop_outer, binop=iff)
visit_implies_outer = partialmethod(binop_outer, binop=implies)
visit_or_outer = partialmethod(binop_outer, binop=op.or_)
visit_xor_outer = partialmethod(binop_outer, binop=xor)
def generic_visit(self, _, children):
return children
def children_getter(self, _, children, i):
return children[i]
visit_phi = partialmethod(children_getter, i=0)
visit_paren_phi = partialmethod(children_getter, i=2)
def visit_bot(self, *_):
return ast.BOT
def visit_top(self, *_):
return ast.TOP
def visit_interval(self, _, children):
_, _, left, _, _, _, right, _, _ = children
return ast.Interval(left, right)
def get_text(self, node, _):
return node.text
visit_op = get_text
def unary_temp_op_visitor(self, _, children, op):
_, i, _, phi = children
i = self.default_interval if not i else i[0]
return op(i, phi)
visit_f = partialmethod(unary_temp_op_visitor, op=ast.F)
visit_g = partialmethod(unary_temp_op_visitor, op=ast.G)
def visit_until(self, _, children):
_, _, phi1, _, _, _, phi2, _, _ = children
return ast.Until(phi1, phi2)
def visit_timed_until(self, _, children):
_, _, phi1, _, _, itvl, _, phi2, _, _ = children
return timed_until(phi1, phi2, itvl.lower, itvl.upper)
def visit_id(self, name, _):
return name.text
def visit_AP(self, *args):
return ast.AtomicPred(self.visit_id(*args))
def visit_neg(self, _, children):
return ~children[2]
def visit_next(self, _, children):
return ast.Next(children[2])
def parse(mtl_str: str, rule: str = "phi", H=oo) -> "MTL":
return MTLVisitor(H).visit(MTL_GRAMMAR[rule].parse(mtl_str))

31
mtl/test_ast.py Normal file
View file

@ -0,0 +1,31 @@
import mtl
from mtl.hypothesis import MetricTemporalLogicStrategy
from hypothesis import given
@given(MetricTemporalLogicStrategy)
def test_identities(phi):
assert mtl.TOP == mtl.TOP | phi
assert mtl.BOT == mtl.BOT & phi
assert mtl.TOP == phi | mtl.TOP
assert mtl.BOT == phi & mtl.BOT
assert phi == phi & mtl.TOP
assert phi == phi | mtl.BOT
assert mtl.TOP == mtl.TOP & mtl.TOP
assert mtl.BOT == mtl.BOT | mtl.BOT
assert mtl.TOP == mtl.TOP | mtl.BOT
assert mtl.BOT == mtl.TOP & mtl.BOT
assert ~mtl.BOT == mtl.TOP
assert ~mtl.TOP == mtl.BOT
assert ~~mtl.BOT == mtl.BOT
assert ~~mtl.TOP == mtl.TOP
assert (phi & phi) & phi == phi & (phi & phi)
assert (phi | phi) | phi == phi | (phi | phi)
assert ~~phi == phi
def test_walk():
phi = mtl.parse(
'(([ ][0, 1] ap1 & < >[1,2] ap2) | (@ap1 U ap2))')
assert len(list((~phi).walk())) == 11

127
mtl/test_boolean_eval.py Normal file
View file

@ -0,0 +1,127 @@
import hypothesis.strategies as st
import traces
from hypothesis import given
from pytest import raises
import mtl
import mtl.boolean_eval
import mtl.fastboolean_eval
from mtl.hypothesis import MetricTemporalLogicStrategy
"""
TODO: property based test that fasteval should be the same as slow
TODO: property based test that x |= phi == ~(x |= ~phi)
TODO: property based test that ~~phi == phi
TODO: property based test that ~~~phi == ~phi
TODO: property based test that ~phi => phi
TODO: property based test that phi => phi
TODO: property based test that phi <=> phi
TODO: property based test that phi & psi => phi
TODO: property based test that psi => phi | psi
TODO: property based test that (True U psi) => F(psi)
TODO: property based test that G(psi) = ~F(~psi)
TODO: Automatically generate input time series.
"""
x = {
"x":
traces.TimeSeries([(0, 1), (0.1, 1), (0.2, 4)], domain=(0, 10)),
"y":
traces.TimeSeries([(0, 2), (0.1, 4), (0.2, 2)], domain=(0, 10)),
"ap1":
traces.TimeSeries([(0, True), (0.1, True), (0.2, False)], domain=(0, 10)),
"ap2":
traces.TimeSeries([(0, False), (0.2, True), (0.5, False)], domain=(0, 10)),
"ap3":
traces.TimeSeries([(0, True), (0.1, True), (0.3, False)], domain=(0, 10)),
"ap4":
traces.TimeSeries(
[(0, False), (0.1, False), (0.3, False)], domain=(0, 10)),
"ap5":
traces.TimeSeries([(0, False), (0.1, False), (0.3, True)], domain=(0, 10)),
"ap6":
traces.TimeSeries([(0, True)], domain=(0, 10)),
}
@given(st.just(mtl.ast.Next(mtl.BOT) | mtl.ast.Next(mtl.TOP)))
def test_eval_smoke_tests(phi):
mtl_eval9 = mtl.boolean_eval.pointwise_sat(mtl.ast.Next(phi))
mtl_eval10 = mtl.boolean_eval.pointwise_sat(~mtl.ast.Next(phi))
assert mtl_eval9(x, 0) != mtl_eval10(x, 0)
phi4 = mtl.parse('~ap4')
mtl_eval11 = mtl.boolean_eval.pointwise_sat(phi4)
assert mtl_eval11(x, 0)
phi5 = mtl.parse('G[0.1, 0.03] ~ap4')
mtl_eval12 = mtl.boolean_eval.pointwise_sat(phi5)
assert mtl_eval12(x, 0)
phi6 = mtl.parse('G[0.1, 0.03] ~ap5')
mtl_eval13 = mtl.boolean_eval.pointwise_sat(phi6)
assert mtl_eval13(x, 0)
assert mtl_eval13(x, 0.4)
phi7 = mtl.parse('G ~ap4')
mtl_eval14 = mtl.boolean_eval.pointwise_sat(phi7)
assert mtl_eval14(x, 0)
phi8 = mtl.parse('F ap5')
mtl_eval15 = mtl.boolean_eval.pointwise_sat(phi8)
assert mtl_eval15(x, 0)
phi9 = mtl.parse('(ap1 U ap2)')
mtl_eval16 = mtl.boolean_eval.pointwise_sat(phi9)
assert mtl_eval16(x, 0)
phi10 = mtl.parse('(ap2 U ap2)')
mtl_eval17 = mtl.boolean_eval.pointwise_sat(phi10)
assert not mtl_eval17(x, 0)
with raises(NotImplementedError):
mtl.boolean_eval.eval_mtl(None, None)
@given(MetricTemporalLogicStrategy)
def test_temporal_identities(phi):
mtl_eval = mtl.boolean_eval.pointwise_sat(phi)
mtl_eval2 = mtl.boolean_eval.pointwise_sat(~phi)
assert mtl_eval2(x, 0) == (not mtl_eval(x, 0))
mtl_eval3 = mtl.boolean_eval.pointwise_sat(~~phi)
assert mtl_eval3(x, 0) == mtl_eval(x, 0)
mtl_eval4 = mtl.boolean_eval.pointwise_sat(phi & phi)
assert mtl_eval4(x, 0) == mtl_eval(x, 0)
mtl_eval5 = mtl.boolean_eval.pointwise_sat(phi & ~phi)
assert not mtl_eval5(x, 0)
mtl_eval6 = mtl.boolean_eval.pointwise_sat(phi | ~phi)
assert mtl_eval6(x, 0)
mtl_eval7 = mtl.boolean_eval.pointwise_sat(mtl.ast.Until(mtl.TOP, phi))
mtl_eval8 = mtl.boolean_eval.pointwise_sat(mtl.env(phi))
assert mtl_eval7(x, 0) == mtl_eval8(x, 0)
@given(st.just(mtl.BOT))
def test_fastboolean_equiv(phi):
mtl_eval = mtl.fastboolean_eval.pointwise_sat(mtl.alw(phi, lo=0, hi=4))
mtl_eval2 = mtl.fastboolean_eval.pointwise_sat(~mtl.env(~phi, lo=0, hi=4))
assert mtl_eval2(x, 0) == mtl_eval(x, 0)
mtl_eval3 = mtl.fastboolean_eval.pointwise_sat(~mtl.alw(~phi, lo=0, hi=4))
mtl_eval4 = mtl.fastboolean_eval.pointwise_sat(mtl.env(phi, lo=0, hi=4))
assert mtl_eval4(x, 0) == mtl_eval3(x, 0)
def test_fastboolean_smoketest():
phi = mtl.parse(
'(((G[0, 4] ap6 & F[2, 1] ap1) | ap2) & G[0,0](ap2))')
mtl_eval = mtl.fastboolean_eval.pointwise_sat(phi)
assert not mtl_eval(x, 0)
with raises(NotImplementedError):
mtl.fastboolean_eval.pointwise_sat(mtl.ast.AST())
def test_callable_interface():
phi = mtl.parse(
'(((G[0, 4] ap6 & F[2, 1] ap1) | ap2) & G[0,0](ap2))')
assert not phi(x, 0)

20
mtl/test_params.py Normal file
View file

@ -0,0 +1,20 @@
import mtl
from mtl.hypothesis import MetricTemporalLogicStrategy
import hypothesis.strategies as st
from hypothesis import given
@given(st.integers(), st.integers(), st.integers())
def test_params1(a, b, c):
phi = mtl.parse('G[a, b] x')
assert {x.name for x in phi.params} == {'a', 'b'}
phi2 = phi.set_params({'a': a, 'b': b})
assert phi2.params == set()
assert phi2 == mtl.parse(f'G[{a}, {b}](x)')
@given(MetricTemporalLogicStrategy)
def test_hash_stable(phi):
assert hash(phi) == hash(mtl.parse(str(phi)))

22
mtl/test_parser.py Normal file
View file

@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
from hypothesis import event, given
import mtl
from mtl.hypothesis import MetricTemporalLogicStrategy
@given(MetricTemporalLogicStrategy)
def test_invertable_repr(phi):
event(str(phi))
assert str(phi) == str(mtl.parse(str(phi)))
@given(MetricTemporalLogicStrategy)
def test_hash_inheritance(phi):
assert hash(repr(phi)) == hash(phi)
def test_sugar_smoke():
mtl.parse('(x <-> x)')
mtl.parse('(x -> x)')
mtl.parse('(x ^ x)')

113
mtl/test_utils.py Normal file
View file

@ -0,0 +1,113 @@
import mtl
from mtl.hypothesis import MetricTemporalLogicStrategy
from hypothesis import given
from pytest import raises
CONTEXT = {
mtl.parse('ap1'): mtl.parse('x'),
mtl.parse('ap2'): mtl.parse('(y U z)'),
mtl.parse('ap3'): mtl.parse('x'),
mtl.parse('ap4'): mtl.parse('(x -> y -> z)'),
mtl.parse('ap5'): mtl.parse('(ap1 <-> y <-> z)'),
}
APS = set(CONTEXT.keys())
@given(MetricTemporalLogicStrategy)
def test_f_neg_or_canonical_form(phi):
phi2 = mtl.utils.f_neg_or_canonical_form(phi)
phi3 = mtl.utils.f_neg_or_canonical_form(phi2)
assert phi2 == phi3
assert not any(
isinstance(x, (mtl.ast.G, mtl.ast.And)) for x in phi2.walk())
def test_f_neg_or_canonical_form_not_implemented():
with raises(NotImplementedError):
mtl.utils.f_neg_or_canonical_form(mtl.ast.AST())
def test_inline_context_rigid():
phi = mtl.parse('G ap1')
phi2 = phi.inline_context(CONTEXT)
assert phi2 == mtl.parse('G x')
phi = mtl.parse('G ap5')
phi2 = phi.inline_context(CONTEXT)
assert phi2 == mtl.parse('G(x <-> y <-> z)')
@given(MetricTemporalLogicStrategy)
def test_inline_context(phi):
phi2 = phi.inline_context(CONTEXT)
assert not (APS & phi2.atomic_predicates)
@given(MetricTemporalLogicStrategy, MetricTemporalLogicStrategy)
def test_timed_until_smoke_test(phi1, phi2):
mtl.utils.timed_until(phi1, phi2, lo=2, hi=20)
def test_discretize():
dt = 0.3
phi = mtl.parse('@ ap1')
assert mtl.utils.is_discretizable(phi, dt)
phi2 = mtl.utils.discretize(phi, dt)
phi3 = mtl.utils.discretize(phi2, dt)
assert phi2 == phi3
phi = mtl.parse('G[0.3, 1.2] F[0.6, 1.5] ap1')
assert mtl.utils.is_discretizable(phi, dt)
phi2 = mtl.utils.discretize(phi, dt)
phi3 = mtl.utils.discretize(phi2, dt)
assert phi2 == phi3
phi = mtl.parse('G[0.3, 1.4] F[0.6, 1.5] ap1')
assert not mtl.utils.is_discretizable(phi, dt)
phi = mtl.parse('G[0.3, 1.2] F ap1')
assert not mtl.utils.is_discretizable(phi, dt)
phi = mtl.parse('G[0.3, 1.2] (ap1 U ap2)')
assert not mtl.utils.is_discretizable(phi, dt)
phi = mtl.parse('G[0.3, 0.6] ~F[0, 0.3] a')
assert mtl.utils.is_discretizable(phi, dt)
phi2 = mtl.utils.discretize(phi, dt, distribute=True)
phi3 = mtl.utils.discretize(phi2, dt, distribute=True)
assert phi2 == phi3
assert phi2 == mtl.parse(
'(~(@a | @@a) & ~(@@a | @@@a))')
phi = mtl.TOP
assert mtl.utils.is_discretizable(phi, dt)
phi2 = mtl.utils.discretize(phi, dt)
phi3 = mtl.utils.discretize(phi2, dt)
assert phi2 == phi3
phi = mtl.BOT
assert mtl.utils.is_discretizable(phi, dt)
phi2 = mtl.utils.discretize(phi, dt)
phi3 = mtl.utils.discretize(phi2, dt)
assert phi2 == phi3
def test_scope():
dt = 0.3
phi = mtl.parse('@ap1')
assert mtl.utils.scope(phi, dt) == 0.3
phi = mtl.parse('(@@ap1 | ap2)')
assert mtl.utils.scope(phi, dt) == 0.6
phi = mtl.parse('G[0.3, 1.2] F[0.6, 1.5] ap1')
assert mtl.utils.scope(phi, dt) == 1.2 + 1.5
phi = mtl.parse('G[0.3, 1.2] F ap1')
assert mtl.utils.scope(phi, dt) == float('inf')
phi = mtl.parse('G[0.3, 1.2] (ap1 U ap2)')
assert mtl.utils.scope(phi, dt) == float('inf')

172
mtl/utils.py Normal file
View file

@ -0,0 +1,172 @@
import operator as op
from functools import reduce, wraps
from math import isfinite
import traces
import numpy as np
from lenses import bind
import mtl.ast
from mtl.ast import (And, F, G, Interval, Neg, Or, Next, Until,
AtomicPred, _Top, _Bot)
oo = float('inf')
def f_neg_or_canonical_form(phi):
if isinstance(phi, (AtomicPred, _Top, _Bot)):
return phi
children = [f_neg_or_canonical_form(s) for s in phi.children]
if isinstance(phi, (And, G)):
children = [Neg(s) for s in children]
children = tuple(sorted(children, key=str))
if isinstance(phi, Or):
return Or(children)
elif isinstance(phi, And):
return Neg(Or(children))
elif isinstance(phi, Neg):
return Neg(*children)
elif isinstance(phi, Next):
return Next(*children)
elif isinstance(phi, Until):
return Until(*children)
elif isinstance(phi, F):
return F(phi.interval, *children)
elif isinstance(phi, G):
return Neg(F(phi.interval, *children))
else:
raise NotImplementedError
def const_trace(x, start=0):
return traces.TimeSeries([(start, x)], domain=traces.Domain(start, oo))
def require_discretizable(func):
@wraps(func)
def _func(phi, dt, *args, **kwargs):
if 'horizon' not in kwargs:
assert is_discretizable(phi, dt)
return func(phi, dt, *args, **kwargs)
return _func
def scope(phi, dt, *, _t=0, horizon=oo):
if isinstance(phi, Next):
_t += dt
elif isinstance(phi, (G, F)):
_t += phi.interval.upper
elif isinstance(phi, Until):
_t += float('inf')
_scope = max((scope(c, dt, _t=_t) for c in phi.children), default=_t)
return min(_scope, horizon)
# Code to discretize a bounded MTL formula
@require_discretizable
def discretize(phi, dt, distribute=False, *, horizon=None):
if horizon is None:
horizon = oo
phi = _discretize(phi, dt, horizon)
return _distribute_next(phi) if distribute else phi
def _discretize(phi, dt, horizon):
if isinstance(phi, (AtomicPred, _Top, _Bot)):
return phi
if not isinstance(phi, (F, G, Until)):
children = tuple(_discretize(arg, dt, horizon) for arg in phi.children)
if isinstance(phi, (And, Or)):
return bind(phi).args.set(children)
elif isinstance(phi, (Neg, Next)):
return bind(phi).arg.set(children[0])
raise NotImplementedError
elif isinstance(phi, Until):
raise NotImplementedError
# Only remaining cases are G and F
upper = min(phi.interval.upper, horizon)
l, u = round(phi.interval.lower / dt), round(upper / dt)
psis = (next(_discretize(phi.arg, dt, horizon - i), i)
for i in range(l, u + 1))
opf = andf if isinstance(phi, G) else orf
return opf(*psis)
def _interval_discretizable(itvl, dt):
l, u = itvl.lower / dt, itvl.upper / dt
if not (isfinite(l) and isfinite(u)):
return False
return np.isclose(l, round(l)) and np.isclose(u, round(u))
def _distribute_next(phi, i=0):
if isinstance(phi, AtomicPred):
return mtl.utils.next(phi, i=i)
elif isinstance(phi, Next):
return _distribute_next(phi.arg, i=i+1)
children = tuple(_distribute_next(c, i) for c in phi.children)
if isinstance(phi, (And, Or)):
return bind(phi).args.set(children)
elif isinstance(phi, (Neg, Next)):
return bind(phi).arg.set(children[0])
def is_discretizable(phi, dt):
if any(c for c in phi.walk() if isinstance(c, Until)):
return False
return all(
_interval_discretizable(c.interval, dt) for c in phi.walk()
if isinstance(c, (F, G)))
# EDSL
def alw(phi, *, lo=0, hi=float('inf')):
return G(Interval(lo, hi), phi)
def env(phi, *, lo=0, hi=float('inf')):
return F(Interval(lo, hi), phi)
def andf(*args):
return reduce(op.and_, args) if args else mtl.TOP
def orf(*args):
return reduce(op.or_, args) if args else mtl.TOP
def implies(x, y):
return ~x | y
def xor(x, y):
return (x | y) & ~(x & y)
def iff(x, y):
return (x & y) | (~x & ~y)
def next(phi, i=1):
return phi >> i
def timed_until(phi, psi, lo, hi):
return env(psi, lo=lo, hi=hi) & alw(mtl.ast.Until(phi, psi), lo=0, hi=lo)