Skip to content

Instantly share code, notes, and snippets.

@night-crawler
Created August 12, 2013 18:26
Show Gist options
  • Select an option

  • Save night-crawler/6213578 to your computer and use it in GitHub Desktop.

Select an option

Save night-crawler/6213578 to your computer and use it in GitHub Desktop.
A simple telnet-websocket proxy on python using Tornado. Tested with "Sphere of Worlds MUD". Replaces IAC and GA.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
from tornado.web import RequestHandler, Application
from tornado.websocket import WebSocketHandler
from tornado.ioloop import IOLoop
import socket
import threading
import Queue
import select
import time
class AsyncSocket(threading.Thread):
def __init__(self, host, port, read_callback, disconnect_callback=None):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.bufsize = 512
self.socket = None
self.alive = threading.Event()
self.disconnect_callback = disconnect_callback
self.read_callback = read_callback
self.write_queue = Queue.Queue()
def join(self, timeout=None):
self.socket.close()
self.alive.clear()
threading.Thread.join(self, timeout)
def connect(self, host=None, port=None):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((host or self.host, port or self.port))
self.alive.set()
def put(self, data):
if self.alive.isSet():
self.write_queue.put(data)
else:
raise Exception('Disconnected')
def run(self):
self.connect()
while self.alive.isSet():
sread, swrite, sexc = select.select([], [self.socket], [], 0.01)
for sock in swrite:
if sock == self.socket:
while True:
cmd_list = []
try:
cmd_list.append(self.write_queue.get(block=False))
except Queue.Empty:
break
if cmd_list:
sock.send(''.join(cmd_list))
sread, swrite, sexc = select.select([self.socket], [], [], 0.01)
for sock in sread:
if sock == self.socket:
try:
data = sock.recv(self.bufsize)
if not data:
# self.write_queue.task_done()
# self.write_queue.join()
raise Exception('Socket disconnect')
self.read_callback(data)
except Exception as e:
if self.disconnect_callback:
self.disconnect_callback(e)
print 'Socket exception: ', e
ON_CONNECT = """
\x1B[1;36mJSMC \x1B[1;30mis a MUD client written in JavaScript. It allows you to play MUD. Seriously. :(
\x1B[1;33mCurrently you are \x1B[1;31mnot \x1B[1;30mconnected to MUD.
Use `\x1B[1;34m#connect host:port encoding\x1B[1;30m` to connect.
Example: \x1B[1;34m'\x1B[0;35m#connect sow.igrohost.ru:5555 cp1251\x1B[1;34m'
Also, you have to select a right encoding in MUD, in your case - `1`.\x1B[1;30m
\x1B[1;36mJSMC \x1B[1;30m- это мад-клиент на JavaScript. Он позволяет играть в мад. :(
\x1B[1;33mНа данный момент вы \x1B[1;31mНЕ \x1B[1;30mподключены к MUD.
Используйте команду `\x1B[1;34m#connect host:port encoding\x1B[1;30m` для подключения.
Пример: \x1B[1;34m'\x1B[0;35m#connect sow.igrohost.ru:5555 cp1251\x1B[1;34m'
Также, вам нужно выбрать верную кодировку в маде, в вашем случае - `1`.\x1B[1;30m
\x1B[0m
"""
TELNET_GA = chr(249)
TELNET_IAC = chr(255)
class MessagesHandler(WebSocketHandler):
def open(self, thread_id):
self.write_message('Successfully connected to Tornado server\n' + ON_CONNECT)
self.encoding = 'cp1251'
self.sock = None
self.replace_ga = True
# self.sock = AsyncSocket('sow.igrohost.ru', 5555, self.process_server_response, self.server_disconnect_handler)
# self.sock.start()
def process_server_response(self, data):
if self.replace_ga:
i = 0
while True:
i = data.find(TELNET_IAC + TELNET_GA, i)
if i == -1:
break
if i-1 >= 0:
if data[i-1] != TELNET_IAC:
data = data[:i] + "\n" + data[i+2:]
continue
i += 2
data = data.replace(TELNET_IAC*2, TELNET_IAC)
# i = 0
# while True:
# i = data.find(TELNET_IAC, i)
# if i == -1:
# break
#
# print ord(data[i]), ord(data[i+1]), ord(data[i+2]), data[i:].decode(self.encoding)
# i += 1
self.write_message(data.decode(self.encoding, 'replace'))
def process_command(self, command):
match = re.match(r'#connect (.+):(\d+) (.+)', command)
if match:
if self.sock:
self.write_message('Nope')
return False
self.encoding = match.group(3).strip()
self.sock = AsyncSocket(match.group(1), int(match.group(2)), self.process_server_response, self.server_disconnect_handler)
self.sock.start()
return True
if not self.sock:
self.write_message(ON_CONNECT)
return True
if command.startswith('#disconnect'):
self.close_server_socket()
return True
return False
def close_server_socket(self):
if self.sock is None:
return
# self.sock.join()
self.sock.socket.close()
self.sock.alive.clear()
self.sock = None
def server_disconnect_handler(self, msg):
try:
self.close_server_socket() # cannot join
except Exception as e:
print e
self.write_message('Disconnected: %s' % msg)
def on_message(self, message):
if not self.process_command(message):
try:
encoded_message = message.encode(self.encoding)
encoded_message = encoded_message.replace(TELNET_IAC, TELNET_IAC*2)
self.sock.put(encoded_message)
except Exception as e:
print e
def on_close(self):
self.close_server_socket()
print "WebSocket closed"
application = Application([
(r'/(?P<thread_id>\d+)/', MessagesHandler),
])
if __name__ == "__main__":
application.listen(8888, '0.0.0.0')
IOLoop.instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment