Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime
- from airflow import models
- from airflow.operators.bash_operator import BashOperator
- from airflow.operators.bigquery_operator import BigQueryOperator
- DAG_NAME = 'test_workflow'
- yesterday = datetime.datetime.combine(
- datetime.datetime.today() - datetime.timedelta(days=1) + datetime.timedelta(hours=9),
- datetime.datetime.min.time())
- default_dag_args = {
- 'start_date': yesterday,
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'retries': 1,
- 'retry_delay': datetime.timedelta(minutes=5),
- 'project_id': models.Variable.get('gcp_project')
- }
- with models.DAG(
- dag_id=DAG_NAME,
- schedule_interval="@daily",
- default_args=default_dag_args) as dag:
- start = BashOperator(
- task_id='start',
- bash_command='echo Start Workflow'
- )
- bq_to_bq = BigQueryOperator(
- task_id='query',
- write_disposition='WRITE_TRUNCATE',
- create_disposition='CREATE_IF_NEEDED',
- allow_large_results=True,
- bql="SELECT id, name, address FROM `your_project.data_set.test_a`", # version1.10以降はsql引数
- use_legacy_sql=False,
- destination_dataset_table='your_project.data_set.test_b' + format(yesterday.strftime("%Y%m%d"))
- )
- start >> bq_to_bq
Add Comment
Please, Sign In to add comment