Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import unittest
- import asyncio
- import os
- from unittest.mock import Mock
- from artiq.protocols.pc_rpc import AsyncioClient, Server
- from artiq.frontend.artiq_ctlmgr import ControllerManager
- from artiq.frontend.artiq_master import main as master_main
- get_argparser = unittest.mock.Mock()
- get_argparser.parse_args.return_value = {
- "repository": "/home/fallen/dev/artiq/examples/master/repository",
- "ddb": "/home/fallen/dev/artiq/examples/master/ddb.pyon",
- "pdb": "/home/fallen/dev/artiq/examples/master/pdb.pyon",
- "git": False,
- "bind": "::1",
- "port_control": 3251,
- "port_notify": 3250,
- }
- @asyncio.coroutine
- def master():
- master_main()
- class CtlMgrCase(unittest.TestCase):
- def setUp(self):
- print("TOTO")
- if os.name == "nt":
- self.loop = asyncio.ProactorEventLoop()
- else:
- self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(self.loop)
- self.ctlmgr = ControllerManager("::1", 3250, 1.0)
- self.ctlmgr.start()
- class CtlMgrRPC:
- retry_now = self.ctlmgr.retry_now
- rpc_target = CtlMgrRPC()
- self.rpc_server = Server({"ctlmgr": rpc_target}, builtin_terminate=True)
- self.loop.run_until_complete(self.rpc_server.start("::1", 3249))
- yield from master()
- def test_crash(self):
- print("TITI")
- self.loop.run_until_complete(asyncio.sleep(5))
- def tearDown(self):
- print("TATA")
- self.loop.run_until_complete(self.ctlmgr.stop())
- self.loop.run_until_complete(self.rpc_server.stop())
- self.loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement