Advertisement
Guest User

Untitled

a guest
Feb 9th, 2016
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.55 KB | None | 0 0
  1. import os
  2. from airflow import DAG
  3. from airflow.operators import BashOperator
  4. from airflow.operators import S3KeySensor
  5. from datetime import datetime, timedelta
  6. from datetime import date
  7.  
  8. START_DATE = datetime(2016, 2, 4, 18, 0, 0)
  9.  
  10. default_args = {
  11. 'owner': 'ubuntu',
  12. 'depends_on_past': False,
  13. 'start_date': START_DATE,
  14. 'email': ['],
  15. 'email_on_failure': True,
  16. 'email_on_retry': True,
  17. 'retries': 0,
  18. 'retry_delay': timedelta(minutes=60*6),
  19. }
  20.  
  21. dag = DAG(
  22. dag_id='spark_alltrans_etl',
  23. default_args=default_args,
  24. schedule_interval='15 09 * * *',
  25. dagrun_timeout=timedelta(minutes=60*5))
  26.  
  27. spark_job = BashOperator(
  28. task_id='alltrans-spark-job',
  29. bash_command="""ssh hadoop@xxx 'source ~/.bash_profile; /usr/bin/spark-submit --deploy-mode cluster --executor-memory 20G --num-executors 5 --conf spark.hadoop.validateOutputSpecs=false --class com.bigcommerce.hawking.Hawking /home/hadoop/hawking-core-assembly-0.1.jar 2015-12-18T00'""",
  30. dag=dag)
  31.  
  32. dummy_op = BashOperator(
  33. task_id='kick-off',
  34. bash_command='exit 0',
  35. dag=dag)
  36.  
  37. def detect_required_s3_keys(datasource, name, dag=dag, upstream=dummy_op):
  38. task = S3KeySensor(
  39. task_id = name,
  40. bucket_key="{0}/{1}/{2}/{1}*".format(datasource,name,date.today().strftime('%Y-%m-%d')),
  41. bucket_name='hawking-load',
  42. s3_conn_id='s3connection',
  43. dag=dag,
  44. wildcard_match=True)
  45. task.set_upstream(upstream)
  46. return task
  47.  
  48. tblinvoices = detect_required_s3_keys('foo','tblinvoices')
  49. tblinvoiceitems = detect_required_s3_keys('foo','tblinvoiceitems')
  50. tbldomains = detect_required_s3_keys('foo','tbldomains')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement