Attachment 'chuck_fox_python_ipc.py'

Download

   1 # Chuck Fox  cfox04 at gmail dot com
   2 # Date: 23/03/2005 PyCon code sample
   3 #
   4 # Description:
   5 #    This module contains code for a very simple IPC mechanism that allows for pickled Python
   6 # objects to be passed between different processes using UNIX domain sockets.  The intent of
   7 # this file is to have the ability to send status information objects from a child process
   8 # up to its parent which is listening for updates.  However, this mechanism should be 
   9 # generic enough to allow for passing of objects between any 2 programs as long as they
  10 # can connect on a UNIX socket.  We are keeping the interface limited to UNIX sockets
  11 # on purpose for security purposes here, this is generally not a safe method of network
  12 # communication.
  13 
  14 # Update: Improve the socket-level performanance by moving the depickling code to the actual
  15 # poll method.  This requires shorter lock holds and means the socket code needs to do less... a good thing
  16 
  17 import os, socket, sys, string, time, cPickle, threading
  18 
  19 class ObjectSender:
  20   """ This class is responsible for pickling and sending (via a socket) an object to another process
  21   listening on a UNIX socket. Once instantiated this object can be used to send multiple objects across.
  22   Internally the object uses a UNIX domain socket that will establish a new connection with the server
  23   for each object we send across.  This class is the "client" side of the equation; please see RecvObject
  24   for the "server" side """ 
  25   
  26   def __init__ (self, sockName, sockTimeout = 30):
  27     """ Args:
  28          sockName: Name of the UNIX socket to connect to (this is a string and the socket should be open)
  29          sockTimeout: Timeout in seconds on socket operations, prevents us from blocking forever... """
  30     self.sockName = sockName
  31     self.sockTimeout = sockTimeout
  32   
  33   def sendObj (self, object, retries = 10):
  34     # This is where we actually pickle and ship out 'object' across the socket:
  35     # Returns: codes based on success/failure
  36     pickledObj = cPickle.dumps (object, cPickle.HIGHEST_PROTOCOL)
  37     
  38     # now send it across a newly opened socket:
  39     for II in range (0, retries):
  40       try:
  41         sock = socket.socket (socket.AF_UNIX)
  42         sock.settimeout (self.sockTimeout)
  43         sock.connect (self.sockName)
  44         sock.sendall (pickledObj)
  45         sock.shutdown (2) # REALLY shutdown mr. connection here 
  46         sock.close ()
  47         return ()
  48       except Exception, err:
  49        print "Error received: ", err
  50        # if we have exceeded max retries OR if the error code is not 146 (meaning "Connection Refused") we pass this error up the line
  51        if II == retries - 1 or (err [0] != 146):
  52          raise
  53        else:
  54          time.sleep (float (II + 1) / 10)  # a linearly increasing backoff delay for retries on the socket
  55     
  56 class ObjectReceiver (threading.Thread):
  57   """ ObjectReceiver is the 'server' counterpart to ObjectSender.  It binds to a named UNIX socket and listens for incoming
  58   connections from ObjectSender.  Once a connection is completed it takes all the data send and attempts to depickle it to a Python
  59   object.  If this succeeds ObjectReceiver then places the object in an internal list that may be grabbed by the calling program.  Once
  60   the calling program has grabbed the list, it is cleared out and we wait for more objects to come in from other processes. 
  61     With the optional maxCachedObjects constructor arg, the internal object cache can be setup to only hold a certain number of objects
  62   In the event of too many objects in the queue, additional objects sent across the wire will not be added to the queue. """
  63   def __init__ (self, sockName, socketTimeout = 30, sockBuffSize = 4096, maxCachedObjects = None):
  64     """ New ObjectReceivers are setup to bind to a sockName and wait for incoming 
  65     connections from other processes.
  66     Args:
  67       sockName: string name of the socket to bind to, must be available and clients will use the same name
  68       socketTimeout: This is the timeout for all socket operations to prevent hanging 
  69       sockBuffSize: Size of the receive buffer in bytes of the listening socket
  70       maxCachedObjects: The max number of objects we keep on hand between poll requests that grab the buffer, None means no limit to the list size """
  71     threading.Thread.__init__ (self)
  72     self.sockName = sockName
  73     self.sockBuffSize = sockBuffSize
  74     self.sock = socket.socket (socket.AF_UNIX)
  75     self.socketTimeout  = socketTimeout
  76     self.maxCachedObjects  = maxCachedObjects
  77     self.objCache = [] 
  78     self.pollLock = threading.Lock ()
  79     self.objEvent = threading.Event () # we use this event to allow for blocking when objCache is empty
  80     self.setDaemon (True)  # this is a daemon thread that will close with the rest of the process
  81  
  82   def run (self):
  83     # Bind and the socket and start listening
  84     self.sock.bind (self.sockName)
  85     self.sock.listen (5)
  86     while True:
  87       conn, addr = self.sock.accept ()
  88      
  89       # set conn's timeout to prevent hanging on bad behaviour
  90       conn.settimeout (self.socketTimeout)
  91    
  92       # now receive data & depickle it, we rely on the client to close out its
  93       # connection to get out of the following loop.  If that does not occur our conn socket
  94       # will timeout as well
  95       data = ""
  96       try:
  97         while True:
  98           next_data = conn.recv (self.sockBuffSize)
  99           if not next_data:
 100             conn.shutdown (2)
 101             conn.close ()
 102             break
 103           data += next_data
 104       except Exception, err:
 105         # most likely a timeout exception, ignore any data that came in
 106         continue 
 107  
 108       # if the existing objCache is allowed to have another member, try adding one:
 109       if (not self.maxCachedObjects) or (len (self.objCache) < self.maxCachedObjects):
 110         self.pollLock.acquire ()
 111         self.objCache.append (data)
 112         self.objEvent.set ()  # we have at least 1 item in the queue anybody waiting for a new object may fire 
 113         self.pollLock.release ()
 114  
 115   def poll (self): 
 116     """ This method is designed to return the contents of the objCache to the calling process. It is a (mostly) non-blocking poll, the
 117     only block is on the lock that keeps the contents of objCache trhead-safe.  The returned list consists of python objects that the
 118     calling process may use as it sees fit.  UPDATE: In the new version the actual depickling takes place in this method now """ 
 119     self.pollLock.acquire ()
 120     dataList = self.objCache
 121     self.objCache = []
 122     self.pollLock.release ()
 123     for II in range (0, len (dataList)):
 124       dataList [II] = cPickle.loads (dataList [II])
 125     self.objEvent.clear () # no objects left in the queue, wait for more
 126     return (dataList)
 127     
 128 
 129   def wait (self, timeout = None):
 130     self.objEvent.wait (timeout)
 131     # fell through timeout or got woken up, attempt a poll
 132     return (self.poll ())

Attached Files

To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.
  • [get | view] (2005-03-23 18:37:09, 43.2 KB) [[attachment:chuck_fox_python_ipc.pdf]]
  • [get | view] (2005-03-23 18:39:26, 6.7 KB) [[attachment:chuck_fox_python_ipc.py]]
  • [get | view] (2005-03-24 15:49:34, 6.7 KB) [[attachment:chuck_fox_python_ipc_p1.py]]
 All files | Selected Files: delete move to page copy to page

You are not allowed to attach a file to this page.

Unable to edit the page? See the FrontPage for instructions.