Guest User

Untitled

a guest
Jan 19th, 2018
163
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.38 KB | None | 0 0
  1. import org.apache.spark.ml.evaluation.RegressionEvaluator
  2. import org.apache.spark.ml.recommendation.ALS
  3. import spark.implicits._
  4. import org.apache.spark.sql.functions.{udf, explode}
  5.  
  6.  
  7.  
  8. case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
  9. def parseRating(str: String): Rating = {
  10. val fields = str.split("::")
  11. assert(fields.size == 4)
  12. Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
  13. }
  14.  
  15. val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  16. .map(parseRating)
  17. .toDF()
  18. val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
  19.  
  20. // Build the recommendation model using ALS on the training data
  21. val als = new ALS()
  22. .setMaxIter(5)
  23. .setRegParam(0.01)
  24. .setUserCol("userId")
  25. .setItemCol("movieId")
  26. .setRatingCol("rating")
  27. val model = als.fit(training)
  28.  
  29. val userRecommend = model.recommendForAllUsers(10)
  30. val zip = udf((xs: Seq[Int], ys: Seq[Float]) => xs.zip(ys))
  31.  
  32. userRecommend
  33. .withColumn("recommends", explode(zip($"recommendations.movieId", $"recommendations.rating")))
  34. .select($"userId", $"recommends._1".alias("movieId"), $"recommends._2".alias("rating"))
  35. .format("jdbc")
  36. .mode("overwrite")
  37. .option("driver", "com.mysql.jdbc.Driver")
  38. .option("url", "jdbc:mysql://127.0.0.1:3306/db?user=username&password=password&useSSL=false")
  39. .option("dbtable", "schema.table")
  40. .option("truncate", "true")
Add Comment
Please, Sign In to add comment