Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import logging
- import os
- from datetime import datetime
- from pprint import pformat
- from airflow.models import BaseOperator, DAG
- from airflow.operators.dummy_operator import DummyOperator
- from airflow.utils.decorators import apply_defaults
- logger = logging.getLogger(__name__)
- os.chdir(os.path.dirname(os.path.realpath(__file__)))
- class RenderJSONOperator(BaseOperator):
- template_ext = ('.json',)
- template_fields = ('file_json', 'params',)
- @apply_defaults
- def __init__(self,
- file_json,
- *args,
- **kwargs):
- super(RenderJSONOperator, self).__init__(*args, **kwargs)
- self.file_json = file_json
- def execute(self, context):
- body = json.loads(self.file_json)
- logging.info(pformat(body))
- with DAG(dag_id='dag-json-testing',
- start_date=datetime(2019, 1, 1),
- schedule_interval=None,
- params={
- 'my_dag_params_date': '{{ ds }}'
- },
- max_active_runs=1) as dag:
- start, end = DummyOperator(task_id='start'), DummyOperator(task_id='end')
- render = RenderJSONOperator(
- task_id='render_json',
- file_json='template.json',
- params={
- 'my_operator_params_date': '{{ ds }}'
- },
- )
- start >> render >> end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement