Advertisement
Guest User

Untitled

a guest
Apr 30th, 2017
537
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.45 KB | None | 0 0
  1. # -*- coding: UTF-8 -*-
  2. from __future__ import absolute_import
  3. from __future__ import division
  4. from __future__ import print_function
  5. from __future__ import unicode_literals
  6.  
  7. __author__ = "d01"
  8. __email__ = "jungflor@gmail.com"
  9. __copyright__ = "Copyright (C) 2017, Florian JUNG"
  10. __license__ = "MIT"
  11. __version__ = "0.1.0"
  12. __date__ = "2017-05-01"
  13. # Created: 2017-04-27 23:27
  14.  
  15. import threading
  16.  
  17. from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
  18. from flotils import get_logger
  19. from flotils.runable import SignalStopWrapper
  20. from alexander_fw import ReactorModule
  21. from alexander_fw.dto import InputMessage
  22.  
  23.  
  24. logger = get_logger()
  25.  
  26.  
  27. class SimpleEcho(WebSocket):
  28.  
  29. def handleMessage(self):
  30. if instance.socket is None:
  31. instance.socket = self
  32. instance.on_msg(self.data)
  33.  
  34. def handleConnected(self):
  35. # logger.debug(self.address, "connected")
  36. print(self.address, "connected")
  37.  
  38. def handleClose(self):
  39. # logger.debug(self.address, "closed")
  40. print(self.address, "closed")
  41.  
  42.  
  43. class WebCommunicator(ReactorModule):
  44.  
  45. def __init__(self, settings=None):
  46. if settings is None:
  47. settings = {}
  48. super(WebCommunicator, self).__init__(settings)
  49. web_sett = settings.get('web', {})
  50. self._host = web_sett.get("host", "")
  51. self._port = web_sett.get("port", 8080)
  52. self.socket = None
  53. """ :type : SimpleEcho """
  54.  
  55. def _thread_wrapper(self, func, args=None):
  56. """
  57. Wrap function for exception handling with threaded calls
  58.  
  59. :param func: Function to call
  60. :type func: callable
  61. :rtype: None
  62. """
  63. if args is None:
  64. args = ()
  65. try:
  66. func(*args)
  67. except:
  68. self.exception("Threaded execution failed")
  69.  
  70. def _setup(self):
  71. # Setup listeners
  72. self._register(
  73. "communicator_web", "say", "WebCommunicator"
  74. )
  75.  
  76. def communicate(self, msg):
  77. msg.source = "communicator_web"
  78. self.emit(msg, self.get_exchange("manager_intent"))
  79.  
  80. def react_nameko(self, exchange, routing_key, msg):
  81. pass
  82.  
  83. def react(self, exchange, routing_key, msg):
  84. # self.debug("{}-{}: {}".format(exchange, routing_key, msg))
  85.  
  86. if exchange == "communicator_web":
  87. if routing_key == "say":
  88. try:
  89. self._do_say(msg)
  90. except:
  91. self.exception("Failed to say")
  92.  
  93. def on_msg(self, message):
  94. """
  95. Receive message
  96.  
  97. :param message: Received message
  98. :type message: unicode
  99. :rtype: None
  100. """
  101. from pprint import pformat
  102. self.debug(pformat(message))
  103. im = InputMessage()
  104. im.data = message
  105. im.metadata = None
  106. self.communicate(im)
  107.  
  108. def _do_say(self, msg):
  109. self.socket.sendMessage("{}".format(msg.result))
  110.  
  111. def start(self, blocking=False):
  112. self._setup()
  113. try:
  114. self.socket = None
  115. server = SimpleWebSocketServer(self._host, self._port, SimpleEcho)
  116. a_thread = threading.Thread(
  117. target=self._thread_wrapper,
  118. args=(server.serveforever, (), )
  119. )
  120. a_thread.daemon = True
  121. a_thread.start()
  122. except:
  123. self.exception("Failed to run receive loop")
  124. self.stop()
  125. return
  126. super(WebCommunicator, self).start(blocking)
  127.  
  128. def stop(self):
  129. super(WebCommunicator, self).stop()
  130.  
  131.  
  132. class Wrapper(WebCommunicator, SignalStopWrapper):
  133.  
  134. def __init__(self, settings=None):
  135. if settings is None:
  136. settings = {}
  137. super(Wrapper, self).__init__(settings)
  138.  
  139.  
  140. if __name__ == "__main__":
  141. import argparse
  142. import logging.config
  143. from flotils.logable import default_logging_config
  144. from alexander_fw import setup_kombu
  145.  
  146. parser = argparse.ArgumentParser()
  147. parser.add_argument(
  148. "--debug", action="store_true",
  149. help="Use debug level output"
  150. )
  151. parser.add_argument(
  152. "--config", nargs="?", default="settings.yaml",
  153. help="Config file"
  154. )
  155. logging.config.dictConfig(default_logging_config)
  156. args = parser.parse_args()
  157.  
  158. if args.debug:
  159. logging.getLogger().setLevel(logging.DEBUG)
  160.  
  161. setup_kombu()
  162.  
  163. instance = Wrapper({
  164. 'settings_file': args.config
  165. })
  166.  
  167. try:
  168. instance.start(True)
  169. except KeyboardInterrupt:
  170. pass
  171. finally:
  172. instance.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement