Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import socket
- import thread
- import logging
- import time
- import random
- HOST = ""
- PORT = 4004
- TIME_AFK = 300
- TIME_KICK = 600
- BANNER = """
- _____ _ __ __ _ _ ____
- |_ _(_)_ __ _ _| \/ | | | | _ \
- | | | | '_ \| | | | |\/| | | | | | | |
- | | | | | | | |_| | | | | |_| | |_| |
- |_| |_|_| |_|\__, |_| |_|\___/|____/
- |___/
- Welcome to TinyMUD!
- Please enter your name to login, or `new' to create a new character.
- """
- STARTING_ROOM = 0
- not_allowed = ["new", "quit", "admin", "wizard", "god", "creator"]
- accounts = {
- "admin": {"character": 1, "password": "admin", "tags": ["wizard"], "last_seen": 0},
- "test": {"character": None, "password": "test", "tags": [], "last_seen": 0}
- }
- db = {
- 0: {
- "kind": "room",
- "name": "Limbo",
- "description": "You are floating in a featureless grey limbo.",
- "exits": [3, None, None, None],
- "items": []
- },
- 1: {
- "kind": "mob",
- "name": "One",
- "description": "A genderless humanoid, tall and gangly and very, very real.",
- "position": 0,
- "wield": None,
- "wears": None,
- "str": 10,
- "hp": 10,
- "max_hp": 10
- },
- 2: {
- "kind": "item",
- "name": "a red apple",
- "description": "It's big, red, and very, very juicy.",
- "position": 3,
- "type": "food",
- "hunger": -10
- },
- 3: {
- "kind": "room",
- "name": "Hillfort Plaza",
- "description": "The hustle and bustle of the city of Hillfort surrounds you on all sides.",
- "exits": [None, 0, None, None],
- "items": [2, 4, 5]
- },
- 4: {
- "kind": "item",
- "name": "a short sword",
- "description": "About two feet long and quite sharp.",
- "position": 3,
- "type": "weapon"
- },
- 5: {
- "kind": "mob",
- "name": "a city guard",
- "description": "He's tall, broad and dressed in chainmail.",
- "position": 3,
- "wields": None,
- "wears": 6,
- "str": 10,
- "hp": 10,
- "max_hp": 10,
- "items": [6]
- },
- 6: {
- "kind": "item",
- "name": "a chainmail vest",
- "description": "It gets down to your knees and it's pretty heavy.",
- "position": 5,
- "type": "armor"
- },
- }
- FREE_IDS = []
- ID = 6
- def get_id():
- global ID
- if len(FREE_IDS) > 0:
- return FREE_IDS.pop()
- ID += 1
- return ID
- def destroy(id):
- if id in db:
- FREE_IDS.append(id)
- del db[id]
- return True
- return False
- Directions = [
- ("north", "n"),
- ("south", "s"),
- ("east", "e"),
- ("west", "w")
- ]
- Opposite = [1, 0, 3, 2]
- def has_tag(name, tag):
- return tag in accounts[name]["tags"]
- def add_tag(name, tag):
- if not has_tag(name, tag):
- accounts[name]["tags"].append(tag)
- def remove_tag(name, tag):
- if has_tag(name, tag):
- accounts[name]["tags"].remove(tag)
- ###
- def is_wizard(name):
- return has_tag(name, "wizard")
- def get_user_room(name):
- if name in accounts:
- return db.get(accounts[name]["character"], {}).get("position", None)
- def in_room(room):
- #return db[room].get("items", [])
- return [item for item in sorted(db)
- if db[item].get("position", None) == room]
- def in_room_except(room, excluded):
- #return filter(lambda item: item != excluded, in_room(room))
- return [item for item in sorted(db)
- if db[item].get("position", None) == room
- if item != excluded]
- ###
- def prompt(client, prompt=">"):
- client.send(prompt)
- try:
- return client.recv(1024).strip()
- except socket.error:
- pass#continue
- def accept(conn):
- """
- Call the inner func in a thread so as not to block. Wait for a
- name to be entered from the given connection. Once a name is
- entered, set the connection to non-blocking and add the user to
- the users dict.
- """
- def threaded():
- for line in BANNER.splitlines():
- conn.send(line + "\n")
- state = "username"
- name, password = None, None
- while True:
- if state == "username":
- name = prompt(conn, "Username: ")
- if name is None:
- pass
- elif name == "new":
- state = "new_username"
- elif name in accounts:
- state = "password"
- else:
- conn.send("That character does not exists. Type 'new' to create a new character.\n")
- elif state == "password":
- password = prompt(conn, "Password: ")
- if password is None:
- pass
- elif password == accounts[name]["password"]:
- state = "login"
- else:
- conn.send("Wrong username or password.\n")
- state = "username"
- elif state == "new_username":
- conn.send("Creating a new character.\n")
- name = prompt(conn, "Username: ")
- if name is None:
- pass
- elif name in not_allowed:
- conn.send("%s is not allowed. Choose another name.\n" % name)
- elif name in accounts:
- conn.send("%s is taken. Choose another name.\n" % name)
- else:
- state = "new_password"
- elif state == "new_password":
- password = prompt(conn, "Password: ")
- if password is None:
- pass
- else:
- state = "new_password_repeat"
- elif state == "new_password_repeat":
- password_r = prompt(conn, "Repeat password: ")
- if password_r is None:
- pass
- elif password_r != password:
- conn.send("The passwords are different.\n")
- state = "username"
- else:
- accounts[name] = {
- "character": None,
- "password": password,
- "tags": [],
- "last_seen": time.time()
- }
- state = "login"
- elif state == "login":
- conn.setblocking(False)
- users[name] = conn
- conn.send("You are logged in as %s.\n" % name)
- accounts[name]["last_seen"] = time.time()
- #conn.send("> ")
- do_look(name, None)
- broadcast(name, "+++ %s arrives +++" % name)
- break
- thread.start_new_thread(threaded, ())
- def broadcast_room(name, message):
- """
- Send a message to all users in the same room from the given name.
- """
- print(message)
- start_room = get_user_room(name)
- for to_name, conn in users.items():
- room = get_user_room(to_name)
- if name is None or (to_name != name) and (start_room == room):
- try:
- conn.send(message + "\n")
- except socket.error:
- pass
- def broadcast(name, message):
- """
- Send a message to all users from the given name.
- """
- print(message)
- for to_name, conn in users.items():
- if name is None or (to_name != name):
- try:
- conn.send(message + "\n")
- except socket.error:
- pass
- def send(name, message):
- """
- Sends a message to the given user.
- """
- if name in users:
- users[name].send(message + "\n")
- Commands = {}
- OneLetter = {}
- def command(*aliases, **kwargs):
- wizard = kwargs.get("wizard", False)
- def wrap(function):
- for alias in aliases:
- if len(alias) == 1 and not alias.isalpha():
- OneLetter[alias] = [function, wizard]
- else:
- Commands[alias] = [function, wizard]
- return function
- return wrap
- def parse_command(name, line):
- accounts[name]["last_seen"] = time.time()
- if len(line) > 0 and line[0] in OneLetter:
- if is_wizard(name) or not OneLetter[line[0]][1]:
- return OneLetter[line[0]][0](name, line[1:].strip())
- command, _, params = line.strip().partition(" ")
- command = command.lower()
- for i, item in enumerate(Directions):
- if command in item:
- return go_direction(name, i)
- if command in Commands:
- if is_wizard(name) or not Commands[command][1]:
- return Commands[command][0](name, params)
- send(name, "Uh?")
- def go_direction(name, direction):
- character = accounts[name]["character"]
- if character is not None:
- room = db[character]["position"]
- if db[room]["exits"][direction] is not None:
- broadcast_room(name, "%s walks %s." % (db[character]["name"], Directions[direction][0]))
- db[character]["position"] = db[room]["exits"][direction]
- send(name, "You walk %s." % Directions[direction][0])
- broadcast_room(name, "%s enters from the %s." % (db[character]["name"], Directions[Opposite[direction]][0]))
- do_look(name, None)
- else:
- send(name, "You can't go that way.")
- else:
- send(name, "You don't have a body yet.")
- @command("say", "'")
- def do_say(name, what):
- send(name, "You say \"%s\"" % what)
- broadcast_room(name, "%s says \"%s\"" % (name, what))
- @command("shout", "!")
- def do_shout(name, what):
- send(name, "You shout \"%s\"" % what)
- broadcast(name, "%s shouts \"%s\"" % (name, what))
- @command("emote", ":")
- def do_emote(name, what):
- send(name, "You %s" % what)
- broadcast_room(name, "%s %s" % (name, what))
- @command("quit", "q")
- def do_quit(name, params):
- users[name].close()
- del users[name]
- broadcast(name, "--- %s leaves ---" % name)
- @command("who")
- def do_who(name, params):
- send(name, "%s connected user%s" % (len(users), "s" if len(users) != 1 else ""))
- for user in users:
- if has_tag(user, "invisible"):
- continue
- afk = time.time() - accounts[user]["last_seen"] > TIME_AFK
- flags = "[%s%s]" % ("W" if is_wizard(user) else " ", "A" if afk else " ")
- send(name, flags + " " + user)
- @command("shutdown", wizard=True)
- def do_shutdown(name, params):
- raise SystemExit
- @command("kick", wizard=True)
- def do_kick(name, params):
- who = params.strip()
- if who in users:
- send(name, "You kicked %s out of the game." % who)
- send(who, "You have been kicked out of the game.")
- users[who].close()
- del users[who]
- broadcast(name, "--- %s has been kicked ---" % who)
- else:
- send(name, "No user with that name.")
- @command("invisible", "invis", wizard=True)
- def do_invisible(name, params):
- if has_tag(name, "invisible"):
- remove_tag(name, "invisible")
- send(name, "You are now visible.")
- broadcast_room(name, "%s appears from the empty air." % name)
- else:
- add_tag(name, "invisible")
- send(name, "You are now invisible.")
- broadcast_room(name, "%s vanishes from sight." % name)
- @command("look", "l")
- def do_look(name, params):
- character = accounts[name]["character"]
- if character is not None:
- room = db[character]["position"]
- send(name, db[room]["name"])
- send(name, db[room]["description"])
- contained = in_room_except(room, character)
- if len(contained) > 0:
- send(name, "You see:")
- for i, item in enumerate(contained):
- send(name, "[%s] %s" % (i, db[item]["name"]))
- send(name, "Exits:")
- if any(exit is not None for exit in db[room]["exits"]):
- for i, exit in enumerate(db[room]["exits"]):
- if exit is not None:
- send(name, "[%s] %s" % (
- Directions[i][1].upper(),
- db[exit]["name"]))
- else:
- send(name, " None!")
- else:
- send(name, "You don't have a body yet.")
- @command("get", "g")
- def do_get(name, params):
- character = accounts[name]["character"]
- if character is not None:
- room = db[character]["position"]
- items = in_room_except(room, character)
- try:
- item = items[int(params.strip())]
- db[item]["position"] = character
- send(name, "You pick up %s." % db[item]["name"])
- broadcast_room(name, "%s picks up %s." % (db[character]["name"], db[item]["name"]))
- except (ValueError, IndexError):
- send(name, "You don't see that here.")
- else:
- send(name, "You don't have a body yet.")
- @command("drop", "d")
- def do_drop(name, params):
- character = accounts[name]["character"]
- if character is not None:
- room = db[character]["position"]
- items = in_room(character)
- try:
- item = items[int(params.strip())]
- db[item]["position"] = room
- send(name, "You drop %s." % db[item]["name"])
- broadcast_room(name, "%s drops %s." % (db[character]["name"], db[item]["name"]))
- except (ValueError, IndexError):
- send(name, "You aren't carrying that.")
- else:
- send(name, "You don't have a body yet.")
- @command("inventory", "inv", "i")
- def do_inventory(name, params):
- character = accounts[name]["character"]
- if character is not None:
- inventory = in_room(character)
- send(name, "You are carrying:")
- if len(inventory) > 0:
- for i, item in enumerate(inventory):
- flags = ""
- if db[character]["wield"] == item:
- flags += " (wielded)"
- elif db[character]["wears"] == item:
- flags += " (worn)"
- send(name, "[%s] %s%s" % (i, db[item]["name"], flags))
- else:
- send(name, " Nothing")
- else:
- send(name, "You don't have a body yet.")
- @command("use")
- def do_use(name, params):
- character = accounts[name]["character"]
- if character is not None:
- room = db[character]["position"]
- items = in_room(character)
- try:
- item = items[int(params.strip())]
- if db[item]["type"] == "food":
- send(name, "You eat %s." % db[item]["name"])
- broadcast_room(name, "%s eats %s." % (db[character]["name"], db[item]["name"]))
- destroy(item)
- elif db[item]["type"] == "weapon":
- send(name, "You wield %s." % db[item]["name"])
- broadcast_room(name, "%s wields %s." % (db[character]["name"], db[item]["name"]))
- db[character]["wield"] = item
- else:
- send(name, "You can't use that.")
- except (ValueError, IndexError):
- send(name, "You aren't carrying that.")
- else:
- send(name, "You don't have a body yet.")
- @command("remove")
- def do_remove(name, params):
- character = accounts[name]["character"]
- if character is not None:
- room = db[character]["position"]
- items = in_room(character)
- try:
- item = items[int(params.strip())]
- if db[character]["wield"] == item:
- send(name, "You sheathe %s." % db[item]["name"])
- broadcast_room(name, "%s sheathes %s." % (db[character]["name"], db[item]["name"]))
- db[character]["wield"] = None
- else:
- send(name, "You aren't using that.")
- except (ValueError, IndexError):
- send(name, "You aren't carrying that.")
- else:
- send(name, "You don't have a body yet.")
- @command("attack", "kill", "k")
- def do_attack(name, params):
- character = accounts[name]["character"]
- if character is not None:
- room = db[character]["position"]
- items = in_room_except(room, character)
- try:
- item = items[int(params.strip())]
- if db[item]["kind"] == "mob":
- send(name, "You attack %s." % db[item]["name"])
- broadcast_room(name, "%s attacks %s." % (db[character]["name"], db[item]["name"]))
- #TODO Insert send to attacked
- send(db[item]["name"], "%s attacks you!" % name)
- attacker_str = db[character]["str"]
- damage = int(random.randint(round(attacker_str / 10), round(attacker_str / 2)))
- send(name, "You deal %s points of damage!" % damage)
- db[item]["hp"] -= damage
- if db[item]["hp"] <= 0:
- send(name, "You killed %s!" % db[item]["name"])
- broadcast_room(name, "%s killed %s!" % (db[character]["name"], db[item]["name"]))
- #TODO Insert send to attacked
- carried = in_room(item)
- for i in carried:
- broadcast_room(None, "%s tumbles to the ground." % db[i]["name"])
- db[i]["position"] = db[item]["position"]
- destroy(item) #TODO
- else:
- do_attack(db[item]["name"], db[character])
- else:
- send(name, "You can't attack objects.")
- except (ValueError, IndexError):
- send(name, "You don't see that here.")
- else:
- send(name, "You don't have a body yet.")
- @command("@character", "@char")
- def do_newchar(name, params):
- try:
- new_name, new_desc = params.split("=", 1)
- except ValueError:
- send(name, "Usage: @character NAME = DESCRIPTION")
- return
- character = accounts[name]["character"]
- if character is None:
- send(name, "You shape a new body from the chaos surrounding you.")
- character = get_id()
- accounts[name]["character"] = character
- db[character] = {
- "kind": "mob",
- "name": None,
- "description": None,
- "position": STARTING_ROOM,
- "wield": None,
- "wears": None,
- "str": 10,
- "hp": 10,
- "max_hp": 10
- }
- broadcast_room(name, "A lump of chaos stuff coalesces into an humanoid form.")
- else:
- send(name, "Your body shakes and flows in a new shape.")
- broadcast_room(name, "%s's body shakes and flows in a new shape." % db[character]["name"])
- db[character]["name"] = new_name.strip()
- db[character]["description"] = new_desc.strip()
- # Set up the server socket.
- server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server.setblocking(False)
- server.bind((HOST, PORT))
- server.listen(1)
- print("Listening on %s" % ("%s:%s" % server.getsockname()))
- # Main event loop.
- users = {} # username -> connection (socket instance)
- while True:
- try:
- # Accept new connections
- while True:
- try:
- conn, addr = server.accept()
- except socket.error:
- break
- accept(conn)
- # Kick idles
- for name, conn in users.items():
- idle_for = time.time() - accounts[name]["last_seen"]
- if idle_for > TIME_KICK:
- conn.send("--- Kicked for being idle ---\n")
- users[name].close()
- del users[name]
- broadcast(name, "--- %s kicked for being idle ---" % name)
- # Read from connections
- for name, conn in users.items():
- try:
- message = conn.recv(1024)
- except socket.error:
- continue
- if not message:
- # Empty string is given on disconnect.
- del users[name]
- broadcast(name, "--- %s leaves ---" % name)
- continue
- parse_command(name, message)
- time.sleep(.05)
- except (SystemExit, KeyboardInterrupt):
- broadcast(None, "!!! Server is shutting down !!!")
- break
- except Exception as e:
- print(e)
- continue
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement