Created
February 3, 2015 18:17
-
-
Save MikeRixWolfe/349f71f13d87d8f1e052 to your computer and use it in GitHub Desktop.
andy-bot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
import socket | |
import time | |
import re | |
import glob | |
import traceback | |
import collections | |
sys.path += ['plugins'] # so 'import hook' works without duplication | |
os.chdir(sys.path[0] or '.') # do stuff relative to the install directory | |
irc_prefix_rem = re.compile(r'(.*?) (.*?) (.*)').match | |
irc_noprefix_rem = re.compile(r'()(.*?) (.*)').match | |
irc_netmask_rem = re.compile(r':?([^!@]*)!?([^@]*)@?(.*)').match | |
irc_param_ref = re.compile(r'(?:^|(?<= ))(:.*|[^ ]+)').findall | |
class Bot(object): | |
pass | |
class IRC(object): | |
def __init__(self, server='localhost', port=6667, | |
nick='Andy', channel='#test'): | |
self.channel = channel | |
self.server = server | |
self.port = port | |
self.nick = nick | |
self.socket = socket.socket(socket.AF_INET, socket.TCP_NODELAY) | |
self.connect() | |
self.parse_loop() | |
def connect(self): | |
print 'Connecting...' | |
self.conn = self.socket.connect((self.server, self.port)) | |
print 'Connected!' | |
self.cmd('NICK', [self.nick]) | |
self.cmd('USER', [self.nick, '3', '*', self.nick]) | |
self.cmd('JOIN', [self.channel]) | |
def parse_loop(self): | |
while True: | |
data = self.socket.recv(4096) | |
msg = decode(data) | |
if msg.startswith(':'): # has a prefix | |
prefix, command, params = irc_prefix_rem(msg).groups() | |
else: | |
prefix, command, params = irc_noprefix_rem(msg).groups() | |
nick, user, host = irc_netmask_rem(prefix).groups() | |
paramlist = irc_param_ref(params) | |
lastparam = '' | |
if paramlist: | |
if paramlist[-1].startswith(':'): | |
paramlist[-1] = paramlist[-1][1:] | |
lastparam = paramlist[-1] | |
if command == 'PING': | |
self.cmd('PONG', paramlist) | |
else: | |
print '%s %s <%s> %r' % (time.strftime("%H:%M:%S", | |
time.localtime()), paramlist[0], | |
nick, lastparam) | |
try: | |
handle(self, [msg, prefix, command, params, nick, user, | |
host, paramlist, lastparam]) | |
except Exception: | |
self.msg(paramlist[0], "I've encountered an error " | |
"executing %s" % paramlist[1]) | |
traceback.print_exc() | |
def msg(self, target, text): | |
self.cmd('PRIVMSG', [target, text]) | |
def cmd(self, command, params=None): | |
if params: | |
params[-1] = ':' + params[-1] | |
command = (command + ' ' + ' '.join(params)) | |
print '>>> %r' % unicode(command) | |
self.socket.send(command.encode('utf-8', 'replace') + '\r\n') | |
class Input(dict): | |
def __init__(self, conn, raw, prefix, command, params, | |
nick, user, host, paraml, msg): | |
chan = paraml[0].lower() | |
if chan == conn.nick.lower(): # is a PM | |
chan = nick | |
def say(msg): | |
conn.msg(chan, msg) | |
def reply(msg): | |
if chan == nick: # PMs don't need prefixes | |
conn.msg(chan, msg) | |
else: | |
conn.msg(chan, nick + ': ' + msg) | |
dict.__init__(self, conn=conn, raw=raw, prefix=prefix, command=command, | |
params=params, nick=nick, user=user, host=host, | |
paraml=paraml, msg=msg, server=conn.server, chan=chan, | |
say=say, reply=reply, lastparam=paraml[-1]) | |
# make dict keys accessible as attributes | |
def __getattr__(self, key): | |
return self[key] | |
def __setattr__(self, key, value): | |
self[key] = value | |
def decode(txt): | |
txt = txt.replace('\n', '').replace('\r', '') | |
for codec in ('utf-8', 'iso-8859-1', 'shift_jis', 'cp1252'): | |
try: | |
return txt.decode(codec) | |
except UnicodeDecodeError: | |
continue | |
return txt.decode('utf-8', 'ignore') | |
def reload(): | |
bot.plugs = collections.defaultdict(list) | |
fileset = set(glob.glob(os.path.join('plugins', '*.py'))) | |
for filename in fileset: | |
print filename | |
try: | |
code = compile(open(filename, 'U').read(), filename, 'exec') | |
namespace = {} | |
eval(code, namespace) | |
except Exception: | |
traceback.print_exc() | |
continue | |
for obj in namespace.itervalues(): | |
if hasattr(obj, '_hook'): # check for magic | |
for type, data in obj._hook: | |
bot.plugs[type] += [data] | |
bot.commands = {} | |
for plug in bot.plugs['command']: | |
name = plug[1]['name'].lower() | |
bot.commands[name] = plug | |
print "Command loaded: %s" % name | |
def handle(conn, out): | |
inp = Input(conn, *out) | |
if inp.lastparam[0] == '.': | |
if len(inp.lastparam.split()) != 1: | |
prefix, suffix = inp.lastparam.split(' ', 1) | |
else: | |
prefix, suffix = inp.lastparam, '' | |
command = prefix[1:] | |
if command in bot.commands: | |
input = Input(conn, *out) | |
input.trigger = prefix | |
input.inp_unstripped = suffix | |
input.inp = input.inp_unstripped.strip() | |
func, args = bot.commands[command] | |
run(func, input) | |
def run(func, input): | |
args = func._args | |
if 'inp' not in input: | |
input.inp = input.paraml | |
if args: | |
if 'input' in args: | |
input.input = input | |
if 0 in args: | |
out = func(input.inp, **input) | |
else: | |
kw = dict((key, input[key]) for key in args if key in input) | |
out = func(input.inp, **kw) | |
else: | |
out = func(input.inp) | |
if out is not None: | |
input.reply(unicode(out)) | |
if __name__ == '__main__': | |
bot = Bot() | |
reload() | |
bot.conn = IRC('dot', 6667, 'Andy', '#geekboy') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment