Last active
August 29, 2015 14:17
-
-
Save eliquious/f3636f3b8048d496acc9 to your computer and use it in GitHub Desktop.
Tributary with IPC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tributary | |
from tributary.core import Engine, Message | |
from tributary.streams import StreamElement, StreamProducer | |
from tributary.events import StopMessage, STOP | |
import tributary.ext.fs as fs | |
import decimal | |
import time | |
import logging | |
import pyhash | |
import gipc | |
class FileLogger(StreamElement): | |
"""docstring for FileLogger""" | |
def __init__(self, name): | |
super(FileLogger, self).__init__(name) | |
def process(self, message): | |
self.log(message.data.get('filename', None)) | |
class IPCProducer(StreamProducer): | |
"""docstring for IPCProducer""" | |
def __init__(self, name, cend): | |
super(IPCProducer, self).__init__(name) | |
self.cend = cend | |
def process(self, message=None): | |
"""Recieves message from IPC""" | |
while self.running: | |
message = self.cend.get() | |
# self.log(message.channel + ': ' + str(message)) | |
if message.channel == STOP: | |
self.stop() | |
break | |
# send to child nodes | |
self.scatter(message) | |
def handleException(self, ex): | |
self.log_exception('IPC') | |
class ChildFileProcessor(StreamElement): | |
"""docstring for ChildFileProcessor""" | |
def __init__(self, name, cend, id): | |
super(ChildFileProcessor, self).__init__(name) | |
self.cend = cend | |
self.id = id | |
def process(self, message): | |
filename = message.data.get('filename', None) | |
if filename is not None: | |
with open(filename) as fh: | |
self.log('Reading: %s' % filename) | |
for line in fh: | |
pass | |
# ack | |
self.cend.put(status_message('ok', 'Completed: %s' % filename)) | |
def status_message(msgtype, msg): | |
"""Compose status message""" | |
return Message(type=msgtype, message=msg) | |
def file_processor(cend): | |
"""Main of child process""" | |
meta = cend.get() | |
logfile = meta.data.get('logfile', None) | |
name = meta.data.get('name', None) | |
ID = meta.data.get('id', None) | |
if not logfile: | |
cend.put(status_message('error', 'Missing "logfile" information')) | |
elif not name: | |
cend.put(status_message('error', 'Missing "name" information')) | |
elif not ID: | |
cend.put(status_message('error', 'Missing "ID" information')) | |
# create internal process engine | |
eng = Engine() | |
# IPC msg handler | |
ipc = IPCProducer('IPC #%s' % ID, cend) | |
ipc.add(ChildFileProcessor(name, cend, ID)) | |
eng.add(ipc) | |
eng.start() | |
hasher = pyhash.fnv1a_64() | |
class FileDispatcher(StreamElement): | |
"""docstring for FileDispatcher""" | |
def __init__(self, name, num): | |
super(FileDispatcher, self).__init__(name) | |
self.name = name | |
self.id = num | |
cend, pend = gipc.pipe(duplex=True) | |
self.pend = pend | |
self.child = gipc.start_process(file_processor, args=(cend,)) | |
# init child | |
self.pend.put(Message(logfile='child.txt', name='Child Process #%s' % (num+1), id=num+1)) | |
def process(self, message): | |
filename = message.data.get('filename', None) | |
if filename is not None: | |
hash = hasher(filename) | |
if (hash % 8) == self.id: | |
self.pend.put(message) | |
response = self.pend.get() | |
# self.log(response.data) | |
def postProcess(self, message=None): | |
self.log('Stopping child process') | |
self.pend.put(StopMessage) | |
self.log('Waiting for child process to stop') | |
# self.pend.put(Message(filename=None)) | |
self.child.join() | |
self.log('Child process stopped') | |
if __name__ == '__main__': | |
engine = Engine() | |
files = fs.GlobFileStream("Data Files", "./data/*txt") | |
# files.add(FileLogger("Logger")) | |
files.add(FileDispatcher("Dispatcher #1", 0)) | |
files.add(FileDispatcher("Dispatcher #2", 1)) | |
files.add(FileDispatcher("Dispatcher #3", 2)) | |
files.add(FileDispatcher("Dispatcher #4", 3)) | |
files.add(FileDispatcher("Dispatcher #5", 4)) | |
files.add(FileDispatcher("Dispatcher #6", 5)) | |
files.add(FileDispatcher("Dispatcher #7", 6)) | |
files.add(FileDispatcher("Dispatcher #8", 7)) | |
engine.add(files) | |
engine.start() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import random | |
import cStringIO | |
import string | |
# data domain | |
alphabet = string.octdigits + 'abcdef' | |
def generate_source_file(filename, count, size): | |
"""Generates a file of fake data""" | |
possible = [] | |
with open (filename, 'w') as fh: | |
for i in xrange(count): | |
possible.append(''.join(random.sample(alphabet, size))) | |
# write to file | |
fh.write('\n'.join(possible)) | |
# return truth | |
return possible | |
def generate_sample_file(filename, sample): | |
"""Generates a file of fake data""" | |
with open (filename, 'w') as fh: | |
fh.write('\n'.join(sample)) | |
def main(directory, filecount, combinations, chars, sample): | |
"""Executes the data creation and processing""" | |
if not os.path.exists(directory): | |
os.mkdir(directory) | |
# generate source file | |
possible = generate_source_file('source.txt', combinations, chars) | |
# generate sample files | |
for i in xrange(filecount): | |
generate_sample_file(os.path.join(directory, '%s.txt' % i), random.sample(possible, sample)) | |
if __name__ == '__main__': | |
main(directory='./data', filecount=500, combinations=1000000, chars=10, sample=100000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment