muMLE/tutorial/05_advanced_transformation.py

213 lines
7.8 KiB
Python

# In this tutorial, we implement the semantics of Petri Nets by means of model transformation.
# Compared to the previous tutorial, it only introduces one more feature: pivots.
# Consider the following Petri Net language meta-model:
mm_cs = """
Place:Class
Transition:Class
Place_tokens:AttributeLink (Place -> Integer) {
optional = False;
name = "tokens";
constraint = `get_value(get_target(this)) >= 0`;
}
P2T:Association (Place -> Transition)
T2P:Association (Transition -> Place)
P2T_weight:AttributeLink (P2T -> Integer) {
optional = False;
name = "weight";
constraint = `get_value(get_target(this)) >= 0`;
}
T2P_weight:AttributeLink (T2P -> Integer) {
optional = False;
name = "weight";
constraint = `get_value(get_target(this)) >= 0`;
}
"""
# We now create the following Petri Net:
# https://upload.wikimedia.org/wikipedia/commons/4/4d/Two-boundedness-cb.png
m_cs = """
p1:Place { tokens = 0; }
p2:Place { tokens = 0; }
cp1:Place { tokens = 2; }
cp2:Place { tokens = 2; }
t1:Transition
t2:Transition
t3:Transition
:T2P (t1 -> p1) { weight = 1; }
:P2T (p1 -> t2) { weight = 1; }
:T2P (t2 -> cp1) { weight = 1; }
:P2T (cp1 -> t1) { weight = 1; }
:T2P (t2 -> p2) { weight = 1; }
:P2T (p2 -> t3) { weight = 1; }
:T2P (t3 -> cp2) { weight = 1; }
:P2T (cp2 -> t2) { weight = 1; }
"""
# The usual...
from state.devstate import DevState
from bootstrap.scd import bootstrap_scd
from util import loader
from transformation.ramify import ramify
from transformation.matcher import match_od
from transformation.cloner import clone_od
from transformation import rewriter
from concrete_syntax.textual_od.renderer import render_od
from concrete_syntax.common import indent
from api.od import ODAPI
state = DevState()
mmm = bootstrap_scd(state)
mm = loader.parse_and_check(state, mm_cs, mmm, "mm")
m = loader.parse_and_check(state, m_cs, mm, "m")
mm_ramified = ramify(state, mm)
# We will now implement Petri Net operational semantics by means of model transformation.
# Look for any transition:
lhs_transition_cs = """
t:RAM_Transition
"""
# But, if that transition has an incoming arc (P2T) from a place with not enough tokens, the transition cannot fire. We can express this as a pattern:
lhs_transition_disabled_cs = """
t:RAM_Transition
p:RAM_Place
:RAM_P2T (p -> t) {
condition = ```
place = get_source(this)
tokens = get_slot_value(place, "tokens")
weight = get_slot_value(this, "weight")
tokens < weight # True means: cannot fire
```;
}
"""
# Parse these patterns:
lhs_transition = loader.parse_and_check(state, lhs_transition_cs, mm_ramified, "lhs_transition")
lhs_transition_disabled = loader.parse_and_check(state, lhs_transition_disabled_cs, mm_ramified, "lhs_transition_disabled")
# To find enabled transitions, we first match our first pattern (looking for a transition), and then we try to 'grow' that match with our second, "Negative Application Condition" (NAC) pattern. If growing the match with the second pattern is possible, we abort and look for another transition.
# To grow a match, we use the 'pivot'-argument of the match-function. A pivot is a partial match that needs to be grown.
# This results in the following generator function:
def find_enabled_transitions(m):
for match in match_od(state, m, mm, lhs_transition, mm_ramified):
for match_nac in match_od(state, m, mm, lhs_transition_disabled, mm_ramified, pivot=match): # <-- notice the pivot :)
# transition is disabled
break # find next transition
else:
# we've found an enabled transition:
yield match
# Let's see if it works:
enabled = list(find_enabled_transitions(m))
print("enabled PN transitions:", enabled)
# Next, to fire a transition:
# - we decrement the number of tokens of every incoming place
# - we increment the number of tokens of every outgoing place
# We do this also by growing our match: given an enabled transition (already matched), we match for *any* incoming place, and rewrite that place to reduce its tokens. Next, we look for *any* outgoing place, and increment its tokens.
# Decrement incoming
lhs_incoming_cs = """
t:RAM_Transition # <-- we already know this transition is enabled
inplace:RAM_Place {
RAM_tokens = `True`; # this needs to be here, otherwise, the rewriter will try to create a new attribute rather than update the existing one
}
inarc:RAM_P2T (inplace -> t)
"""
rhs_incoming_cs = """
t:RAM_Transition
inplace:RAM_Place {
RAM_tokens = ```
weight = get_slot_value(matched("inarc"), "weight")
print("adding", weight, "tokens to", get_name(this))
get_value(this) - weight
```;
}
inarc:RAM_P2T (inplace -> t)
"""
# Increment outgoing
lhs_outgoing_cs = """
t:RAM_Transition
outplace:RAM_Place {
RAM_tokens = `True`; # this needs to be here, otherwise, the rewriter will try to create a new attribute rather than update the existing one
}
outarc:RAM_T2P (t -> outplace)
"""
rhs_outgoing_cs = """
t:RAM_Transition
outplace:RAM_Place {
RAM_tokens = ```
weight = get_slot_value(matched("outarc"), "weight")
print("removing", weight, "tokens from", get_name(this))
get_value(this) + weight
```;
}
outarc:RAM_T2P (t -> outplace)
"""
# Parse all the patterns
lhs_incoming = loader.parse_and_check(state, lhs_incoming_cs, mm_ramified, "lhs_incoming")
rhs_incoming = loader.parse_and_check(state, rhs_incoming_cs, mm_ramified, "rhs_incoming")
lhs_outgoing = loader.parse_and_check(state, lhs_outgoing_cs, mm_ramified, "lhs_outgoing")
rhs_outgoing = loader.parse_and_check(state, rhs_outgoing_cs, mm_ramified, "rhs_outgoing")
# Firing is really simple:
def fire_transition(m, transition_match):
for match_incoming in match_od(state, m, mm, lhs_incoming, mm_ramified, pivot=transition_match):
rewriter.rewrite(state, rhs_incoming, mm_ramified, match_incoming, m, mm)
for match_outgoing in match_od(state, m, mm, lhs_outgoing, mm_ramified, pivot=transition_match):
rewriter.rewrite(state, rhs_outgoing, mm_ramified, match_outgoing, m, mm)
def show_petri_net(m):
odapi = ODAPI(state, m, mm)
p1 = odapi.get_slot_value(odapi.get("p1"), "tokens")
p2 = odapi.get_slot_value(odapi.get("p2"), "tokens")
cp1 = odapi.get_slot_value(odapi.get("cp1"), "tokens")
cp2 = odapi.get_slot_value(odapi.get("cp2"), "tokens")
return f"""
t1 t2 t3
┌─┐ p1 ┌─┐ p2 ┌─┐
│ │ │ │ │ │
│ ├─────► ( {p1} )─────►│ │─────► ( {p2} )─────►│ │
└─┘ └─┘ └─┘
▲ │ ▲ │
│ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
└───────( {cp1} )◄──────┘ └──────( {cp2} )◄───────┘
cp1 cp2 """
# Let's see if it works:
while len(enabled) > 0:
print(show_petri_net(m))
print("\nenabled PN transitions:", enabled)
to_fire = enabled[0]['t']
print("press ENTER to fire", to_fire)
input()
print("firing transition:", to_fire)
fire_transition(m, enabled[0])
enabled = list(find_enabled_transitions(m))
# That's it!