Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class DataProcSparkOperator(BaseOperator):
- """
- Start a Cloud DataProc cluster, run a Spark job, then shut down the Spark cluster.
- """
- template_fields = ['arguments']
- ui_color = '#0273d4'
- @apply_defaults
- def __init__(
- self,
- main_jar=None,
- main_class=None,
- arguments=None,
- archives=None,
- files=None,
- labels=None,
- dataproc_cluster=None,
- dataproc_spark_properties=None,
- dataproc_spark_jars=None,
- project_id=None,
- dataproc_cluster_properties=None,
- gcp_conn_id='google_cloud_default',
- delegate_to=None,
- allowed_envs=Environment.ALL,
- *args,
- **kwargs):
- super(DataProcSparkOperator, self).__init__(*args, **kwargs)
- dataproc_properties = DEFAULT_SPARK_DATAPROC_PROPERTIES.copy()
- if dataproc_spark_properties:
- dataproc_properties.update(dataproc_spark_properties)
- self.project_id = project_id
- self.gcp_conn_id = gcp_conn_id
- self.delegate_to = delegate_to
- self.main_jar = main_jar
- self.main_class = main_class
- self.arguments = arguments
- self.archives = archives
- self.files = files
- self.labels = labels
- self.dataproc_cluster = dataproc_cluster
- self.dataproc_properties = dataproc_properties
- self.dataproc_jars = dataproc_spark_jars
- self.dataproc_cluster_properties = dataproc_cluster_properties
- self.allowed_envs = allowed_envs
- @limit_by_environment
- def execute(self, context):
- # Create a cluster if requested
- cluster_hook = None
- cluster_created = False
- if self.dataproc_cluster_properties is not None:
- # Ensure cluster name exists, create a random one if not
- if self.dataproc_cluster is None:
- curr_time_millis = int(round(time.time() * 1000))
- self.dataproc_cluster = 'dp-%s-%d' % (uuid.uuid4().hex, curr_time_millis)
- self.dataproc_cluster_properties['cluster_name'] = self.dataproc_cluster
- cluster_hook = DataProcClusterHook(
- project_id=self.project_id,
- gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to
- )
- cluster_created = cluster_hook.create_cluster(**self.dataproc_cluster_properties)
- # Run job on cluster
- try:
- hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to,
- project_id=self.project_id)
- job = hook.create_job_template(self.project_id,
- self.task_id,
- self.dataproc_cluster,
- "sparkJob",
- self.dataproc_properties)
- job.set_main(self.main_jar, self.main_class)
- job.add_args(self.arguments)
- job.add_jar_file_uris(self.dataproc_jars)
- job.add_archive_uris(self.archives)
- job.add_file_uris(self.files)
- labels = self.labels
- def sanitize(l):
- return re.sub('[^a-z0-9-]', '-', l.lower())[:63]
- if labels is None:
- labels = {
- 'task-id': sanitize(self.task_id),
- 'dag-id': sanitize(self.dag_id),
- }
- job.add_labels(labels)
- logging.info("Submitting job: \n{}".format(job.pretty()))
- hook.submit(job.build())
- except Exception, e:
- logging.error("job submission failed %s", e)
- raise
- finally:
- # Tear down cluster when we're done
- if cluster_created:
- cluster_hook.delete_cluster(self.dataproc_cluster)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement