Size: 4958
Comment:
|
Size: 6486
Comment: Add syntax highlighting to Twisted example.
|
Deletions are marked like this. | Additions are marked like this. |
Line 1: | Line 1: |
= 99 Concurrency Bottles of Beer = | = 99 Concurrent Bottles of Beer = |
Line 5: | Line 5: |
Take one down, pass it around, | |
Line 6: | Line 7: |
Take one down, pass it around, | 98 bottles of beer on the wall, 98 bottles of beer |
Line 11: | Line 12: |
Include your name and a brief description if you add to this page. Please make sure your source is well commented - concurrency is hard! == The problem == |
Include a brief description if you add to this page. Please make sure your source is well commented - concurrency is hard! == The Problem == |
Line 22: | Line 23: |
== Errata == Solutions using readline() will exhibit bugs if less than a full line is flushed to disk. If your input file is syslog, this shouldn't be a problem however. Glyph makes the very valid point that these examples are in fact serial programs (ie, they don't do more than one thing at a time). A better example would be following multiple files simultaneously. |
|
Line 99: | Line 105: |
Line 118: | Line 124: |
Line 126: | Line 131: |
Line 130: | Line 135: |
Line 133: | Line 138: |
Line 145: | Line 149: |
Line 151: | Line 154: |
# parents == main greenlet - HERE! g_printer = greenlet.greenlet(printer) g_grep = greenlet.greenlet(grep(".*pants.*", g_printer)) follow("/var/log/system.log", g_grep) }}} == Kamelia == |
p = greenlet.greenlet(printer) g = greenlet.greenlet(grep(".*pants.*", p)) follow("/var/log/system.log", g) }}} == Kamaelia == |
Line 187: | Line 188: |
pattern = "." | pattern = "." |
Line 208: | Line 209: |
Follow('tail -f /var/log/system.log'), | Follow('/var/log/system.log'), |
Line 213: | Line 214: |
== Twisted == {{{#!python from twisted.protocols.basic import LineReceiver from twisted.python import log SLOW_INTERVAL = 1.0 FAST_INTERVAL = 0.001 SEEK_END = 2 BLOCKSIZE = 8192 class TailTransport(object): def __init__(self, fileobj, protocol): self.fileobj = fileobj self.protocol = protocol self.disconnecting = False def start(self, clock): self.clock = clock self.fileobj.seek(0, SEEK_END) self.protocol.makeConnection(self) self.tick() def tick(self): anyData = self.fileobj.read(BLOCKSIZE) try: self.protocol.dataReceived(anyData) except: log.err() if anyData: interval = FAST_INTERVAL else: interval = SLOW_INTERVAL self.clock.callLater(interval, self.tick) class Grep(LineReceiver): delimiter = '\n' def __init__(self, term): self.term = term def lineReceived(self, line): if self.term in line: print line.rstrip("\n") def main(): from twisted.internet import reactor TailTransport(file("/var/log/syslog", "rb"), Grep("pants")).start(reactor) reactor.run() main() }}} |
99 Concurrent Bottles of Beer
99 bottles of beer on the wall, 99 bottles of beer. Take one down, pass it around, Take one down, pass it around, 97 bottles of beer on the wall, 97 bottles of beer 98 bottles of beer on the wall, 98 bottles of beer
Solutions to common concurrent problems in different styles/toolkits. Inspired by 99 Bottles of Beer.
Include a brief description if you add to this page. Please make sure your source is well commented - concurrency is hard!
The Problem
Implement
#!/bin/sh tail -f /var/log/system.log |grep pants
in concurrent Python. On unix, you can send syslog messages via logger; filenames may vary.
Errata
Solutions using readline() will exhibit bugs if less than a full line is flushed to disk. If your input file is syslog, this shouldn't be a problem however.
Glyph makes the very valid point that these examples are in fact serial programs (ie, they don't do more than one thing at a time). A better example would be following multiple files simultaneously.
Generator
Generators implement a "pull-style" approach to concurrency.
1 import time
2 import re
3
4 def follow(fname):
5 f = file(fname)
6 f.seek(0,2) # go to the end
7 while True:
8 l = f.readline()
9 if not l: # no data
10 time.sleep(.1)
11 else:
12 yield l
13
14 def grep(lines, pattern):
15 regex = re.compile(pattern)
16 for l in lines:
17 if regex.match(l):
18 yield l
19
20 def printer(lines):
21 for l in lines:
22 print l.strip()
23
24 f = follow('/var/log/system.log')
25 g = grep(f, ".*pants.*")
26 p = printer(g)
27
28 for i in p:
29 pass
Coroutines
The inversion of the generator example above, coroutines use a "push-style" approach to concurrency:
1 import time
2 import re
3 from functools import wraps
4
5
6 def coroutine(func):
7 @wraps(func)
8 def thing(*args, **kwargs):
9 gen = func(*args, **kwargs)
10 gen.next() # advance to the first yield
11 return gen
12 return thing
13
14 @coroutine
15 def follow(fname, next):
16 f = file(fname)
17 f.seek(0,2) # go to the end
18 while True:
19 l = f.readline()
20 if not l: # no data
21 time.sleep(.1)
22 else:
23 next.send(l)
24
25 @coroutine
26 def grep(pattern, next):
27 regex = re.compile(pattern)
28 while True:
29 l = yield
30 if regex.match(l):
31 next.send(l)
32
33 @coroutine
34 def printer():
35 while True:
36 l = yield
37 print l.strip()
38
39
40 p = printer()
41 g = grep('.*pants.*', p)
42 f = follow('/var/log/system.log', g)
Greenlets
Greenlets are similar to coroutines.
1 import greenlet
2 import time
3 import re
4
5 def follow(fname, next):
6 # setup
7 f = file(fname)
8 f.seek(0,2) # go to the end
9 # do stuff
10 while True:
11 l = f.readline()
12 if not l: # no data
13 time.sleep(.1)
14 else:
15 next.switch(l)
16
17 def grep(pattern, next):
18 # setup
19 regex = re.compile(pattern)
20
21 def do_stuff(l):
22 parent = greenlet.getcurrent().parent
23 while True:
24 if regex.match(l):
25 l = next.switch(l)
26 else:
27 l = parent.switch() # subtle!
28
29 return do_stuff
30
31 def printer(l):
32 # no setup
33 parent = greenlet.getcurrent().parent
34 # do stuff
35 while True:
36 print l.strip()
37 l = parent.switch()
38
39 p = greenlet.greenlet(printer)
40 g = greenlet.greenlet(grep(".*pants.*", p))
41 follow("/var/log/system.log", g)
Kamaelia
1 import time
2 import re
3
4 import Axon
5 from Kamaelia.Chassis.Pipeline import Pipeline
6
7 # threaded due to the time.sleep() call
8 # No yield since a threaded component
9 class Follow(Axon.ThreadedComponent.threadedcomponent):
10 def __init__(self, fname, **argv):
11 self.fname = fname
12 super(Follow,self).__init__(**argv)
13 def main(self):
14 f = file(self.fname)
15 f.seek(0,2) # go to the end
16 while not self.dataReady("control"):
17 l = f.readline()
18 if not l: # no data
19 time.sleep(.1)
20 else:
21 self.send(l, "outbox")
22
23 self.send(self.recv("control"), "signal")
24
25 class Grep(Axon.Component.component):
26 # Default pattern, override in constructor with pattern="some pattern"
27 # See below
28 pattern = "."
29 def main(self):
30 regex = re.compile(self.pattern)
31 while not self.dataReady("control"):
32 for l in self.Inbox("inbox"):
33 if regex.match(l):
34 self.send(l, "outbox")
35 self.pause()
36 yield 1
37 self.send(self.recv("control"), "signal")
38
39 class Printer(Axon.Component.component):
40 def main(self):
41 while not self.dataReady("control"):
42 for l in self.Inbox("inbox"):
43 print l.strip()
44 self.pause()
45 yield 1
46 self.send(self.recv("control"), "signal")
47
48 Pipeline(
49 Follow('/var/log/system.log'),
50 Grep(".*pants.*"),
51 Printer(),
52 ).run()
Twisted
1 from twisted.protocols.basic import LineReceiver
2 from twisted.python import log
3
4 SLOW_INTERVAL = 1.0
5 FAST_INTERVAL = 0.001
6 SEEK_END = 2
7 BLOCKSIZE = 8192
8
9 class TailTransport(object):
10 def __init__(self, fileobj, protocol):
11 self.fileobj = fileobj
12 self.protocol = protocol
13 self.disconnecting = False
14
15 def start(self, clock):
16 self.clock = clock
17 self.fileobj.seek(0, SEEK_END)
18 self.protocol.makeConnection(self)
19 self.tick()
20
21 def tick(self):
22 anyData = self.fileobj.read(BLOCKSIZE)
23 try:
24 self.protocol.dataReceived(anyData)
25 except:
26 log.err()
27 if anyData:
28 interval = FAST_INTERVAL
29 else:
30 interval = SLOW_INTERVAL
31 self.clock.callLater(interval, self.tick)
32
33 class Grep(LineReceiver):
34 delimiter = '\n'
35 def __init__(self, term):
36 self.term = term
37
38 def lineReceived(self, line):
39 if self.term in line:
40 print line.rstrip("\n")
41
42 def main():
43 from twisted.internet import reactor
44 TailTransport(file("/var/log/syslog", "rb"),
45 Grep("pants")).start(reactor)
46 reactor.run()
47
48 main()