Created
December 5, 2012 08:47
-
-
Save mafice/4213942 to your computer and use it in GitHub Desktop.
Minecraftサーバ管理スクリプト
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import subprocess | |
import threading | |
import urllib | |
import time | |
import re | |
import yaml | |
import requests | |
from termcolor import cprint | |
mem = "256M" | |
save_interval = 120 # sec. | |
time_fmt = "%Y/%m/%d %H:%M %Z" | |
cmd = "java -Xmx%(mem)s -Xms%(mem)s -jar minecraft_server.jar nogui" % {"mem": mem} | |
server_jar_url = "http://s3.amazonaws.com/MinecraftDownload/launcher/minecraft_server.jar" | |
server_pros = object | |
conf = {"places": {}} | |
def parseLog (log): | |
# login | |
if -1 != log.find("logged in with entity id"): | |
username = re.search("(.*?)\[[a-zA-Z0-9\/\.\:]*\]", log.split(" ")[3]).group(1) | |
send_cmd("tell %(username)s Greetings, %(username)s" % locals()) | |
# atmark command | |
if -1 != log.find("[INFO] <") and -1 != log.find("> @"): | |
command = log.split(" ")[4][1:] | |
username = re.search("<(.*)>", log.split(" ")[3]).group(1) | |
args = log.split(" ")[5:] | |
atmark_cmd(command, username, args) | |
return | |
def send_cmd (cmd): | |
global server_pros | |
server_pros.stdin.write(bytes("%s\n" % cmd, "ascii")) | |
server_pros.stdin.flush() | |
return | |
def input_handler (): | |
while True: | |
str = input("") + " " # add a space to avoid IndexError | |
if str == "stop": | |
str = "@exit" | |
if str[0] == "@": | |
atmark_cmd(str.split(" ")[0][1:], "", str.split(" ")[1].split(" ")) | |
else: | |
send_cmd(str) | |
def atmark_cmd (cmd, username, args): | |
global conf | |
if cmd == "exit": | |
atexit() | |
elif cmd == "update": | |
update() | |
elif cmd == "rem": | |
if len(args) < 3: | |
send_cmd("say too few arguments") | |
conf["places"][args[0]] = "%s %s %s" % (args[1], args[2], args[3]) | |
elif cmd == "forget": | |
conf["places"] = {} | |
elif cmd == "places": | |
for name, place in conf["places"].items(): | |
cod = place.split(" ") | |
send_cmd("say %s: %s %s %s" % (name, cod[0], cod[1], cod[2])) | |
elif cmd == "goto": | |
if len(args) < 1: | |
send_cmd("say too few arguments") | |
send_cmd("tp %s %s %s %s" % tuple([username] + conf["places"][args[0]].split(" "))) | |
elif cmd == "time": | |
send_cmd("say It is %s." % time.strftime(time_fmt)) | |
elif cmd == "help": | |
for line in ("exit", "update", "rem x y z", "forget", "places", "goto place", "time"): | |
send_cmd("say %s" % line) | |
else: | |
send_cmd("say invalid command") | |
return | |
def update (): | |
print("starting update...") | |
send_cmd("save-all") | |
send_cmd("stop") | |
time.sleep(5) | |
print("downloading a jar file...") | |
with open("minecraft_server.jar", "wb") as f: | |
f.write(requests.get(server_jar_url).content) | |
print("updated! restarting server...") | |
atexit(restart=True) | |
def atexit (**kwargs): | |
global conf | |
with open("seiyas-server.yaml", "w") as f: | |
f.write(yaml.dump(conf)) | |
send_cmd("save-all") | |
time.sleep(4) | |
send_cmd("stop") | |
time.sleep(5) | |
print("Good byte ;-)") | |
if kwargs.get("restart", False): | |
os.execl(sys.executable, " ".join(sys.argv)) | |
else: | |
os._exit(1) | |
def main (): | |
global conf, server_pros, input_handler | |
print("Welcome to Minecraft Server!") | |
server_pros = subprocess.Popen(cmd.split(" "), stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT) | |
# reduce disk access (I don't know whether it is effective or not) | |
send_cmd("save-off") | |
threading.Timer(save_interval, lambda: send_cmd("save-all")) | |
inputHandler = threading.Thread(target=input_handler) | |
inputHandler.setDaemon(True) | |
inputHandler.start() | |
if os.path.exists("seiyas-server.yaml"): | |
with open("seiyas-server.yaml") as f: | |
conf = yaml.load(f) | |
while True: | |
line = str(server_pros.stdout.readline().rstrip(), "utf-8") | |
cprint(line, "white") | |
if line != "": | |
eof = False | |
parseLog(line) | |
else: | |
time.sleep(0.7) | |
atexit() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment