removed robustess calculations + finish porting to traces library
This commit is contained in:
parent
3a7ac439f1
commit
4871f993c0
5 changed files with 6 additions and 282 deletions
|
|
@ -7,4 +7,3 @@ from stl.parser import parse
|
||||||
from stl.fastboolean_eval import pointwise_sat
|
from stl.fastboolean_eval import pointwise_sat
|
||||||
from stl.synth import lex_param_project
|
from stl.synth import lex_param_project
|
||||||
from stl.types import STL
|
from stl.types import STL
|
||||||
from stl.robustness import pointwise_robustness
|
|
||||||
|
|
|
||||||
|
|
@ -1,72 +0,0 @@
|
||||||
# TODO: technically incorrect on 0 robustness since conflates < and >
|
|
||||||
|
|
||||||
from functools import singledispatch
|
|
||||||
from operator import sub, add
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
from lenses import lens
|
|
||||||
|
|
||||||
import stl.ast
|
|
||||||
|
|
||||||
oo = float('inf')
|
|
||||||
|
|
||||||
@singledispatch
|
|
||||||
def pointwise_robustness(stl):
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
|
|
||||||
@pointwise_robustness.register(stl.Or)
|
|
||||||
def _(stl):
|
|
||||||
return lambda x, t: max(pointwise_robustness(arg)(x, t) for arg in stl.args)
|
|
||||||
|
|
||||||
|
|
||||||
@pointwise_robustness.register(stl.And)
|
|
||||||
def _(stl):
|
|
||||||
return lambda x, t: min(pointwise_robustness(arg)(x, t) for arg in stl.args)
|
|
||||||
|
|
||||||
|
|
||||||
@pointwise_robustness.register(stl.F)
|
|
||||||
def _(stl):
|
|
||||||
lo, hi = stl.interval
|
|
||||||
return lambda x, t: max((pointwise_robustness(stl.arg)(x, t + t2)
|
|
||||||
for t2 in x[lo:hi].index), default=-oo)
|
|
||||||
|
|
||||||
|
|
||||||
@pointwise_robustness.register(stl.G)
|
|
||||||
def _(stl):
|
|
||||||
lo, hi = stl.interval
|
|
||||||
return lambda x, t: min((pointwise_robustness(stl.arg)(x, t + t2)
|
|
||||||
for t2 in x[lo:hi].index), default=oo)
|
|
||||||
|
|
||||||
|
|
||||||
@pointwise_robustness.register(stl.Neg)
|
|
||||||
def _(stl):
|
|
||||||
return lambda x, t: -pointwise_robustness(stl.arg)(x, t)
|
|
||||||
|
|
||||||
|
|
||||||
op_lookup = {
|
|
||||||
">": sub,
|
|
||||||
">=": sub,
|
|
||||||
"<": lambda x, y: sub(y, x),
|
|
||||||
"<=": lambda x, y: sub(y, x),
|
|
||||||
"=": lambda a, b: -abs(a - b),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@pointwise_robustness.register(stl.LinEq)
|
|
||||||
def _(stl):
|
|
||||||
op = op_lookup[stl.op]
|
|
||||||
return lambda x, t: op(eval_terms(stl, x, t), stl.const)
|
|
||||||
|
|
||||||
|
|
||||||
def eval_terms(lineq, x, t):
|
|
||||||
psi = lens(lineq).terms.each_().modify(eval_term(x, t))
|
|
||||||
return sum(psi.terms)
|
|
||||||
|
|
||||||
|
|
||||||
def eval_term(x, t):
|
|
||||||
def _eval_term(term):
|
|
||||||
coeff = float(term.coeff) if term.coeff.is_number else term.coeff
|
|
||||||
return coeff*np.interp(t, x.index, x[term.id.name])
|
|
||||||
return _eval_term
|
|
||||||
|
|
@ -1,140 +0,0 @@
|
||||||
# TODO: technically incorrect on 0 robustness since conflates < and >
|
|
||||||
|
|
||||||
from functools import singledispatch
|
|
||||||
from collections import namedtuple
|
|
||||||
|
|
||||||
import sympy as sym
|
|
||||||
from numpy import arange
|
|
||||||
from funcy import pairwise
|
|
||||||
from lenses import lens
|
|
||||||
|
|
||||||
import stl.ast
|
|
||||||
from stl.ast import t_sym
|
|
||||||
from stl.utils import walk, f_neg_or_canonical_form
|
|
||||||
from stl.robustness import op_lookup
|
|
||||||
|
|
||||||
Param = namedtuple("Param", ["L", "h", "B", "eps"])
|
|
||||||
|
|
||||||
@singledispatch
|
|
||||||
def node_base(_, _1, _2):
|
|
||||||
return sym.E
|
|
||||||
|
|
||||||
|
|
||||||
@node_base.register(stl.ast.Or)
|
|
||||||
def _(phi, eps, _1):
|
|
||||||
return len(phi.args)**(1/eps)
|
|
||||||
|
|
||||||
|
|
||||||
@node_base.register(stl.ast.F)
|
|
||||||
def _(phi, eps, L):
|
|
||||||
lo, hi = phi.interval
|
|
||||||
return sym.ceiling((hi - lo)*L/eps)**(2/eps)
|
|
||||||
|
|
||||||
|
|
||||||
def sample_rate(eps, L):
|
|
||||||
return eps / L
|
|
||||||
|
|
||||||
|
|
||||||
def admissible_params(phi, eps, L):
|
|
||||||
h = sample_rate(eps, L)
|
|
||||||
B = max(node_base(n, eps, L) for n in walk(phi))
|
|
||||||
return B, h
|
|
||||||
|
|
||||||
|
|
||||||
def symbolic_params(phi, eps, L):
|
|
||||||
L = sym.Dummy("L")
|
|
||||||
eps = sym.Dummy("\epsilon")
|
|
||||||
return Param(
|
|
||||||
L=L,
|
|
||||||
h=sample_rate(eps, L),
|
|
||||||
B=sym.Dummy("B")(eps, sym.Dummy("\phi")),
|
|
||||||
eps=eps,
|
|
||||||
)
|
|
||||||
|
|
||||||
def smooth_robustness(phi, *, L=None, eps=None):
|
|
||||||
phi = f_neg_or_canonical_form(phi)
|
|
||||||
p = symbolic_params(phi, eps, L)
|
|
||||||
lo, hi = beta(phi, p), alpha(phi, p)
|
|
||||||
subs = {}
|
|
||||||
if L is not None:
|
|
||||||
subs[p.L] = L
|
|
||||||
if eps is not None:
|
|
||||||
subs[p.eps] = eps
|
|
||||||
if L is not None and eps is not None:
|
|
||||||
B, h = admissible_params(phi, eps, L)
|
|
||||||
subs[p.B] = B
|
|
||||||
subs[p.h] = h
|
|
||||||
lo, hi = lo.subs(subs), hi.subs(subs)
|
|
||||||
else:
|
|
||||||
B = p.B
|
|
||||||
|
|
||||||
return sym.log(lo, B), sym.log(hi, B)
|
|
||||||
|
|
||||||
|
|
||||||
# Alpha implementation
|
|
||||||
|
|
||||||
@singledispatch
|
|
||||||
def alpha(stl, p):
|
|
||||||
raise NotImplementedError("Call canonicalization function")
|
|
||||||
|
|
||||||
def eval_terms(lineq):
|
|
||||||
return sum(map(eval_term, lineq.terms))
|
|
||||||
|
|
||||||
|
|
||||||
def eval_term(term):
|
|
||||||
return term.coeff*sym.Function(term.id.name)(t_sym)
|
|
||||||
|
|
||||||
|
|
||||||
@alpha.register(stl.LinEq)
|
|
||||||
def _(phi, p):
|
|
||||||
op = op_lookup[phi.op]
|
|
||||||
x = op(eval_terms(phi), phi.const)
|
|
||||||
return p.B**x
|
|
||||||
|
|
||||||
|
|
||||||
@alpha.register(stl.Neg)
|
|
||||||
def _(phi, p):
|
|
||||||
return 1/beta(phi.arg, p)
|
|
||||||
|
|
||||||
|
|
||||||
@alpha.register(stl.Or)
|
|
||||||
def _(phi, p):
|
|
||||||
return sum(alpha(psi, p) for psi in phi.args)
|
|
||||||
|
|
||||||
|
|
||||||
def F_params(phi, p, r):
|
|
||||||
hi, lo = phi.interval
|
|
||||||
N = sym.ceiling((hi - lo) / p.h)
|
|
||||||
x = lambda k: r.subs({t_sym: t_sym+k+lo})
|
|
||||||
i = sym.Dummy("i")
|
|
||||||
return N, i, x
|
|
||||||
|
|
||||||
|
|
||||||
@alpha.register(stl.F)
|
|
||||||
def _(phi, p):
|
|
||||||
N, i, x = F_params(phi, p, alpha(phi.arg, p))
|
|
||||||
x_ij = sym.sqrt(p.B**(p.L*p.h)*x(i)*x(i+1))
|
|
||||||
return sym.summation(x_ij, (i, 0, N-1))
|
|
||||||
|
|
||||||
# Beta implementation
|
|
||||||
|
|
||||||
@singledispatch
|
|
||||||
def beta(phi, p):
|
|
||||||
raise NotImplementedError("Call canonicalization function")
|
|
||||||
|
|
||||||
beta.register(stl.LinEq)(alpha)
|
|
||||||
|
|
||||||
@beta.register(stl.Neg)
|
|
||||||
def _(phi, p):
|
|
||||||
return 1/alpha(phi.arg, p)
|
|
||||||
|
|
||||||
|
|
||||||
@beta.register(stl.Or)
|
|
||||||
def _(phi, p):
|
|
||||||
return alpha(phi, p)/len(phi.args)
|
|
||||||
|
|
||||||
|
|
||||||
@beta.register(stl.F)
|
|
||||||
def _(phi, p):
|
|
||||||
N, i, x = F_params(phi, p, beta(phi.arg, p))
|
|
||||||
return sym.summation(x(i), (i, 0, N))
|
|
||||||
|
|
@ -1,56 +0,0 @@
|
||||||
import stl
|
|
||||||
import stl.boolean_eval
|
|
||||||
import stl.robustness
|
|
||||||
import stl.smooth_robustness
|
|
||||||
import pandas as pd
|
|
||||||
from nose2.tools import params
|
|
||||||
import unittest
|
|
||||||
from sympy import Symbol
|
|
||||||
|
|
||||||
oo = float('inf')
|
|
||||||
|
|
||||||
ex1 = ("2*A > 3", -1)
|
|
||||||
ex2 = ("F[0, 1](2*A > 3)", 5)
|
|
||||||
ex3 = ("F[1, 0](2*A > 3)", -oo)
|
|
||||||
ex4 = ("G[1, 0](2*A > 3)", oo)
|
|
||||||
ex5 = ("(A < 0)", -1)
|
|
||||||
ex6 = ("G[0, 0.1](A < 0)", -1)
|
|
||||||
x = pd.DataFrame([[1,2], [1,4], [4,2]], index=[0,0.1,0.2],
|
|
||||||
columns=["A", "B"])
|
|
||||||
|
|
||||||
|
|
||||||
class TestSTLRobustness(unittest.TestCase):
|
|
||||||
@params(ex1, ex2, ex3, ex4, ex5, ex6)
|
|
||||||
def test_robustness_sign(self, phi_str, _):
|
|
||||||
phi = stl.parse(phi_str)
|
|
||||||
stl_eval = stl.boolean_eval.pointwise_sat(phi)
|
|
||||||
stl_eval2 = stl.robustness.pointwise_robustness(phi)
|
|
||||||
r = stl_eval2(x, 0)
|
|
||||||
assert (r == 0 or ((r > 0) == stl_eval(x, 0)))
|
|
||||||
|
|
||||||
|
|
||||||
@params(ex1, ex2, ex3, ex4, ex5, ex6)
|
|
||||||
def test_robustness_value(self, phi_str, r):
|
|
||||||
phi = stl.parse(phi_str)
|
|
||||||
r1 = stl.robustness.pointwise_robustness(phi)(x, 0)
|
|
||||||
r2 = stl.robustness.pointwise_robustness(~phi)(x, 0)
|
|
||||||
self.assertEqual(r1, r)
|
|
||||||
self.assertEqual(r1, -r2)
|
|
||||||
|
|
||||||
|
|
||||||
@params(ex1, ex2, ex3, ex4, ex5, ex6)
|
|
||||||
def test_eps_robustness(self, phi_str, r):
|
|
||||||
phi = stl.parse(phi_str)
|
|
||||||
r = stl.robustness.pointwise_robustness(phi)(x, 0)
|
|
||||||
lo, hi = stl.smooth_robustness.smooth_robustness(phi, L=1, eps=0.1)
|
|
||||||
# hi - lo <= eps
|
|
||||||
# lo <= r <= hi
|
|
||||||
#raise NotImplementedError
|
|
||||||
|
|
||||||
|
|
||||||
@params(ex1, ex2, ex3, ex4, ex5, ex6)
|
|
||||||
def test_interval_polarity(self, phi_str, r):
|
|
||||||
phi = stl.parse(phi_str)
|
|
||||||
lo, hi = stl.smooth_robustness.smooth_robustness(phi, L=1, eps=0.1)
|
|
||||||
# hi - lo > 0
|
|
||||||
#raise NotImplementedError
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
import stl
|
import stl
|
||||||
import stl.robustness
|
|
||||||
import stl.synth
|
import stl.synth
|
||||||
import pandas as pd
|
import traces
|
||||||
from nose2.tools import params
|
from nose2.tools import params
|
||||||
import unittest
|
import unittest
|
||||||
from sympy import Symbol
|
from sympy import Symbol
|
||||||
|
|
@ -20,8 +19,11 @@ ex5 = ("F[0, b?](A > 0)", ("b?",), {"b?": (0.1, 5)},
|
||||||
ex6 = ("(A > a?) or (A > b?)", ("a?", "b?",), {"a?": (0, 2), "b?": (0, 2)},
|
ex6 = ("(A > a?) or (A > b?)", ("a?", "b?",), {"a?": (0, 2), "b?": (0, 2)},
|
||||||
{"a?": False, "b?": False}, {"a?": 2, "b?": 1})
|
{"a?": False, "b?": False}, {"a?": 2, "b?": 1})
|
||||||
|
|
||||||
x = pd.DataFrame([[1,2], [1,4], [4,2]], index=[0,0.1,0.2],
|
x = {
|
||||||
columns=["A", "B"])
|
"A": traces.TimeSeries([(0, 1), (0.1, 1), (0.2, 4)]),
|
||||||
|
"B": traces.TimeSeries([(0, 2), (0.1, 4), (0.2, 2)]),
|
||||||
|
"C": traces.TimeSeries([(0, True), (0.1, True), (0.2, False)]),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class TestSTLRobustness(unittest.TestCase):
|
class TestSTLRobustness(unittest.TestCase):
|
||||||
|
|
@ -31,15 +33,6 @@ class TestSTLRobustness(unittest.TestCase):
|
||||||
val2 = stl.synth.lex_param_project(
|
val2 = stl.synth.lex_param_project(
|
||||||
phi, x, order=order, ranges=ranges, polarity=polarity)
|
phi, x, order=order, ranges=ranges, polarity=polarity)
|
||||||
|
|
||||||
phi2 = stl.utils.set_params(phi, val2)
|
|
||||||
phi3 = stl.utils.set_params(phi, val)
|
|
||||||
|
|
||||||
stl_eval = stl.robustness.pointwise_robustness(phi2)
|
|
||||||
stl_eval2 = stl.robustness.pointwise_robustness(phi3)
|
|
||||||
|
|
||||||
# check that the robustnesses are almost the same
|
|
||||||
self.assertAlmostEqual(stl_eval(x, 0), stl_eval2(x, 0), delta=0.01)
|
|
||||||
|
|
||||||
# check that the valuations are almost the same
|
# check that the valuations are almost the same
|
||||||
for var in order:
|
for var in order:
|
||||||
self.assertAlmostEqual(val2[var], val[var], delta=0.01)
|
self.assertAlmostEqual(val2[var], val[var], delta=0.01)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue