Skip to content

Instantly share code, notes, and snippets.

@blam23
Last active December 18, 2015 17:29
Show Gist options
  • Select an option

  • Save blam23/5819159 to your computer and use it in GitHub Desktop.

Select an option

Save blam23/5819159 to your computer and use it in GitHub Desktop.
Toribash <-> IRC bot
# Made by Blam of the Toribash community
# Anyone is free to use, modify and do whatever they want to this code
import socket, threading, re
import string
import datetime
print "Announce Bot 1.0"
# Shamelessly stolen from http://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
def pretty_date(time=False):
"""
Get a datetime object or a int() Epoch timestamp and return a
pretty string like 'an hour ago', 'Yesterday', '3 months ago',
'just now', etc
"""
from datetime import datetime
now = datetime.now()
if type(time) is int:
diff = now - datetime.fromtimestamp(time)
elif isinstance(time,datetime):
diff = now - time
elif not time:
diff = now - now
second_diff = diff.seconds
day_diff = diff.days
if day_diff < 0:
return ''
if day_diff == 0:
if second_diff < 10:
return "just now"
if second_diff < 60:
return str(second_diff) + " seconds ago"
if second_diff < 120:
return "a minute ago"
if second_diff < 3600:
return str( second_diff / 60 ) + " minutes ago"
if second_diff < 7200:
return "an hour ago"
if second_diff < 86400:
return str( second_diff / 3600 ) + " hours ago"
if day_diff == 1:
return "Yesterday"
if day_diff < 7:
return str(day_diff) + " days ago"
if day_diff < 31:
return str(day_diff/7) + " weeks ago"
if day_diff < 365:
return str(day_diff/30) + " months ago"
return str(day_diff/365) + " years ago"
# Shamelessly stolen from: http://stackoverflow.com/questions/5179467/equivalent-of-setinterval-in-python
def setInterval(interval):
def decorator(function):
def wrapper(*args, **kwargs):
stopped = threading.Event()
def loop(): # executed in another thread
while not stopped.wait(interval): # until stopped
function(*args, **kwargs)
t = threading.Thread(target=loop)
t.daemon = True # stop if the program exits
t.start()
return stopped
return wrapper
return decorator
class AnnounceBot(threading.Thread):
def __init__(self, channel, nickname, server, port=6667):
threading.Thread.__init__ (self)
self.channel = channel
self.nickname = nickname
self.server = server
self.port = port
self.irc_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.lastAnnounce = ""
self.lastRoom = ""
self.lastName = ""
self.lastTime = datetime.datetime.now()
self.reconnect = 1
self.running = 0
def run(self):
try:
self.irc_socket.connect((self.server, self.port))
print "[IRC] Connected."
self.running = 1
self.send_line("PASS poodunnit\nNICK "+self.nickname+"\nUSER "+self.nickname+" 0 * :"+self.nickname+"\nJOIN " + self.channel + "\nMODE " + self.nickname + " +B")
self.send_line("PRIVMSG NickServ identify password")
self.loop()
except Exception, e:
print "[IRC] Error", str(e)
def linesplit(self):
buffer = self.irc_socket.recv(4096)
done = False
while not done:
if "\n" in buffer:
(line, buffer) = buffer.split("\n", 1)
yield line
else:
more = self.irc_socket.recv(4096)
if not more:
done = True
running = 0
if(self.reconnect):
self.restart()
else:
buffer = buffer+more
if buffer:
yield buffer
def loop(self):
try:
while self.running:
for line in self.linesplit():
self.parse(line.strip())
except Exception, e:
print "[IRC] Error", str(e)
if(not self.running and self.reconnect):
self.restart()
def parse(self, line):
if line[:4] == "PING":
self.send_line("PONG " + line.split() [ 1 ])
match = re.match( r':(.+)!(.+) PRIVMSG '+self.channel+r' :(.+)', line)
if match:
self.parse_chat(*match.groups())
def parse_chat(self, name, address, chat):
if chat[:5] == "!room":
room = chat[6:]
people = getBouts(room)
if(people[0] != -1):
count = len(people[2])
desc = people[3]
max = people[0]
self.chat("\x02"+room+"\x02 - "+desc+" - Players: \x02"+str(count)+"\x02/"+str(max))
if chat[:5] == "!find":
player = chat[6:]
room = getRoom(player)
player = strip(player)
if room != "":
self.chat("User '\x02" + player + "\x02' found in '\x02" + room + "\x02'")
else:
self.chat("User '\x02" + player + "\x02' is not currently playing")
if chat[:6] == "!bouts":
room = chat[7:]
people = getBouts(room)
if(people[0] != -1):
self.send_line("PRIVMSG " + name + " :Bouts in [\x02" + room + "\x02] ==> " + string.join(people[2], ", "));
elif chat[:5] == "!last":
if self.lastAnnounce == "":
self.chat("\x02\x0314,1No announcements recorded.")
else:
if(self.lastRoom != ""):
people = getBouts(self.lastRoom)
if(people[0] != -1):
count = len(people[2])
max = people[0]
self.chat("Last Announce was " + pretty_date(self.lastTime) + ": [\x02"+self.lastName+"\x02] [\x02"+str(count)+"\x02/"+str(max)+"] " + self.lastAnnounce)
return
self.chat("Last Announce was " + pretty_date(self.lastTime) + ": [\x02"+self.lastName+"\x02] " + self.lastAnnounce)
def chat(self, msg):
self.send_line("PRIVMSG " + self.channel + " " + msg);
def send_line(self, data):
if(self.running):
self.irc_socket.send(data + "\n");
def stop(self):
self.running = 0
self.reconnect = 0
self.irc_socket.close()
def restart(self):
try:
print "[IRC] Restarting.."
self.irc_socket.close()
self.irc_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.run()
except Exception, e:
print "[IRC] Error while reconnecting ", str(e)
class ToriBot(threading.Thread):
def __init__(self, IRCbot, server, port):
threading.Thread.__init__ (self)
self.server = server
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.bot = IRCbot
self.reconnect = 1
def run(self):
try:
self.socket.connect((self.server, self.port))
self.running = 1
print "[TB] Connected."
self.send_line("TORIBASH 30")
self.send_line("PING")
self.stopPing = self.ping()
self.loop()
except Exception, e:
print "[TB] Error", str(e)
def linesplit(self):
buffer = self.socket.recv(4096)
done = False
while not done:
if "\n" in buffer:
(line, buffer) = buffer.split("\n", 1)
yield line
else:
more = self.socket.recv(4096)
if not more:
done = True
running = 0
if(self.reconnect):
self.restart()
else:
buffer = buffer+more
if buffer:
yield buffer
def loop(self):
try:
while self.running:
for line in self.linesplit():
self.parse(removeColors(line))
except Exception, e:
print "[TB] Error", str(e)
if(not self.running and self.reconnect):
self.restart()
@setInterval(30)
def ping(self):
self.send_line("PING")
def parse(self, line):
if line[:16] == "SAY 0; [global] ":
self.bot.lastAnnounce = line[16:]
self.bot.lastTime = datetime.datetime.now()
count = -1
match = re.search("\/joi?n?\s(\w+)", str.lower(self.bot.lastAnnounce))
if(not match):
match = re.search("\/?joi?n?\s(\w+)", str.lower(self.bot.lastAnnounce))
if(match):
self.bot.lastRoom = match.groups()[0]
people = getBouts(self.bot.lastRoom)
if(people[0] != -1):
count = len(people[2])
max = people[0]
else:
self.bot.lastRoom = ""
self.bot.lastName = getName()
if(count == -1):
self.bot.chat("[\x02"+self.bot.lastName+"\x02] \x02"+ line[16:] + "\x02")
else:
self.bot.chat("[\x02"+self.bot.lastName+"\x02] [\x02"+str(count)+"\x02/"+str(max)+"] \x02"+ line[16:] + "\x02")
def chat(self, msg):
send_line("SAY " + msg);
def send_line(self, data):
if self.running:
self.socket.send(data + "\n");
def stop(self):
self.running = 0
self.reconnect = 0
self.stopPing.set()
self.socket.close()
def restart(self):
try:
print "[TB] Restarting.."
self.socket.close()
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.run()
except Exception, e:
print "[TB] Error while reconnecting ", str(e)
def removeColors(line):
from cStringIO import StringIO
nline = StringIO()
characters = enumerate(line)
for i, c in characters:
if c == '^' and i < line.__len__()-2:
characters.next()
characters.next()
else:
nline.write(c)
return nline.getvalue()
def getBouts(name):
bsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
bsocket.connect(("176.9.64.22", 22000))
buffer = bsocket.recv(4096)
done = False
inserver = False
ip = ""
lastCount = 0
clients = None
while not done:
if "\n" in buffer:
(line, buffer) = buffer.split("\n", 1)
if(line[:4] == "INFO"):
s = line.find(" ",9)
lastCount = int(line[8:s])
if(line[:10] == "SERVER 0; "):
s = line.find(" ", 12)
room = line[s:].strip()
ip = line[10:s]
if(room == name):
inserver = True
if(inserver and line[:8] == "DESC 0; "):
desc = removeColors(line[8:])
return (lastCount, ip, clients, desc)
if(inserver and line[:11] == "CLIENTS 2; "):
clients = line[11:].split("\t")
else:
more = bsocket.recv(4096)
if not more:
return (-1, None, None)
else:
buffer = buffer+more
def strip(bout):
clanEnd = bout.find("]") + 1
return bout[clanEnd:]
def stripAndLower(bout):
return strip(bout).lower()
def getRoom(bout):
bout = stripAndLower(bout)
bsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
bsocket.connect(("176.9.64.22", 22000))
buffer = bsocket.recv(4096)
done = False
lastRoom = ""
while not done:
if "\n" in buffer:
(line, buffer) = buffer.split("\n", 1)
if(line[:10] == "SERVER 0; "):
s = line.find(" ", 12)
lastRoom = line[s:].strip()
if(line[:11] == "CLIENTS 2; "):
clients = line[11:].split("\t")
if bout in [stripAndLower(client) for client in clients]:
return lastRoom
else:
more = bsocket.recv(4096)
if not more:
return ""
else:
buffer = buffer+more
def getName():
htmlsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
htmlsocket.connect(("forum.toribash.com", 80))
htmlsocket.send("GET /tori_broadcast.php?format=json HTTP/1.1\r\nHost: forum.toribash.com\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0\r\n\r\n")
buffer = htmlsocket.recv(4096)
while True:
more = htmlsocket.recv(4096)
if not more:
match = re.search(r"[,{]\"username\":\"(\w+)\"[,}]", buffer)
if match:
return match.groups()[0]
return ""
else:
buffer = buffer + more
def main():
bot = AnnounceBot("#announce", "AnnounceBot", "irc.toribash.com")
bot.start()
toribot = ToriBot(bot, "176.9.64.22", 20200)
toribot.start()
inp = raw_input()
while inp != "":
if inp[:7] == "global ":
toribot.parse("SAY 0; [global] " + inp[7:])
elif inp == "restart IRC":
bot.send_line("QUIT :Bot is restarting")
elif inp == "restart TB":
toribot.send_line("QUIT")
elif inp == "stop":
bot.reconnect = False
toribot.reconnect = False
bot.send_line("QUIT :Bot is exiting. Goodbye!")
toribot.send_line("QUIT")
inp = raw_input()
toribot.stop()
bot.stop()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment