Skip to content

Instantly share code, notes, and snippets.

@jeffreyolchovy
Created May 26, 2011 18:08
Show Gist options
  • Save jeffreyolchovy/993676 to your computer and use it in GitHub Desktop.
Save jeffreyolchovy/993676 to your computer and use it in GitHub Desktop.
Complex Example of PubSub using the Flow Platform for Feedgen
import sys, json
import flow
class Context(object):
CATEGORIES = {
'a': { 'parent': None },
'b': { 'parent': 'a' },
'c': { 'parent': 'a' },
'd': { 'parent': 'b' },
'e': { 'parent': 'd' },
'f': { 'parent': 'd' },
'g': { 'parent': 'd' },
'h': { 'parent': None },
'i': { 'parent': None },
'j': { 'parent': 'i' }}
def __init__(self, client):
client.set_logger_file('fgc2.out')
self.client = client
self.create_application()
self.create_application_flows()
self.create_application_tracks()
def create_application(self):
application = flow.Application(
name='fgc', description='...', email='[email protected]',
url='http://fgc.com')
self.application = self.client.save(application)
def create_application_flows(self):
categories = flow.Bucket(
name='categories', description='category registry',
path='/apps/%s/categories' % self.application.name)
category = flow.Bucket(
name='category', description='root of category hierarchy',
path='/apps/%s/category' % self.application.name)
vendors = flow.Bucket(
name='vendor', description='root of vendor flows',
path='/apps/%s/vendor' % self.application.name)
leads = flow.Bucket(
name='leads', description='point of entry for all incoming leads',
path='/apps/%s/leads' % self.application.name,
template=[{'name':'category','class':'string','required':True}])
self.client.save(categories)
self.client.save(category)
self.client.save(vendors)
self.client.save(leads)
for name, data in sorted(Context.CATEGORIES.items()):
self.create_category(name)
def create_application_tracks(self):
for name, data in Context.CATEGORIES.items():
filter = 'category == "%s"' % name
fro = '/apps/%s/leads' % self.application.name
to = '/apps/%s/category/%s' % (
self.application.name,
'/'.join(self._calc_lineage(data, [name])))
self.client.save(flow.Track(**{'to': to, 'from': fro, 'filterString': filter}))
def create_category(self, name):
record = Context.CATEGORIES[name]
path = '/apps/%s/category/%s' % (
self.application.name,
'/'.join(self._calc_lineage(record, [name])))
return self.client.save(flow.Bucket(name=name, description='...', path=path))
def create_vendor(self, name, categories=[]):
path='/apps/%s/vendor/%s' % (self.application.name, name)
bucket = self.client.save(flow.Bucket(name=name, description='...', path=path))
for c in categories:
record = Context.CATEGORIES[c]
to = bucket.path
fro = '/apps/%s/category/%s' % (
self.application.name,
'/'.join(self._calc_lineage(record, [c])))
self.client.save(flow.Track(**{'to': to, 'from': fro}))
return bucket
def create_lead(self, category):
drop = flow.Drop(
path='/apps/%s/leads' % self.application.name,
elems={'category':{'type':'string','value':category}})
return self.client.save(drop)
def vendor_has_lead(self, vendor, lead):
response = json.loads(self.client.http_get('/drop/%s' % vendor.id))
drops = response['body']
for d in drops:
if d['id'] == lead.id: return True
else:
return False
def _calc_lineage(self, record, lineage=[]):
if record['parent'] is not None:
parent = record['parent']
lineage.insert(0, parent)
return self._calc_lineage(Context.CATEGORIES[parent], lineage)
else:
return lineage
if __name__ == '__main__':
KEY = sys.argv[1]
SECRET = sys.argv[2]
ACTOR = '000000000000000000000001'
client = flow.RestClient(KEY, SECRET, ACTOR)
ctx = Context(client)
vendor_a = ctx.create_vendor('vendor_a', ['a', 'f'])
vendor_b = ctx.create_vendor('vendor_b', ['f'])
lead_a = ctx.create_lead('a')
lead_b = ctx.create_lead('f')
if not ctx.vendor_has_lead(vendor_a, lead_a):
raise Exception('Vendor A should have received Lead A')
if not ctx.vendor_has_lead(vendor_a, lead_b):
raise Exception('Vendor A should have received Lead B')
if not ctx.vendor_has_lead(vendor_b, lead_b):
raise Exception('Vendor B should have received Lead B')
if ctx.vendor_has_lead(vendor_b, lead_a):
raise Exception('Vendor B should NOT have received Lead A')
@jeffreyolchovy
Copy link
Author

Leads, as of now, can only be published to one category. This will be changed upon the next update of the Flow platform.

I will continue to update this Gist with: 1. the enhancement stated above; and 2. use cases for the "category registry" flow (to be used when end-users perform keyword searches against categories, vendors, etc.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment