Created
May 12, 2015 10:47
-
-
Save silverweed/ef4c075e8aa3f93aa461 to your computer and use it in GitHub Desktop.
Minimalistic pokepon client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# simple Pokepon client | |
# by silverweed | |
import telnetlib | |
import getpass | |
import hashlib | |
import sys | |
import signal | |
from threading import Thread | |
import re | |
err = sys.stderr.write | |
termnone = "\033[0m" | |
termcolors = { | |
'red': "\033[1;31m", | |
'blue': "\033[1;34m", | |
'purple': "\033[1;35m", | |
'green': "\033[1;32m", | |
'yellow': "\033[1;33m", | |
'cyan': "\033[1;36m", | |
'white': "\033[1;37m", | |
'gray': "\033[1;30m" | |
} | |
termbold = '\033[1m' | |
termdim = '\033[2m' | |
termemph = '\033[4m' | |
def log(msg): | |
sys.stderr.write('[PyClient::log] ' + str(msg) + "\n") | |
def stripHTML(msg): | |
buf = [] | |
in_tag = False | |
for c in msg: | |
if c == '>': | |
if in_tag: | |
in_tag = False | |
else: | |
buf.append(c) | |
elif c == '<': | |
if not in_tag: | |
in_tag = True | |
else: | |
if not in_tag: | |
buf.append(c) | |
return ''.join(buf) | |
def convertHTML(msg): | |
re_color = re.compile(r'<font color="(?P<color>[a-z]+)">(?P<content>.*?)</font>') | |
re_bold = re.compile(r'<b>(.*?)</b>') | |
re_italic = re.compile(r'<(?:i|em)>(.*?)</(?:i|em)>') | |
re_br = re.compile('<br/?>') | |
re_a = re.compile("""<a .*href=["'](?P<url>.*)["']>(?P<name>.*?)</a>""") | |
regexes = { re_color: 'color', re_bold: 'bold', re_italic: 'italic', re_br: 'br', re_a: 'a' } | |
def process(retype, match): | |
if retype == 'color': | |
return termcolors[match.group('color')] + match.group('content') + termnone | |
elif retype == 'bold': | |
return termbold + match.group(1) + termnone | |
elif retype == 'italic': | |
return termemph + match.group(1) + termnone | |
elif retype == 'br': | |
return '\n' | |
elif retype == 'a': | |
return termdim + '[' + match.group('name') + '](' + termnone + \ | |
termemph + match.group('url') + termdim + ')' + termnone | |
matched = True | |
while matched: | |
matched = False | |
for regex in regexes: | |
match = regex.search(msg) | |
if match != None: | |
matched = True | |
msg = re.sub(regex, process(regexes[regex], match), msg, count=1) | |
return msg | |
class Listener(Thread): | |
def __init__(self, client): | |
Thread.__init__(self) | |
self.client = client | |
def run(self): | |
while True: | |
msg = self.client.connection.read_until(b"\n").decode('utf-8') | |
for ex in reversed(self.client.executors): | |
ret = ex.execute(msg) | |
if ret == 0: | |
pass | |
elif ret == 1: | |
break | |
elif ret == 2: | |
self.client.connection.close() | |
return | |
class CommunicationExecutor: | |
def __init__(self, client): | |
self.client = client | |
def execute(self, msg): | |
if msg[0] != Client.CMN_PREFIX: | |
return 0 | |
cmd, *args = msg[1:].split() | |
if verbose: | |
log("cmd: "+cmd+", args: "+''.join(args)) | |
if cmd == 'ping': | |
self.client.cmn('pong') | |
elif cmd == 'youros': | |
self.client.cmn('myos PythonClient 0.1') | |
elif cmd == 'ok': | |
log('Handshake completed.') | |
elif cmd == 'drop': | |
log(*args) | |
log('Server dropped connection') | |
return 2 | |
elif cmd == 'motd': | |
print('<<<MOTD\n' + convertHTML(' '.join(args)) + '\n<<<END OF MOTD') | |
elif cmd == 'html' or cmd == 'htmlconv': | |
print(convertHTML(' '.join(args))) | |
elif cmd == 'givepasswd': | |
pwd = getpass.getpass("Enter password > ") | |
self.client.cmn('passwd ' + self.client.hashpwd(pwd)) | |
elif cmd == 'disconnect': | |
self.client.disconnect() | |
return 2 | |
return 1 | |
class DefaultExecutor: | |
def __init__(self, client): | |
self.client = client | |
def execute(self, msg): | |
sys.stdout.write(convertHTML(msg)) | |
return 1 | |
class Client: | |
CMN_PREFIX = '!' | |
CMD_PREFIX = '/' | |
def __init__(self, ip, port): | |
self.ip = ip | |
self.port = port | |
def connect(self): | |
try: | |
self.executors = [DefaultExecutor(self)] | |
self.executors.append(CommunicationExecutor(self)) | |
self.listener = Listener(self) | |
self.connection = telnetlib.Telnet(self.ip, self.port) | |
self.listener.start() | |
log('Started listener') | |
self.run() | |
except Exception as e: | |
err(str(e)) | |
def disconnect(self): | |
self.cmd('disconnect') | |
try: | |
self.connection.close() | |
except Exception: | |
pass | |
log('Disconnected.') | |
def run(self): | |
for line in sys.stdin: | |
self.process(line) | |
def process(self, line): | |
self.send_raw(line) | |
def send_raw(self, line): | |
self.connection.write((line + "\n").encode('utf-8')) | |
def cmd(self, msg): | |
self.connection.write((Client.CMD_PREFIX + msg + "\n").encode('utf-8')) | |
def cmn(self, msg): | |
self.connection.write((Client.CMN_PREFIX + msg + "\n").encode('utf-8')) | |
def hashpwd(self, pwd): | |
algo = hashlib.sha1() | |
algo.update(pwd.encode('utf-8')) | |
hashed = algo.hexdigest() | |
log("hashed: "+str(hashed)) | |
return str(map( | |
lambda c: 'x' if c == '\n' or c == '\f' or c == '\r' else c, | |
str(hashed))) | |
verbose = False | |
if __name__ == '__main__': | |
from argparse import ArgumentParser | |
parser = ArgumentParser() | |
parser.add_argument('-p', '--port', help='port to connect to', default=12344) | |
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', default=False) | |
parser.add_argument('ip', help='The IP to connect to', default='pokepon.center') | |
opts = parser.parse_args() | |
verbose = opts.verbose | |
log('Creating client') | |
client = Client(opts.ip, opts.port) | |
def sigint_handler(client): | |
def handler(signum, frame): | |
if client != None: | |
client.disconnect() | |
sys.exit(0) | |
return handler | |
signal.signal(signal.SIGINT, sigint_handler(client)) | |
client.connect() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment