Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @defer.inlineCallbacks
- def test_GetPublicTimeline(self):
- protocols.AmqpActorFactory.protocol = TwitterConsumer
- f = protocols.AmqpFactory()
- c = reactor.connectTCP("localhost",5672,f)
- af = f.channels[0].actors[0]
- ap = af.buildProtocol()
- apid = ap.getId()
- yield ap.actorReady
- def func(obj, public_timeline):
- return len(public_timeline) == 20
- d = self.assertEqualAtLeastOneResponseBeforeTimeout(TwitterConsumer,'callBack',func)
- ap.send(apid+'.GetPublicTimeline',{'callBack':apid})
- yield d
Add Comment
Please, Sign In to add comment