Created
May 26, 2011 18:08
-
-
Save jeffreyolchovy/993676 to your computer and use it in GitHub Desktop.
Complex Example of PubSub using the Flow Platform for Feedgen
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys, json | |
import flow | |
class Context(object): | |
CATEGORIES = { | |
'a': { 'parent': None }, | |
'b': { 'parent': 'a' }, | |
'c': { 'parent': 'a' }, | |
'd': { 'parent': 'b' }, | |
'e': { 'parent': 'd' }, | |
'f': { 'parent': 'd' }, | |
'g': { 'parent': 'd' }, | |
'h': { 'parent': None }, | |
'i': { 'parent': None }, | |
'j': { 'parent': 'i' }} | |
def __init__(self, client): | |
client.set_logger_file('fgc2.out') | |
self.client = client | |
self.create_application() | |
self.create_application_flows() | |
self.create_application_tracks() | |
def create_application(self): | |
application = flow.Application( | |
name='fgc', description='...', email='[email protected]', | |
url='http://fgc.com') | |
self.application = self.client.save(application) | |
def create_application_flows(self): | |
categories = flow.Bucket( | |
name='categories', description='category registry', | |
path='/apps/%s/categories' % self.application.name) | |
category = flow.Bucket( | |
name='category', description='root of category hierarchy', | |
path='/apps/%s/category' % self.application.name) | |
vendors = flow.Bucket( | |
name='vendor', description='root of vendor flows', | |
path='/apps/%s/vendor' % self.application.name) | |
leads = flow.Bucket( | |
name='leads', description='point of entry for all incoming leads', | |
path='/apps/%s/leads' % self.application.name, | |
template=[{'name':'category','class':'string','required':True}]) | |
self.client.save(categories) | |
self.client.save(category) | |
self.client.save(vendors) | |
self.client.save(leads) | |
for name, data in sorted(Context.CATEGORIES.items()): | |
self.create_category(name) | |
def create_application_tracks(self): | |
for name, data in Context.CATEGORIES.items(): | |
filter = 'category == "%s"' % name | |
fro = '/apps/%s/leads' % self.application.name | |
to = '/apps/%s/category/%s' % ( | |
self.application.name, | |
'/'.join(self._calc_lineage(data, [name]))) | |
self.client.save(flow.Track(**{'to': to, 'from': fro, 'filterString': filter})) | |
def create_category(self, name): | |
record = Context.CATEGORIES[name] | |
path = '/apps/%s/category/%s' % ( | |
self.application.name, | |
'/'.join(self._calc_lineage(record, [name]))) | |
return self.client.save(flow.Bucket(name=name, description='...', path=path)) | |
def create_vendor(self, name, categories=[]): | |
path='/apps/%s/vendor/%s' % (self.application.name, name) | |
bucket = self.client.save(flow.Bucket(name=name, description='...', path=path)) | |
for c in categories: | |
record = Context.CATEGORIES[c] | |
to = bucket.path | |
fro = '/apps/%s/category/%s' % ( | |
self.application.name, | |
'/'.join(self._calc_lineage(record, [c]))) | |
self.client.save(flow.Track(**{'to': to, 'from': fro})) | |
return bucket | |
def create_lead(self, category): | |
drop = flow.Drop( | |
path='/apps/%s/leads' % self.application.name, | |
elems={'category':{'type':'string','value':category}}) | |
return self.client.save(drop) | |
def vendor_has_lead(self, vendor, lead): | |
response = json.loads(self.client.http_get('/drop/%s' % vendor.id)) | |
drops = response['body'] | |
for d in drops: | |
if d['id'] == lead.id: return True | |
else: | |
return False | |
def _calc_lineage(self, record, lineage=[]): | |
if record['parent'] is not None: | |
parent = record['parent'] | |
lineage.insert(0, parent) | |
return self._calc_lineage(Context.CATEGORIES[parent], lineage) | |
else: | |
return lineage | |
if __name__ == '__main__': | |
KEY = sys.argv[1] | |
SECRET = sys.argv[2] | |
ACTOR = '000000000000000000000001' | |
client = flow.RestClient(KEY, SECRET, ACTOR) | |
ctx = Context(client) | |
vendor_a = ctx.create_vendor('vendor_a', ['a', 'f']) | |
vendor_b = ctx.create_vendor('vendor_b', ['f']) | |
lead_a = ctx.create_lead('a') | |
lead_b = ctx.create_lead('f') | |
if not ctx.vendor_has_lead(vendor_a, lead_a): | |
raise Exception('Vendor A should have received Lead A') | |
if not ctx.vendor_has_lead(vendor_a, lead_b): | |
raise Exception('Vendor A should have received Lead B') | |
if not ctx.vendor_has_lead(vendor_b, lead_b): | |
raise Exception('Vendor B should have received Lead B') | |
if ctx.vendor_has_lead(vendor_b, lead_a): | |
raise Exception('Vendor B should NOT have received Lead A') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Leads, as of now, can only be published to one category. This will be changed upon the next update of the Flow platform.
I will continue to update this Gist with: 1. the enhancement stated above; and 2. use cases for the "category registry" flow (to be used when end-users perform keyword searches against categories, vendors, etc.)