Last active
February 19, 2019 08:44
-
-
Save korniltsev/85c5572a9ff9301297822b58b1479da4 to your computer and use it in GitHub Desktop.
OkApy.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import urllib | |
import gzip | |
import cStringIO | |
urllib.quote_plus = urllib.quote | |
class Request: | |
def __init__(self, method, params): | |
self.method = method | |
self.params = params | |
def supplier(self, key, value): | |
self.params[key] = {"supplier": value} | |
def pretty_print(r): | |
print json.dumps(json.loads(r), sort_keys=True, indent=4, separators=(',', ': ')) | |
def okid(it): | |
return str(it ^ 265224201205) | |
def batch_to_json(requests): | |
jsondata = [] | |
for r in requests: | |
it = { | |
"method": r.method, | |
"params": r.params, | |
} | |
jsondata += [it] | |
dumps = json.dumps(jsondata) | |
print dumps | |
return dumps | |
APP_OK_ANDROID = "CBAFJIICABABABABA" | |
UA_ANDROID = "OKAndroid/18.6.25 debug b357 (Android 7.0; ru_RU; samsung SM-G920F Build/NRD90M.G920FXXU5EQHD; xxxhdpi 640dpi 1440x2560)" | |
URL_PROD = "https://api.ok.ru" | |
URL_TEST = "https://apitest.ok.ru" | |
class OkApi: | |
def __init__(self, app=APP_OK_ANDROID, ua=UA_ANDROID, url=URL_PROD): | |
self.app = app | |
self.ua = ua | |
self.api = url | |
self.session_key = None | |
# todo session expired support | |
def execute(self, request): | |
url = self.api + "/api/" + request.method.replace('.', '/') | |
args = dict(request.params) | |
args['application_key'] = self.app | |
if self.session_key is not None: | |
args['session_key'] = self.session_key | |
headers = { | |
"User-Agent": self.ua, | |
'Content-Encoding': 'gzip', | |
} | |
encoded = urllib.urlencode(args) | |
compressed = self.compress(encoded) | |
return requests.post(url, data=compressed, headers=headers) | |
def batch(self, rs): | |
jsdata = batch_to_json(rs) | |
url = self.api + "/api/batch/execute" | |
args = { | |
'application_key': self.app, | |
'methods': jsdata, | |
} | |
if self.session_key is not None: | |
args['session_key'] = self.session_key | |
headers = { | |
"User-Agent": self.ua, | |
'Content-Encoding': 'gzip', | |
} | |
encoded = urllib.urlencode(args) | |
compressed = self.compress(encoded) | |
return requests.post(url, data=compressed, headers=headers) | |
def compress(self, data): | |
buffer = cStringIO.StringIO() | |
writer = gzip.GzipFile(None, 'wb', 6, buffer) | |
writer.write(data) | |
writer.close() | |
buffer.seek(0) | |
ret = buffer.read() | |
return ret | |
def set_session(self, session): | |
if session is None: | |
self.session_key = None | |
else: | |
self.session_key = session['session_key'] | |
def login(self, u, p): | |
loginreq = Request("auth.login", { | |
'user_name': u, | |
'password': p, | |
# 'gen_token': 'true', | |
}) | |
session = self.execute(loginreq) | |
session_dict = json.loads(session.text) | |
self.set_session(session_dict) | |
pretty_print(session.text) | |
return session_dict | |
def loginByToken(self, auth_token): | |
loginreq = Request("auth.loginByToken", { | |
'token': auth_token, | |
}) | |
# todo session restore? | |
return self.execute(loginreq) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment