Advertisement
Guest User

Untitled

a guest
Jun 18th, 2019
193
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.11 KB | None | 0 0
  1. from airflow.models import DAG
  2. from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
  3. from airflow.operators.s3_file_transform_operator import S3FileTransformOperator
  4. from datetime import datetime
  5.  
  6. class XComEnabledAWSAthenaOperator(AWSAthenaOperator):
  7. def execute(self, context):
  8. super(XComEnabledAWSAthenaOperator, self).execute(context)
  9. # just so that this gets `xcom_push`(ed)
  10. return self.query_execution_id
  11.  
  12. with DAG(dag_id='athena_query_and_move',
  13. schedule_interval=None,
  14. start_date=datetime(2019, 6, 7)) as dag:
  15.  
  16. run_query = XComEnabledAWSAthenaOperator(
  17. task_id='run_query',
  18. query='SELECT * FROM UNNEST(SEQUENCE(0, 100))',
  19. output_location='s3://my-bucket/my-path/',
  20. database='my_database'
  21. )
  22.  
  23. move_results = S3FileTransformOperator(
  24. task_id='move_results',
  25. source_s3_key='s3://mybucket/mypath/{{ task_instance.xcom_pull(task_ids="run_query") }}.csv',
  26. dest_s3_key='s3://mybucket/otherpath/myresults.csv',
  27. transform_script='/bin/cp'
  28. )
  29.  
  30. move_results.set_upstream(run_query)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement