Advertisement
Guest User

Untitled

a guest
Jun 24th, 2019
175
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.40 KB | None | 0 0
  1. from airflow import DAG
  2. from airflow.hooks.base_hook import BaseHook
  3. from airflow.operators.bash_operator import BashOperator
  4. from datetime import datetime, timedelta
  5. from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
  6. from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
  7.  
  8. SLACK_CONN_ID = 'slack-search'
  9.  
  10. #---- Spark resource size
  11. NUM_EXECUTOR = 56
  12. EXECUTOR_CORES = 5
  13. EXECUTOR_MEMORY = '20g'
  14.  
  15. JAR_FILE = 'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar'
  16. EXECUTE_CLASS = 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay'
  17. AGG_PERIOD = 1
  18.  
  19. def task_fail_slack_alert(context):
  20. slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
  21. slack_msg = "iac_keyword_stat_day failed"
  22. failed_alert = SlackWebhookOperator(
  23. task_id='slack_notify',
  24. http_conn_id=SLACK_CONN_ID,
  25. webhook_token=slack_webhook_token,
  26. message=slack_msg,
  27. channel='#search-alert',
  28. username='airflow',
  29. dag=dag)
  30. return failed_alert.execute(context=context)
  31.  
  32. def get_from_date():
  33. return (datetime.now() - timedelta(days=AGG_PERIOD)).strftime('%Y%m%d')
  34.  
  35. def get_to_date():
  36. return datetime.now().strftime('%Y%m%d')
  37.  
  38. default_args = {
  39. 'owner': 'search',
  40. 'depends_on_past': False,
  41. 'start_date': datetime(2019, 6, 21),
  42. 'retries': 1,
  43. 'retry_delay': timedelta(hours=1),
  44. 'on_fail_callback': task_fail_slack_alert,
  45. }
  46.  
  47. default_conf = {
  48. 'spark.network.timeout': '800s',
  49. 'spark.executor.heartbeatInterval': '60s'
  50. }
  51.  
  52. dag = DAG('iac_keyword_stat_day', catchup=False, default_args=default_args, schedule_interval="0 6 * * *")
  53.  
  54. t1 = SparkSubmitOperator(
  55. task_id='spark_submit',
  56. application=JAR_FILE,
  57. conf=default_conf,
  58. java_class=EXECUTE_CLASS,
  59. executor_cores=EXECUTOR_CORES,
  60. executor_memory=EXECUTOR_MEMORY,
  61. num_executors=NUM_EXECUTOR,
  62. application_args=["--from", get_from_date(), "--to", get_to_date()],
  63. dag=dag)
  64.  
  65. {__init__.py:1580} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
  66. Traceback (most recent call last):
  67. File "/usr/local/lib/python2.7/dist-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
  68. result = task_copy.execute(context=context)
  69. File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 176, in execute
  70. self._hook.submit(self._application)
  71. File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 352, in submit
  72. spark_submit_cmd, returncode
  73. AirflowException: Cannot execute: ['spark-submit', '--master', 'yarn', '--conf', 'spark.executor.heartbeatInterval=60s', '--conf', 'spark.network.timeout=800s', '--num-executors', '56', '--executor-cores', '5', '--executor-memory', '20g', '--name', u'airflow-spark', '--class', 'com.ebaykorea.search.pichu.batch.iac.KeywordStatDay', '--queue', u'default', u'hdfs://itemcachs102am:8020/apps/search/search-pichu-131.jar', u'--from', u'20190624', u'--to', u'20190625']. Error code is: 1.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement