Created
November 23, 2012 08:41
-
-
Save pfote/4134562 to your computer and use it in GitHub Desktop.
my old bot.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
################################################## | |
# little programming exercise using twisted irc | |
# ibot is a infobot which whole purpose is to | |
# autorespond to a defined set of keywords with | |
# a stored phrase | |
# $Id: bot.py,v 1.4 2003/12/14 13:38:29 pfote Exp $ | |
################################################## | |
__version__ = "$Revision: 1.4 $"[11:-2] | |
HAS_BOTADMIN = 'a' | |
import ConfigParser,sys,time,string | |
from twisted.protocols import irc | |
from twisted.internet import reactor, protocol | |
from twisted.persisted.dirdbm import Shelf | |
from botstrings import * | |
from pprint import pformat # for debugging | |
import re | |
import time | |
class IrcAuth: | |
"""base class for authenticating""" | |
def __init__(self,dir='auth'): | |
"""dummy init method""" | |
self.auth = Shelf(dir) | |
if len(self.auth)==0: | |
self.add('pfote','.*infosys.de') | |
def add(self, nick, hostmask=None): | |
"""add something to the credentials list""" | |
# print "addauth() called with %s %s" % (nick,hostmask) | |
self.auth[nick] = hostmask | |
print self.list() | |
def delete(self,nick): | |
del self.auth[nick] | |
def count(self): | |
return len(self.auth) | |
def list(self): | |
"""list all authenticated nicks""" | |
return str(self.auth.keys()) | |
def __repr__(self): | |
list = [] | |
for k in self.auth.keys(): | |
list.append("%s (%s)" % (k,self.auth[k])) | |
return ",".join(list) | |
def verify(self, nick, hostmask): | |
if self.auth.has_key(nick): | |
# its defined | |
print "verify %s with %s " % (nick,hostmask) | |
if (self.auth[nick] is None) or (self.auth[nick] == ''): | |
return(1) | |
else: | |
return re.compile(self.auth[nick]).match(hostmask) | |
return(0) | |
class IrcMode: | |
def __init__(self,dir='modes'): | |
"""dummy init method""" | |
self.mode = Shelf(dir) | |
if len(self.mode) == 0: | |
self.add('pfote',HAS_BOTADMIN) | |
def add(self, nick, mode=None): | |
"""extend the modes of a user""" | |
if self.mode.has_key(nick): | |
print "%s exists in mode hash" % nick | |
if string.find(self.mode[nick],mode) == -1: | |
self.mode[nick] = self.mode[nick] + mode | |
print "adding mode %s to %s" % (mode,nick) | |
else: | |
self.mode[nick] = mode | |
print self.list() | |
def remove(self,nick,mode): | |
if string.find(self.mode[nick],mode) != -1: | |
# nick has mode .. remove it | |
newmode = string.replace(self.mode[nick],mode,'') | |
self.mode[nick] = newmode | |
if newmode == '': | |
del self.mode[nick] | |
def has_mode(self,nick,mode): | |
"see if a nick has a special mode" | |
return string.find(self.mode[nick],mode) != -1 | |
def list(self): | |
"""list all authenticated nicks""" | |
for k in self.mode.keys(): | |
print "%s => %s" % (k,self.mode[k]) | |
class InfoStorage: | |
"""container class for the data the infobot should handle""" | |
def __init__(self,dir='dict'): | |
self.dict = Shelf(dir) | |
def add(self,key,data): | |
"""adds a entry to the dict""" | |
if not self.dict.has_key(key): | |
self.dict[key.strip()] = data.strip() | |
def haskey(self,key): | |
return self.dict.has_key(key.strip()) | |
def get(self,key): | |
return self.dict[key.strip()] | |
def count(self): | |
return len(self.dict) | |
def delete(self,key): | |
if self.dict.has_key(key.strip()): | |
del self.dict[key.strip()] | |
def searchkeys(self, subkey): | |
"""substring search on all stored keys, | |
returns a array with matching keys""" | |
ret = [] | |
keys = self.dict.keys() | |
print "searching for %s in %s" % (subkey," ".join(keys)) | |
for k in keys: | |
if string.find(k,subkey) != -1: | |
ret.append(k) | |
return(ret) | |
def searchdata(self, subkey): | |
"""substring search on all stored key values, | |
returns a array with matching keys""" | |
ret = [] | |
keys = self.dict.keys() | |
print "searching for %s in %s" % (subkey," ".join(keys)) | |
for k in keys: | |
if string.find(self.dict[k],subkey) != -1: | |
ret.append(k) | |
return(ret) | |
class MessageLogger: | |
""" | |
An independant logger class (because separation of application | |
and protocol logic is a good thing). | |
""" | |
def __init__(self, file): | |
self.file = file | |
def log(self, message): | |
timestamp = time.strftime("[%H:%M:%S]", time.localtime(time.time())) | |
self.file.write('%s %s\n' % (timestamp, message)) | |
self.file.flush() | |
def close(self): | |
self.file.close() | |
class BotProtocol(irc.IRCClient): | |
"""Just a IRC bot.""" | |
def __init__(self): | |
self.logger = None | |
self.nickname = "ibot" | |
self.is_joined = 0; | |
self.seen = {} | |
def connectionMade(self): | |
if self.logger is None: | |
self.logger = MessageLogger(open(self.factory.config['logfile'], "a")) | |
self.log = self.logger.log | |
irc.IRCClient.connectionMade(self) | |
if self.nickname != self.factory.config['botname']: | |
self.nickname = self.factory.config['botname'] | |
self.setNick(self.nickname) | |
self.log("[connected at %s]" % time.asctime(time.localtime(time.time()))) | |
def nickservIdent(self, sender, passwd): | |
"""sends IDENT request to NickServ""" | |
self.sendLine(":%s PRIVMSG NickServ :IDENTIFY %s" % (sender,passwd)) | |
self.log(":%s PRIVMSG NickServ :IDENTIFY %s" % (sender,passwd)) | |
def signedOn(self): | |
"""gets called when we signed in .. it joins into the chan and retrieves | |
a list of users""" | |
self.join(self.factory.channel) | |
reactor.callLater(2,self.callUserlist); | |
def callUserlist(self): | |
self.sendLine("WHO #%s" % (self.factory.channel)) | |
def connectionLost(self, reason): | |
if self.logger is None: | |
self.logger = MessageLogger(open(self.factory.config['logfile'], "a")) | |
self.log = self.logger.log | |
irc.IRCClient.connectionLost(self) | |
t = time.asctime(time.localtime(time.time())) | |
self.log("[disconnected at %s] %s" % (t,reason)) | |
self.logger.close() | |
# callbacks for events | |
def kickedFrom(self, channel, kicker, message): | |
"""gets called when the bot itself was kicked from the chan""" | |
self.log("kickedFrom(%s,%s,%s) called" % (channel, kicker, message)) | |
self.is_joined = 0; | |
# try to reconnect | |
self.rejoin(channel) | |
def userKicked(self, kickee, channel, kicker, message): | |
self.log("userKicked(%s,%s,%s,%s) called" % (kickee, channel, kicker, message)) | |
def rejoin(self,channel): | |
self.log("attempt to join the chan %s" % (channel)) | |
#attempt to join the chan | |
self.join(channel) | |
# if it doesnt work, go into a loop | |
if self.is_joined == 0: | |
self.log("attempt to join the chan %s failed, retry in 2s" % (channel)) | |
reactor.callLater(2,self.rejoin,channel) | |
def joined(self, channel): | |
self.log("[I have joined %s]" % channel) | |
self.is_joined = 1; | |
def privmsg(self, user, channel, msg): | |
nick,hostmask = string.split(user, '!', 1) | |
self.log("<%s> %s" % (nick, msg)) | |
privmsg = 0 | |
if channel == self.nickname: | |
# self.say(self.factory.channel,"privmsg() called with %s(%s)/%s/%s" % (user,nick,channel,msg)) | |
privmsg = 1 | |
else: | |
# self.say(self.factory.channel,"chanmsg() called with %s(%s)/%s/%s" % (user,nick,channel,msg)) | |
if not self.seen.has_key(nick): self.seen[nick] = {} | |
self.seen[nick]['time'] = time.time() | |
self.seen[nick]['said'] = msg | |
if msg[0] == self.factory.prefix: | |
# could be a command | |
items = msg.split(" ") | |
message = " ".join(items[1:]) | |
cmnd = "cmd_" + items[0][1:] | |
c = getattr(self, cmnd, None) | |
if c is not None: | |
nick = string.split(user, '!', 1)[0] | |
c(msg = message,user = user, channel = channel, nick=nick, privmesg=privmsg) | |
# see if its a storage request | |
m = re.match(r"^([\w\s]+)=(.*)$", msg) | |
if m and self.factory.auth.verify(nick,user): | |
if not self.factory.info.haskey(m.group(1)): | |
self.factory.info.add(m.group(1),m.group(2)) | |
self.notice(nick,"%s stored" % m.group(1)) | |
else: | |
self.notice(nick,"%s exists, use !del to delete first" % m.group(1)) | |
# see if its a single line with a keyword | |
m = re.match(r"(.*)\?$", msg) | |
if m and self.factory.info.haskey(m.group(1)): | |
# found one ... print it | |
self.say(self.factory.channel,"%s: %s" % (m.group(1),self.factory.info.get(m.group(1)))) | |
# have we been adressed ? | |
# build a match string starting with the bot name | |
ms = r"^%s[\:\,\s](.*)\?$" % self.nickname | |
m = re.match(ms,msg) | |
if m and self.factory.info.haskey(m.group(1)): | |
# found one ... print it | |
self.say(self.factory.channel,"%s: %s" % (m.group(1).strip(),self.factory.info.get(m.group(1)))) | |
def action(self, user, channel, msg): | |
nick = string.split(user, '!', 1)[0] | |
self.log("* %s %s" % (nick, msg)) | |
# self.say(channel,"action() called with %s(%s)/%s/%s" % (user,nick,channel,msg)) | |
# irc callbacks | |
def noticed(self, user, channel, msg): | |
nick = string.split(user, '!', 1)[0] | |
self.log("<%s> %s" % (nick, msg)) | |
if nick == 'NickServ' and self.factory.identifyPending == 1: | |
# could be a ident request | |
if self.factory.nickservpw: | |
self.nickservIdent(self.nickname,self.factory.nickservpw) | |
self.factory.identifyPending = 0 | |
self.log("[IDENTIFY to NICKSERV %s]" % time.asctime(time.localtime(time.time()))) | |
#self.say(channel,"noticed() called with %s(%s)/%s/%s" % (user,nick,channel,msg)) | |
def irc_NICK(self, prefix, params): | |
"""When an IRC user changes their nickname | |
""" | |
old_nick = string.split(prefix,'!')[0] | |
new_nick = params[0] | |
channel = self.factory.channel | |
if channel[0] != "#": channel = "#" + channel | |
# self.say(channel,"irc_NICK() called with %s/%s" % (prefix,"|".join(params))) | |
self.log("%s is now known as %s" % (old_nick, new_nick)) | |
def listCommands(self): | |
channel = self.factory.channel | |
for _cmd in dir(self): | |
if _cmd[:4] == 'cmd_': | |
c = getattr(self,_cmd,None); | |
if c is not None: | |
self.say(channel,"%s: %s" % (_cmd[4:],c.__doc__)); | |
def display_seen(self,nick): | |
channel = self.factory.channel | |
if self.seen.has_key(nick): | |
if self.seen[nick]['said']: | |
self.say(channel,BOT_LASTSEENSAYING % | |
(nick,time.strftime(STRFTIME_FORMAT, time.localtime(self.seen[nick]['time'])),self.seen[nick]['said']) | |
); | |
else: | |
self.say(channel,BOT_LASTSEEN % | |
(nick,time.strftime(STRFTIME_FORMAT, time.localtime(self.seen[nick]['time'])))); | |
else: | |
self.say(channel,"dont know anything about %s" % nick) | |
def userJoined(self,nick, channel): | |
if channel == self.factory.channel: | |
if not self.seen.has_key(nick): self.seen[nick] = {} | |
self.seen[nick]['time'] = time.time() | |
self.seen[nick]['said'] = '' | |
def cmd_addmode(self,**params): | |
"""addmode <nick> <mode> adds a mode to a nick .... needs 'a' mode""" | |
if self.factory.mode.has_mode(params['nick'],'a'): | |
try: | |
target = params['msg'].split()[0] | |
mode = params['msg'].split()[1] | |
except: | |
return 0 | |
self.factory.mode.add(target,mode) | |
self.say(self.factory.channel,"%s has added mode %s to %s" % (params['nick'],mode,target)) | |
def cmd_delmode(self,**params): | |
"""delmode <nick> <mode> removes a mode from a nick""" | |
if self.factory.mode.has_mode(params['nick'],'a'): | |
try: | |
target = params['msg'].split()[0] | |
mode = params['msg'].split()[1] | |
except: | |
return 0 | |
self.factory.mode.remove(target,mode) | |
self.say(self.factory.channel,"%s has removed mode %s from %s" % (params['nick'],mode,target)) | |
def cmd_seen(self,**params): | |
"!seen <nickname> lists last occurrance of that nick" | |
self.display_seen(params['msg']) | |
def cmd_del(self,**params): | |
"""!del <keyword> deletes a key from the info database""" | |
nick = string.split(params['user'], '!', 1)[0] | |
key = params['msg'] | |
if self.factory.auth.verify(nick,params['user']): | |
self.notice(nick,"deleting %s" % key) | |
self.factory.info.delete(key) | |
#def cmd_seenall(self,**params): | |
# "lists all users i have a idea of ..." | |
# channel = self.factory.channel | |
# for s in self.seen.keys(): self.display_seen(s) | |
def cmd_addauth(self,**params): | |
"""<nick> <hostmask> (adding a trusted user)""" | |
nick = string.split(params['user'], '!', 1)[0] | |
user = string.split(params['msg'], ' ', 1)[0] | |
hostmask = string.split(params['msg'], ' ', 1)[1] | |
if self.factory.auth.verify(nick,params['user']): | |
self.factory.auth.add(user,hostmask) | |
def cmd_listauth(self,**params): | |
"""list all known user nicks """ | |
nick = string.split(params['user'], '!', 1)[0] | |
if params['privmesg'] == 1: | |
channel = params['user'] | |
else: | |
channel = self.factory.channel | |
if self.factory.auth.verify(nick,params['user']): | |
self.say(channel,"trusted users are: %s" % self.factory.auth) | |
else: | |
self.say(channel,"you are not authorised to ask this") | |
def cmd_delauth(self,**params): | |
"""deletes one from the auth list .. beware, nicknames are case sensitive""" | |
nick = string.split(params['user'], '!', 1)[0] | |
user = string.split(params['msg'], ' ', 1)[0] | |
if self.factory.auth.verify(nick,params['user']): | |
self.factory.auth.delete(user) | |
def cmd_helpall(self,**params): | |
"""lists all available commands with descriptions""" | |
channel = self.factory.channel | |
for _cmd in dir(self): | |
if _cmd[:4] == 'cmd_': | |
c = getattr(self, 'cmd_' + _cmd[4:], None) | |
if c is not None: | |
if docstrings.has_key(_cmd): | |
self.say(channel,"%s: %s" % (_cmd[4:],docstrings[_cmd])) | |
else: | |
self.say(channel,"%s: %s" % (_cmd[4:],c.__doc__)); | |
def cmd_help(self,**params): | |
"!help gives list of commands, !help <cmd> gives details" | |
channel = self.factory.channel | |
if (len(params) ==0) or len(params['msg']) == 0: | |
cmds = ""; | |
for _cmd in dir(self): | |
if _cmd[:4] == 'cmd_': | |
cmds = cmds + " !" + _cmd[4:] | |
self.say(channel,"commands implemented: %s" % cmds); | |
self.say(channel,BOT_GENERICHELP); | |
elif params['msg']: | |
c = getattr(self, 'cmd_' + params['msg'], None) | |
if c is not None: | |
if docstrings.has_key('cmd_' + params['msg']): | |
self.say(channel,"%s: %s" % (params['msg'],docstrings['cmd_' + params['msg']])) | |
else: | |
self.say(channel,"%s: %s" % (params['msg'],c.__doc__)); | |
def cmd_version(self,**params): | |
"""prints version,copyright info and uptime""" | |
channel = self.factory.channel | |
t = time.strftime(STRFTIME_FORMAT, time.localtime(self.factory.started)) | |
self.say(channel,BOT_VERSION % (__version__,t)); | |
def cmd_search(self,**params): | |
"""does a substring search of all stored keywords""" | |
keys = self.factory.info.searchkeys(params['msg']) | |
print "searchkeys returned %s" % keys | |
if len(keys)==0: | |
self.say(self.factory.channel,BOT_KEYNOTFOUND) | |
else: | |
self.say(self.factory.channel,BOT_KEYSFOUND % ",".join(keys)) | |
def cmd_searchdata(self,**params): | |
"""does a substring search of all stored descriptions""" | |
keys = self.factory.info.searchdata(params['msg']) | |
print "searchkeys returned %s" % keys | |
if len(keys)==0: | |
self.say(self.factory.channel,BOT_KEYNOTFOUND) | |
else: | |
self.say(self.factory.channel,BOT_KEYSCONTAIN % (params['msg'],",".join(keys))) | |
def cmd_die(self,**params): | |
"let the robot die (only auth users)" | |
nick = string.split(params['user'], '!', 1)[0] | |
if self.factory.auth.verify(nick,params['user']): | |
self.quit("bye bye") | |
reactor.stop() | |
def cmd_status(self,**params): | |
"""status report""" | |
t = time.strftime(STRFTIME_FORMAT, time.localtime(self.factory.started)) | |
self.say(self.factory.channel,BOT_STATUSMSG % (t,self.factory.info.count(),self.factory.auth.count())) | |
def irc_RPL_WHOREPLY(self, prefix, params): | |
"""cb to a WHO REPLY """ | |
channel = self.factory.channel | |
requester,chan,ident,userhost,node,nick,flags,dontknow = params | |
if chan[1:] == channel: | |
if not self.seen.has_key(nick): self.seen[nick] = {} | |
self.seen[nick]['time'] = time.time() | |
self.seen[nick]['said'] = '' | |
# self.say(channel,"got a WHOREPLY(prefix %s, params %s)" %(prefix,"|".join(params))); | |
class ChatBot(protocol.ClientFactory): | |
"""main bot object""" | |
protocol = BotProtocol | |
# the class of the protocol to build | |
def __init__(self,config): | |
self.config = config | |
self.channel = self.config['channel'] | |
self.botname = self.config['botname'] | |
if self.config.has_key('auth'): | |
self.auth = IrcAuth(self.config['auth']) | |
else: | |
self.auth = IrcAuth() | |
self.info = InfoStorage() | |
self.started = time.time() | |
if self.config.has_key('nickservpw'): | |
self.nickservpw = self.config['nickservpw'] | |
self.identifyPending = 1 | |
if self.config.has_key('prefix'): | |
self.prefix = self.config['prefix'] | |
else: | |
self.prefix = '!' | |
self.mode = IrcMode() | |
def clientConnectionLost(self, connector, reason): | |
"""If we get disconnected, reconnect to server.""" | |
connector.connect() | |
def clientConnectionFailed(self, connector, reason): | |
print "connection failed:", reason | |
reactor.stop() | |
def printUsage(): | |
print "usage: %s <configfile>" % sys.argv[0] | |
sys.exit(1) | |
def main(): | |
if len(sys.argv) != 2: | |
printUsage() | |
configFile = sys.argv[1] | |
config = ConfigParser.ConfigParser() | |
try: | |
config.readfp(open(configFile)) | |
if config.has_section('IRC'): | |
mainConfig = dict(config.items('IRC')) | |
else: | |
print "incomplete config file %s" % configFile | |
printUsage() | |
except: | |
print "error parsing config file %s" % configFile | |
printUsage() | |
f = ChatBot(mainConfig) | |
reactor.connectTCP(mainConfig['server'], int(mainConfig['serverport']), f) | |
reactor.run() | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment