mosis24DEVS/test/testmodels/injecting.py
Yentl Van Tendeloo 66a6860316 Initial commit
2016-08-04 17:38:43 +02:00

109 lines
3 KiB
Python

from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
from pypdevs.simulator import Simulator
class Root(CoupledDEVS):
def __init__(self):
CoupledDEVS.__init__(self, "Root")
self.listener_A = self.addInPort("listener_A")
self.output_A = self.addOutPort("output_A")
self.mini = self.addSubModel(Mini())
self.connectPorts(self.listener_A, self.mini.listener_B)
self.connectPorts(self.mini.output_B, self.output_A)
class Mini(CoupledDEVS):
def __init__(self):
CoupledDEVS.__init__(self, "Mini")
self.listener_B = self.addInPort("listener_B")
self.output_B = self.addOutPort("output_B")
self.model_one = self.addSubModel(Proc("C"))
self.model_two = self.addSubModel(Proc("D"))
self.connectPorts(self.listener_B, self.model_one.inport)
self.connectPorts(self.listener_B, self.model_two.inport)
self.connectPorts(self.model_one.outport, self.output_B)
class Proc(AtomicDEVS):
def __init__(self, name):
AtomicDEVS.__init__(self, "Proc_%s" % name)
self.inport = self.addInPort("listener_%s" % name)
self.outport = self.addOutPort("output_%s" % name)
self.state = None
def intTransition(self):
return None
def extTransition(self, inputs):
return inputs[self.inport][0]
def outputFnc(self):
return {self.outport: [self.state]}
def timeAdvance(self):
return 0.0 if self.state else float('inf')
model = Root()
sim = Simulator(model)
sim.setRealTime(True)
sim.setRealTimePorts({"input_A": model.listener_A,
"input_B": model.mini.listener_B,
"input_C": model.mini.model_one.inport,
"input_D": model.mini.model_two.inport,
"output_A": model.output_A,
"output_B": model.mini.output_B,
"output_C": model.mini.model_one.outport,
"output_D": model.mini.model_two.outport})
sim.setRealTimePlatformThreads()
def output_on_A(evt):
global on_A
on_A = evt[0]
def output_on_B(evt):
global on_B
on_B = evt[0]
sim.setListenPorts(model.output_A, output_on_A)
sim.setListenPorts(model.mini.output_B, output_on_B)
sim.simulate()
import time
on_A = None
on_B = None
sim.realtime_interrupt("input_A 1")
time.sleep(1)
if not (on_A == "1" and on_B == "1"):
raise Exception("Expected input on A or B output port")
on_A = None
on_B = None
sim.realtime_interrupt("input_B 2")
time.sleep(1)
if not (on_A == "2" and on_B == "2"):
print(on_A)
print(type(on_A))
print(on_B)
print(type(on_B))
raise Exception("Expected input on A and B output port")
on_A = None
on_B = None
sim.realtime_interrupt("input_C 3")
time.sleep(1)
if not (on_A == "3" and on_B == "3"):
print(on_A)
print(on_B)
raise Exception("Expected input on A or B output port")
on_A = None
on_B = None
sim.realtime_interrupt("input_D 4")
time.sleep(1)
if not (on_A == None and on_B == None):
raise Exception("Didn't expect input on A or B output port")