Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- object RunAppPooling {
- def main(args: Array[String]): Unit = { // start the spark session
- val conf = new SparkConf().setMaster("local[2]").set("deploy-mode", "client").set("spark.driver.bindAddress", "127.0.0.1")
- .set("spark.broadcast.compress", "false")
- .setAppName("local-spark")
- val spark = SparkSession
- .builder()
- .config(conf)
- .getOrCreate()
- val filePath = "src/main/resources/train.csv"
- val modelPath = "file:///home/vagrant/custom.model"
- val schema = StructType(
- Array(
- StructField("IDLE_COUNT", IntegerType),
- StructField("TIMEOUTS", IntegerType),
- StructField("ACTIVE_COUNT", IntegerType),
- StructField("FACTOR_LOAD", DoubleType)))
- while(true){
- // read the raw data
- val df_raw = spark
- .read
- .option("header", "true")
- .schema(schema)
- .csv(filePath)
- df_raw.show()
- println(df_raw.count())
- // fill all na values with 0
- val df = df_raw.na.fill(0)
- df.printSchema()
- // create the feature vector
- val vectorAssembler = new VectorAssembler()
- .setInputCols(Array("IDLE_COUNT", "TIMEOUTS", "ACTIVE_COUNT" ))
- .setOutputCol("features_intermediate")
- var lr1: PipelineModel = null
- try {
- lr1 = PipelineModel.load(modelPath)
- } catch {
- case ie: InvalidInputException => println(ie.getMessage)
- }
- import org.apache.spark.ml.feature.StandardScaler
- val scaler = new StandardScaler().setWithMean(true).setWithStd(true).setInputCol("features_intermediate").setOutputCol("features")
- var pipeline: Pipeline = null
- if (lr1 == null) {
- val lr =
- new LinearRegression()
- .setMaxIter(100)
- .setRegParam(0.1)
- .setElasticNetParam(0.8)
- .setLabelCol("FACTOR_LOAD") // setting label column
- // create the pipeline with the steps
- pipeline = new Pipeline().setStages(Array( vectorAssembler, scaler, lr))
- } else {
- pipeline = new Pipeline().setStages(Array(vectorAssembler, scaler, lr1))
- }
- // create the model following the pipeline steps
- val cvModel = pipeline.fit(df)
- // save the model
- cvModel.write.overwrite.save(modelPath)
- var testschema = StructType(
- Array(
- StructField("PACKAGE_KEY", StringType),
- StructField("IDLE_COUNT", IntegerType),
- StructField("TIMEOUTS", IntegerType),
- StructField("ACTIVE_COUNT", IntegerType)
- ))
- val df_raw1 = spark
- .read
- .option("header", "true")
- .schema(testschema)
- .csv("src/main/resources/test_pooling.csv")
- // fill all na values with 0
- val df1 = df_raw1.na.fill(0)
- val extracted = cvModel.transform(df1) //.toDF("prediction")
- import org.apache.spark.sql.functions._
- val test = extracted.select(mean(df("FACTOR_LOAD"))).collect()
- println(test.apply(0))
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement