Guest User

Untitled

a guest
Jan 22nd, 2019
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.30 KB | None | 0 0
  1. package com.zmining.processor
  2.  
  3. import java.text.SimpleDateFormat
  4.  
  5. import com.google.gson
  6. import com.google.gson.{Gson, JsonParser}
  7. import com.vng.dataplatform.sdk.hdbc.HDFSConnection
  8. import com.vng.dataplatform.sdk.sparkjob.SparkjobResult.ErrorCode
  9. import com.vng.dataplatform.sdk.sparkjob.{SparkJobAbst, SparkjobResult}
  10. import com.vng.dataplatform.sdk.storage.StorageFactory
  11. import org.apache.spark.SparkConf
  12.  
  13. /** Command:
  14. * select src_id, start_time, get_json_object(param,'$.listIdMiningReturn'), get_json_object(param,'$.mapLogDataMining')
  15. * from `GLOBAL-SEARCH-DISCOVER-V4`
  16. * where (command=1586 and result=0 and
  17. * get_json_object(param,'$.fromMining') = 'true' and
  18. * get_json_object(param,'$.listItemsSize') != '0')
  19. *
  20. */
  21. object GetListProcessor extends SparkJobAbst {
  22. override def execute(l: Long, hdfsConnection: HDFSConnection, sparkConf: SparkConf): SparkjobResult = {
  23. val sqlc = hdfsConnection.getSparkSession.sqlContext
  24. val query = "select src_id, start_time, param, command, result from `GLOBAL-SEARCH-DISCOVER-V4` " +
  25. "where (command=1586 and result=0) limit 100"
  26.  
  27. println("Query: " + query)
  28.  
  29. val tmt = new SimpleDateFormat("dd_MM_yyyy").parse("20_01_2019").getTime
  30. val df = hdfsConnection.createStatement(query, tmt).execute()
  31.  
  32. // val gson = new Gson()
  33. //
  34. // df.filter(x => {
  35. // val src_id = x.getInt(0)
  36. // val start_time = x.getInt(1)
  37. // val param = x.getString(2)
  38. // val command = x.getInt(3)
  39. // val result = x.get(4)
  40. //
  41. // val paramObject = new JsonParser().parse(param).getAsJsonObject
  42. //
  43. // val fromMining = paramObject.get("fromMining").getAsBoolean
  44. // val listItemSize = paramObject.get("listItemsSize").getAsInt
  45. //
  46. // fromMining && listItemSize > 0
  47. // }).map(x => {
  48. // val src_id = x.getInt(0)
  49. // val start_time = x.getInt(1)
  50. // val param = x.getString(2)
  51. // val command = x.getInt(3)
  52. // val result = x.get(4)
  53. //
  54. // val paramObject = new JsonParser().parse(param).getAsJsonObject
  55. // val mapLogDataMining = paramObject.get("mapLogDataMining").getAsJsonObject
  56. //
  57. // }).write.parquet("/user/zmb")
  58.  
  59.  
  60.  
  61.  
  62. println("Size of df: " + df.count())
  63. new SparkjobResult(ErrorCode.SUCCESSED, null, null)
  64. }
  65.  
  66. def main(args: Array[String]): Unit = {
  67. GetListProcessor.process(args)
  68. }
  69. }
Add Comment
Please, Sign In to add comment