daily pastebin goal
65%
SHARE
TWEET

Untitled

a guest Jan 23rd, 2019 73 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. from typing import Any
  2. from unittest import TestCase
  3. from unittest.mock import MagicMock
  4.  
  5. #
  6. #  Library files
  7. #
  8.  
  9. class TransitionCondition:
  10.     def __init__(self, fromState: type, toState: type):
  11.         self.toState = toState
  12.         self.fromState = fromState
  13.  
  14.     def matches(self, fromState: type, toState: type):
  15.         return (self.fromState == Any or fromState == self.fromState) and (self.toState == Any or self.toState == toState)
  16.  
  17.  
  18. class Transition:
  19.     def __init__(self, condition: TransitionCondition, fnct):
  20.         self.fnct = fnct
  21.         self.condition = condition
  22.  
  23.     def apply(self, fromState: type, toState: type):
  24.         if self.condition.matches(fromState, toState):
  25.             self.fnct()
  26.  
  27.  
  28. class FSM:
  29.     def __init__(self):
  30.         self.state = None
  31.         self.transitions = []
  32.  
  33.     def goto(self, stateClass: type):
  34.         previous_state = self.state
  35.         self.state = stateClass(self)
  36.         for transition in self.transitions: transition.apply(type(previous_state), stateClass)
  37.  
  38.     def apply(self, message):
  39.         self.state.apply(message)
  40.  
  41.     def __lshift__(self, other):
  42.         self.apply(other)
  43.  
  44.     def onTransition(self, condition: TransitionCondition):
  45.         def registrer_transition(funct):
  46.             self.transitions.append(Transition(condition, funct))
  47.         return registrer_transition
  48.  
  49.  
  50. class WithRShift(type):
  51.     def __rshift__(self, other: type):
  52.         return TransitionCondition(self, other)
  53.  
  54.     def __rrshift__(self, other: type):
  55.         return TransitionCondition(other, self)
  56.  
  57.  
  58. class State(metaclass=WithRShift):
  59.     def __init__(self, fsm: FSM):
  60.         self.fsm: FSM = fsm
  61.  
  62.     def goto(self, state):
  63.         self.fsm.goto(state)
  64.  
  65.     def apply(self, message):
  66.         raise NotImplemented(message)
  67.  
  68.     def __repr__(self): return self.__class__.__name__
  69.  
  70.     def __str__(self): return repr(self)
  71.  
  72.  
  73. #
  74. #  Example of a FSM
  75. #
  76.  
  77. class MyFSM(FSM):
  78.  
  79.     class Stopped(State):
  80.         def apply(self, message):
  81.             raise ValueError
  82.  
  83.     class Going(State):
  84.         def apply(self, message):
  85.             if isinstance(message, str) and message == "stop":
  86.                 self.goto(MyFSM.Stopped)
  87.             else:
  88.                 raise ValueError
  89.  
  90.     class Standing(State):
  91.         def apply(self, message):
  92.             if isinstance(message, str) and message == "go":
  93.                 self.goto(MyFSM.Going)
  94.             else:
  95.                 raise ValueError
  96.  
  97.     class MyData:
  98.         message: str
  99.  
  100.     def __init__(self, message_subscriber):
  101.         super(self.__class__, self).__init__()
  102.         self.message_recipient = message_subscriber
  103.         self.data = self.MyData()
  104.         self.goto(self.Standing)
  105.  
  106.         @self.onTransition(self.Standing >> self.Going)
  107.         def _():
  108.             self.data.message = "I am going"
  109.             self.report_state()
  110.  
  111.         @self.onTransition(Any >> self.Stopped)
  112.         def _():
  113.             self.data.message = "I am stopped"
  114.             self.report_state()
  115.  
  116.     def report_state(self):
  117.         print(f"My state is {self.state}, I have to say this: {self.data.message}")
  118.         self.message_recipient(self.data.message)
  119.  
  120.  
  121. class TestFSM(TestCase):
  122.  
  123.     def test_me(self):
  124.         message_subscriber = MagicMock()
  125.         fsm = MyFSM(message_subscriber)
  126.         self.assertIsInstance(fsm.state, MyFSM.Standing)
  127.  
  128.         fsm.apply("go")
  129.         self.assertIsInstance(fsm.state, fsm.Going)
  130.         message_subscriber.assert_called_with("I am going")
  131.  
  132.         fsm << "stop"
  133.         self.assertIsInstance(fsm.state, fsm.Stopped)
  134.         message_subscriber.assert_called_with("I am stopped")
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top