Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- 2017-07-23 16:16:17.281414: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2225
- Process Process-3:
- Traceback (most recent call last):
- File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
- self.run()
- File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run
- self._target(*self._args, **self._kwargs)
- File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 32, in cifar10
- serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
- File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in __init__
- self._server_def.SerializeToString(), status)
- File "/home/skay/anaconda2/lib/python2.7/contextlib.py", line 24, in __exit__
- self.gen.next()
- File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
- pywrap_tensorflow.TF_GetCode(status)) UnknownError: Could not start gRPC server
- > `2017-07-23 16:27:48.605617: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session fe9fd6a338e2c9a7 with config:
- 2017-07-23 16:27:48.607126: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session 3560417f98b00dea with config:
- [ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
- Process-3
- [ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
- Process-3
- [ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
- Process-3
- ERROR:tensorflow:==================================
- Object was never used (type <class 'tensorflow.python.framework.ops.Operation'>):
- <tf.Operation 'worker_0/init' type=NoOp>
- If you want to mark it as used call its "mark_used()" method.
- It was originally created here:
- ['File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 83, in <module>n proc.start()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 130, in startn self._popen = Popen(self)', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/forking.py", line 126, in __init__n code = process_obj._bootstrap()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrapn self.run()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in runn self._target(*self._args, **self._kwargs)', 'File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 49, in cifar10n init_op=tf.initialize_all_variables(),logdir='/tmp/mydir')', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 170, in wrappedn return _add_should_use_warning(fn(*args, **kwargs))', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 139, in _add_should_use_warningn wrapped = TFShouldUseWarningWrapper(x)', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 96, in __init__n stack = [s.strip() for s in traceback.format_stack()]']
- ==================================
- 2017-07-23 16:28:28.646871: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
- 2017-07-23 16:28:38.647276: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
- 2017-07-23 16:28:48.647526: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:
- # build a python mutliprocess.py
- import multiprocessing
- import time
- import tensorflow as tf
- from tensorflow.contrib.training import HParams
- import os
- import psutil
- import numpy as np
- from tensorflow.python.client import device_lib
- from resnet import *
- import Queue
- cluster_spec ={"ps": ["localhost:2226"
- ],
- "worker": [
- "localhost:2227",
- "localhost:2228"]}
- cluster = tf.train.ClusterSpec(cluster_spec)
- im_Test = np.linspace(1,10,10)
- def model_fun(input):
- print multiprocessing.current_process().name
- return input
- def cifar10(device,return_dict,result_t):
- params = HParams(cluster=cluster,
- job_name = device[0],
- task_index = device[1])
- serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
- input_img=[]
- true_lab=[]
- if params.job_name == "ps":
- ##try and wait for all the wokers t
- serv.join()
- elif params.job_name == "worker":
- with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/replica:0/task:%d" % params.task_index,
- cluster=cluster)):
- # with tf.Graph().as_default(), tf.device('/cpu:%d' % params.task_index):
- # with tf.container('%s %d' % ('batchname', params.task_index)) as scope:
- input_img = tf.placeholder(dtype=tf.float32, shape=[10,])
- with tf.name_scope('%s_%d' % (params.job_name, params.task_index)) as scope:
- hess_op = model_fun(input_img)
- global_step = tf.contrib.framework.get_or_create_global_step()
- sv = tf.train.Supervisor(is_chief=(params.task_index == 0),
- global_step=global_step,
- init_op=tf.initialize_all_variables(),logdir='/tmp/mydir')
- with sv.prepare_or_wait_for_session(serv.target) as sess:
- step = 0
- while not sv.should_stop() :
- hess = sess.run(hess_op, feed_dict={input_img:im_Test })
- print(np.array(hess))
- print multiprocessing.current_process().name
- step += 1
- if(step==3):
- return_dict[params.job_name] = params.task_index
- result_t.put(return_dict)
- break
- sv.stop()
- sess.close()
- return
- if __name__ == '__main__':
- logger = multiprocessing.log_to_stderr()
- manager = multiprocessing.Manager()
- result = manager.Queue()
- return_dict = manager.dict()
- processes = []
- devices = [['ps', 0],
- ['worker', 0],
- ['worker', 1]
- ]
- for i in (devices):
- start_time = time.time()
- proc = multiprocessing.Process(target=cifar10,args=(i,return_dict,result))
- processes.append(proc)
- proc.start()
- for p in processes:
- p.join()
- # print return_dict.values()
- kill = []
- while True:
- if result.empty() == True:
- break
- kill.append(result.get())
- print kill
- print("time taken = %d" % (start_time - time.time()))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement