Skip to content

Instantly share code, notes, and snippets.

@klen
Created April 20, 2017 11:17
Show Gist options
  • Save klen/2fa9aa605611322c88e4aeee24561fcc to your computer and use it in GitHub Desktop.
Save klen/2fa9aa605611322c88e4aeee24561fcc to your computer and use it in GitHub Desktop.
"""Provide information about opportunities."""
from collections import OrderedDict
import datetime as dt
import json
from flask import request
from flask_restler import route
from flask_restler.peewee import Filter, ModelFilters
from peewee import PostgresqlDatabase, SQL, Clause
from people_ai.api.v1.teams import TeamModelResource, api, filters as f, get_current_data_version, SQLTickMixin
from people_ai.services.salesforce.models import SalesForceOpportunity, SalesForceCompany, OpportunityCustomField
from people_ai.services.salesforce.models.ticks import OpportunityTick
from people_ai.services.site.models import TeamMembership
from people_ai.api.v1.teams.activity import ActivityLogMixin
from people_ai.api.v1.teams.opportunity import filters as f2, schemas
from people_ai.ext import db
from people_ai.utils.csv_utils import csv_response
class UnionModelFilters(ModelFilters):
"""Magic for empty opportunities."""
def filter(self, collection, resource, *args, **kwargs):
"""Filter two collections."""
if 'hide_empty_opps' in request.args:
return None, super(UnionModelFilters, self).filter(collection, resource, *args, **kwargs)
request.filters = {}
data = request.args.get('where')
if not data or self.filters is None:
return collection
try:
data = json.loads(data)
except (ValueError, TypeError):
return collection
filters = [fl for fl in self.filters if fl.fname in data]
base_filters = [fl for fl in filters if fl.name not in resource.meta.ticks_filters]
# Clean opportunities query
base_collection = collection.select(SalesForceOpportunity).group_by()
base_collection._where = None
base_collection._joins = {}
for fl in base_filters:
base_collection = fl.filter(base_collection, data, resource=resource, **kwargs)
for fl in filters:
collection = fl.filter(collection, data, resource=resource, **kwargs)
return (base_collection.filter(SalesForceOpportunity.id.not_in(
collection.select(SalesForceOpportunity.id).order_by().group_by(SalesForceOpportunity.id)
)).order_by(SalesForceOpportunity.name), collection)
@api.connect('/team/<team>/opportunity')
class OpportunityResource(TeamModelResource, ActivityLogMixin, SQLTickMixin):
"""Load opportunities."""
class Meta:
"""Tune the resource."""
model = SalesForceOpportunity
name = 'opportunity'
filters_converter = UnionModelFilters
filters = (
# Opportunity Model filters
'amount', 'probability', 'is_closed', 'is_won', 'time_spent', f.QueryFilter('name'),
f.DateTimeFilter('created_at'), f.DateTimeFilter('closed_at'), f.DateTimeFilter('last_touch'),
Filter('stage_name', 'stage'), Filter('company', 'account_id'), f.QueryFilter('type', 'opportunity_type'),
f2.OwnerFilter('owner'),
# Company Model filters
f2.AccountNameFilter('account'),
# Opportunity Tick Model filters
f2.UTCFilter('utc'), f2.MembershipFilter('membership'), f2.ActivityTypeFilter('type'),
# Revenue filters
f2.OpportunityRevenueMapping('MR'), f2.OpportunityRevenueMapping('AR'), f2.OpportunityRevenueMapping('NR'),
)
ticks_filters = 'utc', 'membership', 'type'
def filter(self, *args, **kwargs):
"""Save a base collection."""
self.base_collection, collection = super(OpportunityResource, self).filter(*args, **kwargs) # noqa
return collection
def paginate(self, offset=0, limit=None):
"""Pagination magic for two collections."""
collection, count = super(OpportunityResource, self).paginate(offset, limit)
if 'hide_empty_opps' in request.args:
return collection, count
page_size = collection.count()
offset = max(0, offset - count)
count += self.base_collection.count()
self.base_collection = self.base_collection.offset(offset + page_size).limit(limit - page_size) # noqa
return collection, count
def get(self, resource=None, **kwargs):
"""Merge two collections."""
if resource or 'hide_empty_opps' in request.args:
return super(OpportunityResource, self).get(resource, **kwargs)
return self.to_simple(list(self.collection) + list(self.base_collection), many=True, **kwargs)
def sort(self, collection, *sorting, **kwargs):
"""Put null values on last places."""
collection = super(OpportunityResource, self).sort(collection, *sorting, **kwargs)
if isinstance(db.database.obj, PostgresqlDatabase):
collection._order_by = [Clause(collection._order_by[0], SQL('NULLS LAST'))]
return collection
def get_many(self, team=None, opportunity=None, **kwargs):
"""Load opportunities for current team."""
members = self.auth.teammates().select(TeamMembership.id)
qs = SalesForceOpportunity.select(SalesForceOpportunity, SalesForceCompany).\
where(SalesForceOpportunity.team_id == team).\
join(SalesForceCompany, 'LEFT OUTER').switch(SalesForceOpportunity)
if 'aggregate' not in request.args:
return qs
versions = get_current_data_version(members)
versions = versions['$in']
return qs.select(
SalesForceOpportunity, SalesForceCompany, *schemas.OpportunitySchema.aggregate(OpportunityTick)).\
join(OpportunityTick, 'LEFT OUTER').\
where(OpportunityTick.version << versions).\
where(OpportunityTick.utc < dt.datetime.utcnow()).\
group_by(SalesForceOpportunity.id, SalesForceCompany.id)
def get_schema(self, resource=None, opportunity=None, **kwargs):
"""Provide aggregated information."""
if opportunity:
return schemas.ResourceAggregateOpportunitySchema(instance=resource)
return schemas.OpportunitySchema(instance=resource)
@route('/custom')
def custom(self, *args, **kwargs):
"""Return loaded custom fields."""
collection = OpportunityCustomField.select(OpportunityCustomField.name).join(SalesForceOpportunity). \
where(SalesForceOpportunity.team_id == self.auth.team_id).distinct()
return self.to_json_response([field.name for field in collection])
@route
def csv(self, *args, **kwargs):
"""Render CSV for opportunities."""
collection = self.filter(self.collection)
file_name = request.args.get('name', 'export.csv')
data = self.to_simple(collection, many=True, **kwargs)
if not data:
return csv_response([], file_name=file_name)
csv_data = self.get_csv_data(
data=data,
csv_map=OrderedDict([('name', 'Name'), ('id', 'ID'), ('last_touch', 'Last Touch')]),
csv_footer=[['Total Opportunities: %s' % len(data)], ['Opportunities']]
)
return csv_response(csv_data, file_name)
@route
def activity_csv(self, *args, **kwargs):
"""Render CSV for activities."""
collection = self.filter(self.collection)
opportunities = collection.select(SalesForceOpportunity.id).group_by(SalesForceOpportunity.id)
where = request.filters
where.update({'opportunity_id': {'$in': [x.id for x in opportunities]}})
query_string = {
'per_page': 0,
'where': where,
}
file_name = request.args.get('name', 'export.csv')
csv_data = self.get_related_activities_csv(
query_string=query_string,
name='Opportunity Activities'
)
return csv_response(csv_data, file_name=file_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment