Created
April 29, 2018 19:08
-
-
Save Lokno/c3bac857e535abddb389d16bad7d566b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Python 3 script that reads the latest tweets that | |
| # match the search term (line 21) and appends the url to | |
| # the first video associated with each tweet to file | |
| # See line 47 for the file name | |
| # tested with Python 3.6.4 | |
| import tweepy | |
| import signal | |
| import sys | |
| import os | |
| # enter your twitter app credentials | |
| # https://apps.twitter.com/ | |
| consumer_key='INSERT_YOUR_CONSUMER_KEY' | |
| consumer_secret='INSERT_YOUR_CONSUMER_SECRET' | |
| access_token='INSERT_YOUR_ACCESS_TOKEN' | |
| access_token_secret='INSERT_YOUR_ACCESS_TOKEN_SECRET' | |
| # the desired search term for the tweet stream | |
| search_string='#Splatoon2' | |
| # takes a status object and returns the highest bitrate | |
| # video url associated with it | |
| def get_video_url( tweet ): | |
| url = None | |
| if hasattr(tweet,'extended_entities'): | |
| if 'media' in tweet.extended_entities: | |
| if len(tweet.extended_entities['media']) > 0: | |
| vid_info = None | |
| for entry in tweet.extended_entities['media']: | |
| if 'video_info' in entry: | |
| vid_info = entry['video_info'] | |
| if vid_info is not None and 'variants' in vid_info: | |
| highest_bitrate = 0 | |
| for v in vid_info['variants']: | |
| if 'bitrate' in v and 'url' in v and v['bitrate'] > highest_bitrate: | |
| highest_bitrate = v['bitrate'] | |
| url = v['url'] | |
| return url | |
| class StreamListener(tweepy.StreamListener): | |
| output_stream = None | |
| output_filename = 'videos.txt' # change this to your text file | |
| def on_status(self, status): | |
| url = get_video_url(status) | |
| if url is not None: | |
| # append to file | |
| with open(self.output_filename,'a') as output_stream: | |
| output_stream.write(str(status.id) + ',' + url + '\n') | |
| def on_error(self, status_code): | |
| if status_code == 420: | |
| return False | |
| def signal_handler(signal, frame): | |
| sys.exit(0) | |
| if __name__ == "__main__": | |
| auth = tweepy.OAuthHandler(consumer_key, consumer_secret) | |
| auth.set_access_token(access_token, access_token_secret) | |
| signal.signal(signal.SIGINT, signal_handler) | |
| stream_listener = StreamListener() | |
| stream = tweepy.Stream(auth=auth, listener=stream_listener) | |
| # replace this text with what you'd like to search for | |
| stream.filter(track=[search_string]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment