Created
June 9, 2016 19:31
-
-
Save aoleg94/55121f1c806376cf283421703a600976 to your computer and use it in GitHub Desktop.
vk_api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ python vkfast.py | |
Python 2.7.11+ (default, Apr 17 2016, 14:00:29) | |
[GCC 5.3.1 20160413] on linux2 | |
Type "help", "copyright", "credits" or "license" for more information. | |
(InteractiveConsole) | |
>>> res = vk.messages.getDialogs() | |
>>> print res.response[1].body | |
<last chat's last message> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cookielib | |
import urllib2 | |
import urllib | |
from urlparse import urlparse | |
from HTMLParser import HTMLParser | |
import json | |
class FormParser(HTMLParser): | |
def __init__(self): | |
HTMLParser.__init__(self) | |
self.url = None | |
self.params = {} | |
self.in_form = False | |
self.form_parsed = False | |
self.method = "GET" | |
self.captcha_url = None | |
def handle_starttag(self, tag, attrs): | |
tag = tag.lower() | |
if tag == "form": | |
if self.form_parsed: | |
# raise RuntimeError("Second form on page") | |
return | |
if self.in_form: | |
raise RuntimeError("Already in form") | |
self.in_form = True | |
if not self.in_form: | |
return | |
attrs = dict((name.lower(), value) for name, value in attrs) | |
if tag == "form": | |
self.url = attrs["action"] | |
if "method" in attrs: | |
self.method = attrs["method"].upper() | |
elif tag == "input" and "type" in attrs and "name" in attrs: | |
if attrs["type"] in ["hidden", "text", "password"]: | |
self.params[attrs["name"]] = attrs["value"] if "value" in attrs else "" | |
elif tag == "img" and attrs.get("id") == "captcha": | |
self.captcha_url = attrs['src'] | |
def handle_endtag(self, tag): | |
tag = tag.lower() | |
if tag == "form": | |
if not self.in_form: | |
raise RuntimeError("Unexpected end of <form>") | |
self.in_form = False | |
self.form_parsed = True | |
def auth_user1(email, password, client_id, scope, opener, captcha=None): | |
response = opener.open( | |
"http://oauth.vk.com/oauth/authorize?" + \ | |
"redirect_uri=http://oauth.vk.com/blank.html&response_type=token&" + \ | |
"client_id=%s&scope=%s&display=wap" % (client_id, ",".join(scope)) | |
) | |
doc = response.read() | |
parser = FormParser() | |
parser.feed(doc) | |
parser.close() | |
if not parser.form_parsed or parser.url is None or "pass" not in parser.params or \ | |
"email" not in parser.params: | |
raise RuntimeError("Something wrong") | |
parser.params["email"] = email | |
parser.params["pass"] = password | |
if captcha is not None: | |
parser.params["captcha_key"] = captcha | |
if parser.method == "POST": | |
response = opener.open(parser.url, urllib.urlencode(parser.params)) | |
else: | |
raise NotImplementedError("Method '%s'" % parser.method) | |
return response.read(), response.geturl() | |
def auth_user(email, password, client_id, scope, opener): | |
doc, url = auth_user1(email, password, client_id, scope, opener) | |
while 'captcha_key' in doc: | |
parser = FormParser() | |
parser.feed(doc) | |
parser.close() | |
res = opener.open(parser.captcha_url) | |
import tempfile,sys | |
f = open(tempfile.mktemp(".jpg"), "wb") | |
f.write(res.read()) | |
print >>sys.stderr, "Open captcha image at %s and type letters here:", | |
doc, url = auth_user1(email, password, client_id, scope, opener, raw_input()) | |
return doc, url | |
def give_access(doc, opener): | |
parser = FormParser() | |
parser.feed(doc) | |
parser.close() | |
if not parser.form_parsed or parser.url is None: | |
raise RuntimeError("Something wrong") | |
if parser.method == "POST": | |
response = opener.open(parser.url, urllib.urlencode(parser.params)) | |
else: | |
raise NotImplementedError("Method '%s'" % params.method) | |
return response.geturl() | |
def auth(email, password, client_id, scope): | |
if not isinstance(scope, list): | |
scope = [scope] | |
opener = urllib2.build_opener( | |
urllib2.HTTPCookieProcessor(cookielib.CookieJar()), | |
urllib2.HTTPRedirectHandler()) | |
doc, url = auth_user(email, password, client_id, scope, opener) | |
if urlparse(url).path != "/blank.html": | |
# Need to give access to requested scope | |
url = give_access(doc, opener) | |
if urlparse(url).path != "/blank.html": | |
raise RuntimeError("Expected success here") | |
def split_key_value(kv_pair): | |
kv = kv_pair.split("=") | |
return kv[0], kv[1] | |
answer = dict(split_key_value(kv_pair) for kv_pair in urlparse(url).fragment.split("&")) | |
if "access_token" not in answer or "user_id" not in answer: | |
raise RuntimeError("Missing some values in answer") | |
return answer["access_token"], answer["user_id"] | |
def call_api(method, params, token): | |
if isinstance(params, list): | |
params_list = [kv for kv in params] | |
elif isinstance(params, dict): | |
params_list = params.items() | |
else: | |
params_list = [params] | |
params_list.append(("access_token", token)) | |
url = "https://api.vk.com/method/%s?%s" % (method, urllib.urlencode(params_list)) | |
return json.loads(urllib2.urlopen(url).read()) | |
def transform(o): | |
if isinstance(o,dict): return DictShim(o) | |
elif isinstance(o,list): return ListShim(o) | |
return o | |
class DictShim(dict): | |
def __getattr__(self,n): | |
return transform(self[n]) | |
class ListShim(list): | |
def __getitem__(self,n): | |
return transform(list.__getitem__(self,n)) | |
class NameAccumulator(object): | |
def __init__(self,parent,name): | |
self.token=parent.token | |
self.name=parent.name+name | |
self.__cache={} | |
def __call__(self,**kw): | |
return transform(call_api(self.name[1:],kw,self.token)) | |
def __getattr__(self,n): | |
if n in self.__cache: return self.__cache[n] | |
else: return self.__cache.setdefault(n,NameAccumulator(self,'.'+n)) | |
class VK(object): | |
def __init__(self,user,pwd,clid,scope): | |
self.token,self.user_id = auth(user,pwd,clid,scope) | |
self.name='' | |
self.__cache={} | |
def __getattr__(self,n): | |
if n in self.__cache: return self.__cache[n] | |
else: return self.__cache.setdefault(n,NameAccumulator(self,'.'+n)) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import vk_api | |
APPID=<vk app id> | |
UID='<email>' | |
PID='<password in base64>'.decode('base64') | |
vk=vk_api.VK(UID,PID,APPID,[ | |
'photos', | |
'messages', | |
'friends', | |
'audio', | |
'video', | |
'docs', | |
'notes', | |
'pages', | |
'status', | |
'wall', | |
'groups', | |
'notifications', | |
'stats' | |
]) | |
import code | |
code.interact(local=locals()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment