Advertisement
Guest User

Untitled

a guest
Jul 17th, 2017
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.99 KB | None | 0 0
  1. #!/usr/bin/env python
  2.  
  3. from __future__ import print_function
  4. from pyspark import SparkContext
  5. from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
  6. from pyspark.mllib.evaluation import RegressionMetrics
  7. import math
  8.  
  9. if __name__ == "__main__":
  10. sc = SparkContext(appName="PythonCollaborativeFilteringExample")
  11. # $example on$
  12. # Load and parse the data
  13. data = sc.textFile("preference_not0.csv")
  14.  
  15. f = open("outputRecord.txt", "w")
  16.  
  17. # Splitting the data
  18. ratings = data.map(lambda l: l.split(','))\
  19. .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
  20.  
  21. train, validation, test, adjustment = ratings.randomSplit([6, 2, 2, 0]) # splitting data into testing data, validation data and training data
  22.  
  23. train.cache() # caching data for quick optimization
  24. validation.cache()
  25. test.cache()
  26.  
  27. validationForPredict = validation.map(lambda x: (x[0], x[1]))
  28. actualReformatted = validation.map(lambda x: ((x[0], x[1]), x[2]))
  29.  
  30. # Build the recommendation model using Alternating Least Squares
  31. #rank = 10
  32. #numIterations = 10
  33. #model = ALS.train(train, rank, numIterations)
  34.  
  35. iterations = [5, 7, 10]
  36. regularizationParameter = 0.1
  37. ranks = [10, 12, 15]
  38. RMSEs = [0, 0, 0, 0, 0, 0, 0, 0, 0]
  39. err = 0
  40. tolerance = 0.03
  41.  
  42. minRMSE = float('inf')
  43. bestIteration = -1
  44. bestRank = -1
  45. ptr1 = "output \n"
  46.  
  47. #validating hyper-parameters
  48. for rank in ranks:
  49. for iteration in iterations:
  50. model = ALS.trainImplicit(train, rank, iteration, lambda_=regularizationParameter)
  51. predictedRatings = model.predictAll(validationForPredict)
  52. predictedReformatted = predictedRatings.map(lambda x: ((x[0], x[1]), x[2]))
  53.  
  54. predictionAndObservations = (predictedReformatted.join(actualReformatted).map(lambda x: x[1]))
  55.  
  56. metrics = RegressionMetrics(predictionAndObservations)
  57. RMSE = metrics.rootMeanSquaredError
  58. RMSEs[err] = RMSE
  59. err += 1
  60.  
  61. #print ("For rank %s and iteration %s, the RMSE is %s") % (rank, iteration, RMSE)
  62. ptr1 = ptr1 + "For rank " + str(rank) + " and iterations " + str(iteration) + " the RMSE is " + str(RMSE) + " \n"
  63. if RMSE < minRMSE:
  64. minRMSE = RMSE
  65. bestIteration = iteration
  66. bestRank = rank
  67.  
  68. #print ("The best model was trained with rank %s and iteration %s") % (bestRank, bestIteration)
  69. ptr2 = "The best model was trained with rank " + str(bestRank) + " and iteration " + str(bestIteration) + " \n"
  70.  
  71. bestModel = ALS.trainImplicit(train, bestRank, iterations=bestIteration, lambda_=regularizationParameter)
  72.  
  73. testForPredicting = test.map(lambda x: (x[0], x[1]))
  74. testReformatted = test.map(lambda x: ((x[0], x[1]), x[2]))
  75.  
  76. predictedTest = bestModel.predictAll(testForPredicting)
  77. predictedTestReformatted = predictedTest.map(lambda x: ((x[0], x[1]), x[2]))
  78.  
  79. predictionAndObservationTest = (predictedTestReformatted.join(testReformatted). map(lambda x: x[1]))
  80.  
  81. metrics = RegressionMetrics(predictionAndObservationTest)
  82. testRMSE = metrics.rootMeanSquaredError
  83.  
  84. print ("The Model had a RMSE on the test set of " + str(testRMSE))
  85. ptr3 = "The Model had a RMSE on the test set of " + str(testRMSE)
  86.  
  87. f.write(ptr1 + ptr2 + ptr3)
  88.  
  89. # Evaluate the model on training data
  90. #testdata = test.map(lambda p: (p[0], p[1]))
  91. #predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
  92.  
  93. # Keep the it if there is already a data, if not put in the data camputed by ALS
  94. #ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
  95.  
  96. #RMSE = math.sqrt(ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
  97. #print("Root Mean Squared Error = " + str(RMSE))
  98.  
  99. # Save and load model
  100. bestModel.save(sc, "target/tmp/myCollaborativeFilter")
  101. sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
  102. # $example off$
  103.  
  104. f.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement