Created
February 1, 2012 19:25
-
-
Save swarbhanu/1718761 to your computer and use it in GitHub Desktop.
IngestionWorker class
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class IngestionWorker(TransformDataProcess): | |
''' A basic transform that receives input through a subscription, | |
parses the input for an integer and adds 1 to it. If the transform | |
has an output_stream it will publish the output on the output stream. | |
This transform appends transform work in '/tmp/transform_output' | |
''' | |
def __init__(self, *args, **kwargs): | |
super(IngestionWorker,self).__init__() | |
self.db = CouchDB_DataStore() | |
self.datastore_name = 'dm_datastore' | |
def on_start(self): | |
super(IngestionWorker,self).on_start() | |
#---------------------------------------------- | |
# Start up couch | |
#---------------------------------------------- | |
# Create dm_datastore if it does not exist already | |
try: | |
self.db.create_datastore(self.datastore_name) | |
except BadRequest: | |
print 'Already exists' | |
def process(self, packet): | |
"""Processes incoming data!!!! | |
""" | |
if isinstance(packet, BlogPost): | |
post = {'id' : packet.post_id, 'title' : packet.title, 'author' : packet.author,\ | |
'updated' : time.ctime(), 'content' : packet.content } | |
db_post_id, db_post_rev = self.db.create_doc(post, None, self.datastore_name) | |
log.debug('Post titled, %s, written to couchdb, post id: %s, rev: %s'\ | |
% (BlogPost_title, db_post_id, db_post_rev)) | |
if isinstance(packet, BlogComment): | |
comment = {'id' : packet.ref_id, 'author' : packet.author,\ | |
'updated' : time.ctime(), 'content' : packet.content } | |
db_comment_id, db_comment_rev = self.db.create_doc(comment, None, self.datastore_name) | |
log.debug('Post titled, %s, written to couchdb, post id: %s, rev: %s'\ | |
% (BlogPost_title, db_comment_id, db_comment_rev)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment