Skip to content

Instantly share code, notes, and snippets.

@wallabra
Last active July 27, 2018 18:35
Show Gist options
  • Save wallabra/9c2268dfc3d51cbb8ef6669497adf61a to your computer and use it in GitHub Desktop.
Save wallabra/9c2268dfc3d51cbb8ef6669497adf61a to your computer and use it in GitHub Desktop.
IRC GAPS (Group-based Automatic Poll System) bot.
import nltk
import logging
import traceback
import textwrap
import yaml
import string
import time
from irc.bot import ServerSpec, SingleServerIRCBot
from threading import Thread
from nltk import bigrams
from nltk.stem import *
from nltk.stem.snowball import SnowballStemmer
stemmer = SnowballStemmer("english")
def stemmed_bigrams(sentence):
return bigrams("".join([" " + stemmer.stem(i) if not i.startswith("'") and i not in string.punctuation else stemmer.stem(i) for i in nltk.word_tokenize(sentence)]).strip())
def similarity(sentence, sentence2):
return len(tuple(set(stemmed_bigrams(sentence)) & set(stemmed_bigrams(sentence2)))) \
/ min(len(tuple(bigrams(sentence))), len(tuple(bigrams(sentence))))
def group(pool, sentence_bias=0.35, group_bias=0.05):
groups = []
negate = set()
for sentence in pool:
if sentence in negate:
continue
g = {sentence}
strength = []
for sentence2 in pool:
if sentence2 in negate:
continue
simi = similarity(sentence, sentence2)
if simi > sentence_bias:
g.add(sentence2)
strength.append(simi)
if g not in [x[1] for x in groups] and simi > group_bias:
groups.append((sum(strength), g)) # no averaging, just summing.
for s in g:
negate.add(s)
return groups
# Constants
DEFAULT_PREFIX = "[="
ONE_PER_USER = True
# Variables
commands = {}
docs = {}
last_chan = {}
last_interface = {}
polls = []
class Poll(object):
def __init__(self, index, bot, channel, name, timeout):
self.index = index
self.channel = channel
self.votes = []
self.bot = bot
self.name = name
self.voted = set()
self.time = time.time()
self.timeout = timeout
if len([x for x in polls if x is not None]) == 1:
self.message("Poll #{1} created! Vote on it with '{0}vote <vote>'. {2:.2f} seconds remaining!".format(bot.prefix, index, timeout))
else:
self.message("Poll #{1} created! Vote on it with '{0}vote {1} <vote>'. {2:.2f} seconds remaining!".format(bot.prefix, index, timeout))
Thread(target=self.timer, args=(timeout,)).start()
def message(self, msg):
self.bot.send_message(self.channel, msg)
def add_vote(self, voter, vote):
if voter in self.voted and ONE_PER_USER:
return False
else:
self.votes.append(vote)
self.voted.add(voter)
return True
def timer(self, timeout):
time.sleep(timeout)
polls[self.index] = None
groups = sorted(group(self.votes), reverse=True)
self.message("Time's up for poll #{} '{}'!".format(self.index, self.name))
if len(groups) == 0:
self.message("The poll bored {} so much that nobody voted!".format(self.channel))
else:
self.message("Winner vote group has strength {:.3f}: {}".format(groups[0][0], "'" + "', '".join(tuple(groups[0][1])[:5])) + "'")
def command(name, doc="There is no documentation for this command."):
def __decorator__(func):
commands[name] = func
docs[name] = doc
return func
return __decorator__
@command('help', doc="Use this command to get information about a command.")
def help(bot, conn, evt, args):
if len(args) < 1:
bot.send_message(evt.target, "{}: Use the help command to get information about a command, for example, those decribed in the vote command.".format(evt.source.nick))
return
command = args[0]
docstr = docs.get(command, "No such command found!")
bot.send_message(evt.target, "{}: {}".format(evt.source.nick, docstr))
@command("poll", doc="Make a poll! The voting classification is automatic, so you don't need to specify the possible answers.")
def cmd_poll(bot, conn, evt, args):
polls.append(Poll(len(polls), bot, evt.target, ' '.join(args[1:]), float(args[0])))
@command("vote", doc="Vote on a poll! Any answer can be given, as long as it is in English.")
def cmd_vote(bot, conn, evt, args):
if len([x for x in polls if x is not None]) == 1:
id = 0
while polls[id] is None:
id += 1
vote = ' '.join(args)
else:
try:
id = int(args[0])
except ValueError:
bot.send_message(evt.target, "Invalid ID value.")
return
vote = ' '.join(args[1:])
if id >= len(polls):
bot.send_message(evt.target, "No poll of ID #{} exists.".format(id))
elif polls[id] is None:
bot.send_message(evt.target, "The poll of ID #{} ended a long time ago...".format(id))
elif vote == "":
bot.send_message(evt.target, "Syntax: {}vote {} <vote>".format(conn.prefix, id))
else:
if polls[id].add_vote(evt.source.nick, vote):
bot.send_message(evt.target, "Vote added with success to poll #{} '{}'! {:.2f} seconds remaining.".format(id, polls[id].name, polls[id].timeout - (time.time() - polls[id].time)))
else:
bot.send_message(evt.target, "You already voted at poll #{} '{}'! {:.2f} seconds remaining.".format(id, polls[id].name, polls[id].timeout - (time.time() - polls[id].time)))
class Pollmeister(SingleServerIRCBot):
def __init__(self, name, nick, realname, server, port, channels, account, prefix):
super().__init__([ServerSpec(server, port)], nick, realname)
self.name = name
self.prefix = prefix
self.joinchans = channels
self.account = account
def send_message(self, channel, msg):
wp = textwrap.wrap(msg, 439 - len(channel))
for i, line in enumerate(wp):
self.connection.privmsg(channel, line)
if i < len(wp) - 1:
time.sleep(0.6)
def on_pubmsg(self, connection, event):
last_chan[event.source.nick] = event.target
last_interface[event.source.nick] = self
if event.arguments[0].startswith(self.prefix):
cmd_full = event.arguments[0][len(self.prefix):]
cmd_name = cmd_full.split(' ')[0]
cmd_args = cmd_full.split(' ')[1:]
if cmd_name in commands:
try:
print("Executing command: " + cmd_name)
commands[cmd_name](self, connection, event, cmd_args)
except Exception as e:
self.send_message(event.target, "[{}: {} processing the '{}' command! ({})]".format(event.source.nick, type(e).__name__, cmd_name, str(e)))
traceback.print_exc()
def on_endofmotd(self, connection, event):
logging.debug("Joining channels...")
if self.account:
self.connection.send_message('NickServ', 'IDENTIFY {} {}'.format(self.account['username'], self.account['password']))
def _joinchan_postwait():
time.sleep(3)
for c in self.joinchans:
self.connection.join(c)
Thread(target=_joinchan_postwait).start()
if __name__ == "__main__":
conns = {}
threads = []
for s in yaml.load(open("irc.yml").read()):
conns[s['name']] = Pollmeister(s['name'],s['nickname'], s['realname'], s['server'], s['port'], s.get('channels', ()), s.get('account', None), s.get('prefix', DEFAULT_PREFIX))
t = Thread(target=conns[s['name']].start, name="Bot: {}".format(s['name']))
threads.append(t)
t.start()
for t in threads:
t.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment