Guest User

Untitled

a guest
Jun 24th, 2018
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.12 KB | None | 0 0
  1. #
  2. # Broker peering simulation (part 1) in Python
  3. # Prototypes the state flow
  4. #
  5. # Author : Piero Cornice
  6. # Contact: root(at)pieroland(dot)net
  7. #
  8.  
  9. import zmq
  10. import time
  11. import random
  12.  
  13. def main(args):
  14.  
  15. myself = args[1]
  16. print "Hello, I am", myself
  17.  
  18. context = zmq.Context()
  19.  
  20. # State Back-End
  21. statebe = context.socket(zmq.PUB)
  22.  
  23. # State Front-End
  24. statefe = context.socket(zmq.SUB)
  25. statefe.setsockopt(zmq.SUBSCRIBE, '')
  26.  
  27. bind_address = "ipc://" + myself + "-state.ipc"
  28. statebe.bind(bind_address)
  29.  
  30. for i in range(len(args) - 2):
  31. endpoint = "ipc://" + args[i + 2] + "-state.ipc"
  32. statefe.connect(endpoint)
  33. time.sleep(1.0)
  34.  
  35. poller = zmq.Poller()
  36. poller.register(statefe, zmq.POLLIN)
  37.  
  38. while True:
  39.  
  40. ########## Solution with poll() ##########
  41. socks = dict(poller.poll(1000))
  42.  
  43. try:
  44. # Handle incoming status message
  45. if socks[statefe] == zmq.POLLIN:
  46. msg = statefe.recv_multipart()
  47. print 'Received:', msg
  48.  
  49. except KeyError:
  50. # Send our address and a random value
  51. # for worker availability
  52. msg = []
  53. msg.append(bind_address)
  54. msg.append(str(random.randrange(1, 10)))
  55. statebe.send_multipart(msg)
  56. ##################################
  57.  
  58. ######### Solution with select() #########
  59. # (pollin, pollout, pollerr) = zmq.select([statefe], [], [], 1)
  60. #
  61. # if len(pollin) > 0 and pollin[0] == statefe:
  62. # # Handle incoming status message
  63. # msg = statefe.recv_multipart()
  64. # print 'Received:', msg
  65. #
  66. # else:
  67. # # Send our address and a random value
  68. # # for worker availability
  69. # msg = []
  70. # msg.append(bind_address)
  71. # msg.append(str(random.randrange(1, 10)))
  72. # statebe.send_multipart(msg)
  73. ##################################
  74.  
  75. poller.unregister(statefe)
  76. time.sleep(1.0)
  77.  
  78.  
  79. if __name__ == '__main__':
  80. import sys
  81.  
  82. if len(sys.argv) < 2:
  83. print "Usage: peering.py <myself> <peer_1> ... <peer_N>"
  84. raise SystemExit
  85.  
  86. main(sys.argv)
Add Comment
Please, Sign In to add comment