Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime as dt
- import numpy as np
- import logging
- import sys
- class DataTransfer():
- def __init__(self, cur_source, cur_dest, keys_to_use, table_src_name,
- table_dest_name, date_src_field=None,
- dt_start=dt.date(year=2016, month=1, day=1),
- dt_end=dt.datetime.now().date(),
- step=dt.timedelta(weeks=1), batch_size=1000000,
- delimiter=';', date_format='%d.%m.%Y',
- print_info_overall=True, print_info_detailed=False,
- print_info_iter=True, print_dates=True,
- test_load=False, df_to_use=None,
- file_log=('/tmp/data_transfer_{}.log'.format(
- dt.datetime.now().strftime('%Y_%m_%d_%H_%M'))),
- use_logging=True, log_to_stdout=False):
- self.cur_source = cur_source
- self.cur_dest = cur_dest
- self.keys_to_use = keys_to_use
- self.table_src_name = table_src_name
- self.table_dest_name = table_dest_name
- self.date_src_field = date_src_field
- self.batch_size = batch_size
- self.delimiter = delimiter
- self.date_format = date_format
- self.dt_start = dt_start
- self.dt_end = dt_end
- self.step = step
- self.print_info_overall = print_info_overall
- self.print_info_detailed = print_info_detailed
- self.print_info_iter = print_info_iter
- self.print_dates = print_dates
- self.test_load = test_load
- self.log_to_stdout = log_to_stdout
- self.use_logging = use_logging
- if df_to_use is not None:
- self.keys_to_use = df_to_use.loc[df_to_use['param'] == 1, 'key'].tolist()
- if self.use_logging:
- self.logger = logging.getLogger('DATA_TRANSFER')
- self.logger.setLevel(logging.INFO)
- # create a file handler
- handler = logging.FileHandler(file_log)
- handler.setLevel(logging.INFO)
- # create a logging format
- formatter = logging.Formatter('[%(asctime)s] {%(filename)s:'
- '%(lineno)d} %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- # add the handlers to the logger
- self.logger.addHandler(handler)
- if self.log_to_stdout:
- log_stdout = logging.StreamHandler(sys.stdout)
- log_stdout.setLevel(logging.INFO)
- formatter_stdout = logging.Formatter('[%(asctime)s] {%(filename)s:'
- '%(lineno)d} %(levelname)s - %(message)s')
- log_stdout.setFormatter(formatter_stdout)
- self.logger.addHandler(log_stdout)
- self.logger.info('INITIALIZATION')
- def set_date_list(self):
- num_steps = int(np.ceil((self.dt_end - self.dt_start) / self.step)) + 1
- self.date_list = ([self.dt_end - i*self.step
- if self.dt_end - i*self.step > self.dt_start
- else self.dt_start for i in range(num_steps)])
- if self.print_dates and self.date_list:
- self.logger.info('going to fetch data from periods: ')
- self.logger.info('\n'.join(['{}\t{} : {}'.format(i, start, end)
- for i, (start, end) in
- enumerate(zip(self.date_list, self.date_list[1:]))]))
- self.logger.info('-'*80)
- def set_query_string(self, dt_start, dt_end):
- self.query = ("select {keys} from {table_name} where {date_field} > "
- "'{start}' and {date_field} <= '{end}'".format(**{
- 'table_name': self.table_src_name,
- 'date_field': self.date_src_field,
- 'keys' : ', '.join(self.keys_to_use),
- 'start' : dt_start.strftime(self.date_format),
- 'end' : dt_end.strftime(self.date_format)
- })
- )
- def set_query_string_no_start(self):
- self.query = ("select {keys} from {table_name} where {date_field} <= '{end}'".
- format(**{
- 'table_name': self.table_src_name,
- 'date_field': self.date_src_field,
- 'keys' : ', '.join(self.keys_to_use),
- 'end' : self.dt_end.strftime(self.date_format)
- })
- )
- def set_query_string_no_date(self):
- self.query = ("select {keys} from {table_name}".
- format(**{
- 'table_name': self.table_src_name,
- 'keys' : ', '.join(self.keys_to_use)
- })
- )
- def set_whole_string(self):
- if isinstance(self.source_result, list):
- self.data_string = '\n'.join([self.delimiter.join(map(lambda x: str(x).replace(self.delimiter, ',')
- if x is not None else '', row)) for row in self.source_result])
- else:
- self.data_string = self.delimiter.join(map(lambda x: str(x) if x
- is not None else '', self.source_result))
- def fetch_source_data(self):
- start_time = dt.datetime.now()
- self.source_result = self.cur_source.fetchmany(self.batch_size)
- if self.print_info_detailed:
- self.logger.info('fetching data from source is finished: {:.1f} seconds'.
- format((dt.datetime.now() - start_time).total_seconds()))
- def copy_to_dest(self):
- start_time = dt.datetime.now()
- if self.print_info_detailed:
- string_end_time = dt.datetime.now()
- self.logger.info('creating string is finished: {:.1f} seconds'.
- format((string_end_time - start_time).total_seconds()))
- self.cur_dest.copy("COPY {} from stdin DELIMITER '{}' ".
- format(self.table_dest_name, self.delimiter), self.data_string)
- if self.print_info_detailed:
- self.logger.info('copying data to dest is finished: {:.1f} seconds'.format(
- (dt.datetime.now() - start_time).total_seconds()))
- self.logger.info('-'*80)
- def transfer_batches(self):
- self.start_step_time = dt.datetime.now()
- num_iter_load = 0
- while True:
- start_iter_time = dt.datetime.now()
- num_iter_load += 1
- self.fetch_source_data()
- self.set_whole_string()
- self.copy_to_dest()
- if self.print_info_overall:
- self.logger.info('{} is finished, {} rows transfered, iter time: {:.1f},'
- ' date step time: {:.1f}'.format(
- num_iter_load,
- self.cur_source.rowcount,
- (dt.datetime.now() - start_iter_time).total_seconds(),
- (dt.datetime.now() - self.start_step_time).total_seconds()
- ))
- if len(self.source_result) < self.batch_size or self.test_load:
- return
- def print_overall(self, dt_start=None, dt_end=None):
- self.logger.info('-'*80)
- self.logger.info('iteration {} is finished'.format(self.num_iter))
- if 'date_list' in dir(self):
- self.logger.info('data from {} to {} is finished, for {:.1f} seconds'.format(
- dt_start, dt_end, (dt.datetime.now() - self.start_step_time).
- total_seconds()))
- self.logger.info('total rows uploaded: {}'.format(self.cur_source.rowcount))
- self.logger.info('-'*80)
- def transfer_copy(self):
- start_overall_time = dt.datetime.now()
- # check whether date_list is setted otherwise query without date
- self.num_iter = 0
- if 'date_list' in dir(self):
- for self.num_iter, (dt_start, dt_end) in enumerate(zip(self.date_list[1:],
- self.date_list)):
- self.set_query_string(dt_start, dt_end)
- self.logger.info(self.query)
- self.cur_source.execute(self.query)
- self.transfer_batches()
- if self.test_load:
- break
- if self.print_info_overall:
- self.print_overall(dt_start, dt_end)
- else:
- self.set_query_string_no_date()
- self.logger.info(self.query)
- self.cur_source.execute(self.query)
- self.transfer_batches()
- if self.print_info_overall:
- self.print_overall()
- self.logger.info('FINISH TRANSFER')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement