Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * Created by dnema00 on 9/5/2017.
- */
- import java.io.File
- import org.apache.spark.ml.evaluation.RegressionEvaluator
- import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, VectorIndexer}
- import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
- import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
- import org.apache.spark.sql.Column
- import org.joda.time.DateTime
- import org.joda.time.format.DateTimeFormat
- import org.joda.time.DateTimeZone
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.sql.functions._
- import org.apache.spark.util.SizeEstimator
- import spark.implicits._
- import org.apache.spark.sql.functions._
- //Input time series
- val df = spark.read.option("header", "true").option("inferSchema", true).csv(s3Bucket + fullfilename)
- //holidays
- val holiday = spark.read.option("header", "true").option("inferSchema", true).csv(s3Bucket + "HOLIDAY.csv")
- .withColumnRenamed("DATEDIMID", "PUR_DATE").withColumnRenamed("DAY_OF_THE_WEEK", "WEEKDAY")
- val featureSet = activationsWithDay.join(holiday, df("PUR_DATE") === holidayDF("PUR_DATE"), "left_outer").drop(holidayDF("PUR_DATE"))
- .withColumn("WEEKDAY", upper($"WEEKDAY"))
- //Generate Train and test sets
- val trainset = featureSet.filter($"PUR_DATE" <= "2017-12-01" && $"PUR_DATE" > "2012-12-31")
- val testSet = featureSet.filter($"PUR_DATE" >= "2017-12-01")
- //select categorical and continous features
- val categoricalFeatures = Array("PRODUCT", "HOLIDAY", "WEEKDAY")
- val continousFeatures = Array("DAY_TRANSACTION", "MONTH_TRANSACTION", "YEAR_TRANSACTION")
- //index categorical features
- val catFeatureIndexer: Array[PipelineStage] = categoricalFeatures.map(feature => new StringIndexer()
- .setInputCol(feature)
- .setOutputCol("indexed_" + feature)
- .setHandleInvalid("skip")
- .fit(trainset)
- )
- //assemble continous features
- val continuousVectorAssembler: PipelineStage = new VectorAssembler()
- .setInputCols(continousFeatures)
- .setOutputCol("assembledContFeatures")
- //index continous features
- val contVectorIndexer: PipelineStage = new VectorIndexer()
- .setInputCol("assembledContFeatures")
- .setOutputCol("vectorizedContFeatures")
- .setMaxCategories(4)
- //combine categorical and continous feature vectors
- val featureVectors = categoricalFeatures.map(feature => "indexed_" + feature) :+ "vectorizedContFeatures"
- //assemble all features
- val featureAssembler = new VectorAssembler()
- .setInputCols(featureVectors)
- .setOutputCol("assembledFeatures")
- //select prediction column
- val targetColumn = "PURCHASES"
- val predictionColumn = "PREDICTED_PURCHASES"
- //Train a random forest regressor
- val numTrees = 20 // Use more in practice.
- val featureSubsetStrategy = "auto" // Let the algorithm choose.
- //val impurity = "entropy"
- val maxDepth = 10
- val maxBins = 200
- val impurityGain = "variance"
- val checkPointInterval = 5
- //Random forest regressor
- val randomForestRegressor = new RandomForestRegressor()
- .setLabelCol(targetColumn)
- .setFeaturesCol("assembledFeatures")
- .setNumTrees(numTrees)
- .setFeatureSubsetStrategy(featureSubsetStrategy)
- .setImpurity(impurityGain)
- .setMaxDepth(maxDepth)
- .setMaxBins(maxBins)
- .setCheckpointInterval(checkPointInterval)
- //create a ML Pipeline
- val mlPipeline = new Pipeline()
- //chain all transformers
- val transformerPipeline = catFeatureIndexer :+ continuousVectorAssembler :+ contVectorIndexer :+ featureAssembler
- //chain all estimators together
- val estimatorPipeline = Array(randomForestRegressor)
- val pipelineStages = transformerPipeline ++ estimatorPipeline
- mlPipeline.setStages(pipelineStages)
- //val trainedSet = mlPipeline.transform(trainset)
- // Train model. This also runs the indexers.
- val model = mlPipeline.fit(trainset)
- model.stages.foreach(println)
- // Make predictions.
- val predictions = model.transform(testSet)
- val evaluator = new RegressionEvaluator()
- .setLabelCol(targetColumn)
- .setPredictionCol("prediction")
- .setMetricName("rmse")
- val rmse = evaluator.evaluate(predictions)
- println("Root Mean Squared Error (RMSE) on test data = " + rmse)
- spark.stop()
- }
Add Comment
Please, Sign In to add comment