109 lines
3 KiB
Python
109 lines
3 KiB
Python
from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
|
|
from pypdevs.simulator import Simulator
|
|
|
|
class Root(CoupledDEVS):
|
|
def __init__(self):
|
|
CoupledDEVS.__init__(self, "Root")
|
|
self.listener_A = self.addInPort("listener_A")
|
|
self.output_A = self.addOutPort("output_A")
|
|
self.mini = self.addSubModel(Mini())
|
|
|
|
self.connectPorts(self.listener_A, self.mini.listener_B)
|
|
self.connectPorts(self.mini.output_B, self.output_A)
|
|
|
|
class Mini(CoupledDEVS):
|
|
def __init__(self):
|
|
CoupledDEVS.__init__(self, "Mini")
|
|
self.listener_B = self.addInPort("listener_B")
|
|
self.output_B = self.addOutPort("output_B")
|
|
self.model_one = self.addSubModel(Proc("C"))
|
|
self.model_two = self.addSubModel(Proc("D"))
|
|
|
|
self.connectPorts(self.listener_B, self.model_one.inport)
|
|
self.connectPorts(self.listener_B, self.model_two.inport)
|
|
self.connectPorts(self.model_one.outport, self.output_B)
|
|
|
|
class Proc(AtomicDEVS):
|
|
def __init__(self, name):
|
|
AtomicDEVS.__init__(self, "Proc_%s" % name)
|
|
self.inport = self.addInPort("listener_%s" % name)
|
|
self.outport = self.addOutPort("output_%s" % name)
|
|
self.state = None
|
|
|
|
def intTransition(self):
|
|
return None
|
|
|
|
def extTransition(self, inputs):
|
|
return inputs[self.inport][0]
|
|
|
|
def outputFnc(self):
|
|
return {self.outport: [self.state]}
|
|
|
|
def timeAdvance(self):
|
|
return 0.0 if self.state else float('inf')
|
|
|
|
model = Root()
|
|
sim = Simulator(model)
|
|
sim.setRealTime(True)
|
|
sim.setRealTimePorts({"input_A": model.listener_A,
|
|
"input_B": model.mini.listener_B,
|
|
"input_C": model.mini.model_one.inport,
|
|
"input_D": model.mini.model_two.inport,
|
|
"output_A": model.output_A,
|
|
"output_B": model.mini.output_B,
|
|
"output_C": model.mini.model_one.outport,
|
|
"output_D": model.mini.model_two.outport})
|
|
sim.setRealTimePlatformThreads()
|
|
|
|
def output_on_A(evt):
|
|
global on_A
|
|
on_A = evt[0]
|
|
|
|
def output_on_B(evt):
|
|
global on_B
|
|
on_B = evt[0]
|
|
|
|
sim.setListenPorts(model.output_A, output_on_A)
|
|
sim.setListenPorts(model.mini.output_B, output_on_B)
|
|
|
|
sim.simulate()
|
|
|
|
import time
|
|
|
|
on_A = None
|
|
on_B = None
|
|
sim.realtime_interrupt("input_A 1")
|
|
time.sleep(1)
|
|
|
|
if not (on_A == "1" and on_B == "1"):
|
|
raise Exception("Expected input on A or B output port")
|
|
on_A = None
|
|
on_B = None
|
|
|
|
sim.realtime_interrupt("input_B 2")
|
|
time.sleep(1)
|
|
|
|
if not (on_A == "2" and on_B == "2"):
|
|
print(on_A)
|
|
print(type(on_A))
|
|
print(on_B)
|
|
print(type(on_B))
|
|
raise Exception("Expected input on A and B output port")
|
|
on_A = None
|
|
on_B = None
|
|
|
|
sim.realtime_interrupt("input_C 3")
|
|
time.sleep(1)
|
|
|
|
if not (on_A == "3" and on_B == "3"):
|
|
print(on_A)
|
|
print(on_B)
|
|
raise Exception("Expected input on A or B output port")
|
|
on_A = None
|
|
on_B = None
|
|
|
|
sim.realtime_interrupt("input_D 4")
|
|
time.sleep(1)
|
|
|
|
if not (on_A == None and on_B == None):
|
|
raise Exception("Didn't expect input on A or B output port")
|