Created
May 16, 2017 13:57
-
-
Save pallih/96d1726f83a063710b32e45a367af6f0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import random | |
import tweepy | |
from tweepy.models import Status | |
import requests | |
import lxml.html | |
import json | |
import dataset | |
DB_CONNECT = "" | |
consumer_key = "" | |
consumer_secret = "" | |
access_token = "" | |
access_token_secret = "" | |
wrong_messages = ["Því miður þá er {} ekki gulur bíll samkvæmt mínum kokkabókum, heldur {}. Ekkert stig.", | |
"Hmmm... {} er {} bíll en ekki gulur. Sorrí. Ekkert stig.", | |
"Nei, nei. {} er {} bíll. Við erum að leita að gulum bílum. Ekkert stig.", | |
"{} er {} bíll. Ekki gulur. Ekkert stig.", | |
"Mér finnst {} ekki vera gulur bíll. Meira svona {} bíll. Ekkert stig", | |
"Fyrsta lögmál gula bílsins er að {} er ekki gulur bíll. Ekkert stig", | |
"Gulur bíll? Nei, nei. {} er ekkert gulur. Hann er {}."] | |
nothing_found_messages = ["Ég er ekki viss um að {} sé bílnúmer. Í það minnsta finn ég ekkert slíkt. Sorrí, ekkert stig.", | |
"{} er eitthvað skrítið bílnúmer. Ég fann ekkert. Leitaði víða. Sorrí, ekkert stig.", | |
"Neibbs. {} er ekki bílnúmer á mínu heimili. Ekkert stig.", | |
"Ertu viss um að {} sé bílnúmer? Ég fann engan bíl með því númeri..."] | |
yellow_messages = ["Vúhú! {} er gulur bíll! Eitt stig til þín! Þú ert núna með {} stig.", | |
"Blístur og fagnaðarlæti! {} er gulur bíll. Eitt stig til þín! Þú ert núna með {} stig.", | |
"Já! {} er heiðgul bifreið. Eitt stig til þín! Þú ert núna með {} stig.", | |
"{} er gulur sem sólin! Eitt stig til þín! Þú ert núna með {} stig.", | |
"Loksins! {} er gulur bíll. Eitt stig til þín! Þú ert núna með {} stig.", | |
"Jáá! Þú ert á hraðri leið á toppinn! {} er gulur sem þorskur! Þú ert núna með {} stig.", | |
"GUUUUUUUUUULUUUUUUUUUR BÍÍÍÍÍÍÍÍÍLLLLL! {} gæti ekki verið gulari. Þú ert núna með {} stig."] | |
successful_tweets = ["Jibbí! @{} fann gulan bíl og fékk stig!", | |
"GULUR BÍLL! @{} fann gulan bíl!", | |
"Enn einn guli bíllinn! @{} fær stig!", | |
"Hvað eru eiginlega margir gulir bílar? @{} fær stig!", | |
"Gleði og hamingja! @{} fann gulan bíl!", | |
"Þessi bíll er heiðgulur! @{} fær stig!", | |
"Allt að gerast! @{} fann gulan bíl!", | |
"Þvílíkur dagur! @{} fann gulan bíl!", | |
"Ég gleðst í hjarta mér því @{} fann gulan bíl!", | |
"GUUUUUUUUULUUUUUUUR BÍÍÍÍÍÍÍÍÍLLLLL! @{} fann gulan bíl!", | |
"Þessi dagur verður bara betri og betri! @{} fann gulan bíl!"] | |
class StreamListener(tweepy.StreamListener): | |
def send_dm(self, screen_name, text): | |
api.send_direct_message(screen_name=screen_name, text=text) | |
def send_tweet(self, text): | |
api.update_status(status=text) | |
def lookup_number(self, text, status): | |
reply_to = status.direct_message["sender_screen_name"] | |
if reply_to == u'gulurbill': | |
# do not loop endlessly | |
return | |
table = db['users'] | |
user = table.find_one(twitter_handle=reply_to) | |
if not user: | |
scored_yellow = json.dumps(list()) | |
table.upsert(dict(twitter_handle=reply_to, score=0, scored_yellow=scored_yellow), ['twitter_handle']) | |
lookup_url = "http://www.samgongustofa.is/umferd/okutaeki/okutaekjaskra/uppfletting?vq={}" | |
r = requests.get(lookup_url.format(text.strip())) | |
if u"Ekkert fannst" in r.text: | |
dm_text = random.choice(nothing_found_messages) | |
dm_text = dm_text.format(text).decode('utf-8') | |
self.send_dm(screen_name=reply_to, text=dm_text) | |
return | |
root = lxml.html.fromstring(r.text) | |
tegund = root.xpath("//div[@class='vehicleinfo']/div[@class='boxbody']/ul/li[contains(., 'Tegund')]/span")[0].text.strip() | |
stada = root.xpath("//div[@class='vehicleinfo']/div[@class='boxbody']/ul/li[contains(., 'Sta')]/span")[0].text.strip() | |
if stada == u"Afskráð": | |
dm_text = "{} er afskráð bifreið. Ekkert stig".format(text) | |
self.send_dm(screen_name=reply_to, text=dm_text) | |
return | |
if stada == u"Úr umferð": | |
dm_text = "{} er bifreið sem hefur verið tekin úr umferð. Ekkert stig".format(text) | |
self.send_dm(screen_name=reply_to, text=dm_text) | |
return | |
color = tegund.split('(')[1].replace(')', '').strip().lower() | |
if len(color) > 1: | |
if color == u"gulur": | |
user["score"] = user["score"] + 1 | |
user["scored_yellow"] = json.loads(user["scored_yellow"]) | |
if (user["scored_yellow"]) and (text in user["scored_yellow"]): | |
# check if already scored | |
dm_text = "{} er vissulega gulur bíll... en þú hefur nú þegar fengið stig fyrir hann. Ekkert stig til þín!".format(text) | |
self.send_dm(screen_name=reply_to, text=dm_text) | |
return | |
else: | |
# update and send dm | |
user["scored_yellow"] = json.dumps(user["scored_yellow"] + [text]) | |
dm_text = random.choice(yellow_messages) | |
dm_text = dm_text.format(text, str(user["score"])) | |
table.upsert(user, ['twitter_handle']) | |
db.commit() | |
tweet_text = random.choice(successful_tweets) | |
tweet_text = tweet_text.format(reply_to) | |
to_tweet = random.choice(range(0, 5)) | |
if to_tweet == 4: | |
try: | |
self.send_tweet(tweet_text) | |
except: | |
pass | |
self.send_dm(screen_name=reply_to, text=dm_text) | |
return | |
else: | |
dm_text = random.choice(wrong_messages) | |
dm_text = dm_text.format(text, color.encode('utf-8')) | |
self.send_dm(screen_name=reply_to, text=dm_text) | |
return | |
def on_data(self, raw_data): | |
data = json.loads(raw_data) | |
if 'direct_message' in data: | |
status = Status.parse(self.api, data) | |
if self.on_direct_message(status) is False: | |
return False | |
elif 'event' in data: | |
if data["event"] == 'follow': | |
if self.on_follow(data) is False: | |
return False | |
def get_top_5(self): | |
dm_text = "Topp 5:\n" | |
result = db.query('SELECT * FROM users ORDER BY score DESC LIMIT 5') | |
for row in result: | |
dm_text += "@" + row['twitter_handle'] + " (" + str(row['score']) + ')\n' | |
return dm_text | |
def get_help(self): | |
dm_text = "Gulur bíll - hjálp\n" | |
dm_text += "Sendu bílnúmer til mín og ef bíllinn er gulur þá færðu stig. " | |
dm_text += "Dagurinn þinn verður betri. Það mun stytta upp. Vinda mun lægja. Allt verður gott.\n" | |
dm_text += "Þú getur líka sent til mín TOPP3 til að fá stöðu efstu þriggja eða " | |
dm_text += "MÍNSTAÐA til að fá senda þína stöðu.\n" | |
return dm_text | |
def get_position(self, twitter_handle): | |
table = db['users'] | |
user = table.find_one(twitter_handle=twitter_handle) | |
all_users = db.query('SELECT * FROM users ORDER BY score DESC') | |
position = 1 | |
total = 1 | |
for i, row in enumerate(all_users): | |
if twitter_handle == row["twitter_handle"]: | |
position = position + i | |
total = total + i | |
dm_text = "Þú ert með {} stig og ert í sæti nr. {} af {}" | |
dm_text = dm_text.format(str(user["score"]), str(position), str(total)) | |
return dm_text | |
def on_direct_message(self, status): | |
text = status.direct_message["text"].strip("-").replace(" ", "").upper() | |
reply_to = status.direct_message["sender_screen_name"] | |
if reply_to == u'gulurbill': | |
# do not loop endlessly | |
return | |
if text == u"TOPP3" or text == u"TOPP5" or text == "TOPP": | |
dm_text = self.get_top_5() | |
self.send_dm(screen_name=reply_to, text=dm_text) | |
return | |
elif text == u"HJÁLP": | |
dm_text = self.get_help() | |
self.send_dm(screen_name=reply_to, text=dm_text) | |
return | |
elif text == u"MÍNSTAÐA": | |
dm_text = self.get_position(reply_to) | |
self.send_dm(screen_name=reply_to, text=dm_text) | |
return | |
else: | |
self.lookup_number(text.encode('utf-8'), status) | |
def on_error(self, status_code): | |
if status_code == 420: | |
return False | |
def on_follow(self, data): | |
user_who_followed = data["source"]["screen_name"] | |
if user_who_followed == u'gulurbill': | |
return | |
else: | |
table = db['users'] | |
user = table.find_one(twitter_handle=user_who_followed) | |
if not user: | |
scored_yellow = json.dumps(list()) | |
table.upsert(dict(twitter_handle=user_who_followed, score=0, scored_yellow=scored_yellow), ['twitter_handle']) | |
api.create_friendship(id=user_who_followed) | |
return | |
if __name__ == '__main__': | |
db = dataset.connect(DB_CONNECT) | |
auth = tweepy.OAuthHandler(consumer_key, consumer_secret) | |
auth.set_access_token(access_token, access_token_secret) | |
api = tweepy.API(auth) | |
myStreamListener = StreamListener() | |
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener) | |
myStream.userstream(async=False) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment