Advertisement
Guest User

pyth12

a guest
Nov 17th, 2019
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.44 KB | None | 0 0
  1. #!/usr/bin/env python
  2.  
  3. import sys
  4. import itertools
  5. from math import sqrt
  6. from operator import add
  7. from os.path import join, isfile, dirname
  8.  
  9. from pyspark import SparkConf, SparkContext
  10. from pyspark.mllib.recommendation import ALS
  11.  
  12. def parseRating(line):
  13. """
  14. Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
  15. """
  16. fields = line.strip().split("::")
  17. return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))
  18.  
  19. def parseMovie(line):
  20. """
  21. Parses a movie record in MovieLens format movieId::movieTitle .
  22. """
  23. fields = line.strip().split("::")
  24. return int(fields[0]), fields[1]
  25.  
  26. def loadRatings(ratingsFile):
  27. """
  28. Load ratings from file.
  29. """
  30. if not isfile(ratingsFile):
  31. print "File %s does not exist." % ratingsFile
  32. sys.exit(1)
  33. f = open(ratingsFile, 'r')
  34. ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
  35. f.close()
  36. if not ratings:
  37. print "No ratings provided."
  38. sys.exit(1)
  39. else:
  40. return ratings
  41.  
  42. def computeRmse(model, data, n):
  43. """
  44. Compute RMSE (Root Mean Squared Error).
  45. """
  46. predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
  47. predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
  48. .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
  49. .values()
  50. return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
  51.  
  52. if __name__ == "__main__":
  53. if (len(sys.argv) != 3):
  54. print "Usage: /path/to/spark/bin/spark-submit --driver-memory 2g " + \
  55. "MovieLensALS.py movieLensDataDir personalRatingsFile"
  56. sys.exit(1)
  57.  
  58. # set up environment
  59. conf = SparkConf() \
  60. .setAppName("MovieLensALS") \
  61. .set("spark.executor.memory", "2g")
  62. sc = SparkContext(conf=conf)
  63.  
  64. # load personal ratings
  65. myRatings = loadRatings(sys.argv[2])
  66. myRatingsRDD = sc.parallelize(myRatings, 1)
  67.  
  68. # load ratings and movie titles
  69.  
  70. movieLensHomeDir = sys.argv[1]
  71.  
  72. # ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
  73. ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)
  74.  
  75. # movies is an RDD of (movieId, movieTitle)
  76. movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())
  77.  
  78. # your code here
  79.  
  80. # clean up
  81. sc.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement