Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
- val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
- val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
- val defaultParallelism = sc.defaultParallelism
- val files = listStatus(context).asScala
- val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
- val bytesPerCore = totalBytes / defaultParallelism
- val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
- super.setMaxSplitSize(maxSplitSize)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement