Revision 2 as of 2010-11-19 22:03:15

Clear message

State Machine Decorator Module

The code for the state machine decorator module is given below. Examples are given following the code.

import types
#import copy
import itertools

import logging
logging.basicConfig(
    filename="gf_info.txt",
    format = "%(levelname)-10s %(message)s",
    level = logging.ERROR )
from functools import wraps

def truncated(alist, cmprsn):
    for  x in alist:
        if x.__name__ == cmprsn: break
        yield x

class ContextBase(object):
   pass

class _StateVariable(object):
   """ Attribute of a class to maintain state .
     
   State Variable objects are instantiated indirectly via calls to the
   TransitionTable class's initialize method.  TransitionTable objects are created
   at the class level.
   """

   def __init__(self, transTable, context):
      """Constructor - set to initial state"""
      self.__current_state = transTable.initialstate
      self.__next_state = transTable.initialstate
      self.sTable = transTable
      self.__statestack=[]
      self.__ctxClass = context.__class__

   def toNextState(self, context):
      """Transition to next state, if a next_state is differnt.
      
      In addition to the actual state transition, it invokes onLeave 
      and onEnter methods as required.
      """
      if self.__next_state is not self.__current_state:
         cc = context.__class__
         tt_name = self.sTable.inst_state_name
         logging.debug("Transitioning to state %s"%self.__next_state.__name__)

         def callInState(methName, crnt_state):
            if (hasattr(crnt_state, methName) or hasattr(context, methName)):
               nmro = [crnt_state,]
               nmro.extend(cc.__mro__)
               psudoClassName = "%s_%s"%(cc.__name__, crnt_state.__name__)
               stCls = type( psudoClassName, tuple(nmro), {})
               context.__class__ = stCls
               getattr(context, methName)()             # call the onEnter or onLeave method here
               context.__class__ = cc

         callInState('onLeave', self.__current_state)
         self.__setState( context )
         callInState('onEnter', self.__current_state)

   def __setState(self, context ):
      """low level funky called from toNextState"""
      cc = context.__class__
      mro = cc.__mro__
      if ( self.__current_state not in mro):
         self.__current_state = self.__next_state
         return

      logging.debug("Current state %s in mro"% self.__current_state.__name__)
      def f(anc):
         return self.__next_state if anc == self.__current_state else anc
      newmro = tuple(f(anc) for anc in cc.__mro__)
      tt_name = self.sTable.inst_state_name
      cls_name ="%s_%s"%(self.__ctxClass.__name__, self.__next_state.__name__) 
      context.__class__ = type(cls_name, newmro, {})

   def pushState(self, newState, context = None):
      """PushState - allows going to another state with intent of returning
         to the current one."""
      self.__statestack.append(self._current_state)
      self.__next_state = newState
      if context:
         self.toNextState(context)

   def popState(self, context = None):
      """Pop back to the previously pushed state (pushState)"""
      self.__next_state = self.__statestack.pop()
      if (context):
         self.toNextState( context)

   def name(self):
      """Return name of current state"""
      return self.__current_state.__name__

   def setXition(self, func):
      """ Sets the state to transition to upon seeing a transtion event
      
      This method should only be called by the decorators impl'd in this module.
      """
      nxState = self.__current_state.nextStates[func.__name__]
      if nxState is not None:
         self.__next_state = nxState;

   def getFunc(self, func, contxt):
      """Gets the state dependant action method, wrapped in a try-catch block.

      This method should only be called by the decorators impl'd in this module.
      """
      crnt = self.__current_state
      svar_name = self.sTable.inst_state_name
      svCtxt = self.__ctxClass
      
      cc = contxt.__class__
      pseudoclas = "%s_%s"%(cc.__name__, crnt.__name__)

      nmro = [crnt]
      lhead = itertools.takewhile( lambda x: x != svCtxt, crnt.__mro__)
#      nmro.extend(lhead)
      if svCtxt in cc.__mro__:
         ltail = itertools.dropwhile( lambda x: x!= svCtxt, cc.__mro__)
      else:
         ltail = cc.__mro__
      nmro.extend(ltail)
         
      logging.debug("%s - %s - %s - [%s]\n"%(func.__name__, cc.__name__,
        svar_name,  ", ".join( cls.__name__ for cls in truncated(nmro,'TopLevelWindow' ))))
      stCls = type( pseudoclas, tuple(nmro), {})

      contxt.__class__ = stCls

      try:
         funky = getattr(contxt, func.__name__)
      except:
         funky = None
         
      contxt.__class__ = cc   # revert...
      if funky is None:
         t = "'%s' has no attribute '%s' in state %s" % (self.name(), func.__name__, crnt.__name__)
         raise NotImplementedError(t)

      # function with wrapping attribute means we've recursed all the way back
      #   to the context class and need to call the func as a default.
      if  hasattr(funky, "wrapping") and (funky.wrapping == self.sTable.inst_state_name):
         def funcA(*args, **kwargs):
            return func(contxt, *args, **kwargs)
         funky = funcA

      def wrappd2( self, *args, **kwargs):
         # wrap in try - except in event that funky() does something funky
         try:         
            self.__class__ = stCls
            retn = funky( *args, **kwargs)
         finally:
            self.__class__ = cc  
         return retn

      return wrappd2

# -----------------------------------------------------------------------------
class TransitionTable(object):
   """Defines a state table for a state machine class

   A state table for a class is associated with the state variable in the instances
   of the class. The name of the state variable is given in the constructor to the 
   StateTable object.  StateTable objects are attributes of state machine classes, 
   not intances of the state machine class.   A state machine class can have more
   than one StateTable.
   """
   def __init__(self, stateVarblName):
      """Transition Table constructor - state varblName is name of associated
      instance state variable.  """
      self.inst_state_name = stateVarblName
      self.eventList = []
      self.initalState = None
      nextStates = {}

   def initialize(self, ctxt):
      """Create a new state variable in the context.  State variable refs this
      transition table."""
     
      ctxt.__dict__[self.inst_state_name] = _StateVariable(self, ctxt)

   def _addEventHandler(self, funcName):
      """Notifies the current object of a metho that handles a transition.

      This is called by two of the decorators implemented below
      """
      self.eventList.append(funcName)

   def nextStates(self, subState, nslList):
      """Sets up transitions from the state specified by substate

      subState is one of the derived state classes, subclassed from the
      context state machine class. nslList is a list of states to which 
      the context will transition upon the invocation of one of the 
      transition methods.  'None' may be specified instead of an actual
      state if the context is to remain in the same state upon invocation
      of the corresponding method.
      """
      if len(nslList) != len(self.eventList):
         j = "Expected %s Got %s."%(len(self.eventList), len(nslList))
         raise RuntimeError("Wrong number of states in transition list.\n%s"%j)
      subState.nextStates = dict(zip(self.eventList, nslList))
#      self.nextStates[subState = dict(zip....)


# -----------------------------------------------------------------------------
def event( state_table):
   """Decorator for indicating an Event or 'Action' method.

   The decorator is applied to the methods of the state machine class to 
   indicate that the method will invoke a state dependant behavior. States
   are implemented as subclasses of the context(state machine) class .
   """
   stVarName = state_table.inst_state_name
   def wrapper(func):
      @wraps(func)
      def objCall(self, *args, **kwargs):
         state_var = getattr(self, stVarName)
         rtn = state_var.getFunc(func, self)(self, *args, **kwargs)
         return rtn 

      objCall.wrapping = stVarName
      return objCall
   return wrapper


def transition( state_table ):
   """Decorator used to set up methods which cause transitions between states.

   The decorator is applied to methods of the context (state machine) class. 
   Invoking the method may cause a transition to another state.  To define
   what the transitions are, the nextStates method of the TransitionTable class
   is used.
   """
   stVarName = state_table.inst_state_name

   def wrapper(func):
      state_table._addEventHandler( func.__name__)

      @wraps(func)
      def objCall(self, *args, **kwargs):
         state_var = getattr(self, stVarName)
         state_var.setXition(func)
         rtn = func(self, *args, **kwargs)
         state_var.toNextState(self)
         return rtn

      objCall.wrapping  =stVarName
      return objCall

   return wrapper

def transitionevent( state_table):
   """A decorator which is essentially the combination of the above two. 
   
   Can both invoke state dependent method and trigger a state 
   transition.  Mostly equivalent to :
   @Transition(xitionTable)
   @Event(xitionTable)
   """
   stVarName = state_table.inst_state_name
   def wrapper(func):
      state_table._addEventHandler( func.__name__)

      @wraps(func)
      def objCall(self, *args, **kwargs):
         state_var = getattr(self, stVarName)
         state_var.setXition(func)
         rtn = state_var.getFunc(func, self)(self, *args, **kwargs)
         state_var.toNextState(self)
         return rtn

      objCall.wrapping = stVarName
      return objCall

   return wrapper

Examples of Use

Simple Example

import DecoratorStateMachine as dsm
class StateContext( dsm.ContextBase):
        ttable = dsm.TransitionTable('myState')

        def __init__(self):
                self.ttable.initialize(self)

        @dsm.transitionevent(ttable)
        def writeName(self, name):
                pass

class StateA(StateContext):
        def writeName(self, name):
                print name.lower()

class StateB(StateContext):
        def writeName(self, name):
                print name.upper()

class StateC(StateB):
        pass

# Set up transition table to cause states totoggle
StateContext.ttable.nextStates(StateA, (StateB,))
StateContext.ttable.nextStates(StateB, (StateC,))
StateContext.ttable.nextStates(StateC, (StateA,))
StateContext.ttable.initialstate = StateA

if __name__=='__main__':
   ctxt = StateContext()
   ctxt.writeName("Monday")
   ctxt.writeName("Tuesday")
   ctxt.writeName("Wednesday")
   ctxt.writeName("Thursday")
   ctxt.writeName("Friday")
   ctxt.writeName("Saturday")
   ctxt.writeName("Sunday")
   x = raw_input("done>")

Output

monday
TUESDAY
WEDNESDAY
thursday
FRIDAY
SATURDAY
sunday

Miss Grant's Controller

The specification for this controller comes from Martin Fowler. This example uses wxPython as well as the state machine module.

import wx
import DecoratorStateMachine as dsm
class MyFrame(wx.Frame, dsm.ContextBase):

   xtable = dsm.TransitionTable('pstate')
   dtable = dsm.TransitionTable('dstate')

   def __init__(self):
      self.xtable.initialize(self)
      self.dtable.initialize(self)

      wx.Frame.__init__(self, None, -1, "My Frame", size=(350,250))
      family = wx.SWISS
      style = wx.NORMAL
      weight = wx.BOLD
      font = wx.Font(12,family,style,weight, False, "Verdana")
      self.SetFont(font)

      panel = wx.Panel(self, -1)

      button = wx.Button(panel, -1, "Door", pos=(50,20), size=(110,35))
      self.Bind(wx.EVT_BUTTON, self.onToggleDoor, button)
      self.btnDoor = button
      self.doorOpen = False

      button = wx.Button(panel, -1, "Light", pos=(170,20), size=(110,35))
      self.Bind(wx.EVT_BUTTON, self.onLightOn, button)
      self.btnLight = button

      button = wx.Button(panel, -1, "Drawer", pos=(50,60), size=(110,35))
      self.Bind(wx.EVT_BUTTON, self.onOpenDrawer, button)
      self.btnDrawer = button

      button = wx.Button(panel, -1, "Panel", pos=(170,60), size=(110,35))
      self.Bind(wx.EVT_BUTTON, self.onClosePanel, button)
      button.Disable()
      self.btnPanel = button

      self.textArea = wx.StaticText(panel, -1, "Locked", pos=(50,100), size=(100,35))

      # onEnter called here would invoke MyFrame.onEnter (below)
      # call the current state's onEnter method indirectly through onInit()
      self.onInit()     

   def onEnter(self):
      print "Shouldn't get here. Should call one of the state's onEnter functions instead."

   @dsm.transitionevent(dtable)
   def onToggleDoor(self, event):
      pass

   @dsm.event(dtable)
   def onInit(self):
      self.onEnter()            # calls onEnter for current dtable/dstate state

   @dsm.transition(xtable)
   def onOpenDoor(self):
      pass
   @dsm.transition(xtable)
   def onCloseDoor(self):
      pass
   @dsm.transition(xtable)
   def onLightOn(self, event):
      pass
   @dsm.transition(xtable)
   def onOpenDrawer(self, event):
      pass
   @dsm.transition(xtable)
   def onClosePanel(self, event):
      pass

class DoorOpen(MyFrame):
   def onEnter(self):
      print self.dstate.name()
      self.btnDoor.SetLabel("Close Door")
          
   def onToggleDoor(self, event):
      self.onCloseDoor()

class DoorClosed(MyFrame):
   def onEnter(self):
      print self.dstate.name()
      self.btnDoor.SetLabel("Open Door")
          
   def onToggleDoor(self, event):
      self.onOpenDoor()

MyFrame.dtable.nextStates(DoorOpen, (DoorClosed,))
MyFrame.dtable.nextStates(DoorClosed, (DoorOpen,))
MyFrame.dtable.initialstate = DoorOpen

"""
The following classes are states for the controller.
"""
class Idle(MyFrame):
   """this is an initial state"""
   def onEnter(self):
      print self.pstate.name()

class Unlocked(Idle):
   def onEnter(self):
      print self.pstate.name()
      self.btnPanel.Enable()
      self.btnDoor.Disable()
      self.textArea.SetLabel("Unlocked")

   def onLeave(self):
      self.textArea.SetLabel("Locked")
      self.btnPanel.Disable()
      self.btnDoor.Enable()

class Active(Idle):
   pass
class LightOn(Idle):
   pass
class DrawerOpen(Idle):
   pass

MyFrame.xtable.nextStates(Idle, (Idle, Active, Idle, Idle, Idle))
MyFrame.xtable.nextStates(Active, (Idle, Active, LightOn, DrawerOpen, None))
MyFrame.xtable.nextStates(LightOn, (Idle, None,  None, Unlocked, None ))
MyFrame.xtable.nextStates(DrawerOpen, (Idle, None, Unlocked, None, None))
MyFrame.xtable.nextStates(Unlocked, (Idle, None, None, None, Idle))
MyFrame.xtable.initialstate = Idle


if __name__=='__main__':
   app = wx.PySimpleApp()
   frame = MyFrame()
   frame.Show(True)
   app.MainLoop()

There are actually two states in the context. One is for the state of the door, opened or closed. The other is for the main controller. The DoorOpen and DoorClosed states simply translate the onToggleDoor event to invoke either onCloseDoor or onOpenDoor.

Unable to edit the page? See the FrontPage for instructions.