Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import com.facebook.dataflow.operators._
- GlobalDefaults.set(
- user="zjshen",
- schedule="@daily",
- num_retries=2,
- depends_on_past=false,
- pool="di.lowpri_pipelines",
- retry_wait=300,
- replica=false,
- create_tasks=true
- )
- val create_table = new MySQLOperator(
- dep_list=List(),
- mysql_tier="xdb.diet_query_analysis",
- mysql_query=
- """
- |CREATE TABLE IF NOT EXISTS `<TABLE:hive_queries_from_log>` (
- | `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
- | `ds` VARCHAR(20) NOT NULL,
- | `namespace_name` VARCHAR(50) NOT NULL,
- | `task_name` VARCHAR(500) NOT NULL,
- | `type` VARCHAR(20) NOT NULL,
- | `query` LONGTEXT NOT NULL,
- | `signature` CHAR(32) NOT NULL,
- | `query_stats` TEXT,
- | PRIMARY KEY (
- | `id`
- | ),
- | KEY `query_sig` (
- | `ds`,
- | `signature`
- | ),
- | KEY `task_index` (
- | `ds`,
- | `task_name`
- | ),
- | KEY `namespace_index` (
- | `ds`,
- | `namespace_name`
- | )
- |) ENGINE=InnoDB DEFAULT CHARSET=latin1;
- """.stripMargin
- )
- val wait_log_partition = new WaitForHiveOperator(
- table="hive_command_log_signal",
- partition="ds=<DATEID>"
- )
- val wait_stats_partition = new WaitForHiveOperator(
- table="hive_query_stats",
- partition="ds=<DATEID>"
- )
- val scrape_hive_query = new Hive2MySQLOperator(
- dep_list=List(create_table, wait_log_partition, wait_stats_partition),
- mysql_tier="xdb.diet_query_analysis",
- mysql_table="<TABLE:hive_queries_from_log>",
- replica=false,
- hive_query=
- """
- |SELECT
- | NULL,
- | log.ds,
- | FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.namespace') AS ns,
- | CONCAT(
- | FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.pipelineName'),
- | '.',
- | FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.taskName')
- | ) AS taskName,
- | FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.type') AS type,
- | log.command,
- | FB_MD5(log.command),
- | FB_MAKE_JSON_OBJ(MAP(
- | 'inputSize', stats.input_size,
- | 'outputSize', stats.output_size,
- | 'taskRuntimeSec', stats.runtime_total_s,
- | 'mappers', stats.mappers,
- | 'reducers', stats.reducers,
- | 'cpuSec', stats.cpu_msecs / 1000,
- | 'mapCpuSec', stats.map_cpu_msecs / 1000,
- | 'reduceCpuSec', stats.reduce_cpu_msecs / 1000,
- | 'mapRuntimeSec', stats.map_runtime_total_s,
- | 'reduceRuntimeSec', stats.reduce_runtime_total_s)
- | ) AS query_stats
- |FROM hive_command_log log
- |LEFT OUTER JOIN (
- | SELECT
- | queryid,
- | input_size,
- | output_size,
- | job_task_runtime_total_s AS runtime_total_s,
- | mappers,
- | reducers,
- | cpu_msecs,
- | map_cpu_msecs,
- | reduce_cpu_msecs,
- | map_task_runtime_total_s as map_runtime_total_s,
- | reduce_task_runtime_total_s as reduce_runtime_total_s
- | FROM hive_query_stats
- | WHERE ds = '<DATEID>'
- |) stats
- |ON log.queryid = stats.queryid AND
- |FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.taskName') IS NOT NULL
- |WHERE log.command_type='QUERY'
- |AND (
- | FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.type') = 'DATASWARM'
- | OR FB_JSON_PATH_EXTRACTOR(log.hive_query_source_json, '$.type') = 'DATABEE'
- |) AND log.ds='<DATEID>'
- """.stripMargin
- )
- val parse_hive_query = new BashOperator(
- dep_list=List(scrape_hive_query),
- bash_script="/usr/local/jdk-8u60-64/bin/java " +
- "-Xms512m -Xmx4g " +
- "-cp <FBPACKAGE:constance/analyzer>/" +
- "constance-analyzer-0.1.0-SNAPSHOT-jar-with-dependencies.jar " +
- "-Djava.net.preferIPv6Addresses=true " +
- "-Dconstance.root.logger=<TMP_DIR:constance/logs> " +
- "-Dconstance.service.thread-pool.size=40 " +
- "-Dconstance.hive-storage.queries-features-table-name=" +
- "<TABLE:hive_query_features> " +
- "com.facebook.constance.cli.AnalyzerCLI -pq " +
- "-ds <DATEID>"
- )
- val create_hive_table = new HiveQLOperator(
- dep_list=List(parse_hive_query),
- hive_query=
- """
- |CREATE TABLE IF NOT EXISTS <TABLE:fct_hive_queries> (
- | id BIGINT,
- | namespace_name STRING,
- | task_name STRING,
- | pipeline_type STRING,
- | query STRING,
- | query_signature STRING,
- | query_ast STRING,
- | query_stats STRING,
- | query_features STRING,
- | parse_failed INT
- |) PARTITIONED BY (ds STRING)
- | TBLPROPERTIES('RETENTION'='30');
- """.stripMargin
- )
- val dump_to_hive = new MySQL2HiveOperator(
- dep_list=List(create_hive_table),
- mysql_tier="xdb.diet_query_analysis",
- mysql_query=
- """
- |SELECT
- | a.id,
- | a.namespace_name,
- | a.task_name,
- | a.`type` as pipeline_type,
- | a.query,
- | a.signature as query_signature,
- | b.parse_tree as query_ast,
- | a.query_stats as query_stats,
- | b.features as query_features,
- | b.failed as parse_failed
- |FROM hive_queries_from_log a
- |LEFT JOIN hive_query_features b
- |ON a.id = b.qid
- |WHERE a.ds = '<DATEID>' AND b.ds = '<DATEID>';
- """.stripMargin,
- hive_table="<TABLE:fct_hive_queries>",
- hive_partition="ds='<DATEID>'"
- )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement