Advertisement
Guest User

Untitled

a guest
Nov 21st, 2014
163
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.13 KB | None | 0 0
  1. package org.template.recommendation
  2.  
  3. import io.prediction.controller.PDataSource
  4. import io.prediction.controller.EmptyEvaluationInfo
  5. import io.prediction.controller.EmptyActualResult
  6. import io.prediction.controller.Params
  7. import io.prediction.data.storage.Event
  8. import io.prediction.data.storage.Storage
  9.  
  10. import org.apache.spark.SparkContext
  11. import org.apache.spark.SparkContext._
  12. import org.apache.spark.rdd.RDD
  13. import org.apache.spark.mllib.recommendation.Rating
  14.  
  15. import grizzled.slf4j.Logger
  16.  
  17. case class DataSourceParams(val appId: Int) extends Params
  18.  
  19. class DataSource(val dsp: DataSourceParams)
  20.   extends PDataSource[TrainingData,
  21.       EmptyEvaluationInfo, Query, EmptyActualResult] {
  22.  
  23.   @transient lazy val logger = Logger[this.type]
  24.  
  25.   override
  26.   def readTraining(sc: SparkContext): TrainingData = {
  27.     val eventsDb = Storage.getPEvents()
  28.     eventsDb.find(appId = "66").take(2).foreach(println(_))
  29.     val eventsRDD: RDD[Event] = eventsDb.find(
  30.       appId = dsp.appId,
  31.       entityType = Some("user"),
  32.       eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
  33.       // targetEntityType is optional field of an event.
  34.       targetEntityType = Some(Some("item")))(sc)
  35.  
  36.     val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
  37.       val rating = try {
  38.         val ratingValue: Double = event.event match {
  39.           case "rate" => event.properties.get[Double]("rating")
  40.           case "buy" => 4.0 // map buy event to rating value of 4
  41.           case _ => throw new Exception(s"Unexpected event ${event} is read.")
  42.         }
  43.         // assume entityId and targetEntityId is originally Int type
  44.         Rating(event.entityId.toInt,
  45.           event.targetEntityId.get.toInt,
  46.           ratingValue)
  47.       } catch {
  48.         case e: Exception => {
  49.           logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
  50.           throw e
  51.         }
  52.       }
  53.       rating
  54.     }
  55.     new TrainingData(ratingsRDD)
  56.   }
  57. }
  58.  
  59. class TrainingData(
  60.   val ratings: RDD[Rating]
  61. ) extends Serializable {
  62.   override def toString = {
  63.     s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
  64.   }
  65. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement