Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ```import pyspark.sql.functions as fx
- from pyspark.sql.types import *
- # read in BGEN
- bgen_path = "/mnt/data/home/jeremy/data/ukbb/imputed/ukb_imp_chr22_v3.bgen"
- bgen_sample_path = '/mnt/data/home/jeremy/data/ukbb/imputed/ukb49662_imp_chr1_v3_s487317.sample'
- df = spark.read.format("com.databricks.bgen").options(sampleIdColumn="ID_2", sampleFilePath=bgen_sample_path).load(bgen_path)
- display(df.withColumn("firstGenotype", fx.expr("genotypes[0]")).drop("genotypes").limit(10))
- ```
- # --------- NEWCELL
- ```
- def add_position_bin(df, bin_width=10000000):
- """
- add a bin for each variant, which will be used for partitioning the Delta-lake
- The start position
- :param df: dataframe
- :param bin_width: int, bin_width, default = 10,000,000 (10Mb)
- """
- df = df.withColumn("bin_start", fx.col("start") - (fx.col("start") % bin_width)). \
- withColumn("bin_end", fx.col("start") - (fx.col("start") % bin_width) + bin_width). \
- withColumn("bin", fx.concat(fx.col("bin_start").cast(StringType()),
- fx.lit("-"),
- fx.col("bin_end").cast(StringType()))). \
- drop("bin_start", "bin_end")
- return df
- df = add_position_bin(df)
- ```
- # --------- NEWCELL
- df.write.format("delta").partitionBy("contigName", "bin").save("/mnt/data/home/jeremy/data/ukbb/delta/")
- # this is where it explodes and results in
- |
- V
- ```
- ---------------------------------------------------------------------------
- Py4JJavaError Traceback (most recent call last)
- <command-4103967183024832> in <module>()
- ----> 1 df.write.format("delta").partitionBy("contigName", "bin").save("/mnt/data/home/jeremy/data/ukbb/delta/")
- /databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
- 736 self._jwrite.save()
- 737 else:
- --> 738 self._jwrite.save(path)
- 739
- 740 @since(1.4)
- /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
- 1255 answer = self.gateway_client.send_command(command)
- 1256 return_value = get_return_value(
- -> 1257 answer, self.gateway_client, self.target_id, self.name)
- 1258
- 1259 for temp_arg in temp_args:
- /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
- 61 def deco(*a, **kw):
- 62 try:
- ---> 63 return f(*a, **kw)
- 64 except py4j.protocol.Py4JJavaError as e:
- 65 s = e.java_exception.toString()
- /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
- 326 raise Py4JJavaError(
- 327 "An error occurred while calling {0}{1}{2}.\n".
- --> 328 format(target_id, ".", name), value)
- 329 else:
- 330 raise Py4JError(
- Py4JJavaError: An error occurred while calling o350.save.
- : org.apache.spark.SparkException: Job aborted.
- at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
- at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge$$anonfun$writeFiles$1$$anonfun$apply$1.apply(TransactionalWriteEdge.scala:147)
- at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge$$anonfun$writeFiles$1$$anonfun$apply$1.apply(TransactionalWriteEdge.scala:130)
- at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
- at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
- at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
- at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
- at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge$$anonfun$writeFiles$1.apply(TransactionalWriteEdge.scala:130)
- at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge$$anonfun$writeFiles$1.apply(TransactionalWriteEdge.scala:87)
- at com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:369)
- at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
- at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
- at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
- at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
- at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:271)
- at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
- at com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:350)
- at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
- at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
- at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:94)
- at com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:66)
- at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:63)
- at com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:297)
- at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:79)
- at com.databricks.sql.transaction.tahoe.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:108)
- at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:79)
- at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge$class.writeFiles(TransactionalWriteEdge.scala:87)
- at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:79)
- at com.databricks.sql.transaction.tahoe.files.TransactionalWrite$class.writeFiles(TransactionalWrite.scala:96)
- at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:79)
- at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.write(WriteIntoDelta.scala:110)
- at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta$$anonfun$run$1$$anonfun$apply$1.apply(WriteIntoDelta.scala:71)
- at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta$$anonfun$run$1$$anonfun$apply$1.apply(WriteIntoDelta.scala:70)
- at com.databricks.sql.transaction.tahoe.DeltaLog.withNewTransaction(DeltaLog.scala:382)
- at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:70)
- at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:69)
- at com.databricks.sql.acl.CheckPermissions$.trusted(CheckPermissions.scala:781)
- at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.run(WriteIntoDelta.scala:69)
- at com.databricks.sql.transaction.tahoe.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:143)
- at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
- at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72)
- at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70)
- at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88)
- at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
- at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
- at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
- at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
- at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
- at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
- at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114)
- at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:114)
- at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
- at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
- at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
- at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
- at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
- at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
- at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
- at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306)
- at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292)
- at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:498)
- at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
- at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
- at py4j.Gateway.invoke(Gateway.java:295)
- at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
- at py4j.commands.CallCommand.execute(CallCommand.java:79)
- at py4j.GatewayConnection.run(GatewayConnection.java:251)
- at java.lang.Thread.run(Thread.java:748)
- Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 84 in stage 178.0 failed 4 times, most recent failure: Lost task 84.3 in stage 178.0 (TID 21018, 10.26.246.237, executor 62): ExecutorLostFailure (executor 62 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
- Driver stacktrace:
- at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
- at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
- at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
- at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
- at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
- at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
- at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
- at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
- at scala.Option.foreach(Option.scala:257)
- at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096)
- at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
- at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
- at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
- at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
- at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
- at org.apache.spark.SparkContext.runJob(SparkContext.scala:2243)
- at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
- ... 71 more
- ```
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement