Created
February 21, 2014 04:57
-
-
Save jangler/9129022 to your computer and use it in GitHub Desktop.
anagram game IRC bot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Anagram game IRC bot. | |
Requires http://tools.suckless.org/sic. | |
Public commands: | |
- !start | |
- !score [NICK] | |
- !totalscore [NICK] | |
Private commands: | |
- !stop PASSWORD | |
- !quit PASSWORD | |
""" | |
from os import fork | |
from pickle import load, dump | |
from re import match | |
from random import choice, randint | |
from subprocess import Popen, PIPE | |
from threading import Thread | |
from time import sleep | |
NICK = 'ANAGRAP' | |
CHANNEL = '#anagrap' | |
NETWORK = 'irc.esper.net' | |
PORT = 6667 | |
CMD = 'sic', '-h', NETWORK, '-p', str(PORT), '-n', NICK | |
PASSWORD = hex(randint(0x10000000, 0xffffffff))[2:] | |
WORD_FILE = '/usr/share/dict/words' | |
SCORE_FILE = 'total_scores.p' | |
MIN_LENGTH, MAX_LENGTH = 10, 20 | |
ENTRY_TIMER, VOTE_TIMER = 120, 20 | |
INTERMISSION_SHORT, INTERMISSION_LONG = 10, 60 | |
game_running = False | |
period = None | |
phrase = None | |
entries = dict() | |
votes = dict() | |
scores = dict() | |
total_scores = dict() | |
round_number = 0 | |
def new_phrase(): | |
global phrase | |
phrase = '' | |
while len(phrase) < MIN_LENGTH: | |
word = choice(WORD_TUPLE) + ' ' | |
if len(phrase) + len(word) <= MAX_LENGTH: | |
phrase += word | |
phrase = phrase.strip() | |
def send(sic, msg, recipient=CHANNEL): | |
"""Send a message through the bot.""" | |
sic.stdin.write((':m %s %s\n' % (recipient, msg)).encode()) | |
sic.stdin.flush() | |
sleep(1) | |
with open(WORD_FILE, 'r') as f: | |
WORD_TUPLE = f.read().split() | |
WORD_SET = set(WORD_TUPLE) | |
try: | |
with open(SCORE_FILE, 'rb') as f: | |
total_scores = load(f) | |
except Exception as e: | |
print(e) | |
def save_scores(): | |
with open(SCORE_FILE, 'wb') as f: | |
dump(total_scores, f) | |
def print_scores(sic, score_dict): | |
string = '' | |
for nick, score in sorted(score_dict.items(), key=lambda x: x[1], | |
reverse=True): | |
string += '%s: %d, ' % (nick, score) | |
send(sic, '--- %s' % string[:-2]) | |
def entry_period(sic): | |
global entries, period | |
entries = dict() | |
period = entry_period | |
timer = ENTRY_TIMER | |
if game_running: | |
new_phrase() | |
send(sic, '--- Phrase: "%s". PM anagrams to the bot.' % phrase) | |
while game_running and timer > 0: | |
sleep(1) | |
timer -= 1 | |
if timer in (10, 30, 60): | |
send(sic, '--- %s seconds left for entries!' % timer) | |
def vote_period(sic): | |
global period, game_running, votes, entries | |
period = vote_period | |
timer = VOTE_TIMER | |
votes = dict() | |
if game_running: | |
if entries: | |
string = '--- Entries: ' | |
entries = tuple(enumerate(entries.items())) | |
for i, entry in entries: | |
string += '[%d] "%s", ' % (i, entry[1]) | |
string = string[:-2] | |
string += '. PM votes to the bot.' | |
send(sic, string) | |
else: | |
send(sic, '--- No entries.') | |
game_running = False | |
while game_running and timer > 0: | |
sleep(1) | |
timer -= 1 | |
if timer in (10, 30, 60): | |
send(sic, '--- %s seconds left for votes!' % timer) | |
if game_running: | |
if votes.values(): | |
counts = [0] * len(entries) | |
for _, index in votes.items(): | |
counts[index] += 1 | |
string = '--- Votes are in! ' | |
for i, v in enumerate(counts): | |
if v > 0: | |
nick, entry = entries[i][1][:2] | |
string += '%s \"%s\" +%d, ' % (nick, entry, v) | |
if nick not in scores: | |
scores[nick] = 0 | |
total_scores[nick] = 0 | |
scores[nick] += v | |
total_scores[nick] += v | |
send(sic, string[:-2]) | |
save_scores() | |
print_scores(sic, scores) | |
else: | |
send(sic, '--- No votes.') | |
game_running = False | |
def intermission(sic): | |
global period, round_number | |
period = intermission | |
round_number += 1 | |
if round_number % 5 == 0: | |
timer = INTERMISSION_LONG | |
else: | |
timer = INTERMISSION_SHORT | |
if game_running: | |
send(sic, '--- %d-second intermission.' % timer) | |
while game_running and timer > 0: | |
sleep(1) | |
timer -= 1 | |
if timer in (10, 30, 60): | |
send(sic, '--- %s seconds left for intermission!' % timer) | |
def start_game(sic): | |
"""Start a game of anagrap.""" | |
global game_running | |
send(sic, '--- Game started!') | |
game_running = True | |
while game_running: | |
entry_period(sic) | |
vote_period(sic) | |
intermission(sic) | |
send(sic, '--- Game stopped.') | |
def join_channel(sic): | |
"""Join the channel once connected. Returns success value.""" | |
while True: | |
try: | |
line = sic.stdout.readline().decode('utf8') | |
except UnicodeDecodeError: | |
continue | |
if not line: | |
break | |
if ' >< MODE (' in line: | |
sic.stdin.write((':j %s\n' % CHANNEL).encode()) | |
sic.stdin.flush() | |
return True | |
return False | |
def loop_input(sic): | |
"""Loop over and handle input from IRC.""" | |
while True: | |
try: | |
line = sic.stdout.readline().decode('utf8') | |
except UnicodeDecodeError: | |
continue | |
if not line: | |
break | |
mo = match('(\\S+).+<(.+?)> (.+)', line) | |
if mo: | |
chan, nick, msg = mo.group(1), mo.group(2), mo.group(3) | |
if chan == NICK: | |
if not handle_pm(sic, nick, msg): | |
break | |
else: | |
handle_chanmsg(sic, nick, msg) | |
def submit(sic, nick, entry): | |
phrase_chars = [x for x in phrase.lower() if x.isalnum()] | |
for char in [x for x in entry.lower() if x.isalnum()]: | |
if char in phrase_chars: | |
phrase_chars.remove(char) | |
else: | |
send(sic, 'Invalid anagram.', nick) | |
return | |
if phrase_chars: | |
send(sic, 'Invalid anagram.', nick) | |
return | |
if nick in entries: | |
send(sic, 'Submission changed.', nick) | |
else: | |
send(sic, 'Submission accepted.', nick) | |
entries[nick] = entry | |
def vote(sic, nick, string): | |
try: | |
index = int(string) | |
except ValueError: | |
send(sic, 'Invalid vote.', nick) | |
return | |
if index >= 0 and index < len(entries): | |
if entries[index][1][0] == nick: | |
send(sic, 'Can\'t vote for yourself!', nick) | |
elif nick in votes: | |
send(sic, 'Vote changed.', nick) | |
else: | |
send(sic, 'Vote accepted.', nick) | |
votes[nick] = index | |
else: | |
send(sic, 'Vote out of range!', nick) | |
def handle_chanmsg(sic, nick, msg): | |
tokens = msg.split() | |
if len(tokens) >= 1: | |
if tokens[0].lower() == '!start': | |
if not game_running: | |
Thread(target=start_game, args=[sic]).start() | |
return True | |
elif tokens[0].lower() == '!score': | |
if len(tokens) == 1: | |
print_scores(sic, scores) | |
elif len(tokens) == 2: | |
if tokens[1] in scores: | |
send(sic, '%s\'s score: %d' % (tokens[1], scores[nick])) | |
else: | |
send(sic, '%s\'s score: %d' % (tokens[1], 0)) | |
else: | |
send(sic, 'usage: !score [NICK]') | |
elif tokens[0].lower() == '!totalscore': | |
if len(tokens) == 1: | |
print_scores(sic, total_scores) | |
elif len(tokens) == 2: | |
if tokens[1] in total_scores: | |
send(sic, '%s\'s score: %d' % (tokens[1], total_scores[nick])) | |
else: | |
send(sic, '%s\'s score: %d' % (tokens[1], 0)) | |
else: | |
send(sic, 'usage: !totalscore [NICK]') | |
def handle_pm(sic, nick, msg): | |
"""Process a private message command.""" | |
global game_running, scores | |
tokens = msg.split() | |
if len(tokens) >= 1: | |
if tokens[0].lower() == '!quit': | |
if len(tokens) == 2: | |
if tokens[1] == PASSWORD: | |
return False | |
else: | |
send(sic, 'Incorrect password.', nick) | |
else: | |
send(sic, 'usage: !quit PASSWORD', nick) | |
elif tokens[0].lower() == '!stop': | |
if len(tokens) == 2: | |
if tokens[1] == PASSWORD: | |
game_running = False | |
scores = dict() | |
else: | |
send(sic, 'Incorrect password.', nick) | |
else: | |
send(sic, 'usage: !stop PASSWORD', nick) | |
elif period == entry_period: | |
submit(sic, nick, ' '.join(tokens)) | |
elif period == vote_period: | |
vote(sic, nick, tokens[0]) | |
return True | |
def run(): | |
"""Run the IRC bot.""" | |
print('password:', PASSWORD) | |
with Popen(CMD, stdin=PIPE, stdout=PIPE) as sic: | |
if not join_channel(sic): | |
return | |
loop_input(sic) | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment