Last active
September 28, 2016 13:53
-
-
Save neara/dc6c96ee94065571a873 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import string | |
import random | |
import hashlib | |
import logging | |
import urlparse | |
from _ssl import SSLError | |
from itertools import izip_longest | |
import facebook | |
import requests | |
from django.core import signing | |
from django.conf import settings | |
from django.http import HttpResponse | |
from django.core.handlers.wsgi import WSGIRequest | |
from django.core.exceptions import PermissionDenied | |
from django.core.signing import SignatureExpired, BadSignature | |
logger = logging.getLogger(__name__) | |
SLUG_LENGTH = 8 | |
def create_slug(length=SLUG_LENGTH): | |
""" | |
Creates unique slug | |
""" | |
return "".join(random.sample(string.lowercase + string.digits, length)) | |
def get_request_from_args(args): | |
request = None | |
for arg in args: | |
if type(arg) == WSGIRequest: | |
request = arg | |
return request | |
def create_md5(seed): | |
m = hashlib.md5() | |
m.update(seed) | |
return m.hexdigest() | |
def get_clean_url(url): | |
""" | |
:param url: full url | |
:return: url in http scheme w.o query/fragments | |
""" | |
u = urlparse.urlsplit(url) | |
return urlparse.urlunsplit(u._replace(scheme='http', query='', | |
fragment='')) | |
def get_or_init(ModelClass, **kwargs): | |
try: | |
o = ModelClass.objects.get(**kwargs) | |
except ModelClass.DoesNotExist: | |
o = ModelClass(**kwargs) | |
return o | |
def create_or_update(ModelClass, params, **data): | |
""" | |
Creates or updates Model instance based on params and data | |
:param ModelClass: name of the model class to look up | |
:param params: dict(field_name=value, ... ) | |
:param data: field_name=value, field_name=value, or same as params | |
""" | |
o = get_or_init(ModelClass, **params) | |
for k, v in data.iteritems(): | |
setattr(o, k, v) | |
o.save() | |
return o | |
def get_app_access_token(): | |
args = {'grant_type': 'client_credentials', | |
'client_id': settings.FACEBOOK_APP_ID, | |
'client_secret': settings.FACEBOOK_APP_SECRET} | |
r = requests.get(url='https://graph.facebook.com/oauth/access_token', | |
params=args) | |
return r.content | |
class VerifyRequest(object): | |
""" | |
A decorator class for secure api endpoints. Handles request verification | |
""" | |
def __init__(self, f): | |
self.func = f | |
self.data = None | |
self.salt = None | |
def __call__(self, *args, **kwargs): | |
for i in args: | |
if type(i) == WSGIRequest: | |
self.data = i.GET.get('data', None) | |
self.salt = i.GET.get('salt', None) | |
if not self.salt: | |
raise PermissionDenied | |
if not self.data: | |
return HttpResponse('Unauthorized Access Denied!', status=401) | |
try: | |
client_id = signing.loads(self.data, key=settings.API_KEY, | |
salt=self.salt, max_age=60) | |
if client_id not in settings.CLIENT_IDS: | |
raise PermissionDenied | |
return self.func(*args, **kwargs) | |
except SignatureExpired: | |
return HttpResponse('Request Expired', status=408) | |
except BadSignature: | |
return HttpResponse('Bad Request', status=400) | |
def grouper(iterable, n, fillvalue=None): | |
args = [iter(iterable)] * n | |
return izip_longest(*args, fillvalue=fillvalue) | |
def list_splitter(mega_list, chunk_size): | |
lists = grouper(mega_list, chunk_size, 'x') | |
for l in lists: | |
lset = set(l) | |
if 'x' in lset: | |
lset.remove('x') | |
yield lset | |
def in_chunks(iterable, size=100): | |
for i in xrange(0, len(iterable), size): | |
yield iterable[i:i+size] | |
def secure_json_response(data, salt='veryfunny!gimmea'): | |
""" | |
:param data: str | |
:return: encrypted and signed with timestamp response | |
""" | |
# cipher = AES.new(settings.API_KEY, AES.MODE_CFB, salt) | |
# | |
# signed_resp = signing.dumps(data, key=settings.API_KEY, salt=salt) | |
# signed_encrypted_resp = cipher.encrypt(signed_resp) | |
# | |
# return HttpResponse(signed_encrypted_resp, content_type='application/json') | |
pass | |
def dict_fetch_all(sql): | |
from django.db import connection | |
cursor = connection.cursor() | |
cursor.execute(sql) | |
descriptor = cursor.description | |
return list(dict(zip([col[0] for col in descriptor], row)) | |
for row in cursor.fetchall()) | |
def stress_test_graphapi(cycle=2, timeout_limit=15): | |
timeout_log = [] | |
token = settings.FACEBOOK_APP_ACCESS_TOKEN | |
q = '20669912712' | |
path = q + '/posts' | |
FACEBOOK_FETCH_FIELDS = "id,likes.summary(true),comments.summary(true)" \ | |
",from,type,updated_time,full_picture,link," \ | |
"source,message,shares,description" | |
FEED_LIMIT = 100 | |
params = {'fields': FACEBOOK_FETCH_FIELDS, 'limit': FEED_LIMIT} | |
time_log = [] | |
while cycle > 0: | |
timeout = 5 | |
while timeout <= timeout_limit: | |
try: | |
graph = facebook.GraphAPI(token, timeout=timeout) | |
start_time = time.time() | |
graph.request(path, args=params) | |
end_time = time.time() | |
time_log.append(end_time - start_time) | |
break | |
except SSLError: | |
print 'timeout {}'.format(timeout) | |
except Exception as e: | |
return e.args | |
timeout += 2 | |
timeout_log.append(timeout) | |
cycle -= 1 | |
print timeout_log | |
print time_log | |
def _json_decoder(data): | |
for k, v in data.items(): | |
if type(v) in (str, unicode): | |
try: | |
# try to parse datetime | |
data[k] = parse(v) | |
continue | |
except ValueError: | |
# this is not a datetime object | |
pass | |
if type(v) in (str, unicode): | |
# see if this is a json obj | |
data[k] = deserialize_json(v) | |
if type(data[k]) is dict: | |
# go deeper and decode next nest level | |
data[k] = _json_decoder(data[k]) | |
elif type(data[k]) is list: | |
# check if this is a list of json obj that needs to be decoded | |
for i, obj in enumerate(data[k]): | |
data[k][i] = deserialize_json(obj) | |
if type(data[k][i]) is dict: | |
data[k][i] = _json_decoder(data[k][i]) | |
return data | |
def deserialize_json(obj): | |
try: | |
return json.loads(obj) | |
except (ValueError, TypeError): | |
# this is not a JSON object | |
return obj | |
def _deserialize(msg): | |
decoded_msg = deserialize_json(msg) | |
return _json_decoder(decoded_msg) | |
def set_attributes(source, destination_obj): | |
""" | |
Copies attributes from source model to destination object. | |
""" | |
for key, value in source.__dict__.items(): | |
fk_name = key.split('_id')[0] | |
fk_name_in_destination = hasattr(destination_obj, fk_name) | |
if not hasattr(destination_obj, key) and not fk_name_in_destination: | |
continue | |
destination_attr = getattr(destination_obj, fk_name) if fk_name_in_destination\ | |
else getattr(destination_obj, key) | |
if value and key.endswith('_id') and isinstance(destination_attr, Model): | |
sub_source = getattr(source, fk_name) | |
set_attributes(sub_source, destination_attr) | |
elif hasattr(value, '__dict__') and isinstance(destination_attr, Model): | |
# check if this key is represented as another object | |
set_attributes(value, destination_attr) | |
else: | |
setattr(destination_obj, key, value) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment