Skip to content

Instantly share code, notes, and snippets.

@bussiere
Forked from anonymous/Bot KVirc
Created November 5, 2010 22:22
Show Gist options
  • Save bussiere/664969 to your computer and use it in GitHub Desktop.
Save bussiere/664969 to your computer and use it in GitHub Desktop.
###########################################################################
# IRCBot: A simple library for simple IRC bots
# Copyright (c) 2009 Whidou <[email protected]>
###########################################################################
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###########################################################################
import socket, re, threading
from time import sleep, time
class IRCBot:
"""Some freaking IRC bot"""
def __init__(self, network, chan, name, orders={}, init=None, timer=None, end=None, partmessage=""):
self.chan = chan
self.network = network
self.name = name
self.partmessage = partmessage
self.load_orders(orders)
self.load_methods(init, timer, end)
self.lasttime = time()
self.irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.queue = []
self.isup = False
self.stop()
def mainloop(self, freq=1, timer=60):
"""Serve forever"""
if not self.go:
self.go = True
self.isup = True
loop = threading.Thread(target=self.check_loop, args=(freq,))
self.irc.connect((self.network, 6667))
loop.start()
self.sendToServ('NICK %s'%self.name)
self.sendToServ('USER %s _%s __%s :%s'%((self.name,)*4))
self.sendToServ('JOIN %s'%self.chan)
self.init()
threading.Thread(target=self.clock,args=(timer,)).start()
send_thread = threading.Thread(target=self.send)
send_thread.start()
loop.join()
self.stop()
self.end()
self.sendToServ("PART %s :%s"%(self.chan, self.partmessage))
self.isup = False
send_thread.join()
self.irc.shutdown(socket.SHUT_RDWR)
def check_loop(self, freq):
while self.go and (time() - self.lasttime) < 600:
try:
threading.Thread(target=self.check).start()
except:
pass
sleep(freq)
def check(self):
"""Checks new data"""
data = re.split("\r|\n", self.irc.recv(4096))
for i in data:
for j in self.orders:
match = re.search(j, i)
if match:
self.lasttime = time()
self.orders[j](i, match.span())
def sendToChan(self, message):
"""Sends a message on the choosen chan"""
for i in re.split("\r|\n",str(message)):
self.sendToServ("PRIVMSG %s :%s"%(self.chan,i))
def sendToServ(self, line):
"""Sends a message to the server"""
if self.isup: self.queue.append("%s\n"%str(line))
def send(self):
"""Sends the queued messages"""
while self.isup:
while len(self.queue):
self.irc.send(self.queue.pop(0))
sleep(1)
def stop(self, *args):
"""Halt !"""
self.go = False
def clock(self, timer=60):
"""The thread calling the timer"""
t = 0
while self.go:
sleep(1)
t = (t + 1) % int(timer)
if not t:
threading.Thread(target=self.timer).start()
def init(self):
"""A dummy init method"""
pass
def timer(self):
"""A dummy timer method"""
pass
def end(self):
"""A dummy end method"""
pass
def load_orders(self, orders):
"""Use a new set of orders"""
self.orders = orders
def load_methods(self, init=None, timer=None, end=None):
"""Use a new set of special methods"""
if init: self.init = init
if timer: self.timer = timer
if end: self.end = end
if __name__ == "__main__":
class MyBot(IRCBot): # On crée une classe héritée de IRCBot
def __init__(self):
IRCBot.__init__(self, # Que l'on initialise
"irc.nanami.fr", # avec les données désirées
"#COA",
"Altaïr",
{"PING":self.pong, # Un dico de réaction
"JOIN":self.hello, # aux messages reçus
"[Bb]ye":self.stop,
"PRIVMSG #MyPyBot :":self.recv})
self.partmessage = "I'll be back !"
def pong(self, msg, match): # Cette fonction assure la connection au serveur
self.sendToServ('PONG ' + msg.split()[1])
def hello(self, msg, match): # Et celle là est un exemple
name = msg[1:msg.find("!")]
if name != self.name:
self.sendToChan("salut %s"%name)
def init(self):
self.sendToChan("re")
def recv(self, msg, match):
self.sendToChan("Peut être.")
def timer(self):
self.sendToChan("^^")
def end(self):
self.sendToChan("@++")
MyBot().mainloop() # On lâche la bête
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment