Last active
August 29, 2015 14:17
-
-
Save eliquious/d562c8f691804568edfe to your computer and use it in GitHub Desktop.
IPC tributary POC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tributary.core import Engine, Message, ExecutionContext, Service | |
from tributary.streams import StreamElement, StreamProducer | |
from tributary.events import StopMessage, STOP | |
import tributary.ext.fs as fs | |
import tributary.ext.ipc as ipc | |
from collections import defaultdict | |
import operator | |
import logging | |
import pyhash | |
import gevent | |
hasher = pyhash.fnv1a_64() | |
class FileLogger(StreamElement): | |
"""docstring for FileLogger""" | |
def __init__(self, name): | |
super(FileLogger, self).__init__(name) | |
def process(self, message): | |
self.log(message.data.get('filename', None)) | |
def child_main(cend): | |
"""Main of child process""" | |
meta = cend.get() | |
logfile = meta.data.get('logfile', None) | |
name = meta.data.get('name', None) | |
ID = meta.data.get('id', None) | |
if not logfile: | |
cend.put(status_message('error', 'Missing "logfile" information')) | |
elif not name: | |
cend.put(status_message('error', 'Missing "name" information')) | |
elif not ID: | |
cend.put(status_message('error', 'Missing "ID" information')) | |
# create internal process engine | |
eng = Engine() | |
# IPC msg handler | |
child = ipc.IPCSubscriber('IPC #%s' % ID, cend) | |
child.add(ChildFileProcessor(name, cend, ID)) | |
eng.add(child) | |
eng.start() | |
class ChildFileProcessor(StreamElement): | |
"""docstring for ChildFileProcessor""" | |
def __init__(self, name, cend, id): | |
super(ChildFileProcessor, self).__init__(name) | |
self.cend = cend | |
self.id = id | |
def postProcess(self, msg): | |
self.log('Exiting...') | |
def process(self, message): | |
filename = message.data.get('filename', None) | |
if filename is not None: | |
hashes = defaultdict(int) | |
with open(filename) as fh: | |
self.log('Reading: %s' % filename) | |
for line in fh: | |
hashes[hasher(line)] += 1 | |
# ack | |
self.cend.put(status_message('ok', 'Completed: %s' % filename, hashes=hashes)) | |
def status_message(msgtype, msg, **kwargs): | |
"""Compose status message""" | |
return Message(type=msgtype, message=msg, **kwargs) | |
class FileDispatcher(ipc.IPCDispatcher): | |
"""docstring for FileDispatcher""" | |
def __init__(self, name, factory, num, workers): | |
super(FileDispatcher, self).__init__(name, factory) | |
self.id = num | |
self.workers = workers | |
def onConnection(self): | |
self.log('Sending initialization data') | |
self.pipe.put(Message(logfile='child.txt', name='Child Process #%s' % (self.id+1), id=self.id+1)) | |
self.getContext().getService('HashAggregator').open(self.name) | |
def process(self, message): | |
filename = message.data.get('filename', None) | |
if filename is not None: | |
self.pipe.put(message) | |
# self.log('Waiting for response...') | |
response = self.pipe.get() | |
hashes = response.data.get('hashes', None) | |
if hashes is not None: | |
self.getContext().getService('HashAggregator').addAll(hashes) | |
# self.log(response.data) | |
def onClose(self): | |
self.getContext().getService('HashAggregator').close(self.name) | |
class NaivePool(StreamElement): | |
"""docstring for NaivePool""" | |
def __init__(self, name): | |
super(NaivePool, self).__init__(name) | |
self.offset = -1 | |
def process(self, msg): | |
self.offset += 1 | |
for index, element in sorted(self._children.values()): | |
if self.offset % len(self) == index: | |
element.insert(msg) | |
class HashAggregator(Service): | |
"""docstring for HashAggregator""" | |
def __init__(self, name, outputfile): | |
super(HashAggregator, self).__init__(name) | |
self.hashagg = defaultdict(int) | |
self.processes = 0 | |
self.outputfile = outputfile | |
def open(self, name): | |
self.processes += 1 | |
def close(self, name): | |
self.processes -= 1 | |
def addAll(self, hashes): | |
for k, v in hashes.iteritems(): | |
self.hashagg[k] += v | |
def join(self): | |
while self.processes > 0: | |
gevent.sleep(0.1) | |
self.writeFile() | |
def writeFile(self): | |
self.log_info('Logging output') | |
with open(self.outputfile, 'w') as fh: | |
for k, v in self: | |
fh.write('{0:<22} : {1}\n'.format(k, v)) | |
self.log_info('Completed output') | |
def __iter__(self): | |
for k,v in sorted(self.hashagg.items(), key=lambda x: x[1], reverse=True): | |
yield (k, v) | |
if __name__ == '__main__': | |
ctx = ExecutionContext() | |
agg = HashAggregator('HashAggregator', 'output.txt') | |
ctx.addService(agg) | |
# create engine | |
engine = Engine(ctx) | |
files = fs.GlobFileStream("Data Files", "../tributary-dist/parallel-gevent/data/*txt") | |
files.add(FileLogger("Logger")) | |
factory = ipc.SimpleEngineFactory(child_main) | |
pool = NaivePool('Pool') | |
workers = 8 | |
for i in xrange(workers): | |
pool.add(FileDispatcher("Dispatcher #{0}".format(i+1), factory, i, workers)) | |
files.add(pool) | |
engine.add(files) | |
engine.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment