Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- from twisted.internet import stdio, reactor, protocol, defer
- from twisted.protocols import basic
- class RoombaConsole(basic.LineReceiver):
- delimiter = '\n' # unix terminal style newlines. remove this line
- # for use with Telnet
- def connectionMade(self):
- self.sendLine("Roomba Remote Management console. Type 'help' for help.")
- self.factory = RFactory()
- self.connector = reactor.connectTCP('localhost', 8000, self.factory)
- def lineReceived(self, line):
- # Ignore blank lines
- if not line: return
- self.issueCommand(line)
- def issueCommand(self, line):
- # Parse the command
- commandParts = line.split()
- command = commandParts[0].lower()
- args = commandParts[1:]
- # Dispatch the command to the appropriate method. Note that all you
- # need to do to implement a new command is add another do_* method.
- try:
- method = getattr(self, 'do_' + command)
- except AttributeError, e:
- self.sendLine('Error: no such command.')
- else:
- try:
- method(*args)
- except Exception, e:
- self.sendLine('Error: ' + str(e))
- self.factory.deferred.addCallback(self._checkResponse)
- def do_help(self, command=None):
- """help [command]: List commands, or show help on the given command"""
- if command:
- self.sendLine(getattr(self, 'do_' + command).__doc__)
- else:
- commands = [cmd[3:] for cmd in dir(self) if cmd.startswith('do_')]
- self.sendLine("Valid commands: " +" ".join(commands))
- def do_quit(self):
- self.sendLine('Goodbye.')
- self.transport.loseConnection()
- def do_check(self, url):
- self.connector.transport.write("Peffff\n")
- def _checkResponse(self, args):
- success, line = args
- self.sendLine(line)
- self.factory.deferred = defer.Deferred()
- class Client(basic.LineReceiver):
- def connectionMade(self):
- self.cmd_success=True
- return
- def lineReceived(self, line):
- self.factory.deferred.callback((self.cmd_success, line))
- def connectionLost(self, reason):
- reactor.stop()
- class RFactory(protocol.ClientFactory):
- protocol = Client
- def __init__(self):
- self.deferred = defer.Deferred()
- if __name__ == "__main__":
- stdio.StandardIO(RoombaConsole())
- reactor.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement