Advertisement
Guest User

Untitled

a guest
Apr 25th, 2016
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.20 KB | None | 0 0
  1. """
  2. ETL step wrapper to extract data from mysql to S3, compressing it along the way
  3. """
  4. import dataduct
  5. from dataduct.config import Config
  6. from dataduct.steps.etl_step import ETLStep
  7. from dataduct.pipeline import CopyActivity
  8. from dataduct.pipeline import MysqlNode
  9. from dataduct.pipeline import PipelineObject
  10. from dataduct.pipeline import Precondition
  11. from dataduct.pipeline import ShellCommandActivity
  12. from dataduct.utils.helpers import exactly_one
  13. from dataduct.utils.exceptions import ETLInputError
  14. from dataduct.database import SelectStatement
  15.  
  16. config = Config()
  17. if not hasattr(config, 'mysql'):
  18. raise ETLInputError('MySQL config not specified in ETL')
  19.  
  20. MYSQL_CONFIG = config.mysql
  21.  
  22. class ExtractMysqlGzipStep(ETLStep):
  23. """Extract Redshift Step class that helps get data out of redshift
  24. """
  25.  
  26. def __init__(self,
  27. table=None,
  28. sql=None,
  29. host_name=None,
  30. database=None,
  31. output_path=None,
  32. splits=1,
  33. gzip=False,
  34. **kwargs):
  35. """Constructor for the ExtractMysqlGzipStep class
  36.  
  37. Args:
  38. schema(str): schema from which table should be extracted
  39. table(path): table name for extract
  40. insert_mode(str): insert mode for redshift copy activity
  41. database(MysqlNode): database to excute the query
  42. splits(int): Number of files to split the output to.
  43. **kwargs(optional): Keyword arguments directly passed to base class
  44. """
  45. if not exactly_one(table, sql):
  46. raise ETLInputError('Only one of table, sql needed')
  47.  
  48. super(ExtractMysqlGzipStep, self).__init__(**kwargs)
  49.  
  50. if table:
  51. sql = 'SELECT * FROM %s;' % table
  52. elif sql:
  53. table = SelectStatement(sql).dependencies[0]
  54. else:
  55. raise ETLInputError('Provide a sql statement or a table name')
  56.  
  57. host = MYSQL_CONFIG[host_name]['HOST']
  58. user = MYSQL_CONFIG[host_name]['USERNAME']
  59. password = MYSQL_CONFIG[host_name]['PASSWORD']
  60.  
  61. input_node = self.create_pipeline_object(
  62. object_class=MysqlNode,
  63. schedule=self.schedule,
  64. host=host,
  65. database=database,
  66. table=table,
  67. username=user,
  68. password=password,
  69. sql=sql,
  70. )
  71.  
  72. s3_format = self.create_pipeline_object(
  73. object_class=PipelineObject,
  74. type='TSV'
  75. )
  76.  
  77. ### so - here's what's happening. DP does not like to encrypt if your next task is not Redshift
  78. ### so, i tricked it. Disconnected the s3data node from the cycle, and added a "fake" one
  79. ### that points to the same path. The fake one has a precondition (which appears to use polling)
  80. ### to ensure the path exists before attempting to run.
  81. compression = "none"
  82. if gzip:
  83. compression = "gzip"
  84.  
  85. intermediate_node = self.create_s3_data_node(format=s3_format, compression=compression)
  86.  
  87. fake_intermediate_node = None
  88.  
  89. ### if we're not gzip, let's conserve number of objects created
  90. if gzip:
  91. precondition = self.create_pipeline_object(
  92. object_class=Precondition,
  93. is_directory=True
  94. )
  95. fake_intermediate_node = self.create_s3_data_node(format=s3_format, precondition=precondition)
  96.  
  97. ### explicitly set the directory path to that of the intermediate_node
  98. fake_intermediate_node.fields['directoryPath'][0] = intermediate_node.fields['directoryPath'][0]
  99.  
  100. self.create_pipeline_object(
  101. object_class=CopyActivity,
  102. schedule=self.schedule,
  103. resource=self.resource,
  104. worker_group=self.worker_group,
  105. input_node=input_node,
  106. output_node=intermediate_node,
  107. depends_on=self.depends_on,
  108. max_retries=self.max_retries,
  109. )
  110.  
  111. self._output = self.create_s3_data_node(
  112. self.get_output_s3_path(output_path))
  113.  
  114. gunzip_part = \
  115. "for file in ${INPUT1_STAGING_DIR}/*.gz; do gunzip -f ${file}; done; " \
  116. if gzip else ""
  117.  
  118. gzip_part = "; for file in ${OUTPUT1_STAGING_DIR}/*; do gzip -f $file; done" \
  119. if gzip else ""
  120.  
  121.  
  122.  
  123. # This shouldn't be necessary but -
  124. # Mysql uses \\n as null, so we need to remove it
  125. command = ' '.join(["[[ -z $(find ${INPUT1_STAGING_DIR} -maxdepth 1 ! \
  126. -path ${INPUT1_STAGING_DIR} -name '*' -size +0) ]] \
  127. && touch ${OUTPUT1_STAGING_DIR}/part-0 ",
  128. "|| ",
  129. gunzip_part,
  130. "cat ${INPUT1_STAGING_DIR}/*",
  131. "| sed 's/\\\\\\\\n/NULL/g'", # replace \\n
  132. # get rid of control characters
  133. "| tr -d '\\\\000'",
  134. # split into `splits` number of equal sized files
  135. ("| split -a 4 -d -l $((($(cat ${{INPUT1_STAGING_DIR}}/* | wc -l) + \
  136. {splits} - 1) / {splits})) - ${{OUTPUT1_STAGING_DIR}}/part-"). \
  137. format(splits=splits),
  138. "; for f in ${INPUT1_STAGING_DIR}/*; do echo ${f}; file ${f}; done ",
  139. gzip_part])
  140.  
  141. input_node = fake_intermediate_node if gzip else intermediate_node
  142. self.create_pipeline_object(
  143. object_class=ShellCommandActivity,
  144. input_node=input_node,
  145. output_node=self.output,
  146. command=command,
  147. max_retries=self.max_retries,
  148. resource=self.resource,
  149. worker_group=self.worker_group,
  150. schedule=self.schedule,
  151. )
  152.  
  153. @classmethod
  154. def arguments_processor(cls, etl, input_args):
  155. """Parse the step arguments according to the ETL pipeline
  156.  
  157. Args:
  158. etl(ETLPipeline): Pipeline object containing resources and steps
  159. step_args(dict): Dictionary of the step arguments for the class
  160. """
  161. input_args = cls.pop_inputs(input_args)
  162. step_args = cls.base_arguments_processor(etl, input_args)
  163.  
  164. return step_args
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement