Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from airflow import DAG
- from airflow.hooks.base_hook import BaseHook
- from airflow.operators.bash_operator import BashOperator
- from datetime import datetime, timedelta
- from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
- from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
- SLACK_CONN_ID = 'slack-search'
- #---- Spark resource size
- NUM_EXECUTOR = 56
- EXECUTOR_CORES = 5
- EXECUTOR_MEMORY = '20g'
- JAR_FILE = 'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar'
- EXECUTE_CLASS = 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay'
- AGG_PERIOD = 1
- def task_fail_slack_alert(context):
- slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
- slack_msg = "iac_keyword_stat_day failed"
- failed_alert = SlackWebhookOperator(
- task_id='slack_notify',
- http_conn_id=SLACK_CONN_ID,
- webhook_token=slack_webhook_token,
- message=slack_msg,
- channel='#search-alert',
- username='airflow',
- dag=dag)
- return failed_alert.execute(context=context)
- def get_from_date():
- return (datetime.now() - timedelta(days=AGG_PERIOD)).strftime('%Y%m%d')
- def get_to_date():
- return datetime.now().strftime('%Y%m%d')
- default_args = {
- 'owner': 'search',
- 'depends_on_past': False,
- 'start_date': datetime(2019, 6, 21),
- 'retries': 1,
- 'retry_delay': timedelta(hours=1),
- 'on_fail_callback': task_fail_slack_alert,
- }
- default_conf = {
- 'spark.network.timeout': '800s',
- 'spark.executor.heartbeatInterval': '60s'
- }
- dag = DAG('iac_keyword_stat_day', catchup=False, default_args=default_args, schedule_interval="0 6 * * *")
- t1 = SparkSubmitOperator(
- task_id='spark_submit',
- application=JAR_FILE,
- conf=default_conf,
- java_class=EXECUTE_CLASS,
- executor_cores=EXECUTOR_CORES,
- executor_memory=EXECUTOR_MEMORY,
- num_executors=NUM_EXECUTOR,
- application_args=["--from", get_from_date(), "--to", get_to_date()],
- dag=dag)
- {__init__.py:1580} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
- Traceback (most recent call last):
- File "/usr/local/lib/python2.7/dist-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
- result = task_copy.execute(context=context)
- File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 176, in execute
- self._hook.submit(self._application)
- File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 352, in submit
- spark_submit_cmd, returncode
- AirflowException: Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement