213 lines
7.8 KiB
Python
213 lines
7.8 KiB
Python
# In this tutorial, we implement the semantics of Petri Nets by means of model transformation.
|
|
# Compared to the previous tutorial, it only introduces one more feature: pivots.
|
|
# Consider the following Petri Net language meta-model:
|
|
|
|
mm_cs = """
|
|
Place:Class
|
|
Transition:Class
|
|
|
|
Place_tokens:AttributeLink (Place -> Integer) {
|
|
optional = False;
|
|
name = "tokens";
|
|
constraint = `get_value(get_target(this)) >= 0`;
|
|
}
|
|
|
|
P2T:Association (Place -> Transition)
|
|
T2P:Association (Transition -> Place)
|
|
|
|
P2T_weight:AttributeLink (P2T -> Integer) {
|
|
optional = False;
|
|
name = "weight";
|
|
constraint = `get_value(get_target(this)) >= 0`;
|
|
}
|
|
|
|
T2P_weight:AttributeLink (T2P -> Integer) {
|
|
optional = False;
|
|
name = "weight";
|
|
constraint = `get_value(get_target(this)) >= 0`;
|
|
}
|
|
"""
|
|
|
|
# We now create the following Petri Net:
|
|
# https://upload.wikimedia.org/wikipedia/commons/4/4d/Two-boundedness-cb.png
|
|
|
|
m_cs = """
|
|
p1:Place { tokens = 0; }
|
|
p2:Place { tokens = 0; }
|
|
cp1:Place { tokens = 2; }
|
|
cp2:Place { tokens = 2; }
|
|
|
|
t1:Transition
|
|
t2:Transition
|
|
t3:Transition
|
|
|
|
:T2P (t1 -> p1) { weight = 1; }
|
|
:P2T (p1 -> t2) { weight = 1; }
|
|
:T2P (t2 -> cp1) { weight = 1; }
|
|
:P2T (cp1 -> t1) { weight = 1; }
|
|
|
|
:T2P (t2 -> p2) { weight = 1; }
|
|
:P2T (p2 -> t3) { weight = 1; }
|
|
:T2P (t3 -> cp2) { weight = 1; }
|
|
:P2T (cp2 -> t2) { weight = 1; }
|
|
"""
|
|
|
|
# The usual...
|
|
|
|
from state.devstate import DevState
|
|
from bootstrap.scd import bootstrap_scd
|
|
from util import loader
|
|
from transformation.ramify import ramify
|
|
from transformation.matcher import match_od
|
|
from transformation.cloner import clone_od
|
|
from transformation import rewriter
|
|
from concrete_syntax.textual_od.renderer import render_od
|
|
from concrete_syntax.common import indent
|
|
from api.od import ODAPI
|
|
|
|
state = DevState()
|
|
mmm = bootstrap_scd(state)
|
|
mm = loader.parse_and_check(state, mm_cs, mmm, "mm")
|
|
m = loader.parse_and_check(state, m_cs, mm, "m")
|
|
|
|
mm_ramified = ramify(state, mm)
|
|
|
|
|
|
# We will now implement Petri Net operational semantics by means of model transformation.
|
|
|
|
|
|
# Look for any transition:
|
|
|
|
lhs_transition_cs = """
|
|
t:RAM_Transition
|
|
"""
|
|
|
|
# But, if that transition has an incoming arc (P2T) from a place with not enough tokens, the transition cannot fire. We can express this as a pattern:
|
|
|
|
lhs_transition_disabled_cs = """
|
|
t:RAM_Transition
|
|
p:RAM_Place
|
|
:RAM_P2T (p -> t) {
|
|
condition = ```
|
|
place = get_source(this)
|
|
tokens = get_slot_value(place, "tokens")
|
|
weight = get_slot_value(this, "weight")
|
|
tokens < weight # True means: cannot fire
|
|
```;
|
|
}
|
|
"""
|
|
|
|
# Parse these patterns:
|
|
lhs_transition = loader.parse_and_check(state, lhs_transition_cs, mm_ramified, "lhs_transition")
|
|
lhs_transition_disabled = loader.parse_and_check(state, lhs_transition_disabled_cs, mm_ramified, "lhs_transition_disabled")
|
|
|
|
# To find enabled transitions, we first match our first pattern (looking for a transition), and then we try to 'grow' that match with our second, "Negative Application Condition" (NAC) pattern. If growing the match with the second pattern is possible, we abort and look for another transition.
|
|
# To grow a match, we use the 'pivot'-argument of the match-function. A pivot is a partial match that needs to be grown.
|
|
# This results in the following generator function:
|
|
|
|
def find_enabled_transitions(m):
|
|
for match in match_od(state, m, mm, lhs_transition, mm_ramified):
|
|
for match_nac in match_od(state, m, mm, lhs_transition_disabled, mm_ramified, pivot=match): # <-- notice the pivot :)
|
|
# transition is disabled
|
|
break # find next transition
|
|
else:
|
|
# we've found an enabled transition:
|
|
yield match
|
|
|
|
# Let's see if it works:
|
|
|
|
enabled = list(find_enabled_transitions(m))
|
|
print("enabled PN transitions:", enabled)
|
|
|
|
|
|
# Next, to fire a transition:
|
|
# - we decrement the number of tokens of every incoming place
|
|
# - we increment the number of tokens of every outgoing place
|
|
# We do this also by growing our match: given an enabled transition (already matched), we match for *any* incoming place, and rewrite that place to reduce its tokens. Next, we look for *any* outgoing place, and increment its tokens.
|
|
|
|
# Decrement incoming
|
|
lhs_incoming_cs = """
|
|
t:RAM_Transition # <-- we already know this transition is enabled
|
|
inplace:RAM_Place {
|
|
RAM_tokens = `True`; # this needs to be here, otherwise, the rewriter will try to create a new attribute rather than update the existing one
|
|
}
|
|
inarc:RAM_P2T (inplace -> t)
|
|
"""
|
|
rhs_incoming_cs = """
|
|
t:RAM_Transition
|
|
inplace:RAM_Place {
|
|
RAM_tokens = ```
|
|
weight = get_slot_value(matched("inarc"), "weight")
|
|
print("adding", weight, "tokens to", get_name(this))
|
|
get_value(this) - weight
|
|
```;
|
|
}
|
|
inarc:RAM_P2T (inplace -> t)
|
|
"""
|
|
|
|
# Increment outgoing
|
|
lhs_outgoing_cs = """
|
|
t:RAM_Transition
|
|
outplace:RAM_Place {
|
|
RAM_tokens = `True`; # this needs to be here, otherwise, the rewriter will try to create a new attribute rather than update the existing one
|
|
}
|
|
outarc:RAM_T2P (t -> outplace)
|
|
"""
|
|
rhs_outgoing_cs = """
|
|
t:RAM_Transition
|
|
outplace:RAM_Place {
|
|
RAM_tokens = ```
|
|
weight = get_slot_value(matched("outarc"), "weight")
|
|
print("removing", weight, "tokens from", get_name(this))
|
|
get_value(this) + weight
|
|
```;
|
|
}
|
|
outarc:RAM_T2P (t -> outplace)
|
|
"""
|
|
|
|
# Parse all the patterns
|
|
lhs_incoming = loader.parse_and_check(state, lhs_incoming_cs, mm_ramified, "lhs_incoming")
|
|
rhs_incoming = loader.parse_and_check(state, rhs_incoming_cs, mm_ramified, "rhs_incoming")
|
|
lhs_outgoing = loader.parse_and_check(state, lhs_outgoing_cs, mm_ramified, "lhs_outgoing")
|
|
rhs_outgoing = loader.parse_and_check(state, rhs_outgoing_cs, mm_ramified, "rhs_outgoing")
|
|
|
|
# Firing is really simple:
|
|
def fire_transition(m, transition_match):
|
|
for match_incoming in match_od(state, m, mm, lhs_incoming, mm_ramified, pivot=transition_match):
|
|
rewriter.rewrite(state, rhs_incoming, mm_ramified, match_incoming, m, mm)
|
|
for match_outgoing in match_od(state, m, mm, lhs_outgoing, mm_ramified, pivot=transition_match):
|
|
rewriter.rewrite(state, rhs_outgoing, mm_ramified, match_outgoing, m, mm)
|
|
|
|
def show_petri_net(m):
|
|
odapi = ODAPI(state, m, mm)
|
|
p1 = odapi.get_slot_value(odapi.get("p1"), "tokens")
|
|
p2 = odapi.get_slot_value(odapi.get("p2"), "tokens")
|
|
cp1 = odapi.get_slot_value(odapi.get("cp1"), "tokens")
|
|
cp2 = odapi.get_slot_value(odapi.get("cp2"), "tokens")
|
|
return f"""
|
|
t1 t2 t3
|
|
┌─┐ p1 ┌─┐ p2 ┌─┐
|
|
│ │ --- │ │ --- │ │
|
|
│ ├─────► ( {p1} )─────►│ │─────► ( {p2} )─────►│ │
|
|
└─┘ --- └─┘ --- └─┘
|
|
▲ │ ▲ │
|
|
│ │ │ │
|
|
│ │ │ │
|
|
│ │ │ │
|
|
│ --- │ │ --- │
|
|
└───────( {cp1} )◄──────┘ └──────( {cp2} )◄───────┘
|
|
--- ---
|
|
cp1 cp2 """
|
|
|
|
# Let's see if it works:
|
|
while len(enabled) > 0:
|
|
print(show_petri_net(m))
|
|
print("\nenabled PN transitions:", enabled)
|
|
to_fire = enabled[0]['t']
|
|
print("press ENTER to fire", to_fire)
|
|
input()
|
|
print("firing transition:", to_fire)
|
|
fire_transition(m, enabled[0])
|
|
enabled = list(find_enabled_transitions(m))
|
|
|
|
# That's it!
|