Guest User

Untitled

a guest
Dec 10th, 2018
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.18 KB | None | 0 0
  1. import datetime
  2. from airflow import models
  3. from airflow.operators.bash_operator import BashOperator
  4. from airflow.operators.bigquery_operator import BigQueryOperator
  5.  
  6. DAG_NAME = 'test_workflow'
  7.  
  8. yesterday = datetime.datetime.combine(
  9.  datetime.datetime.today() - datetime.timedelta(days=1) + datetime.timedelta(hours=9),
  10.  datetime.datetime.min.time())
  11.  
  12. default_dag_args = {
  13.  'start_date': yesterday,
  14.  'email_on_failure': False,
  15.  'email_on_retry': False,
  16.  'retries': 1,
  17.  'retry_delay': datetime.timedelta(minutes=5),
  18.  'project_id': models.Variable.get('gcp_project')
  19. }
  20.  
  21. with models.DAG(
  22.  dag_id=DAG_NAME,
  23.  schedule_interval="@daily",
  24.  default_args=default_dag_args) as dag:
  25.  
  26. start = BashOperator(
  27. task_id='start',
  28. bash_command='echo Start Workflow'
  29. )
  30.  
  31. bq_to_bq = BigQueryOperator(
  32.   task_id='query',
  33. write_disposition='WRITE_TRUNCATE',
  34. create_disposition='CREATE_IF_NEEDED',
  35. allow_large_results=True,
  36. bql="SELECT id, name, address FROM `your_project.data_set.test_a`", # version1.10以降はsql引数
  37. use_legacy_sql=False,
  38. destination_dataset_table='your_project.data_set.test_b' + format(yesterday.strftime("%Y%m%d"))
  39.   )
  40.  
  41. start >> bq_to_bq
Add Comment
Please, Sign In to add comment