Advertisement
Guest User

Untitled

a guest
Mar 20th, 2017
72
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 7.58 KB | None | 0 0
  1.  
  2. import com.google.common.base.Strings
  3. import org.apache.spark.{SparkConf, SparkContext}
  4.  
  5. import scala.collection.mutable
  6. import scala.util.control.Breaks._
  7.  
  8.  
  9. case class QueryLogOut(q: Option[String], smseq: Option[String],
  10.                        udid : Option[String], city: Option[String],
  11.                        mem_guid: Option[String], datetime: Option[String], typename: Option[String])
  12.  
  13. object SearchToItem {
  14.  
  15.   val dic=new mutable.HashMap[String,String]
  16.   val dicRowNum=new mutable.HashMap[String,String]
  17.   val dicType=new mutable.HashMap[String,String]
  18.   val SEARCH_TO_CART_RELATION_NUM=1
  19.  
  20.  
  21.   def main(args:Array[String]) {
  22.     val sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  23.       .set("spark.kryoserializer.buffer.max", "192M")
  24.     val sc = new SparkContext(sparkConf)
  25.     val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  26.  
  27.     val newSql=
  28.       """select udid,
  29.        |                                 page_col,
  30.        |                                 col_content,
  31.        |                                 query,
  32.        |                                 city,
  33.        |                                 rownum,
  34.        |                                 mem_guid,
  35.        |                                 datetime,
  36.        |                                 type
  37.        |                             from temp.search2cart
  38.        |                              """.stripMargin
  39.  
  40.     val searchToItemDF=hiveContext.sql(newSql)
  41.  
  42.  
  43.     val searchToItemNewRDD=searchToItemDF.repartition(300,searchToItemDF.col("udid")).
  44.       map( r => proessSearchToItemNew(Option(r.get(0)),Option(r.get(1)),Option(r.get(2)),Option(r.get(3))
  45.         ,Option(r.get(4)),Option(r.get(5)),Option(r.get(6)),Option(r.get(7)),Option(r.get(8))))
  46.       .filter(_.q.getOrElse("")!="")
  47.  
  48.     val searchToItemNewDF = hiveContext.createDataFrame(searchToItemNewRDD)
  49.       .rdd.map { r => r.mkString("\t") }
  50.       .saveAsTextFile("/data/search2item")
  51.  
  52.     sc.stop()
  53.  
  54.  
  55.   }
  56.  
  57.   def proessSearchToItemNew(udid_o: Option[Any], page_col_o: Option[Any], col_pos_content_o :
  58.   Option[Any], query_o: Option[Any], city_o: Option[Any], rownum_o: Option[Any] ,
  59.                             mem_guid_o : Option[Any], datetime_o: Option[Any], typename_o: Option[Any]
  60.                            ):QueryLogOut={
  61.  
  62.  
  63.     val udid=udid_o.getOrElse("").toString
  64.     val page_col=page_col_o.getOrElse("").toString
  65.     val col_pos_content=col_pos_content_o.getOrElse("").toString
  66.     val query=query_o.getOrElse("").toString
  67.     val city=city_o.getOrElse("").toString
  68.     val rowNum=rownum_o.getOrElse("0").toString
  69.     val mem_guid=mem_guid_o.getOrElse("").toString
  70.     val datetime=datetime_o.getOrElse("").toString
  71.     val typename=typename_o.getOrElse("").toString
  72.  
  73.  
  74.     var queryLogOut= QueryLogOut(Option(""),Option(""),Option(""),Option(""),Option(""),Option(""),Option(""))
  75.  
  76.  
  77.     breakable {
  78.       page_col match {
  79.         case  "3010" =>{ //#点击「加入购物车」按钮,把商品加入购物车
  80.         val userSearchType=udid + "3035"
  81.           val userCartType=udid + page_col
  82.  
  83.           if (dic.contains(userSearchType) & dicRowNum.contains(userSearchType)){
  84.             var typename = dicType.getOrElse(userSearchType,"")
  85.             var queryword= dic.getOrElse(userSearchType,"")
  86.  
  87.             if(isInvalidProcessLog(userSearchType,rowNum)){
  88.               if(dicRowNum.contains(userCartType) & (rowNum.toInt - dicRowNum.getOrElse(userCartType,"0").toInt) == SEARCH_TO_CART_RELATION_NUM){
  89.                 queryword=dic.getOrElse(userCartType,"")
  90.                 typename=dicType.getOrElse(userCartType,"")
  91.                 dic.put(userCartType,queryword)
  92.               }else{
  93.                 queryword=""
  94.                 break()
  95.               }
  96.             }
  97.  
  98.             if (!Strings.isNullOrEmpty(col_pos_content)){
  99.               dic.put(userCartType,queryword)
  100.               dicRowNum.put(userCartType,rowNum)
  101.               dicType.put(userCartType,typename)
  102.               queryLogOut=
  103.                 QueryLogOut(Option(queryword),Option(col_pos_content),Option(udid),Option(city),Option(mem_guid),Option(datetime),Option(typename))
  104.             }
  105.           }
  106.  
  107.         }
  108.  
  109.         case  "3036" =>{ //#浏览商品
  110.         val itemType=udid + "3036"
  111.           val typeNum="-3036"
  112.           queryLogOut=processItemPage(udid,page_col,col_pos_content,query,city,rowNum,mem_guid,datetime,typename,itemType,typeNum)
  113.         }
  114.  
  115.  
  116.         case _ =>
  117.       }
  118.  
  119.     }
  120.  
  121.  
  122.     queryLogOut
  123.   }
  124.  
  125.  
  126.   def processItemPage(udid: String, page_col: String, col_pos_content : String, query: String,
  127.                       city: String, rownum: String, mem_guid : String, datetime: String, typename: String
  128.                       , ItemType: String, typeNum: String):QueryLogOut= {
  129.  
  130.     val userItemType=udid + "3036"
  131.     val userGoodsItemType=udid + "4011"
  132.     val userSearchType=udid + "3035"
  133.     val userCartType=udid + "3010"
  134.  
  135.     //    var returnBoolean=false
  136.     var typename=""
  137.     var queryword=""
  138.  
  139.     breakable {
  140.       if(dic.contains(userSearchType) & dicRowNum.contains(userSearchType)){
  141.         queryword=dic.getOrElse(userSearchType,"")
  142.         typename = dicType.getOrElse(userSearchType,"")
  143.  
  144.         if(Strings.isNullOrEmpty(queryword) | "NULL".equals(queryword) | "3".equals(typename) ){
  145.           queryword=""
  146.           break()
  147.         }
  148.  
  149.         if (isInvalidProcessLog(userSearchType,rownum) ){
  150.           if (isInvalidProcessLog(userItemType,rownum)){
  151.             if (isInvalidProcessLog(userGoodsItemType,rownum)){
  152.               if(isInvalidProcessLog(userCartType,rownum)){
  153.                 queryword=""
  154.                 break()
  155.               }else{
  156.                 queryword=dic.getOrElse(userCartType,"")
  157.                 typename = dicType.getOrElse(userCartType,"")
  158.               }
  159.             }else{
  160.               queryword=dic.getOrElse(userGoodsItemType,"")
  161.               typename = dicType.getOrElse(userGoodsItemType,"")
  162.             }
  163.           }else{
  164.             queryword=dic.getOrElse(userItemType,"")
  165.             typename = dicType.getOrElse(userItemType,"")
  166.           }
  167.         }
  168.  
  169.         if (!Strings.isNullOrEmpty(col_pos_content)){
  170.           dic.put(ItemType,queryword)
  171.           dicRowNum.put(ItemType,rownum)
  172.           dicType.put(ItemType,typename)
  173.  
  174.           typename=typename+typeNum // 1-3036 2-3036 etc.
  175.  
  176.         }else{
  177.           queryword=""
  178.         }
  179.  
  180.  
  181.       }
  182.     }
  183.  
  184.     QueryLogOut(Option(queryword),Option(col_pos_content),Option(udid),Option(city),Option(mem_guid),Option(datetime),Option(typename))
  185.  
  186.  
  187.   }
  188.  
  189.   def getType(query: String):String={
  190.  
  191.     if ((query.length==6 && query.startsWith("C") & isAllDigits(query.substring(1))) | (query.length==8 & query.substring(0,2)=="CC" & isAllDigits(query.substring(2))))
  192.       return "2"
  193.     else
  194.       return "1"
  195.  
  196.   }
  197.  
  198.   def isAllDigits(x: String) = x forall Character.isDigit
  199.  
  200.   def isValidDigitsRowNum(rowNum: String):Boolean = {
  201.     !Strings.isNullOrEmpty(rowNum) & isAllDigits(rowNum)
  202.   }
  203.  
  204.  
  205.   def isInvalidQuery(query: String):Boolean = {
  206.     Strings.isNullOrEmpty(query) | isValidSMSEQ(query)
  207.   }
  208.  
  209.   def isValidSMSEQ(sm_seq: String):Boolean = {
  210.     (sm_seq.length == 17 & sm_seq.indexOf("CM") == 6) |
  211.       (sm_seq.length >= 9 & isAllDigits(sm_seq))
  212.   }
  213.  
  214.   def isInvalidProcessLog(typeName: String,rowNum: String):Boolean = {
  215.     ( !isValidDigitsRowNum (rowNum) | Strings.isNullOrEmpty(typeName) | !dicRowNum.contains(typeName)
  216.       | (rowNum.toInt - dicRowNum.getOrElse(typeName,"0").toInt) > SEARCH_TO_CART_RELATION_NUM)
  217.   }
  218.  
  219. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement