Advertisement
Guest User

Untitled

a guest
Mar 29th, 2017
58
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.25 KB | None | 0 0
  1. import datetime as dt
  2. import numpy as np
  3. import logging
  4. import sys
  5.  
  6. class DataTransfer():
  7. def __init__(self, cur_source, cur_dest, keys_to_use, table_src_name,
  8. table_dest_name, date_src_field=None,
  9. dt_start=dt.date(year=2016, month=1, day=1),
  10. dt_end=dt.datetime.now().date(),
  11. step=dt.timedelta(weeks=1), batch_size=1000000,
  12. delimiter=';', date_format='%d.%m.%Y',
  13. print_info_overall=True, print_info_detailed=False,
  14. print_info_iter=True, print_dates=True,
  15. test_load=False, df_to_use=None,
  16. file_log=('/tmp/data_transfer_{}.log'.format(
  17. dt.datetime.now().strftime('%Y_%m_%d_%H_%M'))),
  18. use_logging=True, log_to_stdout=False):
  19. self.cur_source = cur_source
  20. self.cur_dest = cur_dest
  21. self.keys_to_use = keys_to_use
  22. self.table_src_name = table_src_name
  23. self.table_dest_name = table_dest_name
  24. self.date_src_field = date_src_field
  25. self.batch_size = batch_size
  26. self.delimiter = delimiter
  27. self.date_format = date_format
  28. self.dt_start = dt_start
  29. self.dt_end = dt_end
  30. self.step = step
  31. self.print_info_overall = print_info_overall
  32. self.print_info_detailed = print_info_detailed
  33. self.print_info_iter = print_info_iter
  34. self.print_dates = print_dates
  35. self.test_load = test_load
  36. self.log_to_stdout = log_to_stdout
  37. self.use_logging = use_logging
  38.  
  39. if df_to_use is not None:
  40. self.keys_to_use = df_to_use.loc[df_to_use['param'] == 1, 'key'].tolist()
  41.  
  42. if self.use_logging:
  43. self.logger = logging.getLogger('DATA_TRANSFER')
  44. self.logger.setLevel(logging.INFO)
  45. # create a file handler
  46. handler = logging.FileHandler(file_log)
  47. handler.setLevel(logging.INFO)
  48. # create a logging format
  49. formatter = logging.Formatter('[%(asctime)s] {%(filename)s:'
  50. '%(lineno)d} %(levelname)s - %(message)s')
  51. handler.setFormatter(formatter)
  52. # add the handlers to the logger
  53. self.logger.addHandler(handler)
  54. if self.log_to_stdout:
  55. log_stdout = logging.StreamHandler(sys.stdout)
  56. log_stdout.setLevel(logging.INFO)
  57. formatter_stdout = logging.Formatter('[%(asctime)s] {%(filename)s:'
  58. '%(lineno)d} %(levelname)s - %(message)s')
  59. log_stdout.setFormatter(formatter_stdout)
  60. self.logger.addHandler(log_stdout)
  61.  
  62. self.logger.info('INITIALIZATION')
  63.  
  64.  
  65. def set_date_list(self):
  66. num_steps = int(np.ceil((self.dt_end - self.dt_start) / self.step)) + 1
  67. self.date_list = ([self.dt_end - i*self.step
  68. if self.dt_end - i*self.step > self.dt_start
  69. else self.dt_start for i in range(num_steps)])
  70.  
  71. if self.print_dates and self.date_list:
  72. self.logger.info('going to fetch data from periods: ')
  73. self.logger.info('\n'.join(['{}\t{} : {}'.format(i, start, end)
  74. for i, (start, end) in
  75. enumerate(zip(self.date_list, self.date_list[1:]))]))
  76. self.logger.info('-'*80)
  77.  
  78. def set_query_string(self, dt_start, dt_end):
  79. self.query = ("select {keys} from {table_name} where {date_field} > "
  80. "'{start}' and {date_field} <= '{end}'".format(**{
  81. 'table_name': self.table_src_name,
  82. 'date_field': self.date_src_field,
  83. 'keys' : ', '.join(self.keys_to_use),
  84. 'start' : dt_start.strftime(self.date_format),
  85. 'end' : dt_end.strftime(self.date_format)
  86. })
  87. )
  88.  
  89. def set_query_string_no_start(self):
  90. self.query = ("select {keys} from {table_name} where {date_field} <= '{end}'".
  91. format(**{
  92. 'table_name': self.table_src_name,
  93. 'date_field': self.date_src_field,
  94. 'keys' : ', '.join(self.keys_to_use),
  95. 'end' : self.dt_end.strftime(self.date_format)
  96. })
  97. )
  98.  
  99. def set_query_string_no_date(self):
  100. self.query = ("select {keys} from {table_name}".
  101. format(**{
  102. 'table_name': self.table_src_name,
  103. 'keys' : ', '.join(self.keys_to_use)
  104. })
  105. )
  106.  
  107. def set_whole_string(self):
  108. if isinstance(self.source_result, list):
  109. self.data_string = '\n'.join([self.delimiter.join(map(lambda x: str(x).replace(self.delimiter, ',')
  110. if x is not None else '', row)) for row in self.source_result])
  111. else:
  112. self.data_string = self.delimiter.join(map(lambda x: str(x) if x
  113. is not None else '', self.source_result))
  114.  
  115. def fetch_source_data(self):
  116. start_time = dt.datetime.now()
  117. self.source_result = self.cur_source.fetchmany(self.batch_size)
  118. if self.print_info_detailed:
  119. self.logger.info('fetching data from source is finished: {:.1f} seconds'.
  120. format((dt.datetime.now() - start_time).total_seconds()))
  121.  
  122. def copy_to_dest(self):
  123. start_time = dt.datetime.now()
  124. if self.print_info_detailed:
  125. string_end_time = dt.datetime.now()
  126. self.logger.info('creating string is finished: {:.1f} seconds'.
  127. format((string_end_time - start_time).total_seconds()))
  128.  
  129.  
  130. self.cur_dest.copy("COPY {} from stdin DELIMITER '{}' ".
  131. format(self.table_dest_name, self.delimiter), self.data_string)
  132. if self.print_info_detailed:
  133. self.logger.info('copying data to dest is finished: {:.1f} seconds'.format(
  134. (dt.datetime.now() - start_time).total_seconds()))
  135. self.logger.info('-'*80)
  136.  
  137. def transfer_batches(self):
  138. self.start_step_time = dt.datetime.now()
  139. num_iter_load = 0
  140. while True:
  141. start_iter_time = dt.datetime.now()
  142. num_iter_load += 1
  143. self.fetch_source_data()
  144. self.set_whole_string()
  145. self.copy_to_dest()
  146. if self.print_info_overall:
  147. self.logger.info('{} is finished, {} rows transfered, iter time: {:.1f},'
  148. ' date step time: {:.1f}'.format(
  149. num_iter_load,
  150. self.cur_source.rowcount,
  151. (dt.datetime.now() - start_iter_time).total_seconds(),
  152. (dt.datetime.now() - self.start_step_time).total_seconds()
  153. ))
  154. if len(self.source_result) < self.batch_size or self.test_load:
  155. return
  156.  
  157. def print_overall(self, dt_start=None, dt_end=None):
  158. self.logger.info('-'*80)
  159. self.logger.info('iteration {} is finished'.format(self.num_iter))
  160. if 'date_list' in dir(self):
  161. self.logger.info('data from {} to {} is finished, for {:.1f} seconds'.format(
  162. dt_start, dt_end, (dt.datetime.now() - self.start_step_time).
  163. total_seconds()))
  164. self.logger.info('total rows uploaded: {}'.format(self.cur_source.rowcount))
  165. self.logger.info('-'*80)
  166.  
  167. def transfer_copy(self):
  168. start_overall_time = dt.datetime.now()
  169. # check whether date_list is setted otherwise query without date
  170. self.num_iter = 0
  171. if 'date_list' in dir(self):
  172. for self.num_iter, (dt_start, dt_end) in enumerate(zip(self.date_list[1:],
  173. self.date_list)):
  174. self.set_query_string(dt_start, dt_end)
  175. self.logger.info(self.query)
  176. self.cur_source.execute(self.query)
  177. self.transfer_batches()
  178.  
  179. if self.test_load:
  180. break
  181.  
  182. if self.print_info_overall:
  183. self.print_overall(dt_start, dt_end)
  184. else:
  185. self.set_query_string_no_date()
  186. self.logger.info(self.query)
  187. self.cur_source.execute(self.query)
  188. self.transfer_batches()
  189.  
  190. if self.print_info_overall:
  191. self.print_overall()
  192. self.logger.info('FINISH TRANSFER')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement