Advertisement
Guest User

Untitled

a guest
Dec 3rd, 2016
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.22 KB | None | 0 0
  1. from pyspark import SparkConf, SparkContext, StorageLevel
  2. from operator import add
  3. from itertools import combinations
  4. from datetime import datetime
  5. import math
  6.  
  7. item_sim = {}
  8. item = 'B000035Y4P'
  9. def get_items_items(user,items_ratings):
  10.  
  11. for item1,item2 in combinations(items_ratings,2):
  12. return (item1[0],item2[0]),(item1[1],item2[1])
  13.  
  14. def cosine_similarity(item_item,rating_rating):
  15.  
  16. xy = 0.0
  17. xx = 0.0
  18. yy = 0.0
  19. for ratings in rating_rating:
  20. xx += float(ratings[0]) * float(ratings[0])
  21. yy += float(ratings[1]) * float(ratings[1])
  22. xy += float(ratings[0]) * float(ratings[1])
  23.  
  24. denominator = float(math.sqrt(xx) * math.sqrt(yy))
  25. if denominator > 0.0:
  26. cosine_sim = xy / denominator
  27. else:
  28. cosine_sim = 0.0
  29. if cosine_sim < 0:
  30. cosine_sim = 0.0
  31.  
  32. return item_item , cosine_sim
  33.  
  34. def get_recent_transaction(x,y):
  35. t1 = datetime.fromtimestamp(x[1])
  36. t2 = datetime.fromtimestamp(y[1])
  37. if (t1 - t2).total_seconds() < 0 :
  38. #print "second"
  39. return y
  40. else:
  41. return x
  42.  
  43. def weighted_average(item_rating):
  44. global item_sim
  45. global item
  46. cos_sim = 0.0
  47. if (item_rating[0],item) in item_sim.keys():
  48. cos_sim = item_sim[(item_rating[0],item)]
  49. elif (item, item_rating[0]) in item_sim.keys():
  50. cos_sim = item_sim[(item, item_rating[0])]
  51. return [cos_sim * item_rating[1] , cos_sim]
  52. if __name__ == "__main__":
  53.  
  54. path = "/home/varsha/Documents/Fall2016/bigdata/assignment2/ratings_Video_Games.csv"
  55.  
  56. sc = SparkContext(appName = "varsha_recommendation")
  57.  
  58. df = sc.textFile(path, use_unicode=False)\
  59. .map(lambda line: line.split(','))\
  60. .map(lambda line: ((line[0],line[1]), (float(line[2]),line[3])))\
  61. .reduceByKey(get_recent_transaction)\
  62. .map(lambda x : (x[0][1],(x[0][0],x[1][0])))\
  63. #.count()
  64.  
  65.  
  66. #print("After timestamp filtering:", df)
  67.  
  68.  
  69. #df = df.map(lambda user_item: (user_item[0], user_item[1]))\
  70. # .distinct()
  71.  
  72. #print("Distinct user item",df)
  73.  
  74.  
  75.  
  76. items = df.map(lambda user_item: (user_item[0], 1))\
  77. .reduceByKey(lambda x,y : x+y)\
  78. .filter(lambda keyValue: keyValue[1] < 25)
  79. #.collect()
  80.  
  81. df = df.subtractByKey(items)\
  82. #.count()
  83.  
  84. df = df.map(lambda x : (x[1][0], (x[0], x[1][1])))
  85.  
  86. users = df.map(lambda user_item: (user_item[0], 1))\
  87. .reduceByKey(lambda x,y : x+y)\
  88. .filter(lambda keyValue: keyValue[1] < 10)
  89.  
  90. df = df.subtractByKey(users)\
  91. #.count()
  92. #print("DATA:",df.take(10))
  93.  
  94.  
  95. df = df.map(lambda x : (x[0], (x[1][0], x[1][1])))
  96.  
  97. rating_sum = df.combineByKey((lambda x: (x[1],1)),
  98. (lambda x, y: (x[0] + y[1], x[1] + 1)),
  99. (lambda x, y: (x[0] + y[0], x[1] + y[1])))
  100.  
  101. rating_avg = rating_sum.map(lambda (key, (value_sum, count)): (key, value_sum/count)).collectAsMap()
  102.  
  103. normalized_df = df.map(lambda x : (x[0], [(x[1][0], x[1][1] - rating_avg[x[0]])]))
  104.  
  105.  
  106. user_items_ratings = normalized_df.reduceByKey(lambda x, y : x+y)
  107.  
  108. #print("User - (item,ratings)",user_items_ratings.take(10))
  109.  
  110. items_ratings = user_items_ratings.filter(
  111. lambda x: len(x[1]) > 1).map(
  112. lambda x: get_items_items(x[0],x[1]))\
  113. .map(lambda x : (x[0], [x[1]]))\
  114. .reduceByKey(lambda x, y : x+y)
  115.  
  116. #print("(item, item) - (rating ,rating)", items_ratings.take(10))
  117.  
  118. global item_sim
  119. item_sim = items_ratings.map(
  120. lambda x: cosine_similarity(x[0],x[1])).collectAsMap()
  121.  
  122.  
  123. print("Weighted Avergae", {k : item_sim[k] for k in item_sim.keys()})
  124. '''
  125. weighted_sum = df.combineByKey((lambda x: weighted_average(x)),
  126. (lambda x, y: (x[0] + [k for k in weighted_average(y)][0], x[1] + [k for k in weighted_average(y)][1])),
  127. (lambda x, y: (x[0] + y[0], x[1] + y[1])))
  128.  
  129. weighted_avg = weighted_sum.map(lambda (key, (numerator_sum, denominator)): (key, round(numerator_sum/denominator, 2)) if denominator else (key, 0)).collectAsMap()
  130.  
  131. print("Weighted Avergae", {k : weighted_avg[k] for k in weighted_avg.keys()})
  132.  
  133.  
  134.  
  135.  
  136.  
  137.  
  138. Get all item-item pair combos
  139. (item1,item2) -> [(item1_rating,item2_rating),
  140. (item1_rating,item2_rating),
  141. ...]
  142.  
  143.  
  144.  
  145.  
  146.  
  147. Calculate the cosine similarity for each item pair
  148. (item1,item2) -> (similarity,co_raters_count)
  149.  
  150.  
  151. item_sims = pairwise_items.map(
  152. lambda p: calcSim(p[0],p[1])).collect()
  153. #print("Size after subtracting users and items",df)
  154. #print("Rating Average:", normalized_df.take(10));
  155.  
  156.  
  157.  
  158.  
  159. support = 10
  160.  
  161. frequent_items = df.map(lambda item_count : (item_count[1], 1))\
  162. .reduceByKey(add)\
  163. .filter(lambda item_count : item_count[1] >= support)\
  164. .collect()
  165. #.keys()\
  166. #.count()
  167.  
  168. df = df.filter(lambda user_item : any(x[0] for x in frequent_items if user_item[1] == x[0]))\
  169.  
  170. baskets = df.map(lambda x : (x[0] , [x[1]]))\
  171. .reduceByKey(lambda x,y : x+y)\
  172. .map(lambda items : sorted(items[1]))\
  173. .persist(StorageLevel.MEMORY_AND_DISK)
  174.  
  175.  
  176.  
  177.  
  178.  
  179. n = float(baskets.count())
  180.  
  181.  
  182. frequent_2_items = baskets.map(lambda x : (get_k_pairs(x, frequent_items,2)))\
  183. .flatMap(lambda x : x)\
  184. .reduceByKey(add)\
  185. .filter(lambda pair_count : pair_count[1] >= support)\
  186. .collect()
  187. #.keys()\
  188.  
  189. frequent_2_items = set(frequent_2_items)
  190.  
  191. second_pass = baskets.filter(lambda user_items : any(x[0] for x in get_k_pairs(user_items, frequent_items, 2) if any(x[0]==y[0] for y in frequent_2_items)))\
  192.  
  193.  
  194.  
  195.  
  196. frequent_3_items = second_pass.map(lambda x : (get_k_pairs(x, frequent_2_items,3)))\
  197. .flatMap(lambda pair_count : pair_count )\
  198. .reduceByKey(add)\
  199. .filter(lambda pair_count : pair_count[1] >= support)\
  200. .collect()
  201. #.keys()\
  202.  
  203. frequent_3_items = set(frequent_3_items)
  204.  
  205. third_pass = second_pass.filter(lambda user_items : any(x[0] for x in get_k_pairs(user_items, frequent_2_items, 3) if any(x[0]==y[0] for y in frequent_3_items)))
  206. frequent_4_items = third_pass.map(lambda x : (get_k_pairs(x, frequent_3_items,4)))\
  207. .flatMap(lambda pair_count : pair_count )\
  208. .reduceByKey(add)\
  209. .filter(lambda pair_count : pair_count[1] >= support)\
  210. .collect()
  211. #.keys()\
  212.  
  213.  
  214. frequent_4_items = set(frequent_4_items)
  215.  
  216. fourth_pass = third_pass.filter(lambda user_items : any(x[0] for x in get_k_pairs(user_items, frequent_3_items, 4) if any(x[0]==y[0] for y in frequent_4_items)))
  217.  
  218. association_rules = []
  219. for items, count in frequent_4_items:
  220. for i, item in enumerate(items):
  221. rule_left = items[:i] + items[i+1:]
  222. denominator = [item_count[1] for item_count in frequent_3_items if item_count[0] == rule_left]
  223. confidence = count / denominator[0]
  224. support_item = [item_count[1] for item_count in frequent_items if item_count[0] == item]
  225. interest = confidence - (support_item[0]/ n)
  226. if confidence >= 0.05 and interest >= 0.02:
  227. association_rules.append((rule_left, item))
  228.  
  229.  
  230.  
  231. f = open('apriori_op.txt', 'w')
  232. f.write("Frequent 1 pairs:\n")
  233. f.write(str(frequent_items)+"\n")
  234. f.write("Frequent 2 pairs:"+"\n")
  235. f.write(str(frequent_2_items)+"\n")
  236. f.write("Frequent 3 pairs:"+"\n")
  237. f.write(str(frequent_3_items)+"\n")
  238. f.write("Frequent 4 pairs:"+"\n")
  239. f.write(str(frequent_4_items)+"\n")
  240. f.write("Association Rules:"+"\n")
  241. f.write(str(association_rules)+"\n")
  242. f.close()
  243.  
  244. #print("First Pass Baskets Count :", first_pass)
  245.  
  246. #print("Item Count Table:", item_table)
  247.  
  248.  
  249.  
  250. #print ("Frequent 3 pairs:" ,frequent_3_items)
  251. '''
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement