Guest User

Untitled

a guest
Jun 20th, 2018
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.51 KB | None | 0 0
  1.  
  2. @defer.inlineCallbacks
  3. def test_GetPublicTimeline(self):
  4. protocols.AmqpActorFactory.protocol = TwitterConsumer
  5. f = protocols.AmqpFactory()
  6. c = reactor.connectTCP("localhost",5672,f)
  7. af = f.channels[0].actors[0]
  8. ap = af.buildProtocol()
  9. apid = ap.getId()
  10. yield ap.actorReady
  11. def func(obj, public_timeline):
  12. return len(public_timeline) == 20
  13. d = self.assertEqualAtLeastOneResponseBeforeTimeout(TwitterConsumer,'callBack',func)
  14. ap.send(apid+'.GetPublicTimeline',{'callBack':apid})
  15. yield d
Add Comment
Please, Sign In to add comment