chuuupa

storage

Oct 19th, 2021
558
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. from datetime import date
  2. import os
  3. import re
  4. import pandas as pd
  5. import pyarrow as pa
  6. import pyarrow.parquet as pq
  7. import itertools
  8.  
  9. from typing import List, Union, Dict
  10. from collections import defaultdict
  11.  
  12. from mapping import CAST_TYPES_INDEXES, PARQUET_SCHEMAS
  13. from environment_handler import Environment
  14.  
  15.  
  16. class StorageData:
  17.     _tmp_storage = os.path.join(Environment.parquets_dir, 'tmp')
  18.     _cdate: str = Environment.change_date().replace('-', '')
  19.     _logger = Environment.get_logger('Storage')
  20.  
  21.     os.makedirs(_tmp_storage, exist_ok=True)
  22.  
  23.  
  24.     @staticmethod
  25.     def _transpose_table(table: List[List[str]]) -> List[List[str]]:
  26.         return list(map(list, itertools.zip_longest(*table)))
  27.  
  28.  
  29.     @staticmethod
  30.     def _remove_dublicates(table: List[List[str]]) -> List[List[str]]:
  31.         return list(map(list, set(map(tuple, table))))
  32.  
  33.  
  34.     @staticmethod
  35.     def _cast_types(
  36.         method: str,
  37.         table_name: str,
  38.         table: List[List[str]]
  39.     ) -> List[List[Union[str, int]]]:
  40.  
  41.         for _type, numcols in CAST_TYPES_INDEXES[method].get(table_name, {}).items():
  42.             if isinstance(numcols, int):
  43.                 numcols = [numcols]
  44.  
  45.             if _type == 'bool':
  46.                 for numcol in numcols:
  47.                     table[numcol] = list(map(
  48.                         lambda x: 1 if x is not None and x.lower() in ('true', '1') else (
  49.                             0 if x is not None and x.lower() in ('false', '0') else None
  50.                         ),
  51.                         table[numcol]
  52.                     ))
  53.            
  54.             elif _type == 'float':
  55.                 for numcol in numcols:
  56.                     table[numcol] = list(map(
  57.                         lambda x: None if (x is None or x == 'N/A') else x.replace(',', '.'),
  58.                         table[numcol]
  59.                     ))
  60.            
  61.             elif _type == 'int':
  62.                 for numcol in numcols:
  63.                     table[numcol] = list(map(lambda x: None if (x == 'N/A') else x, table[numcol]))
  64.  
  65.         return table
  66.  
  67.  
  68.     @classmethod
  69.     def _save_partition(
  70.         cls,
  71.         method: str,
  72.         table_name: str,
  73.         table: List[List[Union[str, int]]],
  74.         ipart: int
  75.     ) -> None:
  76.  
  77.         suffix = 'part' + str(ipart)
  78.         filename = '_'.join(
  79.             ['Spark', method, table_name, cls._cdate, suffix]
  80.         ) + '.parquet'
  81.  
  82.         try:
  83.             schema = PARQUET_SCHEMAS[method][table_name]
  84.         except KeyError:
  85.             raise Exception(
  86.                 f'Parquet schema not found for {table_name} table for method {method}')
  87.                
  88.         table = pa.Table.from_arrays([pa.array(col) for col in table], schema=schema)
  89.         pq.write_table(table, os.path.join(cls._tmp_storage, filename))
  90.  
  91.         cls._logger.info('Saved: ' + filename)
  92.  
  93.    
  94.     @classmethod
  95.     def save_partitions(
  96.         cls,
  97.         method: str,
  98.         partitions: Dict[str, List[List[str]]],
  99.         ipart: int
  100.     ) -> None:
  101.         for table_name, table in partitions.items():
  102.             table = cls._cast_types(
  103.                 method,
  104.                 table_name,
  105.                 cls._transpose_table(cls._remove_dublicates(table))
  106.             )
  107.             cls._save_partition(method, table_name, table, ipart)
  108.  
  109.  
  110.     @classmethod
  111.     def _save_table(
  112.         cls,
  113.         method: str,
  114.         table_name: str,
  115.         table: List[List[Union[str, int]]]
  116.     ) -> None:
  117.  
  118.         filename = '_'.join(['Spark', method, table_name, cls._cdate]) + '.parquet'
  119.  
  120.         try:
  121.             schema = PARQUET_SCHEMAS[method][table_name]
  122.         except KeyError:
  123.             raise Exception(
  124.                 f'Parquet schema not found for {table_name} table for method {method}')
  125.  
  126.         table = pa.Table.from_arrays(list(map(pa.array, table)), schema=schema)
  127.         pq.write_table(table, os.path.join(Environment.parquets_dir, filename))
  128.  
  129.         cls._logger.info('Saved: ' + filename)
  130.  
  131.  
  132.     @classmethod
  133.     def save_tables(
  134.         cls,
  135.         method: str,
  136.         tables: Dict[str, List[List[str]]]
  137.     ) -> None:
  138.         for table_name, table in tables.items():
  139.             table = cls._cast_types(
  140.                 method,
  141.                 table_name,
  142.                 cls._transpose_table(cls._remove_dublicates(table))
  143.             )
  144.             cls._save_table(method, table_name, table)
  145.  
  146.  
  147.     @classmethod
  148.     def concat_partitions(cls, method: str) -> None:
  149.         pattern = rf'Spark_{method}_([a-zA-Z0-9]*?)_{cls._cdate}_part(\d+).parquet'
  150.  
  151.         matching = defaultdict(list)
  152.  
  153.         for filename in os.listdir(cls._tmp_storage):
  154.             match = re.fullmatch(pattern, filename)
  155.             if match:
  156.                 group = match.groups()
  157.                 matching[group[0]].append(group[1])
  158.  
  159.         for table_name, part_indexes in matching.items():
  160.             parts = []
  161.  
  162.             filename = '_'.join(['Spark', method, table_name, cls._cdate])
  163.  
  164.             for i in part_indexes:
  165.                 full_path = os.path.join(cls._tmp_storage, filename + f'_part{i}.parquet')
  166.                 parts.append(pq.read_table(full_path))
  167.            
  168.             full_table = pa.concat_tables(parts)
  169.             pq.write_table(full_table, os.path.join(Environment.parquets_dir, filename + '.parquet'))
  170.  
  171.             cls._logger.info(f"Concatenated {len(part_indexes)} partitions into {filename + '.parquet'}")
  172.  
  173.    
  174.     @classmethod
  175.     def cleanup_tmp_dir(cls):
  176.         for filename in os.listdir(cls._tmp_storage):
  177.             os.remove(os.path.join(cls._tmp_storage, filename))
  178.        
  179.         cls._logger.info('Temp parquets storage cleared')
  180.  
RAW Paste Data