157 lines
6.5 KiB
Python
157 lines
6.5 KiB
Python
import abc
|
|
from difflib import ndiff
|
|
|
|
from lib.controller import Controller, pretty_time
|
|
from lib.tracer import Tracer
|
|
from lib.yakindu_helpers import YakinduTimerServiceAdapter, trace_output_events
|
|
|
|
class AbstractEnvironmentState:
|
|
# should return the new state after handling the event
|
|
@abc.abstractmethod
|
|
def handle_event(self, event_name, param):
|
|
pass
|
|
# should compare states *by value*
|
|
@abc.abstractmethod
|
|
def __eq__(self, other):
|
|
pass
|
|
|
|
# # Can we ignore event in 'trace' at position 'idx' with respect to idempotency?
|
|
# def can_ignore(trace, idx):
|
|
# (timestamp, event_name, value) = trace[idx]
|
|
# if event_name in IDEMPOTENT:
|
|
# # If the same event occurred earlier, with the same parameter value, then this event can be ignored:
|
|
# for (earlier_timestamp, earlier_event_name, earlier_value) in reversed(trace[0:idx]):
|
|
# if (earlier_event_name, earlier_value) == (event_name, value):
|
|
# # same event name and same parameter value (timestamps allowed to differ)
|
|
# return True
|
|
# elif event_name == earlier_event_name:
|
|
# # same event name, but different parameter value:
|
|
# # stop looking into the past:
|
|
# break
|
|
# # If the same event occurs later event, but with the same timestamp, this event is overwritten and can be ignored:
|
|
# for (later_timestamp, later_event_name, later_value) in trace[idx+1:]:
|
|
# if (later_timestamp, later_event_name) == (timestamp, event_name):
|
|
# # if a later event with same name and timestamp occurs, ours will be overwritten:
|
|
# return True
|
|
# if later_timestamp != timestamp:
|
|
# # no need to look further into the future:
|
|
# break
|
|
# return False
|
|
|
|
def postprocess_trace(trace, environment_class):
|
|
env_state = environment_class()
|
|
filtered_trace = []
|
|
# Remove events that have no effect:
|
|
for timestamp, event_name, param in trace:
|
|
new_env_state = env_state.handle_event(event_name, param)
|
|
if new_env_state != env_state:
|
|
# event had an effect
|
|
filtered_trace.append((timestamp, event_name, param))
|
|
env_state = new_env_state
|
|
return filtered_trace
|
|
|
|
def compare_traces(expected, actual):
|
|
i = 0
|
|
while i < len(expected) and i < len(actual):
|
|
# Compare tuples:
|
|
if expected[i] != actual[i]:
|
|
print("Traces differ!")
|
|
# print("expected: (%i, \"%s\", %s)" % expected[i])
|
|
# print("actual: (%i, \"%s\", %s)" % actual[i])
|
|
return False
|
|
i += 1
|
|
if len(expected) != len(actual):
|
|
print("Traces have different length:")
|
|
print("expected length: %i" % len(expected))
|
|
print("actual length: %i" % len(actual))
|
|
return False
|
|
print("Traces match.")
|
|
return True
|
|
|
|
def run_scenario(input_trace, expected_output_trace, statechart_class, environment_class, verbose=False):
|
|
controller = Controller()
|
|
sc = statechart_class()
|
|
tracer = Tracer(verbose=False)
|
|
controller.input_tracers.append((sc, tracer.record_input_event))
|
|
trace_output_events(controller, sc, callback=tracer.record_output_event)
|
|
sc.timer_service = YakinduTimerServiceAdapter(controller)
|
|
|
|
# Put entire input trace in event queue, ready to go!
|
|
for tup in input_trace:
|
|
(timestamp, event_name, value) = tup
|
|
controller.add_input(sc, event_name, timestamp, value)
|
|
|
|
sc.enter() # enter default state(s)
|
|
|
|
if len(expected_output_trace) > 0:
|
|
last_output_event_timestamp = expected_output_trace[-1][0]
|
|
else:
|
|
last_output_event_timestamp = 0
|
|
|
|
# Blocking synchronous call:
|
|
controller.run_until(last_output_event_timestamp)
|
|
|
|
actual_output_trace = tracer.output_events
|
|
|
|
clean_expected = postprocess_trace(expected_output_trace, environment_class)
|
|
clean_actual = postprocess_trace(actual_output_trace, environment_class)
|
|
|
|
def print_diff():
|
|
# The diff printed will be a diff of the 'raw' traces, not of the cleaned up traces
|
|
# A diff of the cleaned up traces would be confusing to the user.
|
|
have_plus = False
|
|
have_minus = False
|
|
have_useless = False
|
|
for diffline in ndiff(
|
|
[str(tup)+'\n' for tup in expected_output_trace],
|
|
[str(tup)+'\n' for tup in actual_output_trace],
|
|
charjunk=None,
|
|
):
|
|
symbol = diffline[0]
|
|
if symbol == '+':
|
|
have_plus = True
|
|
if symbol == '-':
|
|
have_minus = True
|
|
if symbol == '?':
|
|
continue
|
|
rest = diffline[2:-1] # drop last character (=newline)
|
|
useless_line = (
|
|
symbol == '-' and rest not in [str(tup) for tup in clean_expected]
|
|
or symbol == '+' and rest not in [str(tup) for tup in clean_actual]
|
|
# or symbol == ' ' and rest not in [str(tup) for tup in clean_actual]
|
|
)
|
|
if useless_line:
|
|
print(" (%s) %s" % (symbol, rest))
|
|
have_useless = True
|
|
else:
|
|
print(" %s %s" % (symbol, rest))
|
|
|
|
if have_minus or have_plus or have_useless:
|
|
print("Legend:")
|
|
if have_minus:
|
|
print(" -: expected, but did not happen")
|
|
if have_plus:
|
|
print(" +: happened, but was not expected")
|
|
if have_useless:
|
|
print(" (-) or (+): indicates a \"useless event\" (because it has no effect), either in expected output (-) or in actual output (+).")
|
|
print("\n\"Useless events\" are ignored by the comparison algorithm, and will never cause your test to fail. In this assignment, your solution is allowed to contain useless events.")
|
|
|
|
if not compare_traces(clean_expected, clean_actual):
|
|
# even though we compared the 'normalized' traces, we print the *raw* traces, not to confuse the user!
|
|
print("Raw diff between expected and actual output event trace:")
|
|
print_diff()
|
|
return False
|
|
elif verbose:
|
|
print_diff()
|
|
return True
|
|
|
|
def run_scenarios(scenarios, statechart_class, environment_class, verbose=True):
|
|
ok = True
|
|
for scenario in scenarios:
|
|
print(f"Running scenario: {scenario["name"]}")
|
|
ok = run_scenario(scenario["input_events"], scenario["output_events"], statechart_class, environment_class, verbose=verbose) and ok
|
|
print("--------")
|
|
if ok:
|
|
print("All scenarios passed.")
|
|
else:
|
|
print("Some scenarios failed.")
|