Differences between revisions 17 and 18
Revision 17 as of 2009-06-11 04:34:05
Size: 8383
Editor: 124-169-176-221
Comment:
Revision 18 as of 2009-06-11 05:15:14
Size: 11538
Editor: 124-171-199-17
Comment: Added example of concurrency with circuits
Deletions are marked like this. Additions are marked like this.
Line 338: Line 338:

== circuits ==
{{{#!python
import re
from time import sleep

from circuits import Event, Component, Manager, Thread

SEEK_END = 2
BLOCKSIZE = 8192

LINESEP = re.compile("\r?\n")

def splitLines(s, buffer):
    lines = LINESEP.split(buffer + s)
    return lines[:-1], lines[-1]

class Follow(Thread):

    def __init__(self, fd):
        super(Follow, self).__init__()
        self.fd = fd
        self.fd.seek(0, SEEK_END)

    def stopped(self, manager):
        if not manager == self:
            self.stop()

    def run(self):
        while self.alive:
            data = self.fd.read(BLOCKSIZE)
            if data:
                self.push(Event(data), "read", self.channel)
                sleep(0.01)
            else:
                sleep(0.1)

class LineBuffer(Component):

    def __init__(self):
        super(LineBuffer, self).__init__()
        self._data = ""

    def read(self, data):
        lines, self._data = splitLines(data, self._data)
        for line in lines:
            self.push(Event(line), "line", self.channel)

class Grep(Component):

    def __init__(self, pattern):
        super(Grep, self).__init__()
        self._pattern = pattern

    def line(self, line):
        if self._pattern in line:
            print line

m = Manager()
f = Follow(file("/tmp/foo"))
f.register(m)
LineBuffer().register(m)
Grep("pants").register(m)

f.start()
m.run()

    * A Thread(ed) Component (Follow) is used to continuously read from the given file and when data is available propagates an event containing the newly read data to a "read" channel.
    * A LineBuffer? Component listener for events on the "read" channel, takes the newly read data from the Follow Component and splits this buffer up one line at a time propagating a single line at a time onto a "line" channel.
    * Finally the Grep Component listens for events on the "line" channel and searches for the given pattern, if found will print the line.

Asynchronous I/O

This second implementation uses the standard asynchronous File Component available in circuits.io.

import re

from circuits.io import File
from circuits import Event, Component

SEEK_END = 2

LINESEP = re.compile("\r?\n")

def splitLines(s, buffer):
    lines = LINESEP.split(buffer + s)
    return lines[:-1], lines[-1]

class Follow(Component):

    def __init__(self, filename):
        super(Follow, self).__init__()
        fd = File(filename, "r")
        fd.seek(0, SEEK_END)
        fd.register(self)

class LineBuffer(Component):

    def __init__(self):
        super(LineBuffer, self).__init__()
        self._data = ""

    def read(self, data):
        lines, self._data = splitLines(data, self._data)
        for line in lines:
            self.push(Event(line), "line", self.channel)

class Grep(Component):

    def __init__(self, pattern):
        super(Grep, self).__init__()
        self._pattern = pattern

    def line(self, line):
        if self._pattern in line:
            print line

(Follow("/tmp/foo") + LineBuffer() + Grep("pants")).run()
}}}

99 Concurrent Bottles of Beer

99 bottles of beer on the wall, 99 bottles of beer.
Take one down, pass it around,
Take one down, pass it around,
97 bottles of beer on the wall, 97 bottles of beer
98 bottles of beer on the wall, 98 bottles of beer

The purpose of this page is to show solutions to common concurrent problems in different styles/toolkits. Inspired by 99 Bottles of Beer. It is not intended to demonstrate high-performance code, but rather to give potential users a sense of what typical code using the various libraries looks like.

These example are interesting, in that they provide an idea of clarity, how much boiler plate code is needed, how message passing looks, and how to yield to the operating system.

Include a brief description if you add to this page. Please make sure your source is well commented - concurrency is hard!

The Problem

Implement

#!/bin/sh
tail -f /var/log/system.log |grep pants

in concurrent Python. On unix, you can send syslog messages via logger; filenames may vary.

Errata

Solutions using readline() will exhibit bugs if less than a full line is flushed to disk. If your input file is syslog, this shouldn't be a problem however.

Glyph makes the very valid point that these examples are in fact serial programs (ie, they don't do more than one thing at a time). A better example would be following multiple files simultaneously.

Generator

Generators implement a "pull-style" approach to concurrency.

   1 import time
   2 import re
   3 
   4 def follow(fname):
   5     f = file(fname)
   6     f.seek(0,2) # go to the end
   7     while True:
   8         l = f.readline()
   9         if not l: # no data
  10             time.sleep(.1)
  11         else:
  12             yield l
  13 
  14 def grep(lines, pattern):
  15     regex = re.compile(pattern)
  16     for l in lines:
  17         if regex.match(l):
  18             yield l
  19 
  20 def printer(lines):
  21     for l in lines:
  22         print l.strip()
  23 
  24 f = follow('/var/log/system.log')
  25 g = grep(f, ".*pants.*")
  26 p = printer(g)
  27 
  28 for i in p:
  29     pass

Coroutines

The inversion of the generator example above, coroutines use a "push-style" approach to concurrency:

   1 import time
   2 import re
   3 from functools import wraps
   4 
   5 
   6 def coroutine(func):
   7     @wraps(func)
   8     def thing(*args, **kwargs):
   9         gen = func(*args, **kwargs)
  10         gen.next() # advance to the first yield
  11         return gen
  12     return thing
  13 
  14 @coroutine
  15 def follow(fname, next):
  16     f = file(fname)
  17     f.seek(0,2) # go to the end
  18     while True:
  19         l = f.readline()
  20         if not l: # no data
  21             time.sleep(.1)
  22         else:
  23             next.send(l)
  24 
  25 @coroutine
  26 def grep(pattern, next):
  27     regex = re.compile(pattern)
  28     while True:
  29         l = yield
  30         if regex.match(l):
  31             next.send(l)
  32 
  33 @coroutine
  34 def printer():
  35     while True:
  36         l = yield
  37         print l.strip()
  38 
  39 
  40 p = printer()
  41 g = grep('.*pants.*', p)
  42 f = follow('/var/log/system.log', g)

Greenlets

Greenlets are similar to coroutines.

   1 import greenlet
   2 import time
   3 import re
   4 
   5 def follow(fname, next):
   6     # setup
   7     f = file(fname)
   8     f.seek(0,2) # go to the end
   9     # do stuff
  10     while True:
  11         l = f.readline()
  12         if not l: # no data
  13             time.sleep(.1)
  14         else:
  15             next.switch(l)
  16 
  17 def grep(pattern, next):
  18     # setup
  19     regex = re.compile(pattern)
  20 
  21     def do_stuff(l):
  22         parent = greenlet.getcurrent().parent
  23         while True:
  24             if regex.match(l):
  25                 l = next.switch(l)
  26             else:
  27                 l = parent.switch() # subtle!
  28 
  29     return do_stuff
  30 
  31 def printer(l):
  32     # no setup
  33     parent = greenlet.getcurrent().parent
  34     # do stuff
  35     while True:
  36         print l.strip()
  37         l = parent.switch()
  38 
  39 p = greenlet.greenlet(printer)
  40 g = greenlet.greenlet(grep(".*pants.*", p))
  41 follow("/var/log/system.log", g)

Kamaelia

   1 import time
   2 import re
   3 
   4 import Axon
   5 from Kamaelia.Chassis.Pipeline import Pipeline
   6 
   7 # threaded due to the time.sleep() call
   8 # No yield since a threaded component
   9 class Follow(Axon.ThreadedComponent.threadedcomponent):
  10     def __init__(self, fname, **argv):
  11         self.fname = fname
  12         super(Follow,self).__init__(**argv)
  13     def main(self):
  14         f = file(self.fname)
  15         f.seek(0,2) # go to the end
  16         while not self.dataReady("control"):
  17             l = f.readline()
  18             if not l: # no data
  19                 time.sleep(.1)
  20             else:
  21                 self.send(l, "outbox")
  22 
  23         self.send(self.recv("control"), "signal")
  24 
  25 class Grep(Axon.Component.component):
  26     # Default pattern, override in constructor with pattern="some pattern"
  27     # See below
  28     pattern = "."
  29     def main(self):
  30         regex = re.compile(self.pattern)
  31         while not self.dataReady("control"):
  32            for l in self.Inbox("inbox"):
  33                if regex.match(l):
  34                    self.send(l, "outbox")
  35            self.pause()
  36            yield 1
  37         self.send(self.recv("control"), "signal")
  38 
  39 class Printer(Axon.Component.component):
  40     def main(self):
  41         while not self.dataReady("control"):
  42             for l in self.Inbox("inbox"):
  43                 print l.strip()
  44             self.pause()
  45             yield 1
  46         self.send(self.recv("control"), "signal")
  47 
  48 Pipeline(
  49     Follow('/var/log/system.log'),
  50     Grep(".*pants.*"),
  51     Printer(),
  52 ).run()

Twisted

   1 from twisted.protocols.basic import LineReceiver
   2 from twisted.python import log
   3 
   4 SLOW_INTERVAL = 1.0
   5 FAST_INTERVAL = 0.001
   6 SEEK_END = 2
   7 BLOCKSIZE = 8192
   8 
   9 class TailTransport(object):
  10   def __init__(self, fileobj, protocol):
  11       self.fileobj = fileobj
  12       self.protocol = protocol
  13       self.disconnecting = False
  14 
  15   def start(self, clock):
  16       self.clock = clock
  17       self.fileobj.seek(0, SEEK_END)
  18       self.protocol.makeConnection(self)
  19       self.tick()
  20 
  21   def tick(self):
  22       anyData = self.fileobj.read(BLOCKSIZE)
  23       try:
  24           self.protocol.dataReceived(anyData)
  25       except:
  26           log.err()
  27       if anyData:
  28           interval = FAST_INTERVAL
  29       else:
  30           interval = SLOW_INTERVAL
  31       self.clock.callLater(interval, self.tick)
  32 
  33 class Grep(LineReceiver):
  34   delimiter = '\n'
  35   def __init__(self, term):
  36       self.term = term
  37 
  38   def lineReceived(self, line):
  39       if self.term in line:
  40           print line.rstrip("\n")
  41 
  42 def main():
  43   from twisted.internet import reactor
  44   TailTransport(file("/var/log/syslog", "rb"),
  45                 Grep("pants")).start(reactor)
  46   reactor.run()
  47 
  48 main()

Fibra

   1 import fibra
   2 import re
   3 
   4 def tail(f, output):
   5     f.seek(0,2)
   6     while True:
   7         line = f.readline()
   8         yield output.push(line) if line else 0.1 #push line, or sleep.
   9 
  10 def grep(pattern, input, output):
  11     regex = re.compile(pattern)
  12     while True:
  13         line = yield input.pop()
  14         if regex.match(line):
  15             yield output.push(line) 
  16 
  17 def printer(input):
  18     while True:
  19         line = yield input.pop()
  20         print line.strip()
  21     
  22 schedule = fibra.schedule()
  23 schedule.install(tail(open("/var/log/syslog.log","r"), fibra.Tube("T2G")))
  24 schedule.install(grep(".*pants.*", fibra.Tube("T2G"), fibra.Tube("G2P")))
  25 schedule.install(printer(fibra.Tube("G2P")))
  26 schedule.run()

Stackless

   1 import stackless
   2 import time
   3 import re
   4 
   5 @stackless.tasklet
   6 def tail(f, output):
   7     f.seek(0,2)
   8     while True:
   9         line = f.readline()
  10         if line:
  11             output.send(line)
  12         else:
  13             time.sleep(0.1)
  14 
  15 @stackless.tasklet
  16 def grep(pattern, input, output):
  17     regex = re.compile(pattern)
  18     while True:
  19         line = input.receive()
  20         if regex.match(line):
  21             output.send(line)
  22 
  23 @stackless.tasklet
  24 def printer(input):
  25     while True:
  26         line = input.receive()
  27         print line.strip()
  28 
  29 T2G = stackless.channel()
  30 G2P = stackless.channel()
  31 tail(open("/var/log/syslog.log","r"), T2G)
  32 grep(".*pants.*", T2G, G2P)
  33 printer(G2P)
  34 stackless.run()

circuits

   1 import re
   2 from time import sleep
   3 
   4 from circuits import Event, Component, Manager, Thread
   5 
   6 SEEK_END = 2
   7 BLOCKSIZE = 8192
   8 
   9 LINESEP = re.compile("\r?\n")
  10 
  11 def splitLines(s, buffer):
  12     lines = LINESEP.split(buffer + s)
  13     return lines[:-1], lines[-1]
  14 
  15 class Follow(Thread):
  16 
  17     def __init__(self, fd):
  18         super(Follow, self).__init__()
  19         self.fd = fd
  20         self.fd.seek(0, SEEK_END)
  21 
  22     def stopped(self, manager):
  23         if not manager == self:
  24             self.stop()
  25 
  26     def run(self):
  27         while self.alive:
  28             data = self.fd.read(BLOCKSIZE)
  29             if data:
  30                 self.push(Event(data), "read", self.channel)
  31                 sleep(0.01)
  32             else:
  33                 sleep(0.1)
  34 
  35 class LineBuffer(Component):
  36 
  37     def __init__(self):
  38         super(LineBuffer, self).__init__()
  39         self._data = ""
  40 
  41     def read(self, data):
  42         lines, self._data = splitLines(data, self._data)
  43         for line in lines:
  44             self.push(Event(line), "line", self.channel)
  45 
  46 class Grep(Component):
  47 
  48     def __init__(self, pattern):
  49         super(Grep, self).__init__()
  50         self._pattern = pattern
  51 
  52     def line(self, line):
  53         if self._pattern in line:
  54             print line
  55 
  56 m = Manager()
  57 f = Follow(file("/tmp/foo"))
  58 f.register(m)
  59 LineBuffer().register(m)
  60 Grep("pants").register(m)
  61 
  62 f.start()
  63 m.run()
  64 
  65     * A Thread(ed) Component (Follow) is used to continuously read from the given file and when data is available propagates an event containing the newly read data to a "read" channel.
  66     * A LineBuffer? Component listener for events on the "read" channel, takes the newly read data from the Follow Component and splits this buffer up one line at a time propagating a single line at a time onto a "line" channel.
  67     * Finally the Grep Component listens for events on the "line" channel and searches for the given pattern, if found will print the line. 
  68 
  69 Asynchronous I/O
  70 
  71 This second implementation uses the standard asynchronous File Component available in circuits.io.
  72 
  73 import re
  74 
  75 from circuits.io import File
  76 from circuits import Event, Component
  77 
  78 SEEK_END = 2
  79 
  80 LINESEP = re.compile("\r?\n")
  81 
  82 def splitLines(s, buffer):
  83     lines = LINESEP.split(buffer + s)
  84     return lines[:-1], lines[-1]
  85 
  86 class Follow(Component):
  87 
  88     def __init__(self, filename):
  89         super(Follow, self).__init__()
  90         fd = File(filename, "r")
  91         fd.seek(0, SEEK_END)
  92         fd.register(self)
  93 
  94 class LineBuffer(Component):
  95 
  96     def __init__(self):
  97         super(LineBuffer, self).__init__()
  98         self._data = ""
  99 
 100     def read(self, data):
 101         lines, self._data = splitLines(data, self._data)
 102         for line in lines:
 103             self.push(Event(line), "line", self.channel)
 104 
 105 class Grep(Component):
 106 
 107     def __init__(self, pattern):
 108         super(Grep, self).__init__()
 109         self._pattern = pattern
 110 
 111     def line(self, line):
 112         if self._pattern in line:
 113             print line
 114 
 115 (Follow("/tmp/foo") + LineBuffer() + Grep("pants")).run()

Concurrency/99Bottles (last edited 2014-10-14 21:57:17 by ppaez)

Unable to edit the page? See the FrontPage for instructions.