Advertisement
Guest User

Untitled

a guest
Apr 26th, 2017
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.27 KB | None | 0 0
  1. """Benchmark tensorflow distributed by adding vector of ones on worker2
  2. to variable on worker1 as fast as possible.
  3.  
  4. On 2014 macbook, TensorFlow 0.10 this shows
  5.  
  6. Local rate: 2175.28 MB per second
  7. Distributed rate: 107.13 MB per second
  8.  
  9. """
  10.  
  11. import subprocess
  12. import tensorflow as tf
  13. import time
  14. import sys
  15.  
  16. flags = tf.flags
  17. flags.DEFINE_integer("iters", 10, "Maximum number of additions")
  18. flags.DEFINE_integer("data_mb", 100, "size of vector in MBs")
  19. flags.DEFINE_string("port1", "12222", "port of worker1")
  20. flags.DEFINE_string("port2", "12223", "port of worker2")
  21. flags.DEFINE_string("task", "", "internal use")
  22. FLAGS = flags.FLAGS
  23.  
  24. # setup local cluster from flags
  25. host = "127.0.0.1:"
  26. cluster = {"worker": [host+FLAGS.port1, host+FLAGS.port2]}
  27. clusterspec = tf.train.ClusterSpec(cluster).as_cluster_def()
  28.  
  29. def default_config():
  30. optimizer_options = tf.OptimizerOptions(opt_level=tf.OptimizerOptions.L0)
  31. config = tf.ConfigProto(
  32. graph_options=tf.GraphOptions(optimizer_options=optimizer_options))
  33. config.log_device_placement = False
  34. config.allow_soft_placement = False
  35. return config
  36.  
  37. def create_graph(device1, device2):
  38. """Create graph that keeps variable on device1 and
  39. vector of ones/addition op on device2"""
  40.  
  41. tf.reset_default_graph()
  42. dtype=tf.int32
  43. params_size = 250*1000*FLAGS.data_mb # 1MB is 250k integers
  44.  
  45. with tf.device(device1):
  46. params = tf.get_variable("params", [params_size], dtype,
  47. initializer=tf.zeros_initializer)
  48. with tf.device(device2):
  49. # constant node gets placed on device1 because of simple_placer
  50. # update = tf.constant(1, shape=[params_size], dtype=dtype)
  51. update = tf.get_variable("update", [params_size], dtype,
  52. initializer=tf.ones_initializer)
  53. add_op = params.assign_add(update)
  54.  
  55. init_op = tf.initialize_all_variables()
  56. return init_op, add_op
  57.  
  58. def run_benchmark(sess, init_op, add_op):
  59. """Returns MB/s rate of addition."""
  60.  
  61. sess.run(init_op)
  62. sess.run(add_op.op) # warm-up
  63. start_time = time.time()
  64. for i in range(FLAGS.iters):
  65. # change to add_op.op to make faster
  66. sess.run(add_op)
  67. elapsed_time = time.time() - start_time
  68. return float(FLAGS.iters)*FLAGS.data_mb/elapsed_time
  69.  
  70.  
  71. def run_benchmark_local():
  72. ops = create_graph(None, None)
  73. sess = tf.Session(config=default_config())
  74. return run_benchmark(sess, *ops)
  75.  
  76.  
  77. def run_benchmark_distributed():
  78. ops = create_graph("/job:worker/task:0", "/job:worker/task:1")
  79.  
  80. # launch distributed service
  81. def runcmd(cmd): subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT)
  82. runcmd("python %s --task=0"%(sys.argv[0]))
  83. runcmd("python %s --task=1"%(sys.argv[0]))
  84. time.sleep(1)
  85.  
  86. sess = tf.Session("grpc://"+host+FLAGS.port1, config=default_config())
  87. return run_benchmark(sess, *ops)
  88.  
  89. if __name__=='__main__':
  90. if not FLAGS.task:
  91.  
  92. rate1 = run_benchmark_local()
  93. rate2 = run_benchmark_distributed()
  94.  
  95. print("Adding data in %d MB chunks" %(FLAGS.data_mb))
  96. print("Local rate: %.2f MB per second" %(rate1,))
  97. print("Distributed rate: %.2f MB per second" %(rate2,))
  98.  
  99. else: # Launch TensorFlow server
  100. server = tf.train.Server(clusterspec, config=default_config(),
  101. job_name="worker",
  102. task_index=int(FLAGS.task))
  103. server.join()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement