Guest User

Untitled

a guest
Apr 24th, 2018
63
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.12 KB | None | 0 0
  1. /**
  2. * Created by dnema00 on 9/5/2017.
  3. */
  4.  
  5. import java.io.File
  6.  
  7. import org.apache.spark.ml.evaluation.RegressionEvaluator
  8. import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, VectorIndexer}
  9. import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
  10. import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
  11. import org.apache.spark.sql.Column
  12. import org.joda.time.DateTime
  13. import org.joda.time.format.DateTimeFormat
  14. import org.joda.time.DateTimeZone
  15. import org.apache.spark.sql.SparkSession
  16. import org.apache.spark.sql.functions._
  17. import org.apache.spark.util.SizeEstimator
  18. import spark.implicits._
  19. import org.apache.spark.sql.functions._
  20.  
  21. //Input time series
  22. val df = spark.read.option("header", "true").option("inferSchema", true).csv(s3Bucket + fullfilename)
  23.  
  24. //holidays
  25. val holiday = spark.read.option("header", "true").option("inferSchema", true).csv(s3Bucket + "HOLIDAY.csv")
  26. .withColumnRenamed("DATEDIMID", "PUR_DATE").withColumnRenamed("DAY_OF_THE_WEEK", "WEEKDAY")
  27.  
  28. val featureSet = activationsWithDay.join(holiday, df("PUR_DATE") === holidayDF("PUR_DATE"), "left_outer").drop(holidayDF("PUR_DATE"))
  29. .withColumn("WEEKDAY", upper($"WEEKDAY"))
  30.  
  31. //Generate Train and test sets
  32. val trainset = featureSet.filter($"PUR_DATE" <= "2017-12-01" && $"PUR_DATE" > "2012-12-31")
  33. val testSet = featureSet.filter($"PUR_DATE" >= "2017-12-01")
  34.  
  35.  
  36. //select categorical and continous features
  37. val categoricalFeatures = Array("PRODUCT", "HOLIDAY", "WEEKDAY")
  38. val continousFeatures = Array("DAY_TRANSACTION", "MONTH_TRANSACTION", "YEAR_TRANSACTION")
  39.  
  40.  
  41. //index categorical features
  42. val catFeatureIndexer: Array[PipelineStage] = categoricalFeatures.map(feature => new StringIndexer()
  43. .setInputCol(feature)
  44. .setOutputCol("indexed_" + feature)
  45. .setHandleInvalid("skip")
  46. .fit(trainset)
  47. )
  48.  
  49. //assemble continous features
  50. val continuousVectorAssembler: PipelineStage = new VectorAssembler()
  51. .setInputCols(continousFeatures)
  52. .setOutputCol("assembledContFeatures")
  53.  
  54. //index continous features
  55. val contVectorIndexer: PipelineStage = new VectorIndexer()
  56. .setInputCol("assembledContFeatures")
  57. .setOutputCol("vectorizedContFeatures")
  58. .setMaxCategories(4)
  59.  
  60.  
  61. //combine categorical and continous feature vectors
  62. val featureVectors = categoricalFeatures.map(feature => "indexed_" + feature) :+ "vectorizedContFeatures"
  63.  
  64. //assemble all features
  65. val featureAssembler = new VectorAssembler()
  66. .setInputCols(featureVectors)
  67. .setOutputCol("assembledFeatures")
  68.  
  69. //select prediction column
  70. val targetColumn = "PURCHASES"
  71. val predictionColumn = "PREDICTED_PURCHASES"
  72.  
  73. //Train a random forest regressor
  74. val numTrees = 20 // Use more in practice.
  75. val featureSubsetStrategy = "auto" // Let the algorithm choose.
  76. //val impurity = "entropy"
  77. val maxDepth = 10
  78. val maxBins = 200
  79. val impurityGain = "variance"
  80. val checkPointInterval = 5
  81.  
  82.  
  83. //Random forest regressor
  84. val randomForestRegressor = new RandomForestRegressor()
  85. .setLabelCol(targetColumn)
  86. .setFeaturesCol("assembledFeatures")
  87. .setNumTrees(numTrees)
  88. .setFeatureSubsetStrategy(featureSubsetStrategy)
  89. .setImpurity(impurityGain)
  90. .setMaxDepth(maxDepth)
  91. .setMaxBins(maxBins)
  92. .setCheckpointInterval(checkPointInterval)
  93.  
  94. //create a ML Pipeline
  95. val mlPipeline = new Pipeline()
  96. //chain all transformers
  97. val transformerPipeline = catFeatureIndexer :+ continuousVectorAssembler :+ contVectorIndexer :+ featureAssembler
  98. //chain all estimators together
  99. val estimatorPipeline = Array(randomForestRegressor)
  100. val pipelineStages = transformerPipeline ++ estimatorPipeline
  101. mlPipeline.setStages(pipelineStages)
  102.  
  103. //val trainedSet = mlPipeline.transform(trainset)
  104. // Train model. This also runs the indexers.
  105. val model = mlPipeline.fit(trainset)
  106.  
  107. model.stages.foreach(println)
  108.  
  109.  
  110. // Make predictions.
  111. val predictions = model.transform(testSet)
  112.  
  113. val evaluator = new RegressionEvaluator()
  114. .setLabelCol(targetColumn)
  115. .setPredictionCol("prediction")
  116. .setMetricName("rmse")
  117. val rmse = evaluator.evaluate(predictions)
  118. println("Root Mean Squared Error (RMSE) on test data = " + rmse)
  119.  
  120. spark.stop()
  121. }
Add Comment
Please, Sign In to add comment