SHARE
TWEET

Untitled

a guest Jul 17th, 2017 40 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #!/usr/bin/env python
  2.  
  3. from __future__ import print_function
  4. from pyspark import SparkContext
  5. from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
  6. from pyspark.mllib.evaluation import RegressionMetrics
  7. import math
  8.  
  9. if __name__ == "__main__":
  10.     sc = SparkContext(appName="PythonCollaborativeFilteringExample")
  11.     # $example on$
  12.     # Load and parse the data
  13.     data = sc.textFile("preference_not0.csv")
  14.  
  15.     f = open("outputRecord.txt", "w")
  16.    
  17.     # Splitting the data
  18.     ratings = data.map(lambda l: l.split(','))\
  19.         .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
  20.  
  21.     train, validation, test, adjustment = ratings.randomSplit([6, 2, 2, 0]) # splitting data into testing data, validation data and training data
  22.  
  23.     train.cache() # caching data for quick optimization
  24.     validation.cache()
  25.     test.cache()
  26.    
  27.     validationForPredict = validation.map(lambda x: (x[0], x[1]))
  28.     actualReformatted = validation.map(lambda x: ((x[0], x[1]), x[2]))
  29.  
  30.     # Build the recommendation model using Alternating Least Squares
  31.     #rank = 10
  32.     #numIterations = 10
  33.     #model = ALS.train(train, rank, numIterations)
  34.  
  35.     iterations = [5, 7, 10]
  36.     regularizationParameter = 0.1
  37.     ranks = [10, 12, 15]
  38.     RMSEs = [0, 0, 0, 0, 0, 0, 0, 0, 0]
  39.     err = 0
  40.     tolerance = 0.03
  41.  
  42.     minRMSE = float('inf')
  43.     bestIteration = -1
  44.     bestRank = -1
  45.     ptr1 = "output \n"
  46.  
  47.     #validating hyper-parameters
  48.     for rank in ranks:
  49.         for iteration in iterations:
  50.             model = ALS.trainImplicit(train, rank, iteration, lambda_=regularizationParameter)
  51.             predictedRatings = model.predictAll(validationForPredict)
  52.             predictedReformatted = predictedRatings.map(lambda x: ((x[0], x[1]), x[2]))
  53.  
  54.             predictionAndObservations = (predictedReformatted.join(actualReformatted).map(lambda x: x[1]))
  55.            
  56.             metrics = RegressionMetrics(predictionAndObservations)
  57.             RMSE = metrics.rootMeanSquaredError
  58.             RMSEs[err] = RMSE
  59.             err += 1
  60.            
  61.             #print ("For rank %s and iteration %s, the RMSE is %s") % (rank, iteration, RMSE)
  62.             ptr1 = ptr1 +  "For rank " + str(rank) +  " and iterations " + str(iteration) + " the RMSE is " + str(RMSE) + " \n"
  63.             if RMSE < minRMSE:
  64.                 minRMSE = RMSE
  65.                 bestIteration = iteration
  66.                 bestRank = rank
  67.  
  68.     #print ("The best model was trained with rank %s and iteration %s") % (bestRank, bestIteration)
  69.     ptr2 = "The best model was trained with rank " + str(bestRank) + " and iteration " + str(bestIteration) + " \n"
  70.  
  71.     bestModel = ALS.trainImplicit(train, bestRank, iterations=bestIteration, lambda_=regularizationParameter)
  72.  
  73.     testForPredicting = test.map(lambda x: (x[0], x[1]))
  74.     testReformatted = test.map(lambda x: ((x[0], x[1]), x[2]))
  75.  
  76.     predictedTest = bestModel.predictAll(testForPredicting)
  77.     predictedTestReformatted = predictedTest.map(lambda x: ((x[0], x[1]), x[2]))
  78.  
  79.     predictionAndObservationTest = (predictedTestReformatted.join(testReformatted). map(lambda x: x[1]))
  80.  
  81.     metrics = RegressionMetrics(predictionAndObservationTest)
  82.     testRMSE = metrics.rootMeanSquaredError
  83.  
  84.     print ("The Model had a RMSE on the test set of " + str(testRMSE))
  85.     ptr3 = "The Model had a RMSE on the test set of " + str(testRMSE)
  86.  
  87.     f.write(ptr1 + ptr2 + ptr3)
  88.  
  89.     # Evaluate the model on training data
  90.     #testdata = test.map(lambda p: (p[0], p[1]))
  91.     #predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
  92.    
  93.     # Keep the it if there is already a data, if not put in the data camputed by ALS
  94.     #ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
  95.  
  96.     #RMSE = math.sqrt(ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
  97.     #print("Root Mean Squared Error = " + str(RMSE))
  98.  
  99.     # Save and load model
  100.     bestModel.save(sc, "target/tmp/myCollaborativeFilter")
  101.     sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
  102.     # $example off$
  103.  
  104.     f.close()
RAW Paste Data
Top