Attachment 'chuck_fox_python_ipc_p1.py'
Download
Toggle line numbers
1 # Chuck Fox cfox04 at gmail dot com
2 # Date: 23/03/2005 PyCon code sample
3 #
4 # Description:
5 # This module contains code for a very simple IPC mechanism that allows for pickled Python
6 # objects to be passed between different processes using UNIX domain sockets. The intent of
7 # this file is to have the ability to send status information objects from a child process
8 # up to its parent which is listening for updates. However, this mechanism should be
9 # generic enough to allow for passing of objects between any 2 programs as long as they
10 # can connect on a UNIX socket. We are keeping the interface limited to UNIX sockets
11 # on purpose for security purposes here, this is generally not a safe method of network
12 # communication.
13
14 # Update: Improve the socket-level performanance by moving the depickling code to the actual
15 # poll method. This requires shorter lock holds and means the socket code needs to do less... a good thing
16
17 import os, socket, sys, string, time, cPickle, threading
18
19 class ObjectSender:
20 """ This class is responsible for pickling and sending (via a socket) an object to another process
21 listening on a UNIX socket. Once instantiated this object can be used to send multiple objects across.
22 Internally the object uses a UNIX domain socket that will establish a new connection with the server
23 for each object we send across. This class is the "client" side of the equation; please see RecvObject
24 for the "server" side """
25
26 def __init__ (self, sockName, sockTimeout = 30):
27 """ Args:
28 sockName: Name of the UNIX socket to connect to (this is a string and the socket should be open)
29 sockTimeout: Timeout in seconds on socket operations, prevents us from blocking forever... """
30 self.sockName = sockName
31 self.sockTimeout = sockTimeout
32
33 def sendObj (self, object, retries = 10):
34 # This is where we actually pickle and ship out 'object' across the socket:
35 # Returns: codes based on success/failure
36 pickledObj = cPickle.dumps (object, cPickle.HIGHEST_PROTOCOL)
37
38 # now send it across a newly opened socket:
39 for II in range (0, retries):
40 try:
41 sock = socket.socket (socket.AF_UNIX)
42 sock.settimeout (self.sockTimeout)
43 sock.connect (self.sockName)
44 sock.sendall (pickledObj)
45 sock.shutdown (2) # REALLY shutdown mr. connection here
46 sock.close ()
47 return ()
48 except Exception, err:
49 print "Error received: ", err
50 # if we have exceeded max retries OR if the error code is not 146 (meaning "Connection Refused") we pass this error up the line
51 if II == retries - 1 or (err [0] != 146):
52 raise
53 else:
54 time.sleep (float (II + 1) / 10) # a linearly increasing backoff delay for retries on the socket
55
56 class ObjectReceiver (threading.Thread):
57 """ ObjectReceiver is the 'server' counterpart to ObjectSender. It binds to a named UNIX socket and listens for incoming
58 connections from ObjectSender. Once a connection is completed it takes all the data send and attempts to depickle it to a Python
59 object. If this succeeds ObjectReceiver then places the object in an internal list that may be grabbed by the calling program. Once
60 the calling program has grabbed the list, it is cleared out and we wait for more objects to come in from other processes.
61 With the optional maxCachedObjects constructor arg, the internal object cache can be setup to only hold a certain number of objects
62 In the event of too many objects in the queue, additional objects sent across the wire will not be added to the queue. """
63 def __init__ (self, sockName, socketTimeout = 30, sockBuffSize = 4096, maxCachedObjects = None):
64 """ New ObjectReceivers are setup to bind to a sockName and wait for incoming
65 connections from other processes.
66 Args:
67 sockName: string name of the socket to bind to, must be available and clients will use the same name
68 socketTimeout: This is the timeout for all socket operations to prevent hanging
69 sockBuffSize: Size of the receive buffer in bytes of the listening socket
70 maxCachedObjects: The max number of objects we keep on hand between poll requests that grab the buffer, None means no limit to the list size """
71 threading.Thread.__init__ (self)
72 self.sockName = sockName
73 self.sockBuffSize = sockBuffSize
74 self.sock = socket.socket (socket.AF_UNIX)
75 self.socketTimeout = socketTimeout
76 self.maxCachedObjects = maxCachedObjects
77 self.objCache = []
78 self.pollLock = threading.Lock ()
79 self.objEvent = threading.Event () # we use this event to allow for blocking when objCache is empty
80 self.setDaemon (True) # this is a daemon thread that will close with the rest of the process
81
82 def run (self):
83 # Bind and the socket and start listening
84 self.sock.bind (self.sockName)
85 self.sock.listen (5)
86 while True:
87 conn, addr = self.sock.accept ()
88
89 # set conn's timeout to prevent hanging on bad behaviour
90 conn.settimeout (self.socketTimeout)
91
92 # now receive data & depickle it, we rely on the client to close out its
93 # connection to get out of the following loop. If that does not occur our conn socket
94 # will timeout as well
95 data = ""
96 try:
97 while True:
98 next_data = conn.recv (self.sockBuffSize)
99 if not next_data:
100 conn.shutdown (2)
101 conn.close ()
102 break
103 data += next_data
104 except Exception, err:
105 # most likely a timeout exception, ignore any data that came in
106 continue
107
108 # if the existing objCache is allowed to have another member, try adding one:
109 if (not self.maxCachedObjects) or (len (self.objCache) < self.maxCachedObjects):
110 self.pollLock.acquire ()
111 self.objCache.append (data)
112 self.objEvent.set () # we have at least 1 item in the queue anybody waiting for a new object may fire
113 self.pollLock.release ()
114
115 def poll (self):
116 """ This method is designed to return the contents of the objCache to the calling process. It is a (mostly) non-blocking poll, the
117 only block is on the lock that keeps the contents of objCache trhead-safe. The returned list consists of python objects that the
118 calling process may use as it sees fit. UPDATE: In the new version the actual depickling takes place in this method now """
119 self.pollLock.acquire ()
120 dataList = self.objCache
121 self.objCache = []
122 self.objEvent.clear () # no objects left in the queue, wait for more
123 self.pollLock.release ()
124 for II in range (0, len (dataList)):
125 dataList [II] = cPickle.loads (dataList [II])
126 return (dataList)
127
128
129 def wait (self, timeout = None):
130 self.objEvent.wait (timeout)
131 # fell through timeout or got woken up, attempt a poll
132 return (self.poll ())
Attached Files
To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.You are not allowed to attach a file to this page.