Last active
July 27, 2018 18:35
-
-
Save wallabra/9c2268dfc3d51cbb8ef6669497adf61a to your computer and use it in GitHub Desktop.
IRC GAPS (Group-based Automatic Poll System) bot.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import nltk | |
import logging | |
import traceback | |
import textwrap | |
import yaml | |
import string | |
import time | |
from irc.bot import ServerSpec, SingleServerIRCBot | |
from threading import Thread | |
from nltk import bigrams | |
from nltk.stem import * | |
from nltk.stem.snowball import SnowballStemmer | |
stemmer = SnowballStemmer("english") | |
def stemmed_bigrams(sentence): | |
return bigrams("".join([" " + stemmer.stem(i) if not i.startswith("'") and i not in string.punctuation else stemmer.stem(i) for i in nltk.word_tokenize(sentence)]).strip()) | |
def similarity(sentence, sentence2): | |
return len(tuple(set(stemmed_bigrams(sentence)) & set(stemmed_bigrams(sentence2)))) \ | |
/ min(len(tuple(bigrams(sentence))), len(tuple(bigrams(sentence)))) | |
def group(pool, sentence_bias=0.35, group_bias=0.05): | |
groups = [] | |
negate = set() | |
for sentence in pool: | |
if sentence in negate: | |
continue | |
g = {sentence} | |
strength = [] | |
for sentence2 in pool: | |
if sentence2 in negate: | |
continue | |
simi = similarity(sentence, sentence2) | |
if simi > sentence_bias: | |
g.add(sentence2) | |
strength.append(simi) | |
if g not in [x[1] for x in groups] and simi > group_bias: | |
groups.append((sum(strength), g)) # no averaging, just summing. | |
for s in g: | |
negate.add(s) | |
return groups | |
# Constants | |
DEFAULT_PREFIX = "[=" | |
ONE_PER_USER = True | |
# Variables | |
commands = {} | |
docs = {} | |
last_chan = {} | |
last_interface = {} | |
polls = [] | |
class Poll(object): | |
def __init__(self, index, bot, channel, name, timeout): | |
self.index = index | |
self.channel = channel | |
self.votes = [] | |
self.bot = bot | |
self.name = name | |
self.voted = set() | |
self.time = time.time() | |
self.timeout = timeout | |
if len([x for x in polls if x is not None]) == 1: | |
self.message("Poll #{1} created! Vote on it with '{0}vote <vote>'. {2:.2f} seconds remaining!".format(bot.prefix, index, timeout)) | |
else: | |
self.message("Poll #{1} created! Vote on it with '{0}vote {1} <vote>'. {2:.2f} seconds remaining!".format(bot.prefix, index, timeout)) | |
Thread(target=self.timer, args=(timeout,)).start() | |
def message(self, msg): | |
self.bot.send_message(self.channel, msg) | |
def add_vote(self, voter, vote): | |
if voter in self.voted and ONE_PER_USER: | |
return False | |
else: | |
self.votes.append(vote) | |
self.voted.add(voter) | |
return True | |
def timer(self, timeout): | |
time.sleep(timeout) | |
polls[self.index] = None | |
groups = sorted(group(self.votes), reverse=True) | |
self.message("Time's up for poll #{} '{}'!".format(self.index, self.name)) | |
if len(groups) == 0: | |
self.message("The poll bored {} so much that nobody voted!".format(self.channel)) | |
else: | |
self.message("Winner vote group has strength {:.3f}: {}".format(groups[0][0], "'" + "', '".join(tuple(groups[0][1])[:5])) + "'") | |
def command(name, doc="There is no documentation for this command."): | |
def __decorator__(func): | |
commands[name] = func | |
docs[name] = doc | |
return func | |
return __decorator__ | |
@command('help', doc="Use this command to get information about a command.") | |
def help(bot, conn, evt, args): | |
if len(args) < 1: | |
bot.send_message(evt.target, "{}: Use the help command to get information about a command, for example, those decribed in the vote command.".format(evt.source.nick)) | |
return | |
command = args[0] | |
docstr = docs.get(command, "No such command found!") | |
bot.send_message(evt.target, "{}: {}".format(evt.source.nick, docstr)) | |
@command("poll", doc="Make a poll! The voting classification is automatic, so you don't need to specify the possible answers.") | |
def cmd_poll(bot, conn, evt, args): | |
polls.append(Poll(len(polls), bot, evt.target, ' '.join(args[1:]), float(args[0]))) | |
@command("vote", doc="Vote on a poll! Any answer can be given, as long as it is in English.") | |
def cmd_vote(bot, conn, evt, args): | |
if len([x for x in polls if x is not None]) == 1: | |
id = 0 | |
while polls[id] is None: | |
id += 1 | |
vote = ' '.join(args) | |
else: | |
try: | |
id = int(args[0]) | |
except ValueError: | |
bot.send_message(evt.target, "Invalid ID value.") | |
return | |
vote = ' '.join(args[1:]) | |
if id >= len(polls): | |
bot.send_message(evt.target, "No poll of ID #{} exists.".format(id)) | |
elif polls[id] is None: | |
bot.send_message(evt.target, "The poll of ID #{} ended a long time ago...".format(id)) | |
elif vote == "": | |
bot.send_message(evt.target, "Syntax: {}vote {} <vote>".format(conn.prefix, id)) | |
else: | |
if polls[id].add_vote(evt.source.nick, vote): | |
bot.send_message(evt.target, "Vote added with success to poll #{} '{}'! {:.2f} seconds remaining.".format(id, polls[id].name, polls[id].timeout - (time.time() - polls[id].time))) | |
else: | |
bot.send_message(evt.target, "You already voted at poll #{} '{}'! {:.2f} seconds remaining.".format(id, polls[id].name, polls[id].timeout - (time.time() - polls[id].time))) | |
class Pollmeister(SingleServerIRCBot): | |
def __init__(self, name, nick, realname, server, port, channels, account, prefix): | |
super().__init__([ServerSpec(server, port)], nick, realname) | |
self.name = name | |
self.prefix = prefix | |
self.joinchans = channels | |
self.account = account | |
def send_message(self, channel, msg): | |
wp = textwrap.wrap(msg, 439 - len(channel)) | |
for i, line in enumerate(wp): | |
self.connection.privmsg(channel, line) | |
if i < len(wp) - 1: | |
time.sleep(0.6) | |
def on_pubmsg(self, connection, event): | |
last_chan[event.source.nick] = event.target | |
last_interface[event.source.nick] = self | |
if event.arguments[0].startswith(self.prefix): | |
cmd_full = event.arguments[0][len(self.prefix):] | |
cmd_name = cmd_full.split(' ')[0] | |
cmd_args = cmd_full.split(' ')[1:] | |
if cmd_name in commands: | |
try: | |
print("Executing command: " + cmd_name) | |
commands[cmd_name](self, connection, event, cmd_args) | |
except Exception as e: | |
self.send_message(event.target, "[{}: {} processing the '{}' command! ({})]".format(event.source.nick, type(e).__name__, cmd_name, str(e))) | |
traceback.print_exc() | |
def on_endofmotd(self, connection, event): | |
logging.debug("Joining channels...") | |
if self.account: | |
self.connection.send_message('NickServ', 'IDENTIFY {} {}'.format(self.account['username'], self.account['password'])) | |
def _joinchan_postwait(): | |
time.sleep(3) | |
for c in self.joinchans: | |
self.connection.join(c) | |
Thread(target=_joinchan_postwait).start() | |
if __name__ == "__main__": | |
conns = {} | |
threads = [] | |
for s in yaml.load(open("irc.yml").read()): | |
conns[s['name']] = Pollmeister(s['name'],s['nickname'], s['realname'], s['server'], s['port'], s.get('channels', ()), s.get('account', None), s.get('prefix', DEFAULT_PREFIX)) | |
t = Thread(target=conns[s['name']].start, name="Bot: {}".format(s['name'])) | |
threads.append(t) | |
t.start() | |
for t in threads: | |
t.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment