Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ----------------------------
- trans item_code item_qty
- ----------------------------
- 001 A 2
- 001 B 3
- 002 A 4
- 002 B 6
- 002 C 10
- 003 D 1
- ----------------------------
- ----------------------------
- item1 item2 conf coef
- ----------------------------
- A B 0.8 1.5
- B A 1.0 0.67
- A C 0.7 2.5
- ----------------------------
- #!/usr/bin/python
- from pyspark import SparkContext,HiveContext
- from pyspark.mllib.fpm import FPGrowth
- import time
- #read raw data from database
- def read_data():
- sql="""select t.orderno_nosplit,
- t.prod_code,
- t.item_code,
- sum(t.item_qty)
- as item_qty
- from ioc_fdm.fdm_dwr_ioc_fcs_pk_spu_item_f_chain t
- group by t.prod_code, t.orderno_nosplit,t.item_code """
- data=sql_context.sql(sql)
- return data.cache()
- #calculate quantity coefficient of two items
- def qty_coef(item1,item2):
- sql =""" select t1.item, t1.qty from table t1
- where t1.trans in
- (select t2.trans from spu_table t2 where t2.item ='%s'
- and
- (select t3.trans from spu_table t3 where t3.item = '%s' """ % (item1,item2)
- df=sql_context.sql(sql)
- qty_item1=df.filter(df.item_code==item1).agg({"item_qty":"sum"}).first()[0]
- qty_item2=df.filter(df.item_code==item2).agg({"item_qty":"sum"}).first()[0]
- coef=float(qty_item2)/qty_item1
- return coef
- def train(prod):
- spu=total_spu.filter(total_spu.prod_code == prod)
- print 'data length',spu.count(),time.strftime("%H:%M:%S")
- supp=0.1
- conf=0.7
- sql_context.registerDataFrameAsTable(spu,'spu_table')
- sql_context.cacheTable('spu_table')
- print 'table register over', time.strftime("%H:%M:%S")
- trans_sets=spu.rdd.repartition(32).map(lambda x:(x[0],x[2])).groupByKey().mapvalues(list).values().cache()
- print 'trans group over',time.strftime("%H:%M:%S")
- model=FPGrowth.train(trans_sets,supp,10)
- print 'model train over',time.strftime("%H:%M:%S")
- model_f1=model.freqItemsets().filter(lambda x: len(x[0]==1))
- model_f2=model.freqItemsets().filter(lambda x: len(x[0]==2))
- #register model_f1 as dictionary
- model_f1_tuple=model_f1.map(lambda (U,V):(tuple(U)[0],V))
- model_f1Map=model_f1_tuple.collectAsMap()
- #convert model_f1Map to broadcast
- bc_model=sc.broadcast(model_f1Map)
- #generate association rules
- model_f2_conf=model_f2.map(lambda x:(x[0][0],x[0][1],float(x[1])/bc_model.value[x[0][0]],float(x[1]/bc_model.value[x[0][1]])))
- print 'conf calculation over',time.strftime("%H:%M:%S")
- model_f2_conf_flt=model_f2_conf.flatMap(lambda x: (x[0],x[1]))
- #filter the association rules by confidence threshold
- model_f2_conf_flt_ftr=model_f2_conf_flt.filter(lambda x: x[2]>=conf)
- #calculate the quantity coefficient for the filtered association rules
- #since we cannot use nested sql operations in rdd, I have to collect the rules to list first
- asso_list=model_f2_conf_flt_ftr.map(lambda x: list(x)).collect()
- print 'coef calculation over',time.strftime("%H:%M:%S")
- for row in asso_list:
- row.append(qty_coef(row[0],row[1]))
- #rewrite the list to dataframe
- asso_df=sql_context.createDataFrame(asso_list,['item1','item2','conf','coef'])
- sql_context.clearCache()
- path = "hdfs:/user/hive/wilber/%s"%(prod)
- asso_df.write.mode('overwrite').parquet(path)
- if __name__ == '__main__':
- sc = SparkContext()
- sql_context=HiveContext(sc)
- prod_list=sc.textFile('hdfs:/user/hive/wilber/prod_list').collect()
- total_spu=read_data()
- print 'spu read over',time.strftime("%H:%M:%S")
- for prod in list(prod_list):
- print 'prod',prod
- train(prod)
Add Comment
Please, Sign In to add comment