}}}
{{{
99 bottles of beer on the wall, 99 bottles of beer.
Take one down, pass it around,
Take one down, pass it around,
97 bottles of beer on the wall, 97 bottles of beer
98 bottles of beer on the wall, 98 bottles of beer
}}}
The purpose of this page is to show solutions to common concurrent problems in different styles/toolkits. Inspired by [[http://99-bottles-of-beer.net/ | 99 Bottles of Beer]]. It is not intended to demonstrate high-performance code, but rather to give potential users a sense of what typical code using the various libraries looks like.
These example are interesting, in that they provide an idea of clarity, how much boiler plate code is needed, how message passing looks, and how to yield to the operating system.
Include a brief description if you add to this page. Please make sure your source is well commented - concurrency is hard!
<>
== The Problem ==
Implement
{{{#!sh
#!/bin/sh
tail -f /var/log/system.log |grep pants
}}}
in concurrent Python. On unix, you can send syslog messages via `logger`; filenames may vary.
== Errata ==
Solutions using readline() will exhibit bugs if less than a full line is flushed to disk. If your input file is syslog, this shouldn't be a problem however.
Glyph makes the very valid point that these examples are in fact serial programs (ie, they don't do more than one thing at a time). A better example would be following multiple files simultaneously.
== Solutions ==
=== Generator ===
Generators implement a "pull-style" approach to concurrency.
{{{#!python
import time
import re
def follow(fname):
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
def grep(lines, pattern):
regex = re.compile(pattern)
for l in lines:
if regex.match(l):
yield l
def printer(lines):
for l in lines:
print l.strip()
f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
}}}
=== Coroutines ===
The inversion of the generator example above, coroutines use a "push-style" approach to concurrency:
{{{#!python
import time
import re
from functools import wraps
def coroutine(func):
@wraps(func)
def thing(*args, **kwargs):
gen = func(*args, **kwargs)
gen.next() # advance to the first yield
return gen
return thing
@coroutine
def follow(fname, next):
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
next.send(l)
@coroutine
def grep(pattern, next):
regex = re.compile(pattern)
while True:
l = yield
if regex.match(l):
next.send(l)
@coroutine
def printer():
while True:
l = yield
print l.strip()
p = printer()
g = grep('.*pants.*', p)
f = follow('/var/log/system.log', g)
}}}
=== Greenlets ===
Greenlets are similar to coroutines.
{{{#!python
import greenlet
import time
import re
def follow(fname, next):
# setup
f = file(fname)
f.seek(0,2) # go to the end
# do stuff
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
next.switch(l)
def grep(pattern, next):
# setup
regex = re.compile(pattern)
def do_stuff(l):
parent = greenlet.getcurrent().parent
while True:
if regex.match(l):
l = next.switch(l)
else:
l = parent.switch() # subtle!
return do_stuff
def printer(l):
# no setup
parent = greenlet.getcurrent().parent
# do stuff
while True:
print l.strip()
l = parent.switch()
p = greenlet.greenlet(printer)
g = greenlet.greenlet(grep(".*pants.*", p))
follow("/var/log/system.log", g)
}}}
=== Gevent ===
[[http://www.gevent.org | Gevent]] builds user-level threads on top of greenlets.
{{{#!python
import re
import gevent
from gevent.queue import Queue
def follow(fname, dest):
# setup
f = file(fname)
f.seek(0,2) # go to the end
# do stuff
while True:
l = f.readline()
if not l: # no data
gevent.sleep(.1)
else:
dest.put(l)
def grep(pattern, source, dest):
# setup
regex = re.compile(pattern)
def do_stuff():
while True:
l = source.get()
if regex.match(l):
dest.put(l)
return do_stuff
def printer(source):
while True:
line = source.get()
print line.strip()
source_queue = Queue()
filtered_queue = Queue()
p = gevent.spawn(printer, filtered_queue)
g = gevent.spawn(grep(".*pants.*", source_queue, filtered_queue))
follow("/var/log/system.log", source_queue)
}}}
=== Kamaelia ===
{{{#!python
import time
import re
import Axon
from Kamaelia.Chassis.Pipeline import Pipeline
# threaded due to the time.sleep() call
# No yield since a threaded component
class Follow(Axon.ThreadedComponent.threadedcomponent):
def __init__(self, fname, **argv):
self.fname = fname
super(Follow,self).__init__(**argv)
def main(self):
f = file(self.fname)
f.seek(0,2) # go to the end
while not self.dataReady("control"):
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
self.send(l, "outbox")
self.send(self.recv("control"), "signal")
class Grep(Axon.Component.component):
# Default pattern, override in constructor with pattern="some pattern"
# See below
pattern = "."
def main(self):
regex = re.compile(self.pattern)
while not self.dataReady("control"):
for l in self.Inbox("inbox"):
if regex.match(l):
self.send(l, "outbox")
self.pause()
yield 1
self.send(self.recv("control"), "signal")
class Printer(Axon.Component.component):
def main(self):
while not self.dataReady("control"):
for l in self.Inbox("inbox"):
print l.strip()
self.pause()
yield 1
self.send(self.recv("control"), "signal")
Pipeline(
Follow('/var/log/system.log'),
Grep(".*pants.*"),
Printer(),
).run()
}}}
=== Twisted ===
{{{#!python
from twisted.protocols.basic import LineReceiver
from twisted.python import log
SLOW_INTERVAL = 1.0
FAST_INTERVAL = 0.001
SEEK_END = 2
BLOCKSIZE = 8192
class TailTransport(object):
def __init__(self, fileobj, protocol):
self.fileobj = fileobj
self.protocol = protocol
self.disconnecting = False
def start(self, clock):
self.clock = clock
self.fileobj.seek(0, SEEK_END)
self.protocol.makeConnection(self)
self.tick()
def tick(self):
anyData = self.fileobj.read(BLOCKSIZE)
try:
self.protocol.dataReceived(anyData)
except:
log.err()
if anyData:
interval = FAST_INTERVAL
else:
interval = SLOW_INTERVAL
self.clock.callLater(interval, self.tick)
class Grep(LineReceiver):
delimiter = '\n'
def __init__(self, term):
self.term = term
def lineReceived(self, line):
if self.term in line:
print line.rstrip("\n")
def main():
from twisted.internet import reactor
TailTransport(file("/var/log/syslog", "rb"),
Grep("pants")).start(reactor)
reactor.run()
main()
}}}
=== Fibra ===
{{{#!python
import fibra
import re
def tail(f, output):
f.seek(0,2)
while True:
line = f.readline()
yield output.push(line) if line else 0.1 #push line, or sleep.
def grep(pattern, input, output):
regex = re.compile(pattern)
while True:
line = yield input.pop()
if regex.match(line):
yield output.push(line)
def printer(input):
while True:
line = yield input.pop()
print line.strip()
schedule = fibra.schedule()
schedule.install(tail(open("/var/log/syslog.log","r"), fibra.Tube("T2G")))
schedule.install(grep(".*pants.*", fibra.Tube("T2G"), fibra.Tube("G2P")))
schedule.install(printer(fibra.Tube("G2P")))
schedule.run()}}}
=== Stackless ===
{{{#!python
import stackless
import time
import re
@stackless.tasklet
def tail(f, output):
f.seek(0,2)
while True:
line = f.readline()
if line:
output.send(line)
else:
time.sleep(0.1)
@stackless.tasklet
def grep(pattern, input, output):
regex = re.compile(pattern)
while True:
line = input.receive()
if regex.match(line):
output.send(line)
@stackless.tasklet
def printer(input):
while True:
line = input.receive()
print line.strip()
T2G = stackless.channel()
G2P = stackless.channel()
tail(open("/var/log/syslog.log","r"), T2G)
grep(".*pants.*", T2G, G2P)
printer(G2P)
stackless.run()}}}
=== circuits ===
{{{#!python
import sys
from circuits.io import File
from circuits import Component
from circuits.net.protocols import LP
class Tail(Component):
def init(self, filename):
(File(filename, "r", autoclose=False) + LP()).register(self).seek(0, 2)
class Grep(Component):
def init(self, pattern):
self.pattern = pattern
def line(self, line):
if self.pattern in line:
print line
(Tail(sys.argv[1]) + Grep(sys.argv[2])).run()
}}}
=== pprocess ===
This example needs pprocess 0.5. The activity functions are similar to the generator (and other) solutions, and the differences lie in the use of the `multigrep` function, which is invoked to provide `grep` functionality for each pattern in a separate process, and in the way the `multigrep` function itself follows several files using the `multifollow` callable (the `follow` function invoked in a separate process). A channel is used in the `follow` function to communicate new lines which are then consumed via a queue in the `grep` function, which in turn communicates matching lines via a channel which are then consumed by the `printer` function.
{{{#!python
import pprocess
import time
import re
def follow(ch, fname):
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
ch.send(l)
def grep(ch, lines, pattern):
regex = re.compile(pattern)
for l in lines:
if regex.match(l):
ch.send(l)
def printer(lines):
for l in lines:
print l.strip()
def multigrep(ch, pattern):
queue = pprocess.Queue(continuous=1)
multifollow = queue.manage(follow)
# Launch concurrent following activities.
multifollow('/var/log/system.log')
multifollow('/var/log/other.log')
multifollow('/var/log/another.log')
# Handle incoming lines using the specified pattern.
grep(ch, queue, pattern)
# Permit multiple simultaneous grep activities.
queue = pprocess.Queue(continuous=1)
multigrep = queue.manage(multigrep)
# Launch concurrent grep activities.
multigrep(".*pants.*")
multigrep(".*trousers.*")
multigrep(".*shorts.*")
# Print incoming lines.
p = printer(queue)
}}}
=== pypes ===
Here is a simple example based on the pypes framework. It should look similar to the Stackless example above. Pypes abstracts away the semantics of tasklets and channels and provides a model for looser coupling. This makes connecting components at runtime easier which is necessary since at the point in which the component is created, it has no idea what other components it might be interacting with.
{{{#!python
# load the pypes framework
from pkg_resources import require
require('pypes')
import re
import time
from pypes.pipeline import Dataflow
from pypes.component import Component
class Tail(Component):
__metatype__ = 'ADAPTER'
def __init__(self, fp):
Component.__init__(self)
self.fp = fp
def run(self):
self.fp.seek(0,2)
while True:
self.receive('in')
line = self.fp.readline()
if line:
self.send('out', line.strip())
else:
self.yield_ctrl()
class Grep(Component):
__metatype__ = 'TRANSFORMER'
def __init__(self, pattern):
Component.__init__(self)
self.regex = re.compile(pattern)
def run(self):
while True:
for line in self.receive_all('in'):
if self.regex.match(line):
self.send('out', line)
self.yield_ctrl()
class Printer(Component):
__metatype__ = 'PUBLISHER'
def __init__(self):
Component.__init__(self)
def run(self):
while True:
for data in self.receive_all('in'):
print data
self.yield_ctrl()
tail = Tail(open('/var/log/system.log', 'r'))
grep = Grep('.*pants.*')
printer = Printer()
pipe = Dataflow({
tail: {grep:('out','in')},
grep: {printer:('out', 'in')}
})
while True:
pipe.send(None)
time.sleep(0.1)
}}}