Created
August 11, 2016 20:11
-
-
Save mftrhu/b8daba9f4218fc06da86b7527be4a081 to your computer and use it in GitHub Desktop.
The beginning of a (very, very simple) MU* game
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import socket | |
import thread | |
import logging | |
import time | |
import random | |
HOST = "" | |
PORT = 4004 | |
TIME_AFK = 300 | |
TIME_KICK = 600 | |
BANNER = """ | |
_____ _ __ __ _ _ ____ | |
|_ _(_)_ __ _ _| \/ | | | | _ \ | |
| | | | '_ \| | | | |\/| | | | | | | | | |
| | | | | | | |_| | | | | |_| | |_| | | |
|_| |_|_| |_|\__, |_| |_|\___/|____/ | |
|___/ | |
Welcome to TinyMUD! | |
Please enter your name to login, or `new' to create a new character. | |
""" | |
STARTING_ROOM = 0 | |
not_allowed = ["new", "quit", "admin", "wizard", "god", "creator"] | |
accounts = { | |
"admin": {"character": 1, "password": "admin", "tags": ["wizard"], "last_seen": 0}, | |
"test": {"character": None, "password": "test", "tags": [], "last_seen": 0} | |
} | |
db = { | |
0: { | |
"kind": "room", | |
"name": "Limbo", | |
"description": "You are floating in a featureless grey limbo.", | |
"exits": [3, None, None, None], | |
"items": [] | |
}, | |
1: { | |
"kind": "mob", | |
"name": "One", | |
"description": "A genderless humanoid, tall and gangly and very, very real.", | |
"position": 0, | |
"wield": None, | |
"wears": None, | |
"str": 10, | |
"hp": 10, | |
"max_hp": 10 | |
}, | |
2: { | |
"kind": "item", | |
"name": "a red apple", | |
"description": "It's big, red, and very, very juicy.", | |
"position": 3, | |
"type": "food", | |
"hunger": -10 | |
}, | |
3: { | |
"kind": "room", | |
"name": "Hillfort Plaza", | |
"description": "The hustle and bustle of the city of Hillfort surrounds you on all sides.", | |
"exits": [None, 0, None, None], | |
"items": [2, 4, 5] | |
}, | |
4: { | |
"kind": "item", | |
"name": "a short sword", | |
"description": "About two feet long and quite sharp.", | |
"position": 3, | |
"type": "weapon" | |
}, | |
5: { | |
"kind": "mob", | |
"name": "a city guard", | |
"description": "He's tall, broad and dressed in chainmail.", | |
"position": 3, | |
"wields": None, | |
"wears": 6, | |
"str": 10, | |
"hp": 10, | |
"max_hp": 10, | |
"items": [6] | |
}, | |
6: { | |
"kind": "item", | |
"name": "a chainmail vest", | |
"description": "It gets down to your knees and it's pretty heavy.", | |
"position": 5, | |
"type": "armor" | |
}, | |
} | |
FREE_IDS = [] | |
ID = 6 | |
def get_id(): | |
global ID | |
if len(FREE_IDS) > 0: | |
return FREE_IDS.pop() | |
ID += 1 | |
return ID | |
def destroy(id): | |
if id in db: | |
FREE_IDS.append(id) | |
del db[id] | |
return True | |
return False | |
Directions = [ | |
("north", "n"), | |
("south", "s"), | |
("east", "e"), | |
("west", "w") | |
] | |
Opposite = [1, 0, 3, 2] | |
def has_tag(name, tag): | |
return tag in accounts[name]["tags"] | |
def add_tag(name, tag): | |
if not has_tag(name, tag): | |
accounts[name]["tags"].append(tag) | |
def remove_tag(name, tag): | |
if has_tag(name, tag): | |
accounts[name]["tags"].remove(tag) | |
### | |
def is_wizard(name): | |
return has_tag(name, "wizard") | |
def get_user_room(name): | |
if name in accounts: | |
return db.get(accounts[name]["character"], {}).get("position", None) | |
def in_room(room): | |
#return db[room].get("items", []) | |
return [item for item in sorted(db) | |
if db[item].get("position", None) == room] | |
def in_room_except(room, excluded): | |
#return filter(lambda item: item != excluded, in_room(room)) | |
return [item for item in sorted(db) | |
if db[item].get("position", None) == room | |
if item != excluded] | |
### | |
def prompt(client, prompt=">"): | |
client.send(prompt) | |
try: | |
return client.recv(1024).strip() | |
except socket.error: | |
pass#continue | |
def accept(conn): | |
""" | |
Call the inner func in a thread so as not to block. Wait for a | |
name to be entered from the given connection. Once a name is | |
entered, set the connection to non-blocking and add the user to | |
the users dict. | |
""" | |
def threaded(): | |
for line in BANNER.splitlines(): | |
conn.send(line + "\n") | |
state = "username" | |
name, password = None, None | |
while True: | |
if state == "username": | |
name = prompt(conn, "Username: ") | |
if name is None: | |
pass | |
elif name == "new": | |
state = "new_username" | |
elif name in accounts: | |
state = "password" | |
else: | |
conn.send("That character does not exists. Type 'new' to create a new character.\n") | |
elif state == "password": | |
password = prompt(conn, "Password: ") | |
if password is None: | |
pass | |
elif password == accounts[name]["password"]: | |
state = "login" | |
else: | |
conn.send("Wrong username or password.\n") | |
state = "username" | |
elif state == "new_username": | |
conn.send("Creating a new character.\n") | |
name = prompt(conn, "Username: ") | |
if name is None: | |
pass | |
elif name in not_allowed: | |
conn.send("%s is not allowed. Choose another name.\n" % name) | |
elif name in accounts: | |
conn.send("%s is taken. Choose another name.\n" % name) | |
else: | |
state = "new_password" | |
elif state == "new_password": | |
password = prompt(conn, "Password: ") | |
if password is None: | |
pass | |
else: | |
state = "new_password_repeat" | |
elif state == "new_password_repeat": | |
password_r = prompt(conn, "Repeat password: ") | |
if password_r is None: | |
pass | |
elif password_r != password: | |
conn.send("The passwords are different.\n") | |
state = "username" | |
else: | |
accounts[name] = { | |
"character": None, | |
"password": password, | |
"tags": [], | |
"last_seen": time.time() | |
} | |
state = "login" | |
elif state == "login": | |
conn.setblocking(False) | |
users[name] = conn | |
conn.send("You are logged in as %s.\n" % name) | |
accounts[name]["last_seen"] = time.time() | |
#conn.send("> ") | |
do_look(name, None) | |
broadcast(name, "+++ %s arrives +++" % name) | |
break | |
thread.start_new_thread(threaded, ()) | |
def broadcast_room(name, message): | |
""" | |
Send a message to all users in the same room from the given name. | |
""" | |
print(message) | |
start_room = get_user_room(name) | |
for to_name, conn in users.items(): | |
room = get_user_room(to_name) | |
if name is None or (to_name != name) and (start_room == room): | |
try: | |
conn.send(message + "\n") | |
except socket.error: | |
pass | |
def broadcast(name, message): | |
""" | |
Send a message to all users from the given name. | |
""" | |
print(message) | |
for to_name, conn in users.items(): | |
if name is None or (to_name != name): | |
try: | |
conn.send(message + "\n") | |
except socket.error: | |
pass | |
def send(name, message): | |
""" | |
Sends a message to the given user. | |
""" | |
if name in users: | |
users[name].send(message + "\n") | |
Commands = {} | |
OneLetter = {} | |
def command(*aliases, **kwargs): | |
wizard = kwargs.get("wizard", False) | |
def wrap(function): | |
for alias in aliases: | |
if len(alias) == 1 and not alias.isalpha(): | |
OneLetter[alias] = [function, wizard] | |
else: | |
Commands[alias] = [function, wizard] | |
return function | |
return wrap | |
def parse_command(name, line): | |
accounts[name]["last_seen"] = time.time() | |
if len(line) > 0 and line[0] in OneLetter: | |
if is_wizard(name) or not OneLetter[line[0]][1]: | |
return OneLetter[line[0]][0](name, line[1:].strip()) | |
command, _, params = line.strip().partition(" ") | |
command = command.lower() | |
for i, item in enumerate(Directions): | |
if command in item: | |
return go_direction(name, i) | |
if command in Commands: | |
if is_wizard(name) or not Commands[command][1]: | |
return Commands[command][0](name, params) | |
send(name, "Uh?") | |
def go_direction(name, direction): | |
character = accounts[name]["character"] | |
if character is not None: | |
room = db[character]["position"] | |
if db[room]["exits"][direction] is not None: | |
broadcast_room(name, "%s walks %s." % (db[character]["name"], Directions[direction][0])) | |
db[character]["position"] = db[room]["exits"][direction] | |
send(name, "You walk %s." % Directions[direction][0]) | |
broadcast_room(name, "%s enters from the %s." % (db[character]["name"], Directions[Opposite[direction]][0])) | |
do_look(name, None) | |
else: | |
send(name, "You can't go that way.") | |
else: | |
send(name, "You don't have a body yet.") | |
@command("say", "'") | |
def do_say(name, what): | |
send(name, "You say \"%s\"" % what) | |
broadcast_room(name, "%s says \"%s\"" % (name, what)) | |
@command("shout", "!") | |
def do_shout(name, what): | |
send(name, "You shout \"%s\"" % what) | |
broadcast(name, "%s shouts \"%s\"" % (name, what)) | |
@command("emote", ":") | |
def do_emote(name, what): | |
send(name, "You %s" % what) | |
broadcast_room(name, "%s %s" % (name, what)) | |
@command("quit", "q") | |
def do_quit(name, params): | |
users[name].close() | |
del users[name] | |
broadcast(name, "--- %s leaves ---" % name) | |
@command("who") | |
def do_who(name, params): | |
send(name, "%s connected user%s" % (len(users), "s" if len(users) != 1 else "")) | |
for user in users: | |
if has_tag(user, "invisible"): | |
continue | |
afk = time.time() - accounts[user]["last_seen"] > TIME_AFK | |
flags = "[%s%s]" % ("W" if is_wizard(user) else " ", "A" if afk else " ") | |
send(name, flags + " " + user) | |
@command("shutdown", wizard=True) | |
def do_shutdown(name, params): | |
raise SystemExit | |
@command("kick", wizard=True) | |
def do_kick(name, params): | |
who = params.strip() | |
if who in users: | |
send(name, "You kicked %s out of the game." % who) | |
send(who, "You have been kicked out of the game.") | |
users[who].close() | |
del users[who] | |
broadcast(name, "--- %s has been kicked ---" % who) | |
else: | |
send(name, "No user with that name.") | |
@command("invisible", "invis", wizard=True) | |
def do_invisible(name, params): | |
if has_tag(name, "invisible"): | |
remove_tag(name, "invisible") | |
send(name, "You are now visible.") | |
broadcast_room(name, "%s appears from the empty air." % name) | |
else: | |
add_tag(name, "invisible") | |
send(name, "You are now invisible.") | |
broadcast_room(name, "%s vanishes from sight." % name) | |
@command("look", "l") | |
def do_look(name, params): | |
character = accounts[name]["character"] | |
if character is not None: | |
room = db[character]["position"] | |
send(name, db[room]["name"]) | |
send(name, db[room]["description"]) | |
contained = in_room_except(room, character) | |
if len(contained) > 0: | |
send(name, "You see:") | |
for i, item in enumerate(contained): | |
send(name, "[%s] %s" % (i, db[item]["name"])) | |
send(name, "Exits:") | |
if any(exit is not None for exit in db[room]["exits"]): | |
for i, exit in enumerate(db[room]["exits"]): | |
if exit is not None: | |
send(name, "[%s] %s" % ( | |
Directions[i][1].upper(), | |
db[exit]["name"])) | |
else: | |
send(name, " None!") | |
else: | |
send(name, "You don't have a body yet.") | |
@command("get", "g") | |
def do_get(name, params): | |
character = accounts[name]["character"] | |
if character is not None: | |
room = db[character]["position"] | |
items = in_room_except(room, character) | |
try: | |
item = items[int(params.strip())] | |
db[item]["position"] = character | |
send(name, "You pick up %s." % db[item]["name"]) | |
broadcast_room(name, "%s picks up %s." % (db[character]["name"], db[item]["name"])) | |
except (ValueError, IndexError): | |
send(name, "You don't see that here.") | |
else: | |
send(name, "You don't have a body yet.") | |
@command("drop", "d") | |
def do_drop(name, params): | |
character = accounts[name]["character"] | |
if character is not None: | |
room = db[character]["position"] | |
items = in_room(character) | |
try: | |
item = items[int(params.strip())] | |
db[item]["position"] = room | |
send(name, "You drop %s." % db[item]["name"]) | |
broadcast_room(name, "%s drops %s." % (db[character]["name"], db[item]["name"])) | |
except (ValueError, IndexError): | |
send(name, "You aren't carrying that.") | |
else: | |
send(name, "You don't have a body yet.") | |
@command("inventory", "inv", "i") | |
def do_inventory(name, params): | |
character = accounts[name]["character"] | |
if character is not None: | |
inventory = in_room(character) | |
send(name, "You are carrying:") | |
if len(inventory) > 0: | |
for i, item in enumerate(inventory): | |
flags = "" | |
if db[character]["wield"] == item: | |
flags += " (wielded)" | |
elif db[character]["wears"] == item: | |
flags += " (worn)" | |
send(name, "[%s] %s%s" % (i, db[item]["name"], flags)) | |
else: | |
send(name, " Nothing") | |
else: | |
send(name, "You don't have a body yet.") | |
@command("use") | |
def do_use(name, params): | |
character = accounts[name]["character"] | |
if character is not None: | |
room = db[character]["position"] | |
items = in_room(character) | |
try: | |
item = items[int(params.strip())] | |
if db[item]["type"] == "food": | |
send(name, "You eat %s." % db[item]["name"]) | |
broadcast_room(name, "%s eats %s." % (db[character]["name"], db[item]["name"])) | |
destroy(item) | |
elif db[item]["type"] == "weapon": | |
send(name, "You wield %s." % db[item]["name"]) | |
broadcast_room(name, "%s wields %s." % (db[character]["name"], db[item]["name"])) | |
db[character]["wield"] = item | |
else: | |
send(name, "You can't use that.") | |
except (ValueError, IndexError): | |
send(name, "You aren't carrying that.") | |
else: | |
send(name, "You don't have a body yet.") | |
@command("remove") | |
def do_remove(name, params): | |
character = accounts[name]["character"] | |
if character is not None: | |
room = db[character]["position"] | |
items = in_room(character) | |
try: | |
item = items[int(params.strip())] | |
if db[character]["wield"] == item: | |
send(name, "You sheathe %s." % db[item]["name"]) | |
broadcast_room(name, "%s sheathes %s." % (db[character]["name"], db[item]["name"])) | |
db[character]["wield"] = None | |
else: | |
send(name, "You aren't using that.") | |
except (ValueError, IndexError): | |
send(name, "You aren't carrying that.") | |
else: | |
send(name, "You don't have a body yet.") | |
@command("attack", "kill", "k") | |
def do_attack(name, params): | |
character = accounts[name]["character"] | |
if character is not None: | |
room = db[character]["position"] | |
items = in_room_except(room, character) | |
try: | |
item = items[int(params.strip())] | |
if db[item]["kind"] == "mob": | |
send(name, "You attack %s." % db[item]["name"]) | |
broadcast_room(name, "%s attacks %s." % (db[character]["name"], db[item]["name"])) | |
#TODO Insert send to attacked | |
send(db[item]["name"], "%s attacks you!" % name) | |
attacker_str = db[character]["str"] | |
damage = int(random.randint(round(attacker_str / 10), round(attacker_str / 2))) | |
send(name, "You deal %s points of damage!" % damage) | |
db[item]["hp"] -= damage | |
if db[item]["hp"] <= 0: | |
send(name, "You killed %s!" % db[item]["name"]) | |
broadcast_room(name, "%s killed %s!" % (db[character]["name"], db[item]["name"])) | |
#TODO Insert send to attacked | |
carried = in_room(item) | |
for i in carried: | |
broadcast_room(None, "%s tumbles to the ground." % db[i]["name"]) | |
db[i]["position"] = db[item]["position"] | |
destroy(item) #TODO | |
else: | |
do_attack(db[item]["name"], db[character]) | |
else: | |
send(name, "You can't attack objects.") | |
except (ValueError, IndexError): | |
send(name, "You don't see that here.") | |
else: | |
send(name, "You don't have a body yet.") | |
@command("@character", "@char") | |
def do_newchar(name, params): | |
try: | |
new_name, new_desc = params.split("=", 1) | |
except ValueError: | |
send(name, "Usage: @character NAME = DESCRIPTION") | |
return | |
character = accounts[name]["character"] | |
if character is None: | |
send(name, "You shape a new body from the chaos surrounding you.") | |
character = get_id() | |
accounts[name]["character"] = character | |
db[character] = { | |
"kind": "mob", | |
"name": None, | |
"description": None, | |
"position": STARTING_ROOM, | |
"wield": None, | |
"wears": None, | |
"str": 10, | |
"hp": 10, | |
"max_hp": 10 | |
} | |
broadcast_room(name, "A lump of chaos stuff coalesces into an humanoid form.") | |
else: | |
send(name, "Your body shakes and flows in a new shape.") | |
broadcast_room(name, "%s's body shakes and flows in a new shape." % db[character]["name"]) | |
db[character]["name"] = new_name.strip() | |
db[character]["description"] = new_desc.strip() | |
# Set up the server socket. | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
server.setblocking(False) | |
server.bind((HOST, PORT)) | |
server.listen(1) | |
print("Listening on %s" % ("%s:%s" % server.getsockname())) | |
# Main event loop. | |
users = {} # username -> connection (socket instance) | |
while True: | |
try: | |
# Accept new connections | |
while True: | |
try: | |
conn, addr = server.accept() | |
except socket.error: | |
break | |
accept(conn) | |
# Kick idles | |
for name, conn in users.items(): | |
idle_for = time.time() - accounts[name]["last_seen"] | |
if idle_for > TIME_KICK: | |
conn.send("--- Kicked for being idle ---\n") | |
users[name].close() | |
del users[name] | |
broadcast(name, "--- %s kicked for being idle ---" % name) | |
# Read from connections | |
for name, conn in users.items(): | |
try: | |
message = conn.recv(1024) | |
except socket.error: | |
continue | |
if not message: | |
# Empty string is given on disconnect. | |
del users[name] | |
broadcast(name, "--- %s leaves ---" % name) | |
continue | |
parse_command(name, message) | |
time.sleep(.05) | |
except (SystemExit, KeyboardInterrupt): | |
broadcast(None, "!!! Server is shutting down !!!") | |
break | |
except Exception as e: | |
print(e) | |
continue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment