Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- from airflow import DAG
- from airflow.operators import BashOperator
- from airflow.operators import S3KeySensor
- from datetime import datetime, timedelta
- from datetime import date
- START_DATE = datetime(2016, 2, 4, 18, 0, 0)
- default_args = {
- 'owner': 'ubuntu',
- 'depends_on_past': False,
- 'start_date': START_DATE,
- 'email': ['],
- 'email_on_failure': True,
- 'email_on_retry': True,
- 'retries': 0,
- 'retry_delay': timedelta(minutes=60*6),
- }
- dag = DAG(
- dag_id='spark_alltrans_etl',
- default_args=default_args,
- schedule_interval='15 09 * * *',
- dagrun_timeout=timedelta(minutes=60*5))
- spark_job = BashOperator(
- task_id='alltrans-spark-job',
- bash_command="""ssh hadoop@xxx 'source ~/.bash_profile; /usr/bin/spark-submit --deploy-mode cluster --executor-memory 20G --num-executors 5 --conf spark.hadoop.validateOutputSpecs=false --class com.bigcommerce.hawking.Hawking /home/hadoop/hawking-core-assembly-0.1.jar 2015-12-18T00'""",
- dag=dag)
- dummy_op = BashOperator(
- task_id='kick-off',
- bash_command='exit 0',
- dag=dag)
- def detect_required_s3_keys(datasource, name, dag=dag, upstream=dummy_op):
- task = S3KeySensor(
- task_id = name,
- bucket_key="{0}/{1}/{2}/{1}*".format(datasource,name,date.today().strftime('%Y-%m-%d')),
- bucket_name='hawking-load',
- s3_conn_id='s3connection',
- dag=dag,
- wildcard_match=True)
- task.set_upstream(upstream)
- return task
- tblinvoices = detect_required_s3_keys('foo','tblinvoices')
- tblinvoiceitems = detect_required_s3_keys('foo','tblinvoiceitems')
- tbldomains = detect_required_s3_keys('foo','tbldomains')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement