발로짠코드 tw.py 실행후 브라우져에서 허락하고 핀코드뜨면 입력 기다리다보면 info.json 생성함 pic 이랑 vid 폴더를 미리 만들어놔야함 그다음 parser.py 실행하면 저장해줌
1회용코드라 PEP8도 체크안하고 예외처리도 암것도안함...
저장형식: 트윗아이디_사진순서.확장자
import requests | |
import json | |
rets = json.load(open('info.json')) | |
s = set() | |
for fav in rets: | |
id = fav['id'] | |
if 'media' not in fav: | |
continue | |
medias = fav['media'] | |
for n, media in enumerate(medias): | |
filename = '%d_%d' % (id, n) | |
if 'video_info' in media: | |
videos = media['video_info']['variants'] | |
sizes = map(lambda x:x.get('bitrate',0), videos) | |
url = videos[sizes.index(max(sizes))]['url'] | |
filename = '%s/%s.mp4' % ('vid', filename) | |
else: | |
url = media['media_url'] | |
if 'jpg' in url: | |
type = 'jpg' | |
else: | |
type = 'png' | |
filename = '%s/%s.%s' % ('pic', filename, type) | |
with open(filename, 'wb') as f: | |
r = requests.get(url) | |
for chunk in r.iter_content(chunk_size=1024): | |
if chunk: | |
f.write(chunk) | |
print filename |
# -*- coding: utf-8 -*- | |
import urlparse | |
import oauth2 as oauth | |
import twitter | |
import json | |
import webbrowser | |
import time | |
consumer_key = 'VmzAqDGJ4oMSgMasUk2x3PHQk' | |
consumer_secret = 'VjmEW4hmeHaplApi7HsiDkyhCAcTG4so6VkDj5Sbi1VgLB5kYD' | |
consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret) | |
request_token_url = "https://api.twitter.com/oauth/request_token" | |
access_token_url = 'https://api.twitter.com/oauth/access_token' | |
authorize_url = 'https://api.twitter.com/oauth/authorize?oauth_token=%s' | |
class MyOuath(): | |
def __init__(self): | |
self.oauth_token_secret = None | |
self.oauth_token = None | |
def getAuthUrl(self): | |
client = oauth.Client(consumer) | |
resp, content = client.request(request_token_url, "POST", body='oauth_callback=oob') | |
request_token = dict(urlparse.parse_qsl(content)) | |
self.request_token = request_token | |
return authorize_url%request_token['oauth_token'] | |
def getAccessToken(self, pincode): | |
oauth_token = self.request_token['oauth_token'] | |
oauth_token_secret = self.request_token['oauth_token_secret'] | |
token = oauth.Token(oauth_token, oauth_token_secret) | |
token.set_verifier(pincode) | |
client = oauth.Client(consumer, token) | |
resp, content = client.request(access_token_url, "POST") | |
access_token = dict(urlparse.parse_qsl(content)) | |
try: | |
self.oauth_token_secret = access_token['oauth_token_secret'] | |
self.oauth_token = access_token['oauth_token'] | |
except: | |
raise Exception('oauth fail') | |
def getApi(self): | |
if not (self.oauth_token_secret and self.oauth_token): | |
raise Exception('no oauth info') | |
api = twitter.Api(consumer_key = consumer_key, | |
consumer_secret = consumer_secret, | |
access_token_key = self.oauth_token, | |
access_token_secret = self.oauth_token_secret) | |
return api | |
def getHearts(api, max_id=None): | |
ret = [] | |
for x in api.GetFavorites(count=200, max_id=max_id): | |
ret.append(x.AsDict()) | |
max_id = min(map(lambda x:x['id'], ret)) | |
return ret, max_id | |
o = MyOuath() | |
webbrowser.open(o.getAuthUrl()) | |
pin = raw_input('pin: ') | |
o.getAccessToken(pin) | |
api = o.getApi() | |
s = set() | |
max_id = None | |
f = open('info.json', 'w') | |
infos = [] | |
while True: | |
try: | |
rets, max_id = getHearts(api, max_id) | |
except Exception as e: | |
print '=======================' | |
print e | |
print 'sleep 1min' | |
time.sleep(60) | |
continue | |
sLength = len(s) | |
infos.extend(rets) | |
s.update(map(lambda x:x['id'], rets)) | |
print len(s) | |
if sLength == len(s): | |
f.write(json.dumps(infos)) | |
print 'end!' | |
exit() |