Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- #
- import eventlet
- eventlet.monkey_patch()
- import logging
- from oslo_service import service
- import oslo_messaging as om
- from oslo_config import cfg
- import pdb
- import traceback
- import os
- import time
- LOG = logging.getLogger()
- logging.basicConfig(level=logging.INFO,
- format='%(asctime)s %(thread)d %(message)s')
- from threading import Lock, Event, current_thread, Thread
- class TaskServer3(service.Service):
- def __init__(self):
- super(TaskServer3, self).__init__()
- self._lock = Lock()
- self._stopped = Event()
- self._server = None
- LOG.info("TaskServer Created (%s) %s", os.getpid(), self)
- def start(self):
- traceback.print_stack()
- LOG.info("TaskServer start() called (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- super(TaskServer3, self).start()
- with self._lock:
- if self._server is not None:
- LOG.error("RPC Server already started!")
- return
- self.target = om.Target(exchange="testX",
- topic="testT",
- server="testS")
- self._server = om.get_rpc_server(transport=om.get_rpc_transport(cfg.CONF),
- target=self.target,
- endpoints=[self],
- executor="eventlet")
- LOG.info("TaskServer calling RPC server start (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- self._server.start()
- LOG.info("rpc server started (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- LOG.info("TaskServer Start completed (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- def stop(self):
- LOG.info("TaskServer stop() called (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- super(TaskServer3, self).stop()
- with self._lock:
- if self._server is None:
- LOG.error("stopping non-existing server")
- return
- LOG.info("TaskServer calling RPC server stop (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- self._server.stop()
- LOG.info("TaskServer calling RPC server wait (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- self._server.wait()
- LOG.info("rpc server destroyed (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- self._server = None
- self._stopped.set()
- LOG.info("TaskServer Stopped (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- def reset(self):
- LOG.info("TaskServer reset() called (%s) %s", os.getpid(), self)
- pass
- def wait(self):
- LOG.info("TaskServer wait called (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- super(TaskServer3, self).wait()
- #traceback.print_stack()
- with self._lock:
- if self._server is None:
- LOG.error("Waiting for non-existing server")
- traceback.print_stack()
- return
- LOG.info("TaskServer calling RPC server wait (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- self._server.wait()
- LOG.info("TaskServer call to RPC server wait done (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- self._stopped.wait()
- LOG.info("TaskServer wait done (%s) [%s] %s", os.getpid(),
- current_thread(), self)
- ### RPC CALLS ###
- def test(self, ctxt):
- LOG.info("RPC test call (%s)", os.getpid())
- return True
- def main():
- cfg.CONF.transport_url="rabbit://127.0.0.1:5672"
- W = 2 # control the # of workers
- # change to False to test option 2
- if True:
- LOG.info("Original sequence (%s)", os.getpid())
- LOG.info("Creating a launcher")
- service.launch( cfg.CONF,
- TaskServer3(),
- workers=W).wait()
- LOG.info("Main thread exiting")
- else:
- LOG.info("New sequence (%s)", os.getpid())
- LOG.info("Creating a TaskServer")
- ts = TaskServer3()
- LOG.info("Creating a launcher")
- ll = service.launch(cfg.CONF,
- ts,
- workers=W)
- LOG.info("Calling 'start'")
- ts.start()
- LOG.info("Calling 'launcher.wait'")
- ll.wait()
- LOG.info("Main thread exiting")
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement