Last active
April 26, 2020 08:01
-
-
Save kaxil/6511003d75b556fac1d8126f4b987239 to your computer and use it in GitHub Desktop.
Twitter Streaming API to PubSub
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def publish(client, pubsub_topic, data_lines): | |
"""Publish to the given pubsub topic.""" | |
messages = [] | |
for line in data_lines: | |
messages.append({'data': line}) | |
body = {'messages': messages} | |
str_body = json.dumps(body) | |
data = base64.urlsafe_b64encode(bytearray(str_body, 'utf8')) | |
client.publish(topic=pubsub_topic, data=data) | |
class TweetStreamListener(StreamListener): | |
""" | |
A listener handles tweets that are received from the stream. | |
This listener dumps the tweets into a PubSub topic | |
""" | |
client = pubsub.PublisherClient() | |
pubsub_topic = client.topic_path(GCP_PROJECT_NAME, PUBSUB_TOPIC_NAME) | |
count = 0 | |
tweets = [] | |
batch_size = 1 | |
# total_tweets = 10000 | |
total_tweets = TOTAL_TWEETS | |
def write_to_pubsub(self, tweets): | |
publish(self.client, self.pubsub_topic, tweets) | |
def on_status(self, status): | |
# Converting the time to isoformat for serialisation | |
created_at = status.created_at.isoformat() | |
id_str = status.id_str | |
text = status.text | |
source = status.source | |
user_name = status.user.name | |
user_screen_name = status.user.screen_name | |
loc = status.user.location | |
bio = status.user.description | |
tw = dict(text=text, bio=bio, created_at=created_at, tweet_id=id_str, | |
location=loc, user_name=user_name, | |
user_screen_name=user_screen_name, | |
source=source) | |
self.tweets.append(tw) | |
if len(self.tweets) >= self.batch_size: | |
self.write_to_pubsub(self.tweets) | |
print(self.tweets) | |
self.tweets = [] | |
self.count += 1 | |
if self.count >= self.total_tweets: | |
return False | |
if (self.count % 5) == 0: | |
print("count is: {} at {}".format(self.count, datetime.datetime.now())) | |
return True | |
def on_error(self, status_code): | |
print(status_code) | |
if __name__ == '__main__': | |
print '....' | |
auth = OAuthHandler(TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET) | |
auth.set_access_token(TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET) | |
stream_listener = TweetStreamListener() | |
stream = Stream(auth, stream_listener) | |
stream.filter( | |
track=['Royal Wedding', '#RoyalWedding', 'Prince Harry', 'Meghan Markle'] | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment