Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import itertools
- import math
- import typing as tp
- from abc import abstractmethod, ABC
- from string import punctuation
- from heapq import nlargest
- from .utils_functions import get_distance, parse_datetime
- TRow = tp.Dict[str, tp.Any]
- TRowsIterable = tp.Iterable[TRow]
- TRowsGenerator = tp.Generator[TRow, None, None]
- class Operation(ABC):
- """Base class for operations"""
- @abstractmethod
- def __call__(self, rows: TRowsIterable, *args: tp.Any, **kwargs: tp.Any) -> TRowsGenerator:
- pass
- # Operations
- class Mapper(ABC):
- """Base class for mappers"""
- @abstractmethod
- def __call__(self, row: TRow) -> TRowsGenerator:
- """
- :param row: one table row
- """
- pass
- class Map(Operation):
- """Implementation of map operation"""
- def __init__(self, mapper: Mapper) -> None:
- self.mapper = mapper
- def __call__(self, rows: TRowsIterable, *args: tp.Any, **kwargs: tp.Any) -> TRowsGenerator:
- for row in rows:
- yield from self.mapper(row)
- class Reducer(ABC):
- """Base class for reducers"""
- @abstractmethod
- def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
- """
- :param rows: table rows
- """
- pass
- class Reduce(Operation):
- """Implementation of reduce operation"""
- def __init__(self, reducer: Reducer, keys: tp.List[str]) -> None:
- self.reducer = reducer
- self.keys = keys
- def __call__(self, rows: TRowsIterable, *args: tp.Any, **kwargs: tp.Any) -> TRowsGenerator:
- for keys, values in self.group_by(rows):
- yield from self.reducer(keys, values)
- def group_by(self, rows: TRowsIterable) -> tp.Generator[tp.Tuple[tp.Tuple[str, ...], TRowsGenerator], None, None]:
- """Method to groupby rows by keys"""
- result_rows: tp.List[TRow] = []
- values: tp.Tuple[str, ...] = tuple()
- for row in rows:
- if not values:
- values = tuple(row[key] for key in self.keys)
- new_values = tuple(row[key] for key in self.keys)
- if values == new_values:
- result_rows.append(row)
- else:
- yield tuple(self.keys), (row for row in result_rows)
- values = new_values
- result_rows = [row]
- yield tuple(self.keys), (row for row in result_rows)
- class Joiner(ABC):
- """Base class for joiners"""
- def __init__(self, suffix_a: str = '_1', suffix_b: str = '_2') -> None:
- self._a_suffix = suffix_a
- self._b_suffix = suffix_b
- @abstractmethod
- def __call__(self, keys: tp.Sequence[str], rows_a: TRowsIterable, rows_b: TRowsIterable) -> TRowsGenerator:
- """
- :param keys: join keys
- :param rows_a: left table rows
- :param rows_b: right table rows
- """
- pass
- class Join(Operation):
- """Implementation of join operation"""
- def __init__(self, joiner: Joiner, keys: tp.Sequence[str]):
- self.keys = keys
- self.joiner = joiner
- def __call__(self, rows: TRowsIterable, *args: tp.Any, **kwargs: tp.Any) -> TRowsGenerator:
- yield from self.joiner(self.keys, rows, *args)
- # Dummy operators
- class DummyMapper(Mapper):
- """Yield exactly the row passed"""
- def __call__(self, row: TRow) -> TRowsGenerator:
- yield row
- class FirstReducer(Reducer):
- """Yield only first row from passed ones"""
- def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
- for row in rows:
- yield row
- break
- # Mappers
- class FilterPunctuation(Mapper):
- """Left only non-punctuation symbols"""
- def __init__(self, column: str):
- """
- :param column: name of column to process
- """
- self.column = column
- def __call__(self, row: TRow) -> TRowsGenerator:
- result_row = row.copy()
- result_row[self.column] = ''.join(x for x in row[self.column] if x not in punctuation)
- yield result_row
- class Split(Mapper):
- """Split row on multiple rows by separator"""
- def __init__(self, column: str, separator: tp.Optional[str] = None) -> None:
- """
- :param column: name of column to split
- :param separator: string to separate by
- """
- self.column = column
- self.separator = separator
- def __call__(self, row: TRow) -> TRowsGenerator:
- parts = row[self.column].split(self.separator)
- for part in parts:
- result_row = row.copy()
- result_row[self.column] = part
- yield result_row
- class BinaryOperation(Mapper):
- """Apply binary operation to given pair of columns"""
- def __init__(self, columns: tp.Sequence[str], result_column: str = 'product',
- operation: tp.Callable[[int, int], tp.Union[int, float]] = lambda x, y: x * y,
- delete: bool = True) -> None:
- """
- :param columns: column names to do operation
- :param result_column: column names to save result in
- :param operation: operation to apply
- :param delete: flag indicating if old columns should be deleted
- """
- self.columns = columns
- self.result_column = result_column
- self.operation = operation
- self.delete = delete
- def __call__(self, row: TRow) -> TRowsGenerator:
- result_row = row.copy()
- result_row[self.result_column] = self.operation(row[self.columns[0]], row[self.columns[1]])
- if self.delete:
- for column in self.columns:
- del result_row[column]
- yield result_row
- class Filter(Mapper):
- """Remove records that don't satisfy some condition"""
- def __init__(self, condition: tp.Callable[[TRow], bool]) -> None:
- """
- :param condition: if condition is not true - remove record
- """
- self.condition = condition
- def __call__(self, row: TRow) -> TRowsGenerator:
- if self.condition(row):
- yield row
- class Project(Mapper):
- """Leave only mentioned columns"""
- def __init__(self, columns: tp.Sequence[str]) -> None:
- """
- :param columns: names of columns
- """
- self.columns = columns
- def __call__(self, row: TRow) -> TRowsGenerator:
- result_row: TRow = {}
- for column in self.columns:
- result_row[column] = row[column]
- yield result_row
- class UnaryOperation(Mapper):
- """Apply unary operation to given column"""
- def __init__(self, target_column: str, result_column: str, operation: tp.Callable[[tp.Any], tp.Any]) -> None:
- """
- :param target_column: name of target column
- :param result_column: name of result column
- :param operation: operation to apply
- """
- self.target_column = target_column
- self.result_column = result_column
- self.operation = operation
- def __call__(self, row: TRow) -> TRowsGenerator:
- result_row: TRow = row.copy()
- result_row[self.result_column] = self.operation(result_row[self.target_column])
- yield result_row
- class HaversineDistance(Mapper):
- """Mapper to calculate length of edge"""
- def __init__(self, length_column: str, start_column: str, end_column: str) -> None:
- """
- :param length_column: name of column with length of edge
- :param start_column: name of column with start coordinates
- :param end_column: name of column with end coordinates
- """
- self.length_column = length_column
- self.start_column = start_column
- self.end_column = end_column
- def __call__(self, row: TRow) -> TRowsGenerator:
- result_row: TRow = row.copy()
- latitude_start, longitude_start = result_row.pop(self.start_column)
- latitude_end, longitude_end = result_row.pop(self.end_column)
- result_row[self.length_column] = get_distance(latitude_start, longitude_start, latitude_end, longitude_end)
- yield result_row
- # Reducers
- class TopN(Reducer):
- """Calculate top N by value"""
- def __init__(self, column_max: str, rows_count: int) -> None:
- """
- :param column_max: column name to get top by
- :param rows_count: number of top values to extract
- """
- self.column_max = column_max
- self.rows_count = rows_count
- def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
- rows = nlargest(self.rows_count, rows, key=lambda current_row: current_row[self.column_max])
- for row in rows:
- yield row
- class TermFrequency(Reducer):
- """Calculate frequency of values in column"""
- def __init__(self, words_column: str, result_column: str = 'tf') -> None:
- """
- :param words_column: name for column with words
- :param result_column: name for result column
- """
- self.words_column = words_column
- self.result_column = result_column
- def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
- amount = 0
- frequency: tp.Dict[str, int] = {}
- group_key_values: tp.Dict[str, tp.Any] = {}
- for row in rows:
- frequency[row[self.words_column]] = frequency.get(row[self.words_column], 0) + 1
- if not group_key_values:
- for key in group_key:
- group_key_values[key] = row[key]
- amount += 1
- for word in frequency:
- result_row: TRow = {}
- result_row.update(group_key_values)
- result_row[self.words_column] = word
- result_row[self.result_column] = frequency[word] / amount
- yield result_row
- class Count(Reducer):
- """Count rows passed and yield single row as a result"""
- def __init__(self, column: str) -> None:
- """
- :param column: name of column to count
- """
- self.column = column
- def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
- result_row: TRow = {}
- for row in rows:
- if not result_row:
- result_row[self.column] = 0
- result_row.update({key: row[key] for key in group_key})
- result_row[self.column] += 1
- yield result_row
- class Sum(Reducer):
- """Sum values in column passed and yield single row as a result"""
- def __init__(self, column: str) -> None:
- """
- :param column: name of column to sum
- """
- self.column = column
- def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
- result_row: TRow = {}
- for row in rows:
- if not result_row:
- result_row[self.column] = 0
- result_row.update({key: row[key] for key in group_key})
- result_row[self.column] += row[self.column]
- yield result_row
- class IdfCalculator(Reducer):
- """Sum values in column passed and yield single row as a result"""
- def __init__(self, column: str) -> None:
- """
- :param column: name of column to calculate idf
- """
- self.column = column
- def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
- result_row: TRow = {}
- idf_column = 'idf'
- amount = 0
- for row in rows:
- if not result_row:
- result_row[idf_column] = row[self.column]
- result_row.update({key: row[key] for key in group_key})
- amount += 1
- result_row[idf_column] /= amount
- result_row[idf_column] = math.log(result_row[idf_column])
- yield result_row
- class SpeedReducer(Reducer):
- def __init__(self, enter_time_column: str, leave_time_column: str, length_column: str, speed_column: str) -> None:
- """
- :param enter_time_column: name of column with enter time
- :param leave_time_column: name of column with leave time
- :param length_column: name of column with length of edge
- :param speed_column: name of column that stores result
- """
- self.enter_time_column = enter_time_column
- self.leave_time_column = leave_time_column
- self.length_column = length_column
- self.speed_column = speed_column
- def __call__(self, group_key: tp.Tuple[str, ...], rows: TRowsIterable) -> TRowsGenerator:
- total_time = 0
- total_length = 0
- result_row: TRow = {}
- for row in rows:
- if not result_row:
- result_row = row.copy()
- time_start = parse_datetime(row[self.enter_time_column])
- time_end = parse_datetime(row[self.leave_time_column])
- total_time += (time_end - time_start).total_seconds()
- total_length += row[self.length_column]
- result_row[self.speed_column] = total_length / total_time * 3600
- yield result_row
- # Joiners
- def merge_rows(row_a: TRow, row_b: TRow, keys: tp.Sequence[str], suffix_a: str, suffix_b: str) -> TRow:
- """Function to merge rows in joiners
- :param row_a row from first table
- :param row_b row from second table
- :param keys keys for join
- :param suffix_a suffix to add to column name of first table when duplicate
- :param suffix_b suffix to add to column name of second table when duplicate
- """
- result_row: TRow = {key: row_a[key] for key in keys}
- for key in keys:
- if result_row[key] != row_b[key]:
- return {}
- for key in row_a:
- if key in keys:
- continue
- if key in row_b:
- result_row[f'{key}{suffix_a}'] = row_a[key]
- result_row[f'{key}{suffix_b}'] = row_b[key]
- else:
- result_row[key] = row_a[key]
- for key in row_b:
- if key in row_a:
- continue
- result_row[key] = row_b[key]
- return result_row
- class InnerJoiner(Joiner):
- """Join with inner strategy"""
- def __call__(self, keys: tp.Sequence[str], rows_a: TRowsIterable, rows_b: TRowsIterable) -> TRowsGenerator:
- for row_a in rows_a:
- for row_b in rows_b:
- result_row = merge_rows(row_a, row_b, keys, self._a_suffix, self._b_suffix)
- if result_row:
- yield result_row
- class OuterJoiner(Joiner):
- """Join with outer strategy"""
- def __call__(self, keys: tp.Sequence[str], rows_a: TRowsIterable, rows_b: TRowsIterable) -> TRowsGenerator:
- rows_a, rows_a_copy = itertools.tee(rows_a)
- yield from Join(LeftJoiner(suffix_a=self._a_suffix, suffix_b=self._b_suffix), keys=keys)(rows_a_copy, rows_b)
- for row_b in rows_b:
- key_found = False
- rows_a, rows_a_copy = itertools.tee(rows_a)
- for row_a in rows_a_copy:
- result_row = merge_rows(row_a, row_b, keys, self._a_suffix, self._b_suffix)
- if result_row:
- key_found = True
- break
- if not key_found:
- yield row_b
- class LeftJoiner(Joiner):
- """Join with left strategy"""
- def __call__(self, keys: tp.Sequence[str], rows_a: TRowsIterable, rows_b: TRowsIterable) -> TRowsGenerator:
- for row_a in rows_a:
- key_found = False
- for row_b in rows_b:
- result_row: TRow = merge_rows(row_a, row_b, keys, self._a_suffix, self._b_suffix)
- if result_row:
- key_found = True
- yield result_row
- if not key_found:
- yield row_a
- class RightJoiner(Joiner):
- """Join with right strategy"""
- def __call__(self, keys: tp.Sequence[str], rows_a: TRowsIterable, rows_b: TRowsIterable) -> TRowsGenerator:
- yield from Join(LeftJoiner(suffix_a=self._b_suffix, suffix_b=self._a_suffix), keys=keys)(rows_b, rows_a)
Advertisement
Add Comment
Please, Sign In to add comment