Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from typing import Any
- from unittest import TestCase
- from unittest.mock import MagicMock
- #
- # Library files
- #
- class TransitionCondition:
- def __init__(self, fromState: type, toState: type):
- self.toState = toState
- self.fromState = fromState
- def matches(self, fromState: type, toState: type):
- return (self.fromState == Any or fromState == self.fromState) and (self.toState == Any or self.toState == toState)
- class Transition:
- def __init__(self, condition: TransitionCondition, fnct):
- self.fnct = fnct
- self.condition = condition
- def apply(self, fromState: type, toState: type):
- if self.condition.matches(fromState, toState):
- self.fnct()
- class FSM:
- def __init__(self):
- self.state = None
- self.transitions = []
- def goto(self, stateClass: type):
- previous_state = self.state
- self.state = stateClass(self)
- for transition in self.transitions: transition.apply(type(previous_state), stateClass)
- def apply(self, message):
- self.state.apply(message)
- def __lshift__(self, other):
- self.apply(other)
- def onTransition(self, condition: TransitionCondition):
- def registrer_transition(funct):
- self.transitions.append(Transition(condition, funct))
- return registrer_transition
- class WithRShift(type):
- def __rshift__(self, other: type):
- return TransitionCondition(self, other)
- def __rrshift__(self, other: type):
- return TransitionCondition(other, self)
- class State(metaclass=WithRShift):
- def __init__(self, fsm: FSM):
- self.fsm: FSM = fsm
- def goto(self, state):
- self.fsm.goto(state)
- def apply(self, message):
- raise NotImplemented(message)
- def __repr__(self): return self.__class__.__name__
- def __str__(self): return repr(self)
- #
- # Example of a FSM
- #
- class MyFSM(FSM):
- class Stopped(State):
- def apply(self, message):
- raise ValueError
- class Going(State):
- def apply(self, message):
- if isinstance(message, str) and message == "stop":
- self.goto(MyFSM.Stopped)
- else:
- raise ValueError
- class Standing(State):
- def apply(self, message):
- if isinstance(message, str) and message == "go":
- self.goto(MyFSM.Going)
- else:
- raise ValueError
- class MyData:
- message: str
- def __init__(self, message_subscriber):
- super(self.__class__, self).__init__()
- self.message_recipient = message_subscriber
- self.data = self.MyData()
- self.goto(self.Standing)
- @self.onTransition(self.Standing >> self.Going)
- def _():
- self.data.message = "I am going"
- self.report_state()
- @self.onTransition(Any >> self.Stopped)
- def _():
- self.data.message = "I am stopped"
- self.report_state()
- def report_state(self):
- print(f"My state is {self.state}, I have to say this: {self.data.message}")
- self.message_recipient(self.data.message)
- class TestFSM(TestCase):
- def test_me(self):
- message_subscriber = MagicMock()
- fsm = MyFSM(message_subscriber)
- self.assertIsInstance(fsm.state, MyFSM.Standing)
- fsm.apply("go")
- self.assertIsInstance(fsm.state, fsm.Going)
- message_subscriber.assert_called_with("I am going")
- fsm << "stop"
- self.assertIsInstance(fsm.state, fsm.Stopped)
- message_subscriber.assert_called_with("I am stopped")
Add Comment
Please, Sign In to add comment