SHARE
TWEET

Untitled

a guest Jun 18th, 2019 61 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. from airflow.models import DAG
  2. from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
  3. from airflow.operators.s3_file_transform_operator import S3FileTransformOperator
  4. from datetime import datetime
  5.  
  6. class XComEnabledAWSAthenaOperator(AWSAthenaOperator):
  7.     def execute(self, context):
  8.         super(XComEnabledAWSAthenaOperator, self).execute(context)
  9.         # just so that this gets `xcom_push`(ed)
  10.         return self.query_execution_id
  11.  
  12. with DAG(dag_id='athena_query_and_move',
  13.          schedule_interval=None,
  14.          start_date=datetime(2019, 6, 7)) as dag:
  15.  
  16.     run_query = XComEnabledAWSAthenaOperator(
  17.         task_id='run_query',
  18.         query='SELECT * FROM  UNNEST(SEQUENCE(0, 100))',
  19.         output_location='s3://my-bucket/my-path/',
  20.         database='my_database'
  21.     )
  22.    
  23.     move_results = S3FileTransformOperator(
  24.         task_id='move_results',
  25.         source_s3_key='s3://mybucket/mypath/{{ task_instance.xcom_pull(task_ids="run_query") }}.csv',
  26.         dest_s3_key='s3://mybucket/otherpath/myresults.csv',
  27.         transform_script='/bin/cp'
  28.     )
  29.    
  30.     move_results.set_upstream(run_query)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top