Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package nyse
- import com.typesafe.config.ConfigFactory
- import org.apache.hadoop.fs.{FileSystem, Path}
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * Created by itversity on 28/03/17.
- */
- object TopNStocksByVolumeWithName {
- def main(args: Array[String]) {
- val appConf = ConfigFactory.load()
- val conf = new SparkConf().
- setAppName("Top n stocks by volume").
- setMaster(appConf.getConfig(args(3)).getString("executionMode"))
- val sc = new SparkContext(conf)
- val inputPath = args(0)
- val stockSymbolsPath = args(1)
- val outputPath = args(2)
- // We need to use HDFS FileSystem API to perform validations on input and output path
- val fs = FileSystem.get(sc.hadoopConfiguration)
- val inputPathExists = fs.exists(new Path(inputPath))
- val outputPathExists = fs.exists(new Path(outputPath))
- if (outputPathExists)
- fs.delete(new Path(outputPath), true)
- // coalesce is used to reduce number of tasks to process data spread across
- // many small files
- val data = sc.textFile(inputPath).
- coalesce(4)
- val stockSymbols = sc.textFile(stockSymbolsPath).
- map(rec => (rec.split("\t")(0), rec.split("\t")(1))).
- collectAsMap()
- val bv = sc.broadcast(stockSymbols)
- val totalRecords = sc.accumulator(0, "Total number of records")
- val noTradedRecords = sc.accumulator(0, "Number of records that are not traded")
- val noOfTopNRecords = sc.accumulator(0, "Number of records fall under top n records")
- data.
- // Get date in YYYYMM format and stock ticker as key and volume as value
- map(rec => {
- totalRecords += 1
- val a = rec.split(",")
- if(a(6).toInt == 0)
- noTradedRecords += 1
- ((a(1).substring(0, 6).toInt, a(0)), a(6).toInt)
- }).
- // Aggregate and get volume for each stock for each month
- reduceByKey(_ + _).
- // Move stock ticker to value, now key is trade month
- map(rec => (rec._1._1, (rec._2, rec._1._2))).
- // Group by trade month
- // Output will be (trademonth, List((stockticker, volume)))
- groupByKey().
- // Process the list to compute topN stocks by volume for each key
- // This simulate dense rank functionality
- flatMap(rec => {
- // get topN volumes
- val topNVolumes = rec._2.
- toList.
- map(_._1).
- sortBy(-_).
- distinct.
- take(args(4).toInt)
- // Check whether the volume of stock falls in topNVolumes
- rec._2.
- toList.
- sortBy(r => -r._1).
- filter(r => topNVolumes.contains(r._1)).
- map(r => (rec._1, r))
- }).
- // sort the data by trade month
- sortByKey().
- // format data to be tab delimited
- map(rec => {
- noOfTopNRecords += 1
- val s = if (bv.value.contains(rec._2._2)) bv.value.get(rec._2._2).get else rec._2._2
- rec._1 + "\t" + s + "\t" + rec._2._1
- }).
- saveAsTextFile(outputPath)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement