Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark import SparkConf, SparkContext, StorageLevel
- from operator import add
- from itertools import combinations
- from datetime import datetime
- import math
- item_sim = {}
- item = 'B000035Y4P'
- def get_items_items(user,items_ratings):
- for item1,item2 in combinations(items_ratings,2):
- return (item1[0],item2[0]),(item1[1],item2[1])
- def cosine_similarity(item_item,rating_rating):
- xy = 0.0
- xx = 0.0
- yy = 0.0
- for ratings in rating_rating:
- xx += float(ratings[0]) * float(ratings[0])
- yy += float(ratings[1]) * float(ratings[1])
- xy += float(ratings[0]) * float(ratings[1])
- denominator = float(math.sqrt(xx) * math.sqrt(yy))
- if denominator > 0.0:
- cosine_sim = xy / denominator
- else:
- cosine_sim = 0.0
- if cosine_sim < 0:
- cosine_sim = 0.0
- return item_item , cosine_sim
- def get_recent_transaction(x,y):
- t1 = datetime.fromtimestamp(x[1])
- t2 = datetime.fromtimestamp(y[1])
- if (t1 - t2).total_seconds() < 0 :
- #print "second"
- return y
- else:
- return x
- def weighted_average(item_rating):
- global item_sim
- global item
- cos_sim = 0.0
- if (item_rating[0],item) in item_sim.keys():
- cos_sim = item_sim[(item_rating[0],item)]
- elif (item, item_rating[0]) in item_sim.keys():
- cos_sim = item_sim[(item, item_rating[0])]
- return [cos_sim * item_rating[1] , cos_sim]
- if __name__ == "__main__":
- path = "/home/varsha/Documents/Fall2016/bigdata/assignment2/ratings_Video_Games.csv"
- sc = SparkContext(appName = "varsha_recommendation")
- df = sc.textFile(path, use_unicode=False)\
- .map(lambda line: line.split(','))\
- .map(lambda line: ((line[0],line[1]), (float(line[2]),line[3])))\
- .reduceByKey(get_recent_transaction)\
- .map(lambda x : (x[0][1],(x[0][0],x[1][0])))\
- #.count()
- #print("After timestamp filtering:", df)
- #df = df.map(lambda user_item: (user_item[0], user_item[1]))\
- # .distinct()
- #print("Distinct user item",df)
- items = df.map(lambda user_item: (user_item[0], 1))\
- .reduceByKey(lambda x,y : x+y)\
- .filter(lambda keyValue: keyValue[1] < 25)
- #.collect()
- df = df.subtractByKey(items)\
- #.count()
- df = df.map(lambda x : (x[1][0], (x[0], x[1][1])))
- users = df.map(lambda user_item: (user_item[0], 1))\
- .reduceByKey(lambda x,y : x+y)\
- .filter(lambda keyValue: keyValue[1] < 10)
- df = df.subtractByKey(users)\
- #.count()
- #print("DATA:",df.take(10))
- df = df.map(lambda x : (x[0], (x[1][0], x[1][1])))
- rating_sum = df.combineByKey((lambda x: (x[1],1)),
- (lambda x, y: (x[0] + y[1], x[1] + 1)),
- (lambda x, y: (x[0] + y[0], x[1] + y[1])))
- rating_avg = rating_sum.map(lambda (key, (value_sum, count)): (key, value_sum/count)).collectAsMap()
- normalized_df = df.map(lambda x : (x[0], [(x[1][0], x[1][1] - rating_avg[x[0]])]))
- user_items_ratings = normalized_df.reduceByKey(lambda x, y : x+y)
- #print("User - (item,ratings)",user_items_ratings.take(10))
- items_ratings = user_items_ratings.filter(
- lambda x: len(x[1]) > 1).map(
- lambda x: get_items_items(x[0],x[1]))\
- .map(lambda x : (x[0], [x[1]]))\
- .reduceByKey(lambda x, y : x+y)
- #print("(item, item) - (rating ,rating)", items_ratings.take(10))
- global item_sim
- item_sim = items_ratings.map(
- lambda x: cosine_similarity(x[0],x[1])).collectAsMap()
- print("Weighted Avergae", {k : item_sim[k] for k in item_sim.keys()})
- '''
- weighted_sum = df.combineByKey((lambda x: weighted_average(x)),
- (lambda x, y: (x[0] + [k for k in weighted_average(y)][0], x[1] + [k for k in weighted_average(y)][1])),
- (lambda x, y: (x[0] + y[0], x[1] + y[1])))
- weighted_avg = weighted_sum.map(lambda (key, (numerator_sum, denominator)): (key, round(numerator_sum/denominator, 2)) if denominator else (key, 0)).collectAsMap()
- print("Weighted Avergae", {k : weighted_avg[k] for k in weighted_avg.keys()})
- Get all item-item pair combos
- (item1,item2) -> [(item1_rating,item2_rating),
- (item1_rating,item2_rating),
- ...]
- Calculate the cosine similarity for each item pair
- (item1,item2) -> (similarity,co_raters_count)
- item_sims = pairwise_items.map(
- lambda p: calcSim(p[0],p[1])).collect()
- #print("Size after subtracting users and items",df)
- #print("Rating Average:", normalized_df.take(10));
- support = 10
- frequent_items = df.map(lambda item_count : (item_count[1], 1))\
- .reduceByKey(add)\
- .filter(lambda item_count : item_count[1] >= support)\
- .collect()
- #.keys()\
- #.count()
- df = df.filter(lambda user_item : any(x[0] for x in frequent_items if user_item[1] == x[0]))\
- baskets = df.map(lambda x : (x[0] , [x[1]]))\
- .reduceByKey(lambda x,y : x+y)\
- .map(lambda items : sorted(items[1]))\
- .persist(StorageLevel.MEMORY_AND_DISK)
- n = float(baskets.count())
- frequent_2_items = baskets.map(lambda x : (get_k_pairs(x, frequent_items,2)))\
- .flatMap(lambda x : x)\
- .reduceByKey(add)\
- .filter(lambda pair_count : pair_count[1] >= support)\
- .collect()
- #.keys()\
- frequent_2_items = set(frequent_2_items)
- second_pass = baskets.filter(lambda user_items : any(x[0] for x in get_k_pairs(user_items, frequent_items, 2) if any(x[0]==y[0] for y in frequent_2_items)))\
- frequent_3_items = second_pass.map(lambda x : (get_k_pairs(x, frequent_2_items,3)))\
- .flatMap(lambda pair_count : pair_count )\
- .reduceByKey(add)\
- .filter(lambda pair_count : pair_count[1] >= support)\
- .collect()
- #.keys()\
- frequent_3_items = set(frequent_3_items)
- third_pass = second_pass.filter(lambda user_items : any(x[0] for x in get_k_pairs(user_items, frequent_2_items, 3) if any(x[0]==y[0] for y in frequent_3_items)))
- frequent_4_items = third_pass.map(lambda x : (get_k_pairs(x, frequent_3_items,4)))\
- .flatMap(lambda pair_count : pair_count )\
- .reduceByKey(add)\
- .filter(lambda pair_count : pair_count[1] >= support)\
- .collect()
- #.keys()\
- frequent_4_items = set(frequent_4_items)
- fourth_pass = third_pass.filter(lambda user_items : any(x[0] for x in get_k_pairs(user_items, frequent_3_items, 4) if any(x[0]==y[0] for y in frequent_4_items)))
- association_rules = []
- for items, count in frequent_4_items:
- for i, item in enumerate(items):
- rule_left = items[:i] + items[i+1:]
- denominator = [item_count[1] for item_count in frequent_3_items if item_count[0] == rule_left]
- confidence = count / denominator[0]
- support_item = [item_count[1] for item_count in frequent_items if item_count[0] == item]
- interest = confidence - (support_item[0]/ n)
- if confidence >= 0.05 and interest >= 0.02:
- association_rules.append((rule_left, item))
- f = open('apriori_op.txt', 'w')
- f.write("Frequent 1 pairs:\n")
- f.write(str(frequent_items)+"\n")
- f.write("Frequent 2 pairs:"+"\n")
- f.write(str(frequent_2_items)+"\n")
- f.write("Frequent 3 pairs:"+"\n")
- f.write(str(frequent_3_items)+"\n")
- f.write("Frequent 4 pairs:"+"\n")
- f.write(str(frequent_4_items)+"\n")
- f.write("Association Rules:"+"\n")
- f.write(str(association_rules)+"\n")
- f.close()
- #print("First Pass Baskets Count :", first_pass)
- #print("Item Count Table:", item_table)
- #print ("Frequent 3 pairs:" ,frequent_3_items)
- '''
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement