Last active
October 12, 2016 23:51
-
-
Save arivero/1e356ffff5c4cb447965cd47f3baf5b6 to your computer and use it in GitHub Desktop.
pokemon notification bot for Telegram/pkalarm somehow extended
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Setup Logging | |
import logging | |
log = logging.getLogger(__name__) | |
#Python modules | |
import io | |
import json | |
#Local modules | |
from ..alarm import Alarm | |
from ..utils import * | |
#External modules | |
import telepot | |
from telepot.namedtuple import InlineKeyboardMarkup,InlineKeyboardButton, ReplyKeyboardMarkup, KeyboardButton, ReplyKeyboardHide, ForceReply | |
from geopy.distance import VincentyDistance | |
from s2sphere import LatLng | |
import motionless | |
import shapely | |
import shapely.geometry | |
from urllib2 import urlopen | |
import requests | |
from peewee import * | |
db = SqliteDatabase('botUsers.db') | |
class BaseModel(Model): | |
class Meta: | |
database = db | |
#las tres clases deberian ser foreigkeyfield(user) pero no tenemos definida la clase/tabla User :-D | |
class homebase(BaseModel): | |
user=CharField(unique=True,primary_key=True) | |
latitud=DecimalField() | |
longitud=DecimalField() | |
class distancia(BaseModel): | |
user=CharField(unique=True,primary_key=True) | |
metros=IntegerField() | |
class favorites(BaseModel): | |
user=CharField() | |
pokemon=IntegerField() | |
class Meta: | |
primary_key = CompositeKey('user','pokemon') | |
def investiga(position, previa=None, descartar=False): | |
Vdist=VincentyDistance(meters=250*1.4142) | |
radial=VincentyDistance(meters=200) | |
currentShape=shapely.geometry.Polygon([(p[0],p[1]) for p in [radial.destination(point=(position[0],position[1],230),bearing=b) for b in range(0,360,15)]]) | |
if previa: | |
if descartar: | |
newShape=previa.difference(currentShape) | |
else: | |
newShape=currentShape.intersection(previa) | |
if not newShape.is_empty: | |
currentShape=newShape | |
center=(currentShape.centroid.x,currentShape.centroid.y,230) | |
p1=Vdist.destination(point=center,bearing=45+180) | |
p2=Vdist.destination(point=center,bearing=45) | |
gmap = motionless.DecoratedMap(size_x=600, size_y=600,scale=2,fillcolor='gray',pathcolor="0x0000EE20",region=True, | |
lat=center[0],lon=center[1],zoom=17) #17 is 1.1943 m/pixel??? | |
for p in currentShape.boundary.coords: | |
gmap.add_path_latlon(p[0],p[1]) | |
urlsmall=gmap.generate_url() #or with simplify() | |
cursor=db.execute_sql('select printf("%.4f",latitude) lat, printf("%.4f",longitude) lon from spawns where latitude > ? and longitude > ? and latitude < ? and longitude < ? order by random()', | |
[p1[0],p1[1],p2[0],p2[1]]) | |
totalmarkers=0 | |
container=currentShape.buffer(90.0/10000/1000*25.0) #el buffer deberia ser distinto en lat que en long, pero bueno... | |
reserve=[] | |
for lat,lon in cursor: | |
color='red' | |
size = 'tiny' | |
marker=motionless.LatLonMarker(lat, lon, color=color, size=size) | |
if container.contains(shapely.geometry.Point(float(lat),float(lon))): | |
gmap.add_marker(marker) | |
totalmarkers+=1 | |
else: | |
reserve.append(marker) | |
for m in reserve: | |
gmap.add_marker(m) | |
totalmarkers+=1 | |
if totalmarkers>100: | |
break | |
try: | |
url=gmap.generate_url() | |
except: | |
url=urlsmall | |
print "url len is", len(url), len(urlsmall),totalmarkers | |
return url,currentShape | |
class Telegram_Alarm(Alarm): | |
#Gather settings and create alarm | |
def __init__(self, settings): | |
db.connect() | |
db.create_tables([homebase, favorites,distancia], safe=True) | |
self.bot_token = settings['bot_token'] | |
self.avisados=set() | |
self.exclusion={} #dict of booleans | |
self.connect() | |
self.chat_id = settings['chat_id'] | |
self.send_map = parse_boolean(settings.get('send_map', "True")) | |
self.title = settings.get('title', "A wild <pkmn> has appeared!") | |
self.body = settings.get('body', "<gmaps> \n #<pkmn> available until <24h_time> (<time_left>).") | |
log.info("Telegram Alarm intialized.") | |
self.client.sendMessage(self.chat_id, 'Relanzando PokeAlarm! Se olvidan posiciones previas') | |
self.client.message_loop(self.handle) | |
self.shapes={} | |
#(Re)establishes Telegram connection | |
def connect(self): | |
self.client = telepot.Bot(self.bot_token) | |
def handle(self,msg): | |
print "al bot:",msg | |
if telepot.flavor(msg) == 'callback_query': | |
cbid,fromuser_id,data = telepot.glance(msg,flavor='callback_query') #ahora tambien hay un chat_instance | |
chat_id,origin_id =telepot.origin_identifier(msg) | |
self.exclusion[fromuser_id]= (data=="exclusion") | |
self.client.answerCallbackQuery(cbid, | |
"Estas en modo "+data+". Responde al mensaje para dar una nueva posicion con la que operar", | |
show_alert=not (fromuser_id in self.avisados) ) | |
#reply_markup=ForceReply(selective=True)) | |
self.avisados.add(fromuser_id) | |
return | |
print "al bot: 2",msg['chat'] | |
print "al bot: 3",msg['chat']['id'],msg['chat']['id'] < 0 | |
if "location" in msg: | |
if msg['chat']['id'] < 0 : #es para analizar!! | |
chat_id=msg['chat']['id'] | |
origen=msg['reply_to_message']['message_id'] | |
self.client.sendChatAction(chat_id,'find_location') #upload_photo upload_document typing | |
respuesta,shape=investiga((msg['location']['latitude'],msg['location']['longitude']),self.shapes.get(origen),self.exclusion.get(msg['from']['id'])) | |
print respuesta | |
datafile=io.BytesIO(urlopen(respuesta).read()) | |
datafile.name='mapa.png' | |
url = 'https://api.telegram.org/bot'+self.bot_token+'/sendPhoto' | |
markup = {'inline_keyboard':[ | |
[{'text':'Corroborar (lo veo)', 'callback_data':'interseccion'}], | |
[{'text':'Descartar (no lo veo)', 'callback_data':'exclusion'}] | |
]} | |
data={ 'chat_id':msg['chat']['id'], | |
'caption': "Zona del avistamiento", | |
'reply_markup':json.dumps(markup)} | |
#files = {'photo':open('mapa.png', 'rb')} | |
files = {'photo': datafile} | |
r = requests.post(url, files=files, data=data) | |
print r.json() | |
if r.json()["ok"]: | |
self.shapes[r.json()["result"]["message_id"]]=shape | |
#self.client.sendMessage(chat_id,"hecho") | |
else: | |
dato,created=homebase.get_or_create(user=msg['from']['id'],defaults={'latitud':41.648088, 'longitud':-0.893720}) | |
dato.latitud=msg['location']['latitude'] | |
dato.longitud=msg['location']['longitude'] | |
dato.save() | |
self.client.sendMessage(msg['from']['id'],'posicion actualizada') | |
elif telepot.flavor(msg)=="chat": | |
content_type, chat_type, chat_id = telepot.glance(msg) | |
if content_type=="text": | |
texto=msg['text'].strip().lower() | |
command=texto.split(" ",1)[0].split("@",1)[0] | |
line=texto.split(" ") | |
if command<>"/investiga": | |
try: | |
if command in ["/desde","/pokemention","/hasta","/help","/start","/settings","/status"]: | |
self.client.sendMessage(msg['from']['id'],"Respuesta al comando "+command) | |
else: | |
self.client.sendMessage(msg['from']['id'],"Error, comando desconocido. Consulta /help") | |
except Exception as inst: | |
print "Error", type(inst) | |
self.client.sendMessage(chat_id,'no puedo contestarte.\n Abre primero una conversacion en privado.') | |
try: | |
print inst | |
print inst.description() | |
print inst.args | |
print inst.json() | |
except: | |
print "oh, well" | |
if command=="/status": | |
data="" | |
with open('/home/alejandro/PokemomMap2/donepokes.txt', 'r') as f: | |
data=f.read() | |
self.client.sendMessage(chat_id,data) | |
if command=="/pokemention": | |
if len(line) < 3 or (line[1] != "add" and line[1]!="remove"): | |
self.client.sendMessage(chat_id, '/pokemention {add|remove} Pokemon_id_number(s)') | |
else: | |
for token in line[2:]: | |
try: | |
pokemon=int(token) | |
except: | |
pokemon=999 | |
if pokemon > 151 or pokemon <1: | |
self.client.sendMessage(msg['from']['id'], 'usa numeros entre 1 y 151 separados por espacios') | |
else: | |
if line[1]=="add": | |
favorites.get_or_create(user=msg['from']['id'],pokemon=pokemon) | |
self.client.sendMessage(msg['from']['id'], 'Te mencionaremos si tienes nick y avistamos algun ' + get_pkmn_name(pokemon)) | |
elif line[1]=="remove": | |
favorites.delete().where(favorites.user==msg['from']['id'],favorites.pokemon==pokemon).execute() | |
self.client.sendMessage(msg['from']['id'], get_pkmn_name(pokemon)+' eliminado de tu lista') | |
self.client.sendMessage(msg['from']['id'], 'Recuerda que puedes usar /desde y /hasta para configurar radio de menciones') | |
if command=="/desde": | |
markup = ReplyKeyboardMarkup(keyboard=[ | |
[KeyboardButton(text='Location', request_location=True)], | |
],one_time_keyboard=True) | |
self.client.sendMessage(chat_id, 'La posicion solo se puede especificar en chat privado') | |
self.client.sendMessage(msg['from']['id'], 'me tienes que indicar tu posicion con el boton que sale ahora en el teclado... si consigo sacarlo. Si no, vuelve a enviarme el comando /desde',reply_markup=markup) | |
if command=="/hasta": | |
metros=-1 | |
try: | |
metros=int(line[1]) | |
except: | |
self.client.sendMessage(msg['from']['id'], 'la distancia debe ser un numero entero, en metros') | |
if metros>=0: | |
dato,created=distancia.get_or_create(user=msg['from']['id'],defaults={'metros':5000}) | |
dato.metros=metros | |
dato.save() | |
mensaje= 'distancia cambiada, '+str(metros)+' metros\n' | |
try: | |
pos=homebase.get(user=msg['from']['id']) | |
except: | |
mensaje+= 'Tienes que establecer tu base con el comando /desde en chat privado con el bot' | |
self.client.sendMessage(msg['from']['id'],mensaje) | |
if command=="/start": | |
mensaje=""" | |
Hola. Este bot tiene una funcion que se puede usar en cualquier grupo, /investiga, y otras | |
que son solo para configurar notificaciones en PokeRaresZgz. Haz /help para mas info""" | |
self.client.sendMessage(msg['from']['id'],mensaje) | |
if command=="/help": | |
mensaje=""" | |
/settings lista los settings personales | |
/investiga en un grupo, arranca la busqueda de puntos de spawns | |
/pokemention define los pokemones para los que deseas aparecer en la lista de menciones | |
/desde punto de origen del radio de menciones | |
/hasta distancia del radio de menciones""" | |
self.client.sendMessage(msg['from']['id'],mensaje) | |
if command=="/settings": | |
mensaje="Configuracion de menciones en PokeRaresZgz:" | |
pks=favorites.select().where(favorites.user==msg['from']['id']).execute() | |
for p in pks: | |
mensaje+="\n siguiendo "+str(p.pokemon)+" "+get_pkmn_name(int(p.pokemon)) | |
try: | |
dist=distancia.get(distancia.user==msg['from']['id']) | |
mensaje+="\n distancia hasta "+str(dist.metros)+" metros" | |
except: | |
mensaje+="\n distancia por defecto (5000 metros)" | |
try: | |
latlon=homebase.get(homebase.user==msg['from']['id']) | |
mensaje+="\n base latitud="+str(latlon.latitud)+" longitud="+str(latlon.longitud) | |
except: | |
latlon=False | |
mensaje+="\n base por defecto (Glorieta de los Zagries)" | |
self.client.sendMessage(msg['from']['id'],mensaje) | |
if latlon: | |
self.client.sendLocation(msg['from']['id'],float(latlon.latitud),float(latlon.longitud)) | |
if command=="/investiga": | |
self.client.sendMessage(chat_id, | |
'Contesta aqui dando una posicion', | |
reply_to_message_id=msg['message_id'], | |
reply_markup=ForceReply(selective=True)) | |
#Send Pokemon Info | |
def pokemon_alert(self, pkinfo): | |
print "alert",pkinfo | |
name = get_pkmn_name(pkinfo["id"]) | |
if pkinfo["spawnpoint_id"]<>'0': | |
cursor=db.execute_sql('select 100.0*x/n from sample where point_id=? and pokemon_id=?', | |
[pkinfo["spawnpoint_id"],pkinfo["id"]]) | |
resultcursor=[r for r in cursor] | |
if len(resultcursor) > 0: | |
porcentaje=float(resultcursor[0][0]) | |
if porcentaje > 10.0: | |
print "pokemon de nido",name, pkinfo["spawnpoint_id"] | |
#self.client.sendMessage(self.chat_id,"Bah, "+name+ "en nido.") | |
return | |
title = replace(self.title, pkinfo) | |
if pkinfo["24h_time"]=="02:00:00": | |
body=replace("Avistado #<pkmn> en un radio de 200 metros del ojeador" ,pkinfo) | |
else: | |
body = replace(self.body, pkinfo) | |
args = { | |
'chat_id': self.chat_id, | |
'text': '<b>' + title + '</b> \n' + body, | |
'parse_mode': 'HTML', | |
'disable_web_page_preview': 'False', | |
} | |
try_sending(log, self.connect, "Telegram", self.client.sendMessage, args) | |
concerned= favorites.select(favorites.user,distancia.metros,homebase.latitud,homebase.longitud | |
).join(distancia,JOIN.LEFT_OUTER,on=(distancia.user==favorites.user) | |
).join(homebase,JOIN.LEFT_OUTER,on=(homebase.user==distancia.user) | |
).where((favorites.pokemon==132) | (favorites.pokemon==pkinfo["id"]) | |
).group_by(favorites.user | |
).dicts().execute() | |
concernedWithDefaults=[(float(user['latitud']) if user['latitud'] else 41.648088, | |
float(user['longitud']) if user['longitud'] else -0.893720, | |
int(user['metros']) if user['metros'] else 5000, | |
user['user'] ) for user in concerned] | |
print concernedWithDefaults,pkinfo | |
concernedDist=[(6371008.8 * LatLng.from_degrees(float(pkinfo['lat']),float(pkinfo['lng'])).get_distance(LatLng.from_degrees(t[0],t[1])).radians, | |
t[2],t[3]) for t in concernedWithDefaults] | |
concernedDist.sort() | |
print concernedDist | |
concerned=[(t[2],t[0]) for t in concernedDist if t[0]<t[1]] | |
mensaje="Visto "+name+"! " | |
if len(concerned)>0: | |
primero=True | |
for userid,dist in concerned: | |
result= self.client.getChatMember(self.chat_id,userid) | |
print result | |
if result['status'] in ["creator","administrator","member"]: | |
try: | |
username='@'+result['user']['username'] | |
except: | |
print "falla con ", result['user']['first_name']+ ' @'+str(userid) | |
username="" | |
mensaje+= " "+ username | |
if primero and username<>"" and dist < 500: | |
primero=False | |
self.client.sendMessage(self.chat_id,"A menos de 500m, "+username+"!") | |
else: | |
mensaje+= "Pero nadie lo quiere :-(" | |
self.client.sendMessage(self.chat_id,mensaje) | |
if (self.send_map is True) and (len(concerned)>0): | |
locargs = { | |
'chat_id': self.chat_id, | |
'latitude': pkinfo['lat'], | |
'longitude': pkinfo['lng'], | |
'disable_notification':'True' | |
} | |
try_sending(log, self.connect, "Telegram (loc)", self.client.sendLocation, locargs) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment