Guest User

Untitled

a guest
May 23rd, 2018
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.75 KB | None | 0 0
  1. ----------------------------
  2. trans item_code item_qty
  3. ----------------------------
  4. 001 A 2
  5. 001 B 3
  6. 002 A 4
  7. 002 B 6
  8. 002 C 10
  9. 003 D 1
  10. ----------------------------
  11.  
  12. ----------------------------
  13. item1 item2 conf coef
  14. ----------------------------
  15. A B 0.8 1.5
  16. B A 1.0 0.67
  17. A C 0.7 2.5
  18. ----------------------------
  19.  
  20. #!/usr/bin/python
  21. from pyspark import SparkContext,HiveContext
  22. from pyspark.mllib.fpm import FPGrowth
  23. import time
  24.  
  25. #read raw data from database
  26. def read_data():
  27. sql="""select t.orderno_nosplit,
  28. t.prod_code,
  29. t.item_code,
  30. sum(t.item_qty)
  31. as item_qty
  32. from ioc_fdm.fdm_dwr_ioc_fcs_pk_spu_item_f_chain t
  33. group by t.prod_code, t.orderno_nosplit,t.item_code """
  34. data=sql_context.sql(sql)
  35. return data.cache()
  36.  
  37. #calculate quantity coefficient of two items
  38. def qty_coef(item1,item2):
  39. sql =""" select t1.item, t1.qty from table t1
  40. where t1.trans in
  41. (select t2.trans from spu_table t2 where t2.item ='%s'
  42. and
  43. (select t3.trans from spu_table t3 where t3.item = '%s' """ % (item1,item2)
  44. df=sql_context.sql(sql)
  45. qty_item1=df.filter(df.item_code==item1).agg({"item_qty":"sum"}).first()[0]
  46. qty_item2=df.filter(df.item_code==item2).agg({"item_qty":"sum"}).first()[0]
  47. coef=float(qty_item2)/qty_item1
  48. return coef
  49.  
  50. def train(prod):
  51.  
  52. spu=total_spu.filter(total_spu.prod_code == prod)
  53. print 'data length',spu.count(),time.strftime("%H:%M:%S")
  54. supp=0.1
  55. conf=0.7
  56. sql_context.registerDataFrameAsTable(spu,'spu_table')
  57. sql_context.cacheTable('spu_table')
  58. print 'table register over', time.strftime("%H:%M:%S")
  59.  
  60. trans_sets=spu.rdd.repartition(32).map(lambda x:(x[0],x[2])).groupByKey().mapvalues(list).values().cache()
  61. print 'trans group over',time.strftime("%H:%M:%S")
  62.  
  63. model=FPGrowth.train(trans_sets,supp,10)
  64. print 'model train over',time.strftime("%H:%M:%S")
  65.  
  66. model_f1=model.freqItemsets().filter(lambda x: len(x[0]==1))
  67. model_f2=model.freqItemsets().filter(lambda x: len(x[0]==2))
  68.  
  69. #register model_f1 as dictionary
  70. model_f1_tuple=model_f1.map(lambda (U,V):(tuple(U)[0],V))
  71. model_f1Map=model_f1_tuple.collectAsMap()
  72.  
  73. #convert model_f1Map to broadcast
  74. bc_model=sc.broadcast(model_f1Map)
  75.  
  76. #generate association rules
  77. model_f2_conf=model_f2.map(lambda x:(x[0][0],x[0][1],float(x[1])/bc_model.value[x[0][0]],float(x[1]/bc_model.value[x[0][1]])))
  78. print 'conf calculation over',time.strftime("%H:%M:%S")
  79.  
  80. model_f2_conf_flt=model_f2_conf.flatMap(lambda x: (x[0],x[1]))
  81.  
  82. #filter the association rules by confidence threshold
  83. model_f2_conf_flt_ftr=model_f2_conf_flt.filter(lambda x: x[2]>=conf)
  84.  
  85. #calculate the quantity coefficient for the filtered association rules
  86. #since we cannot use nested sql operations in rdd, I have to collect the rules to list first
  87. asso_list=model_f2_conf_flt_ftr.map(lambda x: list(x)).collect()
  88. print 'coef calculation over',time.strftime("%H:%M:%S")
  89. for row in asso_list:
  90. row.append(qty_coef(row[0],row[1]))
  91.  
  92. #rewrite the list to dataframe
  93. asso_df=sql_context.createDataFrame(asso_list,['item1','item2','conf','coef'])
  94. sql_context.clearCache()
  95. path = "hdfs:/user/hive/wilber/%s"%(prod)
  96. asso_df.write.mode('overwrite').parquet(path)
  97.  
  98. if __name__ == '__main__':
  99. sc = SparkContext()
  100. sql_context=HiveContext(sc)
  101. prod_list=sc.textFile('hdfs:/user/hive/wilber/prod_list').collect()
  102. total_spu=read_data()
  103. print 'spu read over',time.strftime("%H:%M:%S")
  104. for prod in list(prod_list):
  105. print 'prod',prod
  106. train(prod)
Add Comment
Please, Sign In to add comment