|
#!/usr/bin/env python3 |
|
import concurrent.futures |
|
|
|
from twitter import TwitterError |
|
import twitter |
|
from requests_oauthlib import OAuth1Session |
|
import webbrowser |
|
import yaml |
|
|
|
REQUEST_TOKEN_URL = 'https://api.twitter.com/oauth/request_token' |
|
ACCESS_TOKEN_URL = 'https://api.twitter.com/oauth/access_token' |
|
AUTHORIZATION_URL = 'https://api.twitter.com/oauth/authorize' |
|
SIGNIN_URL = 'https://api.twitter.com/oauth/authenticate' |
|
|
|
|
|
def confirm(message, default=None): |
|
if default is None: |
|
t = input(message) |
|
else: |
|
if default: |
|
default_input = 'y' |
|
else: |
|
default_input = 'n' |
|
t = input('%s [%s]' % (message, default_input)) |
|
if t == '' or t is None and default is not None: |
|
return default |
|
while t != 'y' and t != 'n': |
|
t = input('Type y or n: ') |
|
if t == 'y': |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
def get_access_token(ck, cs): |
|
oauth_client = OAuth1Session(client_key=ck, client_secret=cs, callback_uri='oob') |
|
|
|
print('\nRequesting temp token from Twitter...\n') |
|
|
|
try: |
|
resp = oauth_client.fetch_request_token(REQUEST_TOKEN_URL) |
|
except ValueError as e: |
|
raise 'Invalid response from Twitter requesting temp token: {0}'.format(e) |
|
|
|
url = oauth_client.authorization_url(AUTHORIZATION_URL) |
|
|
|
print('I will try to start a browser to visit the following Twitter page ' |
|
'if a browser will not start, copy the URL to your browser ' |
|
'and retrieve the pincode to be used ' |
|
'in the next step to obtaining an Authentication Token: \n' |
|
'\n\t{0}'.format(url)) |
|
|
|
webbrowser.open(url) |
|
pincode = input('\nEnter your pincode? ') |
|
|
|
print('\nGenerating and signing request for an access token...\n') |
|
|
|
oauth_client = OAuth1Session(client_key=ck, client_secret=cs, |
|
resource_owner_key=resp.get('oauth_token'), |
|
resource_owner_secret=resp.get('oauth_token_secret'), |
|
verifier=pincode) |
|
try: |
|
resp = oauth_client.fetch_access_token(ACCESS_TOKEN_URL) |
|
except ValueError as e: |
|
raise 'Invalid response from Twitter requesting temp token: {0}'.format(e) |
|
|
|
return resp.get('oauth_token'), resp.get('oauth_token_secret') |
|
|
|
|
|
def load_credentials(): |
|
try: |
|
with open('.twitter_credentials.yml') as f: |
|
c = yaml.load(f) |
|
return c['consumer_key'], c['consumer_secret'], c['access_token'], \ |
|
c['access_token_secret'] |
|
except IOError: |
|
return None |
|
|
|
|
|
def get_credentials(): |
|
ck = input('Input consumer key: ') |
|
cs = input('Input consumer secret: ') |
|
|
|
if confirm('Do you have access token and secret already?', default=False): |
|
at = input('Input access token: ') |
|
ats = input('Input access token secret: ') |
|
else: |
|
at, ats = get_access_token(ck, cs) |
|
return ck, cs, at, ats |
|
|
|
|
|
credentials = load_credentials() |
|
|
|
if not credentials or confirm('Do you want to switch to a new user?', default=False): |
|
credentials = get_credentials() |
|
with open('.twitter_credentials.yml', 'w') as f: |
|
yaml.dump({ |
|
'consumer_key': credentials[0], |
|
'consumer_secret': credentials[1], |
|
'access_token': credentials[2], |
|
'access_token_secret': credentials[3], |
|
}, f, default_flow_style=False) |
|
|
|
consumer_key, consumer_secret, access_token, access_token_secret = credentials |
|
|
|
api = twitter.Api(consumer_key=consumer_key, consumer_secret=consumer_secret, access_token_key=access_token, |
|
access_token_secret=access_token_secret) |
|
|
|
friend_ids_cursor = -1 |
|
friend_ids = [] |
|
|
|
print('Getting following list...') |
|
while friend_ids_cursor != 0: |
|
friend_ids_cursor, _, ids = api.GetFriendIDsPaged(cursor=friend_ids_cursor) |
|
friend_ids += ids |
|
print('You have %d followings' % len(friend_ids)) |
|
|
|
follower_ids_cursor = -1 |
|
follower_ids = [] |
|
|
|
print('Getting followers list') |
|
while follower_ids_cursor != 0: |
|
follower_ids_cursor, _, ids = api.GetFollowerIDsPaged(cursor=follower_ids_cursor) |
|
follower_ids += ids |
|
print('You have %d followers' % len(follower_ids)) |
|
|
|
no_mutual_followers = set(follower_ids) - set(friend_ids) |
|
|
|
print('You have %d followers you haven\'t followed.' % len(no_mutual_followers)) |
|
|
|
unblock = confirm('Unblock those users after removed from followers list?', default=True) |
|
|
|
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) |
|
|
|
cancelled = False |
|
|
|
block_failed_ids = [] |
|
unblock_failed_ids = [] |
|
|
|
|
|
def remove_follower(uid): |
|
if cancelled: |
|
return |
|
try: |
|
print('blocking %d' % uid) |
|
api.CreateBlock(uid) |
|
except TwitterError: |
|
block_failed_ids.append(uid) |
|
if unblock: |
|
try: |
|
print('unblocking %d' % uid) |
|
api.DestroyBlock(uid) |
|
except TwitterError: |
|
unblock_failed_ids.append(uid) |
|
|
|
|
|
try: |
|
for user_id in no_mutual_followers: |
|
executor.submit(remove_follower, user_id) |
|
executor.shutdown(wait=True) |
|
except (KeyboardInterrupt, SystemExit): |
|
cancelled = True |
|
print('Interrupted, exiting...') |
|
|
|
with open('block_failed_ids.list', 'w') as f: |
|
for user_id in block_failed_ids: |
|
f.write('%d\n' % user_id) |
|
|
|
with open('unblock_failed_ids.list', 'w') as f: |
|
for user_id in unblock_failed_ids: |
|
f.write('%d\n' % user_id) |