Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.ml.evaluation.RegressionEvaluator
- import org.apache.spark.ml.recommendation.ALS
- import spark.implicits._
- import org.apache.spark.sql.functions.{udf, explode}
- case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
- def parseRating(str: String): Rating = {
- val fields = str.split("::")
- assert(fields.size == 4)
- Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
- }
- val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
- .map(parseRating)
- .toDF()
- val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
- // Build the recommendation model using ALS on the training data
- val als = new ALS()
- .setMaxIter(5)
- .setRegParam(0.01)
- .setUserCol("userId")
- .setItemCol("movieId")
- .setRatingCol("rating")
- val model = als.fit(training)
- val userRecommend = model.recommendForAllUsers(10)
- val zip = udf((xs: Seq[Int], ys: Seq[Float]) => xs.zip(ys))
- userRecommend
- .withColumn("recommends", explode(zip($"recommendations.movieId", $"recommendations.rating")))
- .select($"userId", $"recommends._1".alias("movieId"), $"recommends._2".alias("rating"))
- .format("jdbc")
- .mode("overwrite")
- .option("driver", "com.mysql.jdbc.Driver")
- .option("url", "jdbc:mysql://127.0.0.1:3306/db?user=username&password=password&useSSL=false")
- .option("dbtable", "schema.table")
- .option("truncate", "true")
Add Comment
Please, Sign In to add comment