Guest User

Untitled

a guest
Jun 22nd, 2018
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.40 KB | None | 0 0
  1. @defer.inlineCallbacks
  2. def test_event_retry(self):
  3. """Testing if the factory reconnects"""
  4. d = self.failIfNotCalledBeforeTimeout(protocols.AmqpFactory,
  5. 'retry')
  6. f = protocols.AmqpFactory()
  7. c = reactor.connectTCP("localhost", 5672, f)
  8. f.protocol.transport.loseConnection()
  9. f.stopTrying()
  10. yield d
Add Comment
Please, Sign In to add comment