Created
September 14, 2012 15:12
-
-
Save shihongzhi/3722535 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
##Part 1 | |
#iteration | |
class countdown_iter(object): | |
def __init__(self, start): | |
self.count = start | |
def __iter__(self): | |
return self | |
def next(self): | |
if self.count <= 0: | |
raise StopIteration | |
r = self.count | |
self.count -= 1 | |
return r | |
#Calling a generator function creates an generator objct. | |
#However, it does not start running the function | |
#The function only executes on next() | |
def countdown_gen(n): | |
while n > 0: | |
yield n | |
n -= 1 | |
c = countdown_gen(5) | |
for i in c: | |
print i | |
##Part 2 | |
#Did't create large temporary lists | |
#like a pipeline, The key: Think big..... | |
wwwlog = open("access-log") | |
bytecolumn = (line.rsplit(None, 1)[1] for line in wwwlog) | |
bytes = (int(x) for x in bytecolumn if x != '-') | |
print "Total", sum(bytes) | |
import os | |
import fnmatch | |
import gzip, bz2 | |
import re | |
def gen_find(filepat, top): | |
for path, dirlist, filelist in os.walk(top): | |
for name in fnmatch.filter(filelist, filepat): | |
yield os.path.join(path, name) | |
def gen_open(filenames): | |
for name in filenames: | |
if name.endswith(".gz"): | |
yield gzip.open(name) | |
elif name.endswith(".bz2"): | |
yield bz2.BZ2File(name) | |
else: | |
yield open(name) | |
def gen_cat(sources): | |
for s in sources: | |
for item in s: | |
yield item | |
def gen_grep(pat, lines): | |
patc = re.compile(pat) | |
for line in lines: | |
if patc.search(line): | |
yield line | |
def field_map(dictseq, name , func): | |
for d in dictseq: | |
d[name] = func(d[name]) | |
yield d | |
def lines_from_dir(filepat, dirname): | |
names = gen_find(filepat, dirname) | |
files = gen_open(names) | |
lines = gen_cat(files) | |
return lines | |
def apache_log(lines): | |
groups = (logpat.match(line) for line in lines) | |
tuples = (g.groups() for g in groups if g) | |
colnames = ('host', 'referrer', 'user', 'datetime', 'method', | |
'request', 'proto', 'status', 'bytes') | |
log = (dict(zip(colnames, t)) for t in tuples) | |
log = field_map(log, "bytes", lambda s: int(s) if s != '-' else 0) | |
log = field_map(log, "status", int) | |
return log | |
lines = lines_from_dir("access-log*", "www") | |
#the beauty of generators is that you can plug filters in at almost any stage | |
lines = (line for line in lines if 'robots.txt' in line) | |
log = apache_log(lines) | |
for r in log: | |
print r | |
#some other queries | |
stat404 = set(r['request'] for r in log if r['status'] == 404) | |
large = (r for r in log if r['bytes'] > 1000000) | |
hosts = set(r['host'] for r in log) #collect all unique host IP addresses | |
##Part 3 | |
##Processing Infinite Data | |
#tail -f | |
import time | |
def follow(thefile): | |
thefile.seek(0, 2) #Go to the end of the file | |
while True: | |
line = thefile.readline() | |
if not line: | |
time.sleep(0.1) #Sleep briefly | |
continue | |
yield line | |
##Part 4 | |
##Feeding the Pipeline | |
import socket | |
def receive_connections(addr): | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
s.bind(addr) | |
s.listen(5) | |
while True: | |
client = s.accept() | |
yield client | |
#Example: | |
for c,a in receive_connections(("", 9000)): | |
c.send("Hello World\n") | |
c.close() | |
def receive_messages(addr, maxsize): | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.bind(addr) | |
while True: | |
msg = s.recvfrom(maxsize) | |
yield msg | |
#Example: | |
for msg, addr in receive_messages(("", 10000), 1024): | |
print msg, "from", addr | |
#I/O Multiplexing | |
import select | |
def gen_events(socks): | |
while True: | |
rdr, wrt, err = select.select(socks, socks, socks, 0.1) | |
for r in rdr: | |
yield "read", r | |
for w in wrt: | |
yield "write", w | |
for e in err: | |
yield "error", e | |
clientset = [] | |
def acceptor(sockset, addr): | |
for c, a in receive_connections(addr): | |
sockset.append(c) | |
acc_thr = threading.Thread(target=acceptor, args==(clientset, ("", 12000))) | |
acc_thr.setDaemon(True) | |
acc_thr.start() | |
for evt, s in gen_events(clientset): | |
if evt == "read": | |
data = s.recv(1024) | |
if not data: | |
print "Closing", s | |
s.close() | |
clientset.remove(s) | |
else: | |
print s, data | |
#Consuming a Queue | |
def consume_queue(thequeue): | |
while True: | |
item = thequeue.get() | |
if item is StopIteration: break | |
yield item | |
import Queue, threading | |
def consumer(q): | |
for item in consume_queue(q): | |
print "Consumed", item | |
print "Done" | |
in_q = Queue.Queue() | |
con_thr = threading.Thread(target=consumer, args=(in_q, )) | |
con_thr.start() | |
for i in xrange(100): | |
in_q.put(i) | |
in_q.put(StopIteration) | |
##Part 5 | |
##Extending the Pipeline | |
#Multiple consumers | |
#Broadcasting | |
def broadcast(source, consumers): | |
for item in source: | |
for c in consumers: | |
c.send(item) | |
#Sadly, inside consumers, it is not possible to continue the same processing pipeline idea | |
#You can do this with threads or distributed processes however | |
#Consumer Thread | |
import Queue, threading | |
class ConsumerThread(threading.Thread): | |
def __init__(self, target): | |
threading.Thread.__init__(self) | |
self.setDaemon(True) | |
self.in_queue = Queue.Queue() | |
self.target = target | |
def send(self, item): | |
self.in_queue.put(item) | |
def generate(self): | |
while True: | |
item = self.in_queue.get() | |
yield item | |
def run(self): | |
self.target(self.generate()) | |
def find_404(log): | |
for r in (r for r in log if r['status'] == 404): | |
print r['status'], r['datetime'], r['request'] | |
def bytes_transferred(log): | |
total = 0 | |
for r in log: | |
total += r['bytes'] | |
print "Total bytes", total | |
c1 = ConsumerThread(find_404) | |
c1.start() | |
c2 = ConsumerThread(bytes_transferred) | |
c2.start() | |
lines = follow(open("access-log")) | |
log = apache_log(lines) | |
broadcast(log, [c1, c2]) | |
#Multiple Sources | |
def concatenate(sources): | |
for s in sources: | |
for item in s: | |
yield item | |
#or use parallel iteration | |
import itertools | |
z = itertools.izip(s1, s2, s3) #Terminates when the first exits | |
#Multiplexing generators | |
def gen_multiplex(genlist): | |
item_q = Queue.Queue() | |
def run_one(source): | |
for item in source: | |
item_q.put(item) | |
def run_all(): | |
thrlist = [] | |
for source in genlist: | |
t = threading.Thread(target=run_one, args=(source, )) | |
t.start() | |
thrlist.append(t) | |
for t in thrlist: t.join() | |
item_q.put(StopIteration) | |
threading.Thread(target=run_all).start() | |
while True: | |
item = item_q.get() | |
if item is StopIteration: return | |
yield item | |
##Part 6 | |
##Co-routines | |
#To get a Co-routines to run properly, you have to ping it with a .next() operation first | |
#The .next() bit can be handled via decoration | |
def consumer(func): | |
def start(*args, **kwargs): | |
c = func(*args, **kwargs) | |
c.next() | |
return c | |
return start | |
@consumer | |
def find_404(): | |
while True: | |
r = (yield) | |
if r['status'] == 404: | |
print r['status'], r['datetime'], r['request'] | |
@consumer | |
def bytes_transferred(): | |
total = 0 | |
while True: | |
r = (yield) | |
total += r['bytes'] | |
print "Total bytes", total | |
lines = follow(open('access-log')) | |
log = apache_log(lines) | |
broadcast(log, [find_404(), bytes_transferred()]) | |
#Discussion: there were no threads, co-operative multitasking, concurrent programming without using threads |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment