Advertisement
Guest User

Untitled

a guest
Sep 23rd, 2016
460
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 5.41 KB | None | 0 0
  1. import com.facebook.dataflow.operators._
  2.  
  3. GlobalDefaults.set(
  4.   user="zjshen",
  5.   schedule="@daily",
  6.   num_retries=2,
  7.   depends_on_past=false,
  8.   pool="di.lowpri_pipelines",
  9.   retry_wait=300,
  10.   replica=false,
  11.   create_tasks=true
  12. )
  13.  
  14. val create_table = new MySQLOperator(
  15.   dep_list=List(),
  16.   mysql_tier="xdb.diet_query_analysis",
  17.   mysql_query=
  18.     """
  19.      |CREATE TABLE IF NOT EXISTS `<TABLE:hive_queries_from_log>` (
  20.      |  `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
  21.      |  `ds` VARCHAR(20) NOT NULL,
  22.      |  `namespace_name` VARCHAR(50) NOT NULL,
  23.      |  `task_name` VARCHAR(500) NOT NULL,
  24.      |  `type` VARCHAR(20) NOT NULL,
  25.      |  `query` LONGTEXT NOT NULL,
  26.      |  `signature` CHAR(32) NOT NULL,
  27.      |  `query_stats` TEXT,
  28.      |  PRIMARY KEY (
  29.      |    `id`
  30.      |  ),
  31.      |  KEY `query_sig` (
  32.      |    `ds`,
  33.      |    `signature`
  34.      |  ),
  35.      |  KEY `task_index` (
  36.      |    `ds`,
  37.      |    `task_name`
  38.      |  ),
  39.      |  KEY `namespace_index` (
  40.      |    `ds`,
  41.      |    `namespace_name`
  42.      |  )
  43.      |) ENGINE=InnoDB DEFAULT CHARSET=latin1;
  44.    """.stripMargin
  45. )
  46.  
  47. val wait_log_partition = new WaitForHiveOperator(
  48.   table="hive_command_log_signal",
  49.   partition="ds=<DATEID>"
  50. )
  51.  
  52. val wait_stats_partition = new WaitForHiveOperator(
  53.   table="hive_query_stats",
  54.   partition="ds=<DATEID>"
  55. )
  56.  
  57. val scrape_hive_query = new Hive2MySQLOperator(
  58.   dep_list=List(create_table, wait_log_partition, wait_stats_partition),
  59.   mysql_tier="xdb.diet_query_analysis",
  60.   mysql_table="<TABLE:hive_queries_from_log>",
  61.   replica=false,
  62.   hive_query=
  63.     """
  64.      |SELECT
  65.      |  NULL,
  66.      |  log.ds,
  67.      |  FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.namespace') AS ns,
  68.      |  CONCAT(
  69.      |    FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.pipelineName'),
  70.      |    '.',
  71.      |    FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.taskName')
  72.      |  ) AS taskName,
  73.      |  FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.type') AS type,
  74.      |  log.command,
  75.      |  FB_MD5(log.command),
  76.      |  FB_MAKE_JSON_OBJ(MAP(
  77.      |    'inputSize', stats.input_size,
  78.      |    'outputSize', stats.output_size,
  79.      |    'taskRuntimeSec', stats.runtime_total_s,
  80.      |    'mappers', stats.mappers,
  81.      |    'reducers', stats.reducers,
  82.      |    'cpuSec', stats.cpu_msecs / 1000,
  83.      |    'mapCpuSec', stats.map_cpu_msecs / 1000,
  84.      |    'reduceCpuSec', stats.reduce_cpu_msecs / 1000,
  85.      |    'mapRuntimeSec', stats.map_runtime_total_s,
  86.      |    'reduceRuntimeSec', stats.reduce_runtime_total_s)
  87.      |  ) AS query_stats
  88.      |FROM hive_command_log log
  89.      |LEFT OUTER JOIN (
  90.      |  SELECT
  91.      |    queryid,
  92.      |    input_size,
  93.      |    output_size,
  94.      |    job_task_runtime_total_s AS runtime_total_s,
  95.      |    mappers,
  96.      |    reducers,
  97.      |    cpu_msecs,
  98.      |    map_cpu_msecs,
  99.      |    reduce_cpu_msecs,
  100.      |    map_task_runtime_total_s as map_runtime_total_s,
  101.      |    reduce_task_runtime_total_s as reduce_runtime_total_s
  102.      |  FROM hive_query_stats
  103.      |  WHERE ds = '<DATEID>'
  104.      |) stats
  105.      |ON log.queryid = stats.queryid AND
  106.      |FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.taskName') IS NOT NULL
  107.      |WHERE log.command_type='QUERY'
  108.      |AND (
  109.      |  FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.type') = 'DATASWARM'
  110.      |  OR FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.type') = 'DATABEE'
  111.      |) AND log.ds='<DATEID>'
  112.    """.stripMargin
  113. )
  114.  
  115. val parse_hive_query = new BashOperator(
  116.   dep_list=List(scrape_hive_query),
  117.   bash_script="/usr/local/jdk-8u60-64/bin/java " +
  118.     "-Xms512m -Xmx4g " +
  119.     "-cp <FBPACKAGE:constance/analyzer>/" +
  120.     "constance-analyzer-0.1.0-SNAPSHOT-jar-with-dependencies.jar " +
  121.     "-Djava.net.preferIPv6Addresses=true " +
  122.     "-Dconstance.root.logger=<TMP_DIR:constance/logs> " +
  123.     "-Dconstance.service.thread-pool.size=40 " +
  124.     "-Dconstance.hive-storage.queries-features-table-name=" +
  125.     "<TABLE:hive_query_features> " +
  126.     "com.facebook.constance.cli.AnalyzerCLI -pq " +
  127.     "-ds <DATEID>"
  128. )
  129.  
  130. val create_hive_table = new HiveQLOperator(
  131.   dep_list=List(parse_hive_query),
  132.   hive_query=
  133.     """
  134.      |CREATE TABLE IF NOT EXISTS <TABLE:fct_hive_queries> (
  135.      |  id BIGINT,
  136.      |  namespace_name STRING,
  137.      |  task_name STRING,
  138.      |  pipeline_type STRING,
  139.      |  query STRING,
  140.      |  query_signature STRING,
  141.      |  query_ast STRING,
  142.      |  query_stats STRING,
  143.      |  query_features STRING,
  144.      |  parse_failed INT
  145.      |) PARTITIONED BY (ds STRING)
  146.      |  TBLPROPERTIES('RETENTION'='30');
  147.    """.stripMargin
  148. )
  149.  
  150. val dump_to_hive = new MySQL2HiveOperator(
  151.   dep_list=List(create_hive_table),
  152.   mysql_tier="xdb.diet_query_analysis",
  153.   mysql_query=
  154.     """
  155.      |SELECT
  156.      |  a.id,
  157.      |  a.namespace_name,
  158.      |  a.task_name,
  159.      |  a.`type` as pipeline_type,
  160.      |  a.query,
  161.      |  a.signature as query_signature,
  162.      |  b.parse_tree as query_ast,
  163.      |  a.query_stats as query_stats,
  164.      |  b.features as query_features,
  165.      |  b.failed as parse_failed
  166.      |FROM  hive_queries_from_log a
  167.      |LEFT JOIN hive_query_features b
  168.      |ON a.id = b.qid
  169.      |WHERE a.ds = '<DATEID>' AND b.ds = '<DATEID>';
  170.    """.stripMargin,
  171.   hive_table="<TABLE:fct_hive_queries>",
  172.   hive_partition="ds='<DATEID>'"
  173. )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement