Advertisement
Guest User

Untitled

a guest
Jun 24th, 2017
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.87 KB | None | 0 0
  1. class DataProcSparkOperator(BaseOperator):
  2. """
  3. Start a Cloud DataProc cluster, run a Spark job, then shut down the Spark cluster.
  4. """
  5. template_fields = ['arguments']
  6. ui_color = '#0273d4'
  7.  
  8. @apply_defaults
  9. def __init__(
  10. self,
  11. main_jar=None,
  12. main_class=None,
  13. arguments=None,
  14. archives=None,
  15. files=None,
  16. labels=None,
  17. dataproc_cluster=None,
  18. dataproc_spark_properties=None,
  19. dataproc_spark_jars=None,
  20. project_id=None,
  21. dataproc_cluster_properties=None,
  22. gcp_conn_id='google_cloud_default',
  23. delegate_to=None,
  24. allowed_envs=Environment.ALL,
  25. *args,
  26. **kwargs):
  27. super(DataProcSparkOperator, self).__init__(*args, **kwargs)
  28. dataproc_properties = DEFAULT_SPARK_DATAPROC_PROPERTIES.copy()
  29. if dataproc_spark_properties:
  30. dataproc_properties.update(dataproc_spark_properties)
  31.  
  32. self.project_id = project_id
  33. self.gcp_conn_id = gcp_conn_id
  34. self.delegate_to = delegate_to
  35. self.main_jar = main_jar
  36. self.main_class = main_class
  37. self.arguments = arguments
  38. self.archives = archives
  39. self.files = files
  40. self.labels = labels
  41. self.dataproc_cluster = dataproc_cluster
  42. self.dataproc_properties = dataproc_properties
  43. self.dataproc_jars = dataproc_spark_jars
  44. self.dataproc_cluster_properties = dataproc_cluster_properties
  45. self.allowed_envs = allowed_envs
  46.  
  47. @limit_by_environment
  48. def execute(self, context):
  49. # Create a cluster if requested
  50. cluster_hook = None
  51. cluster_created = False
  52. if self.dataproc_cluster_properties is not None:
  53. # Ensure cluster name exists, create a random one if not
  54. if self.dataproc_cluster is None:
  55. curr_time_millis = int(round(time.time() * 1000))
  56. self.dataproc_cluster = 'dp-%s-%d' % (uuid.uuid4().hex, curr_time_millis)
  57.  
  58. self.dataproc_cluster_properties['cluster_name'] = self.dataproc_cluster
  59. cluster_hook = DataProcClusterHook(
  60. project_id=self.project_id,
  61. gcp_conn_id=self.gcp_conn_id,
  62. delegate_to=self.delegate_to
  63. )
  64. cluster_created = cluster_hook.create_cluster(**self.dataproc_cluster_properties)
  65.  
  66. # Run job on cluster
  67. try:
  68. hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
  69. delegate_to=self.delegate_to,
  70. project_id=self.project_id)
  71. job = hook.create_job_template(self.project_id,
  72. self.task_id,
  73. self.dataproc_cluster,
  74. "sparkJob",
  75. self.dataproc_properties)
  76.  
  77. job.set_main(self.main_jar, self.main_class)
  78. job.add_args(self.arguments)
  79. job.add_jar_file_uris(self.dataproc_jars)
  80. job.add_archive_uris(self.archives)
  81. job.add_file_uris(self.files)
  82. labels = self.labels
  83.  
  84. def sanitize(l):
  85. return re.sub('[^a-z0-9-]', '-', l.lower())[:63]
  86.  
  87. if labels is None:
  88. labels = {
  89. 'task-id': sanitize(self.task_id),
  90. 'dag-id': sanitize(self.dag_id),
  91. }
  92. job.add_labels(labels)
  93.  
  94. logging.info("Submitting job: \n{}".format(job.pretty()))
  95. hook.submit(job.build())
  96.  
  97. except Exception, e:
  98. logging.error("job submission failed %s", e)
  99. raise
  100.  
  101. finally:
  102. # Tear down cluster when we're done
  103. if cluster_created:
  104. cluster_hook.delete_cluster(self.dataproc_cluster)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement