Advertisement
Guest User

Untitled

a guest
Mar 29th, 2017
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.88 KB | None | 0 0
  1. package nyse
  2.  
  3. import com.typesafe.config.ConfigFactory
  4. import org.apache.hadoop.fs.{FileSystem, Path}
  5. import org.apache.spark.{SparkConf, SparkContext}
  6.  
  7. /**
  8. * Created by itversity on 28/03/17.
  9. */
  10. object TopNStocksByVolumeWithName {
  11. def main(args: Array[String]) {
  12. val appConf = ConfigFactory.load()
  13. val conf = new SparkConf().
  14. setAppName("Top n stocks by volume").
  15. setMaster(appConf.getConfig(args(3)).getString("executionMode"))
  16. val sc = new SparkContext(conf)
  17.  
  18. val inputPath = args(0)
  19. val stockSymbolsPath = args(1)
  20. val outputPath = args(2)
  21.  
  22. // We need to use HDFS FileSystem API to perform validations on input and output path
  23. val fs = FileSystem.get(sc.hadoopConfiguration)
  24. val inputPathExists = fs.exists(new Path(inputPath))
  25. val outputPathExists = fs.exists(new Path(outputPath))
  26.  
  27. if (outputPathExists)
  28. fs.delete(new Path(outputPath), true)
  29.  
  30. // coalesce is used to reduce number of tasks to process data spread across
  31. // many small files
  32. val data = sc.textFile(inputPath).
  33. coalesce(4)
  34.  
  35. val stockSymbols = sc.textFile(stockSymbolsPath).
  36. map(rec => (rec.split("\t")(0), rec.split("\t")(1))).
  37. collectAsMap()
  38.  
  39. val bv = sc.broadcast(stockSymbols)
  40.  
  41. val totalRecords = sc.accumulator(0, "Total number of records")
  42. val noTradedRecords = sc.accumulator(0, "Number of records that are not traded")
  43. val noOfTopNRecords = sc.accumulator(0, "Number of records fall under top n records")
  44.  
  45. data.
  46. // Get date in YYYYMM format and stock ticker as key and volume as value
  47. map(rec => {
  48. totalRecords += 1
  49. val a = rec.split(",")
  50. if(a(6).toInt == 0)
  51. noTradedRecords += 1
  52. ((a(1).substring(0, 6).toInt, a(0)), a(6).toInt)
  53. }).
  54. // Aggregate and get volume for each stock for each month
  55. reduceByKey(_ + _).
  56. // Move stock ticker to value, now key is trade month
  57. map(rec => (rec._1._1, (rec._2, rec._1._2))).
  58. // Group by trade month
  59. // Output will be (trademonth, List((stockticker, volume)))
  60. groupByKey().
  61. // Process the list to compute topN stocks by volume for each key
  62. // This simulate dense rank functionality
  63. flatMap(rec => {
  64. // get topN volumes
  65. val topNVolumes = rec._2.
  66. toList.
  67. map(_._1).
  68. sortBy(-_).
  69. distinct.
  70. take(args(4).toInt)
  71.  
  72. // Check whether the volume of stock falls in topNVolumes
  73. rec._2.
  74. toList.
  75. sortBy(r => -r._1).
  76. filter(r => topNVolumes.contains(r._1)).
  77. map(r => (rec._1, r))
  78. }).
  79. // sort the data by trade month
  80. sortByKey().
  81. // format data to be tab delimited
  82. map(rec => {
  83. noOfTopNRecords += 1
  84. val s = if (bv.value.contains(rec._2._2)) bv.value.get(rec._2._2).get else rec._2._2
  85. rec._1 + "\t" + s + "\t" + rec._2._1
  86. }).
  87. saveAsTextFile(outputPath)
  88. }
  89. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement