mfgnik

Untitled

Apr 14th, 2020
533
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 15.92 KB | None | 0 0
  1. import itertools
  2. import math
  3. import typing as tp
  4. from abc import abstractmethod, ABC
  5. from string import punctuation
  6. from heapq import nlargest
  7.  
  8. from .utils_functions import get_distance, parse_datetime
  9.  
  10. TRow = tp.Dict[str, tp.Any]
  11. TRowsIterable = tp.Iterable[TRow]
  12. TRowsGenerator = tp.Generator[TRow, None, None]
  13.  
  14.  
  15. class Operation(ABC):
  16.     """Base class for operations"""
  17.     @abstractmethod
  18.     def __call__(self, rows: TRowsIterable, *args: tp.Any, **kwargs: tp.Any) -> TRowsGenerator:
  19.         pass
  20.  
  21.  
  22. # Operations
  23.  
  24.  
  25. class Mapper(ABC):
  26.     """Base class for mappers"""
  27.     @abstractmethod
  28.     def __call__(self, row: TRow) -> TRowsGenerator:
  29.         """
  30.        :param row: one table row
  31.        """
  32.         pass
  33.  
  34.  
  35. class Map(Operation):
  36.     """Implementation of map operation"""
  37.     def __init__(self, mapper: Mapper) -> None:
  38.         self.mapper = mapper
  39.  
  40.     def __call__(self, rows: TRowsIterable, *args: tp.Any, **kwargs: tp.Any) -> TRowsGenerator:
  41.         for row in rows:
  42.             yield from self.mapper(row)
  43.  
  44.  
  45. class Reducer(ABC):
  46.     """Base class for reducers"""
  47.     @abstractmethod
  48.     def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
  49.         """
  50.        :param rows: table rows
  51.        """
  52.         pass
  53.  
  54.  
  55. class Reduce(Operation):
  56.     """Implementation of reduce operation"""
  57.     def __init__(self, reducer: Reducer, keys: tp.List[str]) -> None:
  58.         self.reducer = reducer
  59.         self.keys = keys
  60.  
  61.     def __call__(self, rows: TRowsIterable, *args: tp.Any, **kwargs: tp.Any) -> TRowsGenerator:
  62.         for keys, values in self.group_by(rows):
  63.             yield from self.reducer(keys, values)
  64.  
  65.     def group_by(self, rows: TRowsIterable) -> tp.Generator[tp.Tuple[tp.Tuple[str, ...], TRowsGenerator], None, None]:
  66.         """Method to groupby rows by keys"""
  67.         result_rows: tp.List[TRow] = []
  68.         values: tp.Tuple[str, ...] = tuple()
  69.         for row in rows:
  70.             if not values:
  71.                 values = tuple(row[key] for key in self.keys)
  72.             new_values = tuple(row[key] for key in self.keys)
  73.             if values == new_values:
  74.                 result_rows.append(row)
  75.             else:
  76.                 yield tuple(self.keys), (row for row in result_rows)
  77.                 values = new_values
  78.                 result_rows = [row]
  79.         yield tuple(self.keys), (row for row in result_rows)
  80.  
  81.  
  82. class Joiner(ABC):
  83.     """Base class for joiners"""
  84.     def __init__(self, suffix_a: str = '_1', suffix_b: str = '_2') -> None:
  85.         self._a_suffix = suffix_a
  86.         self._b_suffix = suffix_b
  87.  
  88.     @abstractmethod
  89.     def __call__(self, keys: tp.Sequence[str], rows_a: TRowsIterable, rows_b: TRowsIterable) -> TRowsGenerator:
  90.         """
  91.        :param keys: join keys
  92.        :param rows_a: left table rows
  93.        :param rows_b: right table rows
  94.        """
  95.         pass
  96.  
  97.  
  98. class Join(Operation):
  99.     """Implementation of join operation"""
  100.     def __init__(self, joiner: Joiner, keys: tp.Sequence[str]):
  101.         self.keys = keys
  102.         self.joiner = joiner
  103.  
  104.     def __call__(self, rows: TRowsIterable, *args: tp.Any, **kwargs: tp.Any) -> TRowsGenerator:
  105.         yield from self.joiner(self.keys, rows, *args)
  106.  
  107.  
  108. # Dummy operators
  109.  
  110.  
  111. class DummyMapper(Mapper):
  112.     """Yield exactly the row passed"""
  113.     def __call__(self, row: TRow) -> TRowsGenerator:
  114.         yield row
  115.  
  116.  
  117. class FirstReducer(Reducer):
  118.     """Yield only first row from passed ones"""
  119.     def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
  120.         for row in rows:
  121.             yield row
  122.             break
  123.  
  124.  
  125. # Mappers
  126.  
  127.  
  128. class FilterPunctuation(Mapper):
  129.     """Left only non-punctuation symbols"""
  130.     def __init__(self, column: str):
  131.         """
  132.        :param column: name of column to process
  133.        """
  134.         self.column = column
  135.  
  136.     def __call__(self, row: TRow) -> TRowsGenerator:
  137.         result_row = row.copy()
  138.         result_row[self.column] = ''.join(x for x in row[self.column] if x not in punctuation)
  139.         yield result_row
  140.  
  141.  
  142. class Split(Mapper):
  143.     """Split row on multiple rows by separator"""
  144.     def __init__(self, column: str, separator: tp.Optional[str] = None) -> None:
  145.         """
  146.        :param column: name of column to split
  147.        :param separator: string to separate by
  148.        """
  149.         self.column = column
  150.         self.separator = separator
  151.  
  152.     def __call__(self, row: TRow) -> TRowsGenerator:
  153.         parts = row[self.column].split(self.separator)
  154.         for part in parts:
  155.             result_row = row.copy()
  156.             result_row[self.column] = part
  157.             yield result_row
  158.  
  159.  
  160. class BinaryOperation(Mapper):
  161.     """Apply binary operation to given pair of columns"""
  162.     def __init__(self, columns: tp.Sequence[str], result_column: str = 'product',
  163.                  operation: tp.Callable[[int, int], tp.Union[int, float]] = lambda x, y: x * y,
  164.                  delete: bool = True) -> None:
  165.         """
  166.        :param columns: column names to do operation
  167.        :param result_column: column names to save result in
  168.        :param operation: operation to apply
  169.        :param delete: flag indicating if old columns should be deleted
  170.        """
  171.         self.columns = columns
  172.         self.result_column = result_column
  173.         self.operation = operation
  174.         self.delete = delete
  175.  
  176.     def __call__(self, row: TRow) -> TRowsGenerator:
  177.         result_row = row.copy()
  178.         result_row[self.result_column] = self.operation(row[self.columns[0]], row[self.columns[1]])
  179.         if self.delete:
  180.             for column in self.columns:
  181.                 del result_row[column]
  182.         yield result_row
  183.  
  184.  
  185. class Filter(Mapper):
  186.     """Remove records that don't satisfy some condition"""
  187.     def __init__(self, condition: tp.Callable[[TRow], bool]) -> None:
  188.         """
  189.        :param condition: if condition is not true - remove record
  190.        """
  191.         self.condition = condition
  192.  
  193.     def __call__(self, row: TRow) -> TRowsGenerator:
  194.         if self.condition(row):
  195.             yield row
  196.  
  197.  
  198. class Project(Mapper):
  199.     """Leave only mentioned columns"""
  200.     def __init__(self, columns: tp.Sequence[str]) -> None:
  201.         """
  202.        :param columns: names of columns
  203.        """
  204.         self.columns = columns
  205.  
  206.     def __call__(self, row: TRow) -> TRowsGenerator:
  207.         result_row: TRow = {}
  208.         for column in self.columns:
  209.             result_row[column] = row[column]
  210.         yield result_row
  211.  
  212.  
  213. class UnaryOperation(Mapper):
  214.     """Apply unary operation to given column"""
  215.     def __init__(self, target_column: str, result_column: str, operation: tp.Callable[[tp.Any], tp.Any]) -> None:
  216.         """
  217.        :param target_column: name of target column
  218.        :param result_column: name of result column
  219.        :param operation: operation to apply
  220.        """
  221.         self.target_column = target_column
  222.         self.result_column = result_column
  223.         self.operation = operation
  224.  
  225.     def __call__(self, row: TRow) -> TRowsGenerator:
  226.         result_row: TRow = row.copy()
  227.         result_row[self.result_column] = self.operation(result_row[self.target_column])
  228.         yield result_row
  229.  
  230.  
  231. class HaversineDistance(Mapper):
  232.     """Mapper to calculate length of edge"""
  233.     def __init__(self, length_column: str, start_column: str, end_column: str) -> None:
  234.         """
  235.        :param length_column: name of column with length of edge
  236.        :param start_column: name of column with start coordinates
  237.        :param end_column: name of column with end coordinates
  238.        """
  239.         self.length_column = length_column
  240.         self.start_column = start_column
  241.         self.end_column = end_column
  242.  
  243.     def __call__(self, row: TRow) -> TRowsGenerator:
  244.         result_row: TRow = row.copy()
  245.         latitude_start, longitude_start = result_row.pop(self.start_column)
  246.         latitude_end, longitude_end = result_row.pop(self.end_column)
  247.         result_row[self.length_column] = get_distance(latitude_start, longitude_start, latitude_end, longitude_end)
  248.         yield result_row
  249.  
  250.  
  251. # Reducers
  252.  
  253.  
  254. class TopN(Reducer):
  255.     """Calculate top N by value"""
  256.     def __init__(self, column_max: str, rows_count: int) -> None:
  257.         """
  258.        :param column_max: column name to get top by
  259.        :param rows_count: number of top values to extract
  260.        """
  261.         self.column_max = column_max
  262.         self.rows_count = rows_count
  263.  
  264.     def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
  265.         rows = nlargest(self.rows_count, rows, key=lambda current_row: current_row[self.column_max])
  266.         for row in rows:
  267.             yield row
  268.  
  269.  
  270. class TermFrequency(Reducer):
  271.     """Calculate frequency of values in column"""
  272.     def __init__(self, words_column: str, result_column: str = 'tf') -> None:
  273.         """
  274.        :param words_column: name for column with words
  275.        :param result_column: name for result column
  276.        """
  277.         self.words_column = words_column
  278.         self.result_column = result_column
  279.  
  280.     def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
  281.         amount = 0
  282.         frequency: tp.Dict[str, int] = {}
  283.         group_key_values: tp.Dict[str, tp.Any] = {}
  284.         for row in rows:
  285.             frequency[row[self.words_column]] = frequency.get(row[self.words_column], 0) + 1
  286.             if not group_key_values:
  287.                 for key in group_key:
  288.                     group_key_values[key] = row[key]
  289.             amount += 1
  290.         for word in frequency:
  291.             result_row: TRow = {}
  292.             result_row.update(group_key_values)
  293.             result_row[self.words_column] = word
  294.             result_row[self.result_column] = frequency[word] / amount
  295.             yield result_row
  296.  
  297.  
  298. class Count(Reducer):
  299.     """Count rows passed and yield single row as a result"""
  300.     def __init__(self, column: str) -> None:
  301.         """
  302.        :param column: name of column to count
  303.        """
  304.         self.column = column
  305.  
  306.     def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
  307.         result_row: TRow = {}
  308.         for row in rows:
  309.             if not result_row:
  310.                 result_row[self.column] = 0
  311.                 result_row.update({key: row[key] for key in group_key})
  312.             result_row[self.column] += 1
  313.         yield result_row
  314.  
  315.  
  316. class Sum(Reducer):
  317.     """Sum values in column passed and yield single row as a result"""
  318.     def __init__(self, column: str) -> None:
  319.         """
  320.        :param column: name of column to sum
  321.        """
  322.         self.column = column
  323.  
  324.     def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
  325.         result_row: TRow = {}
  326.         for row in rows:
  327.             if not result_row:
  328.                 result_row[self.column] = 0
  329.                 result_row.update({key: row[key] for key in group_key})
  330.             result_row[self.column] += row[self.column]
  331.         yield result_row
  332.  
  333.  
  334. class IdfCalculator(Reducer):
  335.     """Sum values in column passed and yield single row as a result"""
  336.     def __init__(self, column: str) -> None:
  337.         """
  338.        :param column: name of column to calculate idf
  339.        """
  340.         self.column = column
  341.  
  342.     def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
  343.         result_row: TRow = {}
  344.         idf_column = 'idf'
  345.         amount = 0
  346.         for row in rows:
  347.             if not result_row:
  348.                 result_row[idf_column] = row[self.column]
  349.                 result_row.update({key: row[key] for key in group_key})
  350.             amount += 1
  351.         result_row[idf_column] /= amount
  352.         result_row[idf_column] = math.log(result_row[idf_column])
  353.         yield result_row
  354.  
  355.  
  356. class SpeedReducer(Reducer):
  357.     def __init__(self, enter_time_column: str, leave_time_column: str, length_column: str, speed_column: str) -> None:
  358.         """
  359.        :param enter_time_column: name of column with enter time
  360.        :param leave_time_column: name of column with leave time
  361.        :param length_column: name of column with length of edge
  362.        :param speed_column: name of column that stores result
  363.        """
  364.         self.enter_time_column = enter_time_column
  365.         self.leave_time_column = leave_time_column
  366.         self.length_column = length_column
  367.         self.speed_column = speed_column
  368.  
  369.     def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
  370.         total_time = 0
  371.         total_length = 0
  372.         result_row: TRow = {}
  373.         for row in rows:
  374.             if not result_row:
  375.                 result_row = row.copy()
  376.             time_start = parse_datetime(row[self.enter_time_column])
  377.             time_end = parse_datetime(row[self.leave_time_column])
  378.             total_time += (time_end - time_start).total_seconds()
  379.             total_length += row[self.length_column]
  380.         result_row[self.speed_column] = total_length / total_time * 3600
  381.         yield result_row
  382.  
  383.  
  384. #  Joiners
  385.  
  386.  
  387. def merge_rows(row_a: TRow, row_b: TRow, keys: tp.Sequence[str], suffix_a: str, suffix_b: str) -> TRow:
  388.     """Function to merge rows in joiners
  389.    :param row_a row from first table
  390.    :param row_b row from second table
  391.    :param keys keys for join
  392.    :param suffix_a suffix to add to column name of first table when duplicate
  393.    :param suffix_b suffix to add to column name of second table when duplicate
  394.    """
  395.     result_row: TRow = {key: row_a[key] for key in keys}
  396.     for key in keys:
  397.         if result_row[key] != row_b[key]:
  398.             return {}
  399.     for key in row_a:
  400.         if key in keys:
  401.             continue
  402.         if key in row_b:
  403.             result_row[f'{key}{suffix_a}'] = row_a[key]
  404.             result_row[f'{key}{suffix_b}'] = row_b[key]
  405.         else:
  406.             result_row[key] = row_a[key]
  407.     for key in row_b:
  408.         if key in row_a:
  409.             continue
  410.         result_row[key] = row_b[key]
  411.     return result_row
  412.  
  413.  
  414. class InnerJoiner(Joiner):
  415.     """Join with inner strategy"""
  416.     def __call__(self, keys: tp.Sequence[str], rows_a: TRowsIterable, rows_b: TRowsIterable) -> TRowsGenerator:
  417.         for row_a in rows_a:
  418.             for row_b in rows_b:
  419.                 result_row = merge_rows(row_a, row_b, keys, self._a_suffix, self._b_suffix)
  420.                 if result_row:
  421.                     yield result_row
  422.  
  423.  
  424. class OuterJoiner(Joiner):
  425.     """Join with outer strategy"""
  426.     def __call__(self, keys: tp.Sequence[str], rows_a: TRowsIterable, rows_b: TRowsIterable) -> TRowsGenerator:
  427.         rows_a, rows_a_copy = itertools.tee(rows_a)
  428.         yield from Join(LeftJoiner(suffix_a=self._a_suffix, suffix_b=self._b_suffix), keys=keys)(rows_a_copy, rows_b)
  429.         for row_b in rows_b:
  430.             key_found = False
  431.             rows_a, rows_a_copy = itertools.tee(rows_a)
  432.             for row_a in rows_a_copy:
  433.                 result_row = merge_rows(row_a, row_b, keys, self._a_suffix, self._b_suffix)
  434.                 if result_row:
  435.                     key_found = True
  436.                     break
  437.             if not key_found:
  438.                 yield row_b
  439.  
  440.  
  441. class LeftJoiner(Joiner):
  442.     """Join with left strategy"""
  443.     def __call__(self, keys: tp.Sequence[str], rows_a: TRowsIterable, rows_b: TRowsIterable) -> TRowsGenerator:
  444.         for row_a in rows_a:
  445.             key_found = False
  446.             for row_b in rows_b:
  447.                 result_row: TRow = merge_rows(row_a, row_b, keys, self._a_suffix, self._b_suffix)
  448.                 if result_row:
  449.                     key_found = True
  450.                     yield result_row
  451.             if not key_found:
  452.                 yield row_a
  453.  
  454.  
  455. class RightJoiner(Joiner):
  456.     """Join with right strategy"""
  457.     def __call__(self, keys: tp.Sequence[str], rows_a: TRowsIterable, rows_b: TRowsIterable) -> TRowsGenerator:
  458.         yield from Join(LeftJoiner(suffix_a=self._b_suffix, suffix_b=self._a_suffix), keys=keys)(rows_b, rows_a)
Advertisement
Add Comment
Please, Sign In to add comment