mosis24DEVS/test/testmodels/trafficLightModel.py
Yentl Van Tendeloo 66a6860316 Initial commit
2016-08-04 17:38:43 +02:00

339 lines
9.6 KiB
Python

# -*- coding: Latin-1 -*-
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
# trafficLight.py --- simple Traffic Light example
# --------------------------------
# October 2005
# Hans Vangheluwe
# McGill University (Montréal)
# --------------------------------
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
# Add the directory where pydevs lives to Python's import path
import sys
# Import code for DEVS model representation:
from pypdevs.infinity import *
from pypdevs.DEVS import *
# Import for uniform random number generators
from random import uniform
from random import randint
# ====================================================================== #
class TrafficLightMode:
"""Encapsulates the system's state
"""
###
def __init__(self, current="red"):
"""Constructor (parameterizable).
"""
self.set(current)
def set(self, value="red"):
self.__colour=value
def get(self):
return self.__colour
def __str__(self):
return self.get()
class TrafficLight(AtomicDEVS):
"""A traffic light
"""
###
def __init__(self, name=None):
"""Constructor (parameterizable).
"""
# Always call parent class' constructor FIRST:
AtomicDEVS.__init__(self, name)
# STATE:
# Define 'state' attribute (initial sate):
self.state = TrafficLightMode("red")
# PORTS:
# Declare as many input and output ports as desired
# (usually store returned references in local variables):
self.INTERRUPT = self.addInPort(name="INTERRUPT")
self.OBSERVED = self.addOutPort(name="OBSERVED")
###
def extTransition(self, inputs):
"""External Transition Function."""
# Compute the new state 'Snew' based (typically) on current
# State, Elapsed time parameters and calls to 'self.peek(self.IN)'.
#input = self.peek(self.INTERRUPT)
input = inputs[self.INTERRUPT][0]
state = self.state.get()
if input == "toManual":
if state == "manual":
# staying in manual mode
return TrafficLightMode("manual")
if state in ("red", "green", "yellow"):
return TrafficLightMode("manual")
else:
raise DEVSException(\
"unknown state <%s> in TrafficLight external transition function"\
% state)
if input == "toAutonomous":
if state == "manual":
return TrafficLightMode("red")
else:
raise DEVSException(\
"unknown state <%s> in TrafficLight external transition function"\
% state)
raise DEVSException(\
"unknown input <%s> in TrafficLight external transition function"\
% input)
###
def intTransition(self):
"""Internal Transition Function.
"""
state = self.state.get()
if state == "red":
return TrafficLightMode("green")
elif state == "green":
return TrafficLightMode("yellow")
elif state == "yellow":
return TrafficLightMode("red")
else:
raise DEVSException(\
"unknown state <%s> in TrafficLight internal transition function"\
% state)
###
def outputFnc(self):
"""Output Funtion.
"""
# A colourblind observer sees "grey" instead of "red" or "green".
# BEWARE: ouput is based on the OLD state
# and is produced BEFORE making the transition.
# We'll encode an "observation" of the state the
# system will transition to !
# Send messages (events) to a subset of the atomic-DEVS'
# output ports by means of the 'poke' method, i.e.:
# The content of the messages is based (typically) on current State.
state = self.state.get()
if state == "red":
return {self.OBSERVED: ["grey"]}
#self.poke(self.OBSERVED, "grey")
# NOT self.poke(self.OBSERVED, "grey")
elif state == "green":
return {self.OBSERVED: ["yellow"]}
#self.poke(self.OBSERVED, "yellow")
# NOT self.poke(self.OBSERVED, "grey")
elif state == "yellow":
return {self.OBSERVED: ["grey"]}
#self.poke(self.OBSERVED, "grey")
# NOT self.poke(self.OBSERVED, "yellow")
else:
raise DEVSException(\
"unknown state <%s> in TrafficLight external transition function"\
% state)
###
def timeAdvance(self):
"""Time-Advance Function.
"""
# Compute 'ta', the time to the next scheduled internal transition,
# based (typically) on current State.
state = self.state.get()
if state == "red":
return 3
elif state == "green":
return 2
elif state == "yellow":
return 1
elif state == "manual":
return INFINITY
else:
raise DEVSException(\
"unknown state <%s> in TrafficLight time advance transition function"\
% state)
# ====================================================================== #
class PolicemanMode:
"""Encapsulates the Policeman's state
"""
###
def __init__(self, current="idle"):
"""Constructor (parameterizable).
"""
self.set(current)
def set(self, value="idle"):
self.__mode=value
def get(self):
return self.__mode
def __str__(self):
return self.get()
class Policeman(AtomicDEVS):
"""A policeman producing "toManual" and "toAutonomous" events:
"toManual" when going from "idle" to "working" mode
"toAutonomous" when going from "working" to "idle" mode
"""
###
def __init__(self, name=None):
"""Constructor (parameterizable).
"""
# Always call parent class' constructor FIRST:
AtomicDEVS.__init__(self, name)
# STATE:
# Define 'state' attribute (initial sate):
self.state = PolicemanMode("idle")
# ELAPSED TIME:
# Initialize 'elapsed time' attribute if required
# (by default, value is 0.0):
self.elapsed = 0
# PORTS:
# Declare as many input and output ports as desired
# (usually store returned references in local variables):
self.OUT = self.addOutPort(name="OUT")
###
# Autonomous system (no input ports),
# so no External Transition Function required
#
###
def intTransition(self):
"""Internal Transition Function.
The policeman works forever, so only one mode.
"""
state = self.state.get()
if state == "idle":
return PolicemanMode("working")
elif state == "working":
return PolicemanMode("idle")
else:
raise DEVSException(\
"unknown state <%s> in Policeman internal transition function"\
% state)
###
def outputFnc(self):
"""Output Funtion.
"""
# Send messages (events) to a subset of the atomic-DEVS'
# output ports by means of the 'poke' method, i.e.:
# The content of the messages is based (typically) on current State.
state = self.state.get()
if state == "idle":
return {self.OUT: ["toManual"]}
#self.poke(self.OUT, "toManual")
elif state == "working":
return {self.OUT: ["toAutonomous"]}
#self.poke(self.OUT, "toAutonomous")
else:
raise DEVSException(\
"unknown state <%s> in Policeman output function"\
% state)
###
def timeAdvance(self):
"""Time-Advance Function.
"""
# Compute 'ta', the time to the next scheduled internal transition,
# based (typically) on current State.
state = self.state.get()
if state == "idle":
return 200
elif state == "working":
return 100
else:
raise DEVSException(\
"unknown state <%s> in Policeman time advance function"\
% state)
# ====================================================================== #
class TrafficSystem(CoupledDEVS):
def __init__(self, name=None):
""" A simple traffic system consisting of a Policeman and a TrafficLight.
"""
# Always call parent class' constructor FIRST:
CoupledDEVS.__init__(self, name)
# Declare the coupled model's output ports:
# Autonomous, so no output ports
#self.OUT = self.addOutPort(name="OUT")
# Declare the coupled model's sub-models:
# The Policeman generating interrupts
self.policeman = self.addSubModel(Policeman(name="policeman"))
# The TrafficLight
self.trafficLight = self.addSubModel(TrafficLight(name="trafficLight"))
# Only connect ...
self.connectPorts(self.policeman.OUT, self.trafficLight.INTERRUPT)
#self.connectPorts(self.trafficLight.OBSERVED, self.OUT)
def select(self, immList):
"""Give the Policeman highest priority.
Note how the technique used below can
be generalized to encode priorities based on model type.
To distinguish between models of the same type,
the name or unique ID should be used.
"""
# return the first Policeman instance in the immList
for i in range(len(immList)):
#if immList[i].__class__ == TrafficLight:
if immList[i].__class__ == Policeman:
return immList[i]
# if no Policeman instances found, return the last entry
return immList[-1]
# Alternative: randomly choose among imminent submodels
#return immList[randint(0,len(immList))]
# If the select method is not defined, the immList[0]
# will be chosen by default.
# ====================================================================== #