Advertisement
Guest User

Untitled

a guest
Jun 17th, 2014
208
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.54 KB | None | 0 0
  1. package org.apache.bigtop.bigpetstore.recommend
  2.  
  3. import org.apache.mahout.cf.taste.hadoop.als.RecommenderJob
  4. import org.apache.mahout.cf.taste.hadoop.als.ParallelALSFactorizationJob
  5. import java.io.File
  6. import parquet.org.codehaus.jackson.map.DeserializerFactory.Config
  7. import org.apache.hadoop.conf.Configuration
  8. import org.apache.hadoop.conf.Configurable
  9. import org.apache.hadoop.util.ToolRunner
  10. import org.apache.mahout.cf.taste.hadoop.als.SharingMapper
  11.  
  12. // We don't need to wrap these two jobs in ToolRunner.run calls since the only
  13. // thing that we are doing right now is calling the run() methods of RecommenderJob
  14. // and ParallelALSFactorizationJob. Both of these classes have a main() method that
  15. // internally calls ToolRunner.run with all the command line args passed. So, if
  16. // we want to run this code from the command line, we can easily do so by running
  17. // the main methods of the ParallelALSFactorizationJob, followed by running the
  18. // main method of RecommenderJob. That would also take care of the multiple-jvm
  19. // instance issue metioned in the comments below, so the call to
  20. class ItemRecommender(private val inputFile: String,
  21.         private val factorizationOutputDir: String,
  22.         private val recommendationsOutputDir: String) {
  23.   private val recommenderJob = new RecommenderJob
  24.   private val factorizationJob = new ParallelALSFactorizationJob
  25.  
  26.   private def tempDir = "/tmp/mahout_" + System.currentTimeMillis
  27.  
  28.   private def performAlsFactorization() = {
  29.     ToolRunner.run(factorizationJob, Array(
  30.         "--input", inputFile,
  31.         "--output", factorizationOutputDir,
  32.         "--lambda", "0.1",
  33.         "--tempDir", tempDir,
  34.         "--implicitFeedback", "false",
  35.         "--alpha", "0.8",
  36.         "--numFeatures", "2",
  37.         "--numIterations", "5",
  38.         "--numThreadsPerSolver", "1"))
  39.   }
  40.  
  41.   private def generateRecommendations() = {
  42.     ToolRunner.run(recommenderJob, (Array(
  43.         "--input", factorizationOutputDir + "/userRatings/",
  44.         "--userFeatures", factorizationOutputDir + "/U/",
  45.         "--itemFeatures", factorizationOutputDir + "/M/",
  46.         "--numRecommendations", "2",
  47.         "--output", recommendationsOutputDir,
  48.         "--maxRating", "5")))
  49.   }
  50.  
  51.   // At this point, the performAlsFactorization generateRecommendations
  52.   // and this method can not be run from the same VM instance. These two jobs
  53.   // share a common static variable which is not being handled correctly.
  54.   // This, unfortunately, results in a class-cast exception being thrown. That's
  55.   // why the resetFlagInSharedAlsMapper is required. See the comments on
  56.   // resetFlagInSharedAlsMapper() method.
  57.   def recommend = {
  58.     performAlsFactorization
  59.     resetFlagInSharedAlsMapper
  60.     generateRecommendations
  61.   }
  62.  
  63.   // necessary for local execution in the same JVM only. If the performAlsFactorization()
  64.   // and generateRecommendations() calls are performed in separate JVM instances, this
  65.   // would be taken care of automatically. However, if we want to run this two methods
  66.   // as one task, we need to clean up the static state set by these methods, and we don't
  67.   // have any legitimate way of doing this directly. This clean-up should have been
  68.   // performed by ParallelALSFactorizationJob class after the job is finished.
  69.   // TODO: remove this when a better way comes along, or ParallelALSFactorizationJob
  70.   // takes responsibility.
  71.   private def resetFlagInSharedAlsMapper {
  72.     val m = classOf[SharingMapper[_, _, _, _, _]].getDeclaredMethod("reset");
  73.     m setAccessible true
  74.     m.invoke(null)
  75.   }
  76. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement