"""Implementation of statechart e. Generated by itemis CREATE code generator. """ import queue import sys, os sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../lib'))) from yakindu.rx import Observable class E: """Implementation of the state machine E. """ class State: """ State Enum """ ( main_region_orthogonal_state, main_region_orthogonal_state_r1state_a, main_region_orthogonal_state_r2state_b, null_state ) = range(4) def __init__(self): """ Declares all necessary variables including list of states, histories etc. """ self.v = None self.x = None self.x_observable = Observable() self.y = None self.y_observable = Observable() self.in_event_queue = queue.Queue() # enumeration of all states: self.__State = E.State self.__state_conf_vector_changed = None self.__state_vector = [None] * 2 for __state_index in range(2): self.__state_vector[__state_index] = self.State.null_state # for timed statechart: self.timer_service = None self.__time_events = [None] * 1 # initializations: #Default init sequence for statechart E self.v = 0 self.__is_executing = False self.__state_conf_vector_position = None def is_active(self): """Checks if the state machine is active. """ return self.__state_vector[0] is not self.__State.null_state or self.__state_vector[1] is not self.__State.null_state def is_final(self): """Checks if the statemachine is final. Always returns 'false' since this state machine can never become final. """ return False def is_state_active(self, state): """Checks if the state is currently active. """ s = state if s == self.__State.main_region_orthogonal_state: return (self.__state_vector[0] >= self.__State.main_region_orthogonal_state)\ and (self.__state_vector[0] <= self.__State.main_region_orthogonal_state_r2state_b) if s == self.__State.main_region_orthogonal_state_r1state_a: return self.__state_vector[0] == self.__State.main_region_orthogonal_state_r1state_a if s == self.__State.main_region_orthogonal_state_r2state_b: return self.__state_vector[1] == self.__State.main_region_orthogonal_state_r2state_b return False def time_elapsed(self, event_id): """Add time events to in event queue """ if event_id in range(1): self.in_event_queue.put(lambda: self.raise_time_event(event_id)) self.run_cycle() def raise_time_event(self, event_id): """Raise timed events using the event_id. """ self.__time_events[event_id] = True def __execute_queued_event(self, func): func() def __get_next_event(self): if not self.in_event_queue.empty(): return self.in_event_queue.get() return None def __entry_action_main_region_orthogonal_state_r2_state_b(self): """Entry action for state 'StateB'.. """ #Entry action for state 'StateB'. self.timer_service.set_timer(self, 0, (1 * 1000), False) def __exit_action_main_region_orthogonal_state_r2_state_b(self): """Exit action for state 'StateB'.. """ #Exit action for state 'StateB'. self.timer_service.unset_timer(self, 0) def __enter_sequence_main_region_orthogonal_state_default(self): """'default' enter sequence for state OrthogonalState. """ #'default' enter sequence for state OrthogonalState self.__enter_sequence_main_region_orthogonal_state_r1_default() self.__enter_sequence_main_region_orthogonal_state_r2_default() def __enter_sequence_main_region_orthogonal_state_r1_state_a_default(self): """'default' enter sequence for state StateA. """ #'default' enter sequence for state StateA self.__state_vector[0] = self.State.main_region_orthogonal_state_r1state_a self.__state_conf_vector_position = 0 self.__state_conf_vector_changed = True def __enter_sequence_main_region_orthogonal_state_r2_state_b_default(self): """'default' enter sequence for state StateB. """ #'default' enter sequence for state StateB self.__entry_action_main_region_orthogonal_state_r2_state_b() self.__state_vector[1] = self.State.main_region_orthogonal_state_r2state_b self.__state_conf_vector_position = 1 self.__state_conf_vector_changed = True def __enter_sequence_main_region_default(self): """'default' enter sequence for region main region. """ #'default' enter sequence for region main region self.__react_main_region__entry_default() def __enter_sequence_main_region_orthogonal_state_r1_default(self): """'default' enter sequence for region r1. """ #'default' enter sequence for region r1 self.__react_main_region_orthogonal_state_r1__entry_default() def __enter_sequence_main_region_orthogonal_state_r2_default(self): """'default' enter sequence for region r2. """ #'default' enter sequence for region r2 self.__react_main_region_orthogonal_state_r2__entry_default() def __exit_sequence_main_region_orthogonal_state_r1_state_a(self): """Default exit sequence for state StateA. """ #Default exit sequence for state StateA self.__state_vector[0] = self.State.main_region_orthogonal_state self.__state_conf_vector_position = 0 def __exit_sequence_main_region_orthogonal_state_r2_state_b(self): """Default exit sequence for state StateB. """ #Default exit sequence for state StateB self.__state_vector[1] = self.State.main_region_orthogonal_state self.__state_conf_vector_position = 1 self.__exit_action_main_region_orthogonal_state_r2_state_b() def __exit_sequence_main_region(self): """Default exit sequence for region main region. """ #Default exit sequence for region main region state = self.__state_vector[0] if state == self.State.main_region_orthogonal_state_r1state_a: self.__exit_sequence_main_region_orthogonal_state_r1_state_a() state = self.__state_vector[1] if state == self.State.main_region_orthogonal_state_r2state_b: self.__exit_sequence_main_region_orthogonal_state_r2_state_b() def __react_main_region_orthogonal_state_r1__entry_default(self): """Default react sequence for initial entry . """ #Default react sequence for initial entry self.__enter_sequence_main_region_orthogonal_state_r1_state_a_default() def __react_main_region_orthogonal_state_r2__entry_default(self): """Default react sequence for initial entry . """ #Default react sequence for initial entry self.__enter_sequence_main_region_orthogonal_state_r2_state_b_default() def __react_main_region__entry_default(self): """Default react sequence for initial entry . """ #Default react sequence for initial entry self.__enter_sequence_main_region_orthogonal_state_default() def __react(self, transitioned_before): """Implementation of __react function. """ #State machine reactions. return transitioned_before def __main_region_orthogonal_state_react(self, transitioned_before): """Implementation of __main_region_orthogonal_state_react function. """ #The reactions of state OrthogonalState. transitioned_after = transitioned_before #Always execute local reactions. transitioned_after = self.__react(transitioned_before) return transitioned_after def __main_region_orthogonal_state_r1_state_a_react(self, transitioned_before): """Implementation of __main_region_orthogonal_state_r1_state_a_react function. """ #The reactions of state StateA. transitioned_after = transitioned_before if transitioned_after < 0: if self.v == 0: self.__exit_sequence_main_region_orthogonal_state_r1_state_a() self.x_observable.next() self.__enter_sequence_main_region_orthogonal_state_r1_state_a_default() transitioned_after = 0 return transitioned_after def __main_region_orthogonal_state_r2_state_b_react(self, transitioned_before): """Implementation of __main_region_orthogonal_state_r2_state_b_react function. """ #The reactions of state StateB. transitioned_after = transitioned_before if transitioned_after < 1: if self.__time_events[0]: self.__exit_sequence_main_region_orthogonal_state_r2_state_b() self.y_observable.next() self.__time_events[0] = False self.__enter_sequence_main_region_orthogonal_state_r2_state_b_default() self.__main_region_orthogonal_state_react(0) transitioned_after = 1 #If no transition was taken if transitioned_after == transitioned_before: #then execute local reactions. transitioned_after = self.__main_region_orthogonal_state_react(transitioned_before) return transitioned_after def __clear_in_events(self): """Implementation of __clear_in_events function. """ self.__time_events[0] = False def __micro_step(self): """Implementation of __micro_step function. """ transitioned = -1 self.__state_conf_vector_position = 0 state = self.__state_vector[0] if state == self.State.main_region_orthogonal_state_r1state_a: transitioned = self.__main_region_orthogonal_state_r1_state_a_react(transitioned) if self.__state_conf_vector_position < 1: state = self.__state_vector[1] if state == self.State.main_region_orthogonal_state_r2state_b: self.__main_region_orthogonal_state_r2_state_b_react(transitioned) def run_cycle(self): """Implementation of run_cycle function. """ #Performs a 'run to completion' step. if self.timer_service is None: raise ValueError('Timer service must be set.') if self.__is_executing: return self.__is_executing = True next_event = self.__get_next_event() if next_event is not None: self.__execute_queued_event(next_event) condition_0 = True while condition_0: self.__micro_step() self.__clear_in_events() condition_0 = False next_event = self.__get_next_event() if next_event is not None: self.__execute_queued_event(next_event) condition_0 = True self.__is_executing = False def enter(self): """Implementation of enter function. """ #Activates the state machine. if self.timer_service is None: raise ValueError('Timer service must be set.') if self.__is_executing: return self.__is_executing = True #Default enter sequence for statechart E self.__enter_sequence_main_region_default() self.__is_executing = False def exit(self): """Implementation of exit function. """ #Deactivates the state machine. if self.__is_executing: return self.__is_executing = True #Default exit sequence for statechart E self.__exit_sequence_main_region() self.__state_vector[0] = self.State.null_state self.__state_vector[1] = self.State.null_state self.__state_conf_vector_position = 1 self.__is_executing = False def trigger_without_event(self): """Implementation of triggerWithoutEvent function. """ self.run_cycle()