Advertisement
Guest User

Untitled

a guest
Oct 7th, 2015
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.62 KB | None | 0 0
  1. from twisted.internet import reactor
  2. from twisted.internet.defer import gatherResults
  3. from twisted.internet.threads import deferToThread
  4. from twisted.trial.unittest import TestCase
  5. from time import sleep
  6.  
  7. class Case(TestCase):
  8. def test_threads(self):
  9. return start()
  10.  
  11. def counter(x):
  12. c = 0
  13. while True:
  14. c += x
  15. print c
  16. sleep(1)
  17.  
  18. def start():
  19. print reactor.getThreadPool().started
  20. d1 = deferToThread(counter, 2)
  21. d2 = deferToThread(counter, 5)
  22. return gatherResults([d1, d2], consumeErrors=True)
  23.  
  24. if __name__ == '__main__':
  25. reactor.callWhenRunning(start)
  26. reactor.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement