Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- num_partitions=10
- using cached dataframes for key: (8757658986945, 'file:///dbfs/horovod_spark_estimator/a86a5662-cc24-4519-97cf-e2b4c21617b0/intermediate_train_data', 'file:///dbfs/horovod_spark_estimator/a86a5662-cc24-4519-97cf-e2b4c21617b0/intermediate_val_data', None)
- train_data_path=file:///dbfs/horovod_spark_estimator/a86a5662-cc24-4519-97cf-e2b4c21617b0/intermediate_train_data.0
- train_rows=1000
- val_data_path=file:///dbfs/horovod_spark_estimator/a86a5662-cc24-4519-97cf-e2b4c21617b0/intermediate_val_data.0
- val_rows=0
- [1,0]<stdout>:Shared lib path is pointing to: <CDLL '/usr/local/lib/python3.8/dist-packages/horovod/torch/mpi_lib_v2.cpython-38-x86_64-linux-gnu.so', handle 179d3200 at 0x7face728db50>
- [1,0]<stdout>:Training parameters: Epochs: 1
- [1,0]<stdout>:Train rows: 1000, Train batch size: 32, Train_steps_per_epoch: 31
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/fs_utils.py:88: FutureWarning: pyarrow.localfs is deprecated as of 2.0.0, please use pyarrow.fs.LocalFileSystem instead.
- [1,0]<stdout>:Shuffle: True, Random seed: None
- [1,0]<stderr>: self._filesystem = pyarrow.localfs
- [1,0]<stdout>:Checkpoint file: /tmp/tmpnawcah3h/checkpoint.tf, Logs dir: /tmp/tmpnawcah3h/logs
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/etl/dataset_metadata.py:402: FutureWarning: Specifying the 'metadata_nthreads' argument is deprecated as of pyarrow 8.0.0, and the argument will be removed in a future version
- [1,0]<stdout>:
- [1,0]<stderr>: dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10)
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/etl/dataset_metadata.py:362: FutureWarning: 'ParquetDataset.common_metadata' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version.
- [1,0]<stderr>: if not dataset.common_metadata:
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/reader.py:420: FutureWarning: Specifying the 'metadata_nthreads' argument is deprecated as of pyarrow 8.0.0, and the argument will be removed in a future version
- [1,0]<stderr>: self.dataset = pq.ParquetDataset(dataset_path, filesystem=pyarrow_filesystem,
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/unischema.py:317: FutureWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Specify 'use_legacy_dataset=False' while constructing the ParquetDataset, and then use the '.fragments' attribute instead.
- [1,0]<stderr>: meta = parquet_dataset.pieces[0].get_metadata()
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/unischema.py:321: FutureWarning: 'ParquetDataset.partitions' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Specify 'use_legacy_dataset=False' while constructing the ParquetDataset, and then use the '.partitioning' attribute instead.
- [1,0]<stderr>: for partition in (parquet_dataset.partitions or []):
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/etl/dataset_metadata.py:253: FutureWarning: 'ParquetDataset.metadata' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version.
- [1,0]<stderr>: metadata = dataset.metadata
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/etl/dataset_metadata.py:254: FutureWarning: 'ParquetDataset.common_metadata' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version.
- [1,0]<stderr>: common_metadata = dataset.common_metadata
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/etl/dataset_metadata.py:350: FutureWarning: 'ParquetDataset.pieces' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Specify 'use_legacy_dataset=False' while constructing the ParquetDataset, and then use the '.fragments' attribute instead.
- [1,0]<stderr>: futures_list = [thread_pool.submit(_split_piece, piece, dataset.fs.open) for piece in dataset.pieces]
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/etl/dataset_metadata.py:350: FutureWarning: 'ParquetDataset.fs' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Specify 'use_legacy_dataset=False' while constructing the ParquetDataset, and then use the '.filesystem' attribute instead.
- [1,0]<stderr>: futures_list = [thread_pool.submit(_split_piece, piece, dataset.fs.open) for piece in dataset.pieces]
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/etl/dataset_metadata.py:334: FutureWarning: ParquetDatasetPiece is deprecated as of pyarrow 5.0.0 and will be removed in a future version.
- [1,0]<stderr>: return [pq.ParquetDatasetPiece(piece.path, open_file_func=fs_open,
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/arrow_reader_worker.py:140: FutureWarning: 'ParquetDataset.fs' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Specify 'use_legacy_dataset=False' while constructing the ParquetDataset, and then use the '.filesystem' attribute instead.
- [1,0]<stderr>: parquet_file = ParquetFile(self._dataset.fs.open(piece.path))
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/arrow_reader_worker.py:288: FutureWarning: 'ParquetDataset.partitions' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Specify 'use_legacy_dataset=False' while constructing the ParquetDataset, and then use the '.partitioning' attribute instead.
- [1,0]<stderr>: partition_names = self._dataset.partitions.partition_names if self._dataset.partitions else set()
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/arrow_reader_worker.py:291: FutureWarning: 'ParquetDataset.partitions' attribute is deprecated as of pyarrow 5.0.0 and will be removed in a future version. Specify 'use_legacy_dataset=False' while constructing the ParquetDataset, and then use the '.partitioning' attribute instead.
- [1,0]<stderr>: table = piece.read(columns=column_names - partition_names, partitions=self._dataset.partitions)
- [1,0]<stderr>:/usr/local/lib/python3.8/dist-packages/petastorm/pytorch.py:339: UserWarning: The given NumPy array is not writable, and PyTorch does not support non-writable tensors. This means writing to this tensor will result in undefined behavior. You may want to copy the array to protect its data or make it writable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at ../torch/csrc/utils/tensor_numpy.cpp:199.)
- [1,0]<stderr>: row_as_dict[k] = self.transform_fn(v)
- [1,0]<stderr>:Traceback (most recent call last):
- [1,0]<stderr>: File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
- [1,0]<stderr>: return _run_code(code, main_globals, None,[1,0]<stderr>:
- [1,0]<stderr>: File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
- [1,0]<stderr>: exec(code, run_globals)
- [1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/task/mpirun_exec_fn.py", line 52, in <module>
- [1,0]<stderr>: main(codec.loads_base64(sys.argv[1]), codec.loads_base64(sys.argv[2]))
- [1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/task/mpirun_exec_fn.py", line 45, in main
- [1,0]<stderr>: task_exec(driver_addresses, settings, 'OMPI_COMM_WORLD_RANK', 'OMPI_COMM_WORLD_LOCAL_RANK')
- [1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/task/__init__.py", line 61, in task_exec
- [1,0]<stderr>: result = fn(*args, **kwargs)
- [1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/torch/remote.py", line 432, in train
- [1,0]<stderr>: 'train': _train(epoch)
- [1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/torch/remote.py", line 373, in _train
- [1,0]<stderr>: inputs, labels, sample_weights = prepare_batch(row)
- [1,0]<stderr>: File "/usr/local/lib/python3.8/dist-packages/horovod/spark/torch/remote.py", line 306, in prepare_batch
- [1,0]<stderr>: for col, shape in zip(feature_columns, input_shapes)]
- [1,0]<stderr>:TypeError: 'NoneType' object is not iterable
- -------------------------------------------------------
- Primary job terminated normally, but 1 process returned
- a non-zero exit code.. Per user-direction, the job has been aborted.
- -------------------------------------------------------
- --------------------------------------------------------------------------
- mpirun detected that one or more processes exited with non-zero status, thus causing
- [5accf7ff04b7:71520] PMIX ERROR: BAD-PARAM in file src/dstore/pmix_esh.c at line 491
- the job to be terminated. The first process to do so was:
- Process name: [[49903,1],0]
- Exit code: 1
- --------------------------------------------------------------------------
- Exception in thread Thread-1834:
- Traceback (most recent call last):
- File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
- self.run()
- File "/usr/lib/python3.8/threading.py", line 870, in run
- self._target(*self._args, **self._kwargs)
- File "/usr/local/lib/python3.8/dist-packages/horovod/spark/runner.py", line 142, in run_spark
- result = procs.mapPartitionsWithIndex(mapper).collect()
- File "/content/spark-3.1.3-bin-hadoop2.7/python/pyspark/rdd.py", line 949, in collect
- sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
- File "/content/spark-3.1.3-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
- File "/content/spark-3.1.3-bin-hadoop2.7/python/pyspark/sql/utils.py", line 111, in deco
- return f(*a, **kw)
- File "/content/spark-3.1.3-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
- py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
- : org.apache.spark.SparkException: Job 43 cancelled part of cancelled job group horovod.spark.run.9
- at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2303)
- at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:2199)
- at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleJobGroupCancelled$4(DAGScheduler.scala:1093)
- at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
- at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
- at org.apache.spark.scheduler.DAGScheduler.handleJobGroupCancelled(DAGScheduler.scala:1092)
- at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2452)
- at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2432)
- at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2421)
- at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
- at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
- at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
- at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
- at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
- at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
- at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
- at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
- at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
- at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
- at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
- at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
- at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:498)
- at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
- at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
- at py4j.Gateway.invoke(Gateway.java:282)
- at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
- at py4j.commands.CallCommand.execute(CallCommand.java:79)
- at py4j.GatewayConnection.run(GatewayConnection.java:238)
- at java.lang.Thread.run(Thread.java:750)
- ---------------------------------------------------------------------------
- RuntimeError Traceback (most recent call last)
- <ipython-input-132-7c87619631ab> in <module>
- 1 get_ipython().run_line_magic('cd', '/content/deep-loglizer')
- ----> 2 torch_estimator.fit(class_df)
- 10 frames
- /usr/local/lib/python3.8/dist-packages/horovod/spark/common/estimator.py in fit(self, df, params)
- 33 `HorovodModel` transformer wrapping the trained model.
- 34 """
- ---> 35 return super(HorovodEstimator, self).fit(df, params)
- 36
- 37 def fit_on_parquet(self, params=None, dataset_idx=None):
- /content/spark-3.1.3-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
- 159 return self.copy(params)._fit(dataset)
- 160 else:
- --> 161 return self._fit(dataset)
- 162 else:
- 163 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
- /usr/local/lib/python3.8/dist-packages/horovod/spark/common/estimator.py in _fit(self, df)
- 78 train_rows, val_rows, metadata, avg_row_size = util.get_dataset_properties(dataset_idx)
- 79 self._check_metadata_compatibility(metadata)
- ---> 80 return self._fit_on_prepared_data(
- 81 backend, train_rows, val_rows, metadata, avg_row_size, dataset_idx)
- 82
- /usr/local/lib/python3.8/dist-packages/horovod/spark/torch/estimator.py in _fit_on_prepared_data(self, backend, train_rows, val_rows, metadata, avg_row_size, dataset_idx)
- 287
- 288 trainer = remote.RemoteTrainer(self, metadata, last_checkpoint_state, run_id, dataset_idx)
- --> 289 handle = backend.run(trainer,
- 290 args=(serialized_model, optimizer_cls, model_opt_state_serialized,
- 291 train_rows, val_rows, avg_row_size),
- /usr/local/lib/python3.8/dist-packages/horovod/spark/common/backend.py in run(self, fn, args, kwargs, env)
- 81 del full_env['CUDA_VISIBLE_DEVICES']
- 82
- ---> 83 return horovod.spark.run(fn, args=args, kwargs=kwargs,
- 84 num_proc=self._num_proc, env=full_env,
- 85 **self._kwargs)
- /usr/local/lib/python3.8/dist-packages/horovod/spark/runner.py in run(fn, args, kwargs, num_proc, start_timeout, use_mpi, use_gloo, extra_mpi_args, env, stdout, stderr, verbose, nics, prefix_output_with_timestamp, executable)
- 288
- 289 # Run the job
- --> 290 _launch_job(use_mpi, use_gloo, settings, driver, env, stdout, stderr, executable)
- 291 except:
- 292 # Terminate Spark job.
- /usr/local/lib/python3.8/dist-packages/horovod/spark/runner.py in _launch_job(use_mpi, use_gloo, settings, driver, env, stdout, stderr, executable)
- 153 nics = driver.get_common_interfaces()
- 154 executable = executable or sys.executable
- --> 155 run_controller(use_gloo, lambda: gloo_run(executable, settings, nics, driver, env, stdout, stderr),
- 156 use_mpi, lambda: mpi_run(executable, settings, nics, driver, env, stdout, stderr),
- 157 False, lambda: None,
- /usr/local/lib/python3.8/dist-packages/horovod/runner/launch.py in run_controller(use_gloo, gloo_run, use_mpi, mpi_run, use_jsrun, js_run, verbosity)
- 772 js_run()
- 773 else:
- --> 774 mpi_run()
- 775 elif gloo_built(verbose=verbose):
- 776 gloo_run()
- /usr/local/lib/python3.8/dist-packages/horovod/spark/runner.py in <lambda>()
- 154 executable = executable or sys.executable
- 155 run_controller(use_gloo, lambda: gloo_run(executable, settings, nics, driver, env, stdout, stderr),
- --> 156 use_mpi, lambda: mpi_run(executable, settings, nics, driver, env, stdout, stderr),
- 157 False, lambda: None,
- 158 settings.verbose)
- /usr/local/lib/python3.8/dist-packages/horovod/spark/mpi_run.py in mpi_run(executable, settings, nics, driver, env, stdout, stderr)
- 54 codec.dumps_base64(driver.addresses()),
- 55 codec.dumps_base64(settings))
- ---> 56 hr_mpi_run(settings, nics, env, command, stdout=stdout, stderr=stderr)
- /usr/local/lib/python3.8/dist-packages/horovod/runner/mpi_run.py in mpi_run(settings, nics, env, command, stdout, stderr)
- 250 exit_code = safe_shell_exec.execute(mpirun_command, env=env, stdout=stdout, stderr=stderr)
- 251 if exit_code != 0:
- --> 252 raise RuntimeError("mpirun failed with exit code {exit_code}".format(exit_code=exit_code))
- 253 else:
- 254 os.execve('/bin/sh', ['/bin/sh', '-c', mpirun_command], env)
Add Comment
Please, Sign In to add comment