Advertisement
Guest User

Untitled

a guest
Jul 17th, 2019
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.28 KB | None | 0 0
  1. import json
  2. import logging
  3. import os
  4. from datetime import datetime
  5. from pprint import pformat
  6.  
  7. from airflow.models import BaseOperator, DAG
  8. from airflow.operators.dummy_operator import DummyOperator
  9. from airflow.utils.decorators import apply_defaults
  10.  
  11. logger = logging.getLogger(__name__)
  12.  
  13. os.chdir(os.path.dirname(os.path.realpath(__file__)))
  14.  
  15.  
  16. class RenderJSONOperator(BaseOperator):
  17. template_ext = ('.json',)
  18. template_fields = ('file_json', 'params',)
  19.  
  20. @apply_defaults
  21. def __init__(self,
  22. file_json,
  23. *args,
  24. **kwargs):
  25. super(RenderJSONOperator, self).__init__(*args, **kwargs)
  26. self.file_json = file_json
  27.  
  28. def execute(self, context):
  29. body = json.loads(self.file_json)
  30. logging.info(pformat(body))
  31.  
  32.  
  33. with DAG(dag_id='dag-json-testing',
  34. start_date=datetime(2019, 1, 1),
  35. schedule_interval=None,
  36. params={
  37. 'my_dag_params_date': '{{ ds }}'
  38. },
  39. max_active_runs=1) as dag:
  40. start, end = DummyOperator(task_id='start'), DummyOperator(task_id='end')
  41.  
  42. render = RenderJSONOperator(
  43. task_id='render_json',
  44. file_json='template.json',
  45. params={
  46. 'my_operator_params_date': '{{ ds }}'
  47. },
  48. )
  49.  
  50. start >> render >> end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement