Advertisement
Guest User

data source

a guest
Nov 21st, 2014
170
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package org.template.recommendation
  2.  
  3. import io.prediction.controller.PDataSource
  4. import io.prediction.controller.EmptyEvaluationInfo
  5. import io.prediction.controller.EmptyActualResult
  6. import io.prediction.controller.Params
  7. import io.prediction.data.storage.Event
  8. import io.prediction.data.storage.Storage
  9.  
  10. import org.apache.spark.SparkContext
  11. import org.apache.spark.SparkContext._
  12. import org.apache.spark.rdd.RDD
  13. import org.apache.spark.mllib.recommendation.Rating
  14.  
  15. import grizzled.slf4j.Logger
  16.  
  17. case class DataSourceParams(val appId: Int) extends Params
  18.  
  19. class DataSource(val dsp: DataSourceParams)
  20.   extends PDataSource[TrainingData,
  21.       EmptyEvaluationInfo, Query, EmptyActualResult] {
  22.  
  23.   @transient lazy val logger = Logger[this.type]
  24.  
  25.   override
  26.   def readTraining(sc: SparkContext): TrainingData = {
  27.     val eventsDb = Storage.getPEvents()
  28.     eventsDb.find(appId = 66).take(2).foreach(println(_))
  29.     val eventsRDD: RDD[Event] = eventsDb.find(
  30.       appId = dsp.appId,
  31.       entityType = Some("user"),
  32.       eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
  33.       // targetEntityType is optional field of an event.
  34.       targetEntityType = Some(Some("item")))(sc)
  35.  
  36.     val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
  37.       val rating = try {
  38.         val ratingValue: Double = event.event match {
  39.           case "rate" => event.properties.get[Double]("rating")
  40.           case "buy" => 4.0 // map buy event to rating value of 4
  41.           case _ => throw new Exception(s"Unexpected event ${event} is read.")
  42.         }
  43.         // assume entityId and targetEntityId is originally Int type
  44.         Rating(event.entityId.toInt,
  45.           event.targetEntityId.get.toInt,
  46.           ratingValue)
  47.       } catch {
  48.         case e: Exception => {
  49.           logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
  50.           throw e
  51.         }
  52.       }
  53.       rating
  54.     }
  55.     new TrainingData(ratingsRDD)
  56.   }
  57. }
  58.  
  59. class TrainingData(
  60.   val ratings: RDD[Rating]
  61. ) extends Serializable {
  62.   override def toString = {
  63.     s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
  64.   }
  65. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement