Skip to content

Instantly share code, notes, and snippets.

@korniltsev
Last active February 19, 2019 08:44
Show Gist options
  • Save korniltsev/85c5572a9ff9301297822b58b1479da4 to your computer and use it in GitHub Desktop.
Save korniltsev/85c5572a9ff9301297822b58b1479da4 to your computer and use it in GitHub Desktop.
OkApy.py
import requests
import json
import urllib
import gzip
import cStringIO
urllib.quote_plus = urllib.quote
class Request:
def __init__(self, method, params):
self.method = method
self.params = params
def supplier(self, key, value):
self.params[key] = {"supplier": value}
def pretty_print(r):
print json.dumps(json.loads(r), sort_keys=True, indent=4, separators=(',', ': '))
def okid(it):
return str(it ^ 265224201205)
def batch_to_json(requests):
jsondata = []
for r in requests:
it = {
"method": r.method,
"params": r.params,
}
jsondata += [it]
dumps = json.dumps(jsondata)
print dumps
return dumps
APP_OK_ANDROID = "CBAFJIICABABABABA"
UA_ANDROID = "OKAndroid/18.6.25 debug b357 (Android 7.0; ru_RU; samsung SM-G920F Build/NRD90M.G920FXXU5EQHD; xxxhdpi 640dpi 1440x2560)"
URL_PROD = "https://api.ok.ru"
URL_TEST = "https://apitest.ok.ru"
class OkApi:
def __init__(self, app=APP_OK_ANDROID, ua=UA_ANDROID, url=URL_PROD):
self.app = app
self.ua = ua
self.api = url
self.session_key = None
# todo session expired support
def execute(self, request):
url = self.api + "/api/" + request.method.replace('.', '/')
args = dict(request.params)
args['application_key'] = self.app
if self.session_key is not None:
args['session_key'] = self.session_key
headers = {
"User-Agent": self.ua,
'Content-Encoding': 'gzip',
}
encoded = urllib.urlencode(args)
compressed = self.compress(encoded)
return requests.post(url, data=compressed, headers=headers)
def batch(self, rs):
jsdata = batch_to_json(rs)
url = self.api + "/api/batch/execute"
args = {
'application_key': self.app,
'methods': jsdata,
}
if self.session_key is not None:
args['session_key'] = self.session_key
headers = {
"User-Agent": self.ua,
'Content-Encoding': 'gzip',
}
encoded = urllib.urlencode(args)
compressed = self.compress(encoded)
return requests.post(url, data=compressed, headers=headers)
def compress(self, data):
buffer = cStringIO.StringIO()
writer = gzip.GzipFile(None, 'wb', 6, buffer)
writer.write(data)
writer.close()
buffer.seek(0)
ret = buffer.read()
return ret
def set_session(self, session):
if session is None:
self.session_key = None
else:
self.session_key = session['session_key']
def login(self, u, p):
loginreq = Request("auth.login", {
'user_name': u,
'password': p,
# 'gen_token': 'true',
})
session = self.execute(loginreq)
session_dict = json.loads(session.text)
self.set_session(session_dict)
pretty_print(session.text)
return session_dict
def loginByToken(self, auth_token):
loginreq = Request("auth.loginByToken", {
'token': auth_token,
})
# todo session restore?
return self.execute(loginreq)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment