Created
June 27, 2012 22:40
-
-
Save tjb0607/3007360 to your computer and use it in GitHub Desktop.
Minecraft server list MOTD and kick message thing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python2 | |
# -*- coding: utf-8 -*- | |
# This program is free software. It comes without any warranty, to | |
# the extent permitted by applicable law. You can redistribute it | |
# and/or modify it under the terms of the Do What The Fuck You Want | |
# To Public License, Version 2, as published by Sam Hocevar. See | |
# http://sam.zoy.org/wtfpl/COPYING for more details. | |
import asyncore | |
import asynchat | |
import socket | |
import re | |
from codecs import utf_16_be_encode, utf_16_be_decode | |
from struct import pack, unpack | |
MOTD = "UP at mustanglover.dyndns.org" | |
KICKMOTD = "The server is up, but it's moved to mustanglover.dyndns.org. Go there!" | |
IP = "mustanglover.dyndns.org" | |
def get_info(host, port): | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.connect((host, port)) | |
s.send('\xfe') | |
d = s.recv(256) | |
s.close() | |
assert d[0] == '\xff' | |
d = d[3:].decode('utf-16be').split(u'\xa7') | |
return {'motd': d[0], | |
'players': int(d[1]), | |
'max_players': int(d[2])} | |
def pack_string(string): | |
''' | |
Packs a string into UCS-2 and prefixes it with its length as a short int. | |
This function can't actually handle UCS-2, therefore kick messages and | |
the MOTD can't contain special characters. | |
''' | |
string = u"".join(i if ord(i) < 65536 else u"?" for i in string) | |
return (pack(">h", len(string)) + | |
utf_16_be_encode(string, "replace")[0]) | |
def unpack_string(data): | |
''' | |
Extracts a string from a data stream. | |
Like in the pack method, UCS-2 isn't handled correctly. Since usernames | |
and hosts can't contain special characters this isn't an issue. | |
''' | |
(l,) = unpack(">h", data[:2]) | |
assert len(data) >= 2 + 2 * l | |
return utf_16_be_decode(data[2:l * 2])[0] | |
class Listener(asyncore.dispatcher): | |
''' | |
Listens for connecting clients. | |
''' | |
def __init__(self, host, port): | |
asyncore.dispatcher.__init__(self) | |
self.clients = [] | |
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.set_reuse_addr() | |
self.bind((host, port)) | |
self.listen(5) | |
print "Listening for connections on %s:%s" % (host, port) | |
def handle_accept(self): | |
''' | |
Accepts a new connections and assigns it to a new ClientTunnel. | |
''' | |
pair = self.accept() | |
if pair is None: | |
pass | |
else: | |
sock, addr = pair | |
ClientTunnel(sock, addr) | |
@staticmethod | |
def loop(): | |
''' | |
Starts the main I/O loop. | |
''' | |
asyncore.loop() | |
class ClientTunnel(asynchat.async_chat): | |
''' | |
Handles a connecting client | |
''' | |
def __init__(self, sock, addr): | |
asynchat.async_chat.__init__(self, sock) | |
self.set_terminator(None) | |
self.addr = "%s:%s" % addr | |
self.log("Incoming connection") | |
def log(self, message): | |
''' | |
Feedback to user | |
''' | |
print "%s - %s" % (self.addr, message) | |
def collect_incoming_data(self, data): | |
''' | |
Listens for data | |
''' | |
if len(data) >= 1: | |
(packetId,) = unpack(">B", data[0]) | |
if packetId == 0xfe: # Handle server list query | |
serverinfo = get_info(IP, 25565) | |
self.log("Received server list query") | |
self.kick(u"%s§%s§%s" % (MOTD, serverinfo['players'], serverinfo['max_players'])) | |
else: | |
self.kick(KICKMOTD) | |
def kick(self, reason): | |
''' | |
Kicks the client. | |
''' | |
self.log("Kicking (%s)" % reason) | |
self.push(pack(">B", 0xff) + pack_string(reason)) | |
self.close() | |
def handle_close(self): | |
self.log("Client closed connection") | |
self.close() | |
if __name__ == "__main__": | |
server = Listener('0.0.0.0', 25565) | |
Listener.loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment