Skip to content

Instantly share code, notes, and snippets.

@swarbhanu
Created February 1, 2012 19:25
Show Gist options
  • Save swarbhanu/1718761 to your computer and use it in GitHub Desktop.
Save swarbhanu/1718761 to your computer and use it in GitHub Desktop.
IngestionWorker class
class IngestionWorker(TransformDataProcess):
''' A basic transform that receives input through a subscription,
parses the input for an integer and adds 1 to it. If the transform
has an output_stream it will publish the output on the output stream.
This transform appends transform work in '/tmp/transform_output'
'''
def __init__(self, *args, **kwargs):
super(IngestionWorker,self).__init__()
self.db = CouchDB_DataStore()
self.datastore_name = 'dm_datastore'
def on_start(self):
super(IngestionWorker,self).on_start()
#----------------------------------------------
# Start up couch
#----------------------------------------------
# Create dm_datastore if it does not exist already
try:
self.db.create_datastore(self.datastore_name)
except BadRequest:
print 'Already exists'
def process(self, packet):
"""Processes incoming data!!!!
"""
if isinstance(packet, BlogPost):
post = {'id' : packet.post_id, 'title' : packet.title, 'author' : packet.author,\
'updated' : time.ctime(), 'content' : packet.content }
db_post_id, db_post_rev = self.db.create_doc(post, None, self.datastore_name)
log.debug('Post titled, %s, written to couchdb, post id: %s, rev: %s'\
% (BlogPost_title, db_post_id, db_post_rev))
if isinstance(packet, BlogComment):
comment = {'id' : packet.ref_id, 'author' : packet.author,\
'updated' : time.ctime(), 'content' : packet.content }
db_comment_id, db_comment_rev = self.db.create_doc(comment, None, self.datastore_name)
log.debug('Post titled, %s, written to couchdb, post id: %s, rev: %s'\
% (BlogPost_title, db_comment_id, db_comment_rev))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment