Advertisement
Guest User

Untitled

a guest
Jun 27th, 2019
95
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.82 KB | None | 0 0
  1. object RunAppPooling {
  2.  
  3. def main(args: Array[String]): Unit = { // start the spark session
  4. val conf = new SparkConf().setMaster("local[2]").set("deploy-mode", "client").set("spark.driver.bindAddress", "127.0.0.1")
  5. .set("spark.broadcast.compress", "false")
  6. .setAppName("local-spark")
  7.  
  8. val spark = SparkSession
  9. .builder()
  10. .config(conf)
  11. .getOrCreate()
  12.  
  13.  
  14. val filePath = "src/main/resources/train.csv"
  15. val modelPath = "file:///home/vagrant/custom.model"
  16.  
  17. val schema = StructType(
  18. Array(
  19. StructField("IDLE_COUNT", IntegerType),
  20. StructField("TIMEOUTS", IntegerType),
  21. StructField("ACTIVE_COUNT", IntegerType),
  22. StructField("FACTOR_LOAD", DoubleType)))
  23. while(true){
  24. // read the raw data
  25. val df_raw = spark
  26. .read
  27. .option("header", "true")
  28. .schema(schema)
  29. .csv(filePath)
  30.  
  31. df_raw.show()
  32. println(df_raw.count())
  33. // fill all na values with 0
  34. val df = df_raw.na.fill(0)
  35. df.printSchema()
  36.  
  37. // create the feature vector
  38. val vectorAssembler = new VectorAssembler()
  39. .setInputCols(Array("IDLE_COUNT", "TIMEOUTS", "ACTIVE_COUNT" ))
  40. .setOutputCol("features_intermediate")
  41.  
  42. var lr1: PipelineModel = null
  43. try {
  44. lr1 = PipelineModel.load(modelPath)
  45. } catch {
  46. case ie: InvalidInputException => println(ie.getMessage)
  47. }
  48.  
  49. import org.apache.spark.ml.feature.StandardScaler
  50. val scaler = new StandardScaler().setWithMean(true).setWithStd(true).setInputCol("features_intermediate").setOutputCol("features")
  51.  
  52. var pipeline: Pipeline = null
  53. if (lr1 == null) {
  54. val lr =
  55. new LinearRegression()
  56. .setMaxIter(100)
  57. .setRegParam(0.1)
  58. .setElasticNetParam(0.8)
  59. .setLabelCol("FACTOR_LOAD") // setting label column
  60. // create the pipeline with the steps
  61. pipeline = new Pipeline().setStages(Array( vectorAssembler, scaler, lr))
  62. } else {
  63. pipeline = new Pipeline().setStages(Array(vectorAssembler, scaler, lr1))
  64. }
  65.  
  66. // create the model following the pipeline steps
  67. val cvModel = pipeline.fit(df)
  68.  
  69. // save the model
  70. cvModel.write.overwrite.save(modelPath)
  71.  
  72. var testschema = StructType(
  73. Array(
  74. StructField("PACKAGE_KEY", StringType),
  75. StructField("IDLE_COUNT", IntegerType),
  76. StructField("TIMEOUTS", IntegerType),
  77. StructField("ACTIVE_COUNT", IntegerType)
  78. ))
  79.  
  80. val df_raw1 = spark
  81. .read
  82. .option("header", "true")
  83. .schema(testschema)
  84. .csv("src/main/resources/test_pooling.csv")
  85.  
  86. // fill all na values with 0
  87. val df1 = df_raw1.na.fill(0)
  88. val extracted = cvModel.transform(df1) //.toDF("prediction")
  89. import org.apache.spark.sql.functions._
  90. val test = extracted.select(mean(df("FACTOR_LOAD"))).collect()
  91. println(test.apply(0))
  92. }
  93. }
  94.  
  95. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement