Differences between revisions 1 and 10 (spanning 9 versions)
Revision 1 as of 2009-05-19 16:31:19
Size: 4958
Editor: PeterFein
Comment:
Revision 10 as of 2009-06-04 18:42:24
Size: 6486
Editor: PaulBoddie
Comment: Add syntax highlighting to Twisted example.
Deletions are marked like this. Additions are marked like this.
Line 1: Line 1:
= 99 Concurrency Bottles of Beer = = 99 Concurrent Bottles of Beer =
Line 5: Line 5:
Take one down, pass it around,
Line 6: Line 7:
Take one down, pass it around, 98 bottles of beer on the wall, 98 bottles of beer
Line 11: Line 12:
Include your name and a brief description if you add to this page. Please make sure your source is well commented - concurrency is hard!

== The problem ==
Include a brief description if you add to this page. Please make sure your source is well commented - concurrency is hard!

== The Problem ==
Line 22: Line 23:

== Errata ==
Solutions using readline() will exhibit bugs if less than a full line is flushed to disk. If your input file is syslog, this shouldn't be a problem however.

Glyph makes the very valid point that these examples are in fact serial programs (ie, they don't do more than one thing at a time). A better example would be following multiple files simultaneously.
Line 99: Line 105:
        
Line 118: Line 124:
Line 126: Line 131:
    
Line 130: Line 135:
    
Line 133: Line 138:
        
Line 145: Line 149:
Line 151: Line 154:
# parents == main greenlet - HERE!
g_printer
= greenlet.greenlet(printer) 
g_grep = greenlet.greenlet(grep(".*pants.*", g_printer))

follow("/var/log/system.log", g_grep)
}}}

== Kamelia ==
p = greenlet.greenlet(printer)
g = greenlet.greenlet(grep(".*pants.*", p))
follow("/var/log/system.log", g)
}}}

== Kamaelia ==
Line 187: Line 188:
    pattern = "."      pattern = "."
Line 208: Line 209:
    Follow('tail -f /var/log/system.log'),     Follow('/var/log/system.log'),
Line 213: Line 214:


== Twisted ==

{{{#!python
from twisted.protocols.basic import LineReceiver
from twisted.python import log

SLOW_INTERVAL = 1.0
FAST_INTERVAL = 0.001
SEEK_END = 2
BLOCKSIZE = 8192

class TailTransport(object):
  def __init__(self, fileobj, protocol):
      self.fileobj = fileobj
      self.protocol = protocol
      self.disconnecting = False

  def start(self, clock):
      self.clock = clock
      self.fileobj.seek(0, SEEK_END)
      self.protocol.makeConnection(self)
      self.tick()

  def tick(self):
      anyData = self.fileobj.read(BLOCKSIZE)
      try:
          self.protocol.dataReceived(anyData)
      except:
          log.err()
      if anyData:
          interval = FAST_INTERVAL
      else:
          interval = SLOW_INTERVAL
      self.clock.callLater(interval, self.tick)

class Grep(LineReceiver):
  delimiter = '\n'
  def __init__(self, term):
      self.term = term

  def lineReceived(self, line):
      if self.term in line:
          print line.rstrip("\n")

def main():
  from twisted.internet import reactor
  TailTransport(file("/var/log/syslog", "rb"),
                Grep("pants")).start(reactor)
  reactor.run()

main()
}}}

99 Concurrent Bottles of Beer

99 bottles of beer on the wall, 99 bottles of beer.
Take one down, pass it around,
Take one down, pass it around,
97 bottles of beer on the wall, 97 bottles of beer
98 bottles of beer on the wall, 98 bottles of beer

Solutions to common concurrent problems in different styles/toolkits. Inspired by 99 Bottles of Beer.

Include a brief description if you add to this page. Please make sure your source is well commented - concurrency is hard!

The Problem

Implement

#!/bin/sh
tail -f /var/log/system.log |grep pants

in concurrent Python. On unix, you can send syslog messages via logger; filenames may vary.

Errata

Solutions using readline() will exhibit bugs if less than a full line is flushed to disk. If your input file is syslog, this shouldn't be a problem however.

Glyph makes the very valid point that these examples are in fact serial programs (ie, they don't do more than one thing at a time). A better example would be following multiple files simultaneously.

Generator

Generators implement a "pull-style" approach to concurrency.

Toggle line numbers
   1 import time
   2 import re
   3 
   4 def follow(fname):
   5     f = file(fname)
   6     f.seek(0,2) # go to the end
   7     while True:
   8         l = f.readline()
   9         if not l: # no data
  10             time.sleep(.1)
  11         else:
  12             yield l
  13 
  14 def grep(lines, pattern):
  15     regex = re.compile(pattern)
  16     for l in lines:
  17         if regex.match(l):
  18             yield l
  19 
  20 def printer(lines):
  21     for l in lines:
  22         print l.strip()
  23 
  24 f = follow('/var/log/system.log')
  25 g = grep(f, ".*pants.*")
  26 p = printer(g)
  27 
  28 for i in p:
  29     pass

Coroutines

The inversion of the generator example above, coroutines use a "push-style" approach to concurrency:

Toggle line numbers
   1 import time
   2 import re
   3 from functools import wraps
   4 
   5 
   6 def coroutine(func):
   7     @wraps(func)
   8     def thing(*args, **kwargs):
   9         gen = func(*args, **kwargs)
  10         gen.next() # advance to the first yield
  11         return gen
  12     return thing
  13 
  14 @coroutine
  15 def follow(fname, next):
  16     f = file(fname)
  17     f.seek(0,2) # go to the end
  18     while True:
  19         l = f.readline()
  20         if not l: # no data
  21             time.sleep(.1)
  22         else:
  23             next.send(l)
  24 
  25 @coroutine
  26 def grep(pattern, next):
  27     regex = re.compile(pattern)
  28     while True:
  29         l = yield
  30         if regex.match(l):
  31             next.send(l)
  32 
  33 @coroutine
  34 def printer():
  35     while True:
  36         l = yield
  37         print l.strip()
  38 
  39 
  40 p = printer()
  41 g = grep('.*pants.*', p)
  42 f = follow('/var/log/system.log', g)

Greenlets

Greenlets are similar to coroutines.

Toggle line numbers
   1 import greenlet
   2 import time
   3 import re
   4 
   5 def follow(fname, next):
   6     # setup
   7     f = file(fname)
   8     f.seek(0,2) # go to the end
   9     # do stuff
  10     while True:
  11         l = f.readline()
  12         if not l: # no data
  13             time.sleep(.1)
  14         else:
  15             next.switch(l)
  16 
  17 def grep(pattern, next):
  18     # setup
  19     regex = re.compile(pattern)
  20 
  21     def do_stuff(l):
  22         parent = greenlet.getcurrent().parent
  23         while True:
  24             if regex.match(l):
  25                 l = next.switch(l)
  26             else:
  27                 l = parent.switch() # subtle!
  28 
  29     return do_stuff
  30 
  31 def printer(l):
  32     # no setup
  33     parent = greenlet.getcurrent().parent
  34     # do stuff
  35     while True:
  36         print l.strip()
  37         l = parent.switch()
  38 
  39 p = greenlet.greenlet(printer)
  40 g = greenlet.greenlet(grep(".*pants.*", p))
  41 follow("/var/log/system.log", g)

Kamaelia

Toggle line numbers
   1 import time
   2 import re
   3 
   4 import Axon
   5 from Kamaelia.Chassis.Pipeline import Pipeline
   6 
   7 # threaded due to the time.sleep() call
   8 # No yield since a threaded component
   9 class Follow(Axon.ThreadedComponent.threadedcomponent):
  10     def __init__(self, fname, **argv):
  11         self.fname = fname
  12         super(Follow,self).__init__(**argv)
  13     def main(self):
  14         f = file(self.fname)
  15         f.seek(0,2) # go to the end
  16         while not self.dataReady("control"):
  17             l = f.readline()
  18             if not l: # no data
  19                 time.sleep(.1)
  20             else:
  21                 self.send(l, "outbox")
  22 
  23         self.send(self.recv("control"), "signal")
  24 
  25 class Grep(Axon.Component.component):
  26     # Default pattern, override in constructor with pattern="some pattern"
  27     # See below
  28     pattern = "."
  29     def main(self):
  30         regex = re.compile(self.pattern)
  31         while not self.dataReady("control"):
  32            for l in self.Inbox("inbox"):
  33                if regex.match(l):
  34                    self.send(l, "outbox")
  35            self.pause()
  36            yield 1
  37         self.send(self.recv("control"), "signal")
  38 
  39 class Printer(Axon.Component.component):
  40     def main(self):
  41         while not self.dataReady("control"):
  42             for l in self.Inbox("inbox"):
  43                 print l.strip()
  44             self.pause()
  45             yield 1
  46         self.send(self.recv("control"), "signal")
  47 
  48 Pipeline(
  49     Follow('/var/log/system.log'),
  50     Grep(".*pants.*"),
  51     Printer(),
  52 ).run()

Twisted

Toggle line numbers
   1 from twisted.protocols.basic import LineReceiver
   2 from twisted.python import log
   3 
   4 SLOW_INTERVAL = 1.0
   5 FAST_INTERVAL = 0.001
   6 SEEK_END = 2
   7 BLOCKSIZE = 8192
   8 
   9 class TailTransport(object):
  10   def __init__(self, fileobj, protocol):
  11       self.fileobj = fileobj
  12       self.protocol = protocol
  13       self.disconnecting = False
  14 
  15   def start(self, clock):
  16       self.clock = clock
  17       self.fileobj.seek(0, SEEK_END)
  18       self.protocol.makeConnection(self)
  19       self.tick()
  20 
  21   def tick(self):
  22       anyData = self.fileobj.read(BLOCKSIZE)
  23       try:
  24           self.protocol.dataReceived(anyData)
  25       except:
  26           log.err()
  27       if anyData:
  28           interval = FAST_INTERVAL
  29       else:
  30           interval = SLOW_INTERVAL
  31       self.clock.callLater(interval, self.tick)
  32 
  33 class Grep(LineReceiver):
  34   delimiter = '\n'
  35   def __init__(self, term):
  36       self.term = term
  37 
  38   def lineReceived(self, line):
  39       if self.term in line:
  40           print line.rstrip("\n")
  41 
  42 def main():
  43   from twisted.internet import reactor
  44   TailTransport(file("/var/log/syslog", "rb"),
  45                 Grep("pants")).start(reactor)
  46   reactor.run()
  47 
  48 main()

Concurrency/99Bottles (last edited 2014-10-14 21:57:17 by ppaez)

Unable to edit the page? See the FrontPage for instructions.