Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.zmining.processor
- import java.text.SimpleDateFormat
- import com.google.gson
- import com.google.gson.{Gson, JsonParser}
- import com.vng.dataplatform.sdk.hdbc.HDFSConnection
- import com.vng.dataplatform.sdk.sparkjob.SparkjobResult.ErrorCode
- import com.vng.dataplatform.sdk.sparkjob.{SparkJobAbst, SparkjobResult}
- import com.vng.dataplatform.sdk.storage.StorageFactory
- import org.apache.spark.SparkConf
- /** Command:
- * select src_id, start_time, get_json_object(param,'$.listIdMiningReturn'), get_json_object(param,'$.mapLogDataMining')
- * from `GLOBAL-SEARCH-DISCOVER-V4`
- * where (command=1586 and result=0 and
- * get_json_object(param,'$.fromMining') = 'true' and
- * get_json_object(param,'$.listItemsSize') != '0')
- *
- */
- object GetListProcessor extends SparkJobAbst {
- override def execute(l: Long, hdfsConnection: HDFSConnection, sparkConf: SparkConf): SparkjobResult = {
- val sqlc = hdfsConnection.getSparkSession.sqlContext
- val query = "select src_id, start_time, param, command, result from `GLOBAL-SEARCH-DISCOVER-V4` " +
- "where (command=1586 and result=0) limit 100"
- println("Query: " + query)
- val tmt = new SimpleDateFormat("dd_MM_yyyy").parse("20_01_2019").getTime
- val df = hdfsConnection.createStatement(query, tmt).execute()
- // val gson = new Gson()
- //
- // df.filter(x => {
- // val src_id = x.getInt(0)
- // val start_time = x.getInt(1)
- // val param = x.getString(2)
- // val command = x.getInt(3)
- // val result = x.get(4)
- //
- // val paramObject = new JsonParser().parse(param).getAsJsonObject
- //
- // val fromMining = paramObject.get("fromMining").getAsBoolean
- // val listItemSize = paramObject.get("listItemsSize").getAsInt
- //
- // fromMining && listItemSize > 0
- // }).map(x => {
- // val src_id = x.getInt(0)
- // val start_time = x.getInt(1)
- // val param = x.getString(2)
- // val command = x.getInt(3)
- // val result = x.get(4)
- //
- // val paramObject = new JsonParser().parse(param).getAsJsonObject
- // val mapLogDataMining = paramObject.get("mapLogDataMining").getAsJsonObject
- //
- // }).write.parquet("/user/zmb")
- println("Size of df: " + df.count())
- new SparkjobResult(ErrorCode.SUCCESSED, null, null)
- }
- def main(args: Array[String]): Unit = {
- GetListProcessor.process(args)
- }
- }
Add Comment
Please, Sign In to add comment