mtl-aas/mtl/utils.py
Marcell Vazquez-Chanlatte 39fe118cb3 remove numpy dependency
2019-02-11 16:35:57 -08:00

113 lines
2.9 KiB
Python

import operator as op
from functools import reduce, wraps
from math import isfinite
from discrete_signals import signal
from mtl import ast
from mtl.ast import (And, G, Neg, Next, WeakUntil,
AtomicPred, _Bot)
oo = float('inf')
def const_trace(x):
return signal([(0, x)], start=0, end=oo)
def require_discretizable(func):
@wraps(func)
def _func(phi, dt, *args, **kwargs):
if 'horizon' not in kwargs:
assert is_discretizable(phi, dt)
return func(phi, dt, *args, **kwargs)
return _func
def scope(phi, dt, *, _t=0, horizon=oo):
if isinstance(phi, Next):
_t += dt
elif isinstance(phi, G):
_t += phi.interval.upper
elif isinstance(phi, WeakUntil):
_t += float('inf')
_scope = max((scope(c, dt, _t=_t) for c in phi.children), default=_t)
return min(_scope, horizon)
# Code to discretize a bounded MTL formula
@require_discretizable
def discretize(phi, dt, distribute=False, *, horizon=None):
if horizon is None:
horizon = oo
phi = _discretize(phi, dt, horizon)
return _distribute_next(phi) if distribute else phi
def _discretize(phi, dt, horizon):
if isinstance(phi, (AtomicPred, _Bot)):
return phi
if not isinstance(phi, (G, WeakUntil)):
children = tuple(_discretize(arg, dt, horizon) for arg in phi.children)
if isinstance(phi, And):
return phi.evolve(args=children)
elif isinstance(phi, (Neg, Next)):
return phi.evolve(arg=children[0])
raise NotImplementedError
elif isinstance(phi, WeakUntil):
raise NotImplementedError
# Only remaining cases are G and F
upper = min(phi.interval.upper, horizon)
l, u = round(phi.interval.lower / dt), round(upper / dt)
psis = (_discretize(phi.arg, dt, horizon - i) >> i
for i in range(l, u + 1))
opf = andf if isinstance(phi, G) else orf
return opf(*psis)
def _interval_discretizable(itvl, dt):
l, u = itvl.lower / dt, itvl.upper / dt
if not (isfinite(l) and isfinite(u)):
return False
return max(abs(l - round(l)), abs(u - round(u)))
def _distribute_next(phi, i=0):
if isinstance(phi, AtomicPred):
return phi >> i
elif isinstance(phi, Next):
return _distribute_next(phi.arg, i=i+1)
children = tuple(_distribute_next(c, i) for c in phi.children)
if isinstance(phi, And):
return phi.evolve(args=children)
elif isinstance(phi, (Neg, Next)):
return phi.evolve(arg=children[0])
def is_discretizable(phi, dt):
if any(c for c in phi.walk() if isinstance(c, WeakUntil)):
return False
return all(
_interval_discretizable(c.interval, dt) for c in phi.walk()
if isinstance(c, G))
def andf(*args):
return reduce(op.and_, args) if args else ast.TOP
def orf(*args):
return reduce(op.or_, args) if args else ast.TOP