import md5 import socket import errno def digest(seed, pw): if not pw: return None m = md5.new() m.update(seed) m.update(pw) return m.hexdigest() class RconClient: def __init__(self): self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM) self.trace = 0 self.challenge = None def connect (self, host, port): self.sock.connect ((host, port)) response = self.getWelcomeResponse() # Dig out the seed prefix = '### Digest seed: ' challpos = response.find(prefix) if challpos != -1: challpos += len(prefix) challend = response.find('\n', challpos) self.challenge = response[challpos:challend] if self.trace: print "S: challenge: ", self.challenge if self.trace: print "S:", response return response def invoke (self, cmd): if self.trace: print "C:", cmd self.sock.send ('\x02' + cmd + '\n') # This is a non-interactive command. response = self.getResponse() if self.trace: print "S:", response return response # Read data until two newlines are encountered def getWelcomeResponse (self): result = "" while 1: data = self.sock.recv(1024) result += data lflfpos = result.find('\n\n') if lflfpos != -1: break return result def getResponse (self): result = "" done = False while not done: data = self.sock.recv(1024) for c in data: if c == '\x04': done = True break result += c return result if __name__ == '__main__': from getopt import getopt import sys host = 'localhost' port = 4711 opts, args = getopt(sys.argv[1:], 'h:p:') for k, v in opts: if k == '-h': host = v elif k == '-p': port = int(v) print 'Connecting to %s, port %d..' % (host, port) try: conn = RconClient() conn.connect(host, port) while 1: pw = raw_input('Password: ') d = digest(conn.challenge, pw) result = conn.invoke('login %s' % d) if result.startswith('Authentication success'): break print 'Invalid password, please try again' while 1: cmd = raw_input('rcon> ') if cmd == 'login': print 'Already authenticated' continue if cmd == 'quit': break result = conn.invoke(cmd) print result except socket.error, detail: print 'Network error:', detail[1] except EOFError: pass