1 # Chuck Fox cfox04 at gmail dot com 2 # Date: 23/03/2005 PyCon code sample 3 # 4 # Description: 5 # This module contains code for a very simple IPC mechanism that allows for pickled Python 6 # objects to be passed between different processes using UNIX domain sockets. The intent of 7 # this file is to have the ability to send status information objects from a child process 8 # up to its parent which is listening for updates. However, this mechanism should be 9 # generic enough to allow for passing of objects between any 2 programs as long as they 10 # can connect on a UNIX socket. We are keeping the interface limited to UNIX sockets 11 # on purpose for security purposes here, this is generally not a safe method of network 12 # communication. 13 14 # Update: Improve the socket-level performanance by moving the depickling code to the actual 15 # poll method. This requires shorter lock holds and means the socket code needs to do less... a good thing 16 17 import os, socket, sys, string, time, cPickle, threading 18 19 class ObjectSender: 20 """ This class is responsible for pickling and sending (via a socket) an object to another process 21 listening on a UNIX socket. Once instantiated this object can be used to send multiple objects across. 22 Internally the object uses a UNIX domain socket that will establish a new connection with the server 23 for each object we send across. This class is the "client" side of the equation; please see RecvObject 24 for the "server" side """ 25 26 def __init__ (self, sockName, sockTimeout = 30): 27 """ Args: 28 sockName: Name of the UNIX socket to connect to (this is a string and the socket should be open) 29 sockTimeout: Timeout in seconds on socket operations, prevents us from blocking forever... """ 30 self.sockName = sockName 31 self.sockTimeout = sockTimeout 32 33 def sendObj (self, object, retries = 10): 34 # This is where we actually pickle and ship out 'object' across the socket: 35 # Returns: codes based on success/failure 36 pickledObj = cPickle.dumps (object, cPickle.HIGHEST_PROTOCOL) 37 38 # now send it across a newly opened socket: 39 for II in range (0, retries): 40 try: 41 sock = socket.socket (socket.AF_UNIX) 42 sock.settimeout (self.sockTimeout) 43 sock.connect (self.sockName) 44 sock.sendall (pickledObj) 45 sock.shutdown (2) # REALLY shutdown mr. connection here 46 sock.close () 47 return () 48 except Exception, err: 49 print "Error received: ", err 50 # if we have exceeded max retries OR if the error code is not 146 (meaning "Connection Refused") we pass this error up the line 51 if II == retries - 1 or (err  != 146): 52 raise 53 else: 54 time.sleep (float (II + 1) / 10) # a linearly increasing backoff delay for retries on the socket 55 56 class ObjectReceiver (threading.Thread): 57 """ ObjectReceiver is the 'server' counterpart to ObjectSender. It binds to a named UNIX socket and listens for incoming 58 connections from ObjectSender. Once a connection is completed it takes all the data send and attempts to depickle it to a Python 59 object. If this succeeds ObjectReceiver then places the object in an internal list that may be grabbed by the calling program. Once 60 the calling program has grabbed the list, it is cleared out and we wait for more objects to come in from other processes. 61 With the optional maxCachedObjects constructor arg, the internal object cache can be setup to only hold a certain number of objects 62 In the event of too many objects in the queue, additional objects sent across the wire will not be added to the queue. """ 63 def __init__ (self, sockName, socketTimeout = 30, sockBuffSize = 4096, maxCachedObjects = None): 64 """ New ObjectReceivers are setup to bind to a sockName and wait for incoming 65 connections from other processes. 66 Args: 67 sockName: string name of the socket to bind to, must be available and clients will use the same name 68 socketTimeout: This is the timeout for all socket operations to prevent hanging 69 sockBuffSize: Size of the receive buffer in bytes of the listening socket 70 maxCachedObjects: The max number of objects we keep on hand between poll requests that grab the buffer, None means no limit to the list size """ 71 threading.Thread.__init__ (self) 72 self.sockName = sockName 73 self.sockBuffSize = sockBuffSize 74 self.sock = socket.socket (socket.AF_UNIX) 75 self.socketTimeout = socketTimeout 76 self.maxCachedObjects = maxCachedObjects 77 self.objCache =  78 self.pollLock = threading.Lock () 79 self.objEvent = threading.Event () # we use this event to allow for blocking when objCache is empty 80 self.setDaemon (True) # this is a daemon thread that will close with the rest of the process 81 82 def run (self): 83 # Bind and the socket and start listening 84 self.sock.bind (self.sockName) 85 self.sock.listen (5) 86 while True: 87 conn, addr = self.sock.accept () 88 89 # set conn's timeout to prevent hanging on bad behaviour 90 conn.settimeout (self.socketTimeout) 91 92 # now receive data & depickle it, we rely on the client to close out its 93 # connection to get out of the following loop. If that does not occur our conn socket 94 # will timeout as well 95 data = "" 96 try: 97 while True: 98 next_data = conn.recv (self.sockBuffSize) 99 if not next_data: 100 conn.shutdown (2) 101 conn.close () 102 break 103 data += next_data 104 except Exception, err: 105 # most likely a timeout exception, ignore any data that came in 106 continue 107 108 # if the existing objCache is allowed to have another member, try adding one: 109 if (not self.maxCachedObjects) or (len (self.objCache) < self.maxCachedObjects): 110 self.pollLock.acquire () 111 self.objCache.append (data) 112 self.objEvent.set () # we have at least 1 item in the queue anybody waiting for a new object may fire 113 self.pollLock.release () 114 115 def poll (self): 116 """ This method is designed to return the contents of the objCache to the calling process. It is a (mostly) non-blocking poll, the 117 only block is on the lock that keeps the contents of objCache trhead-safe. The returned list consists of python objects that the 118 calling process may use as it sees fit. UPDATE: In the new version the actual depickling takes place in this method now """ 119 self.pollLock.acquire () 120 dataList = self.objCache 121 self.objCache =  122 self.objEvent.clear () # no objects left in the queue, wait for more 123 self.pollLock.release () 124 for II in range (0, len (dataList)): 125 dataList [II] = cPickle.loads (dataList [II]) 126 return (dataList) 127 128 129 def wait (self, timeout = None): 130 self.objEvent.wait (timeout) 131 # fell through timeout or got woken up, attempt a poll 132 return (self.poll ())
Attached FilesTo refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.
You are not allowed to attach a file to this page.