Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class JustGroup(Reducer):
- """Return top N by value"""
- def __init__(self):
- pass
- def __call__(self, group_key: Tuple[str],
- rows: Iterable[Row]) -> OperationResult:
- return [[row for row in rows]]
- # Joiners
- class InnerJoiner(Joiner):
- """Join with inner strategy"""
- def __call__(self, keys: Sequence[str], rows_a: Iterable[Row],
- rows_b: Iterable[Row]) -> OperationResult:
- rows_a_groups = Reduce(JustGroup(), keys=keys)(rows_a)
- rows_b_groups = Reduce(JustGroup(), keys=keys)(rows_b)
- try:
- group_a = next(rows_a_groups)
- group_b = next(rows_b_groups)
- group_a_key = tuple(group_a[0][key] for key in keys)
- group_b_key = tuple(group_b[0][key] for key in keys)
- while True:
- if group_a_key == group_b_key:
- for row_a in group_a:
- for row_b in group_b:
- yield self.unite_rows(row_a, row_b, keys)
- group_a = next(rows_a_groups)
- group_b = next(rows_b_groups)
- group_a_key = tuple(group_a[0][key] for key in keys)
- group_b_key = tuple(group_b[0][key] for key in keys)
- if group_a_key > group_b_key:
- group_b = next(rows_b_groups)
- group_b_key = tuple(group_b[0][key] for key in keys)
- if group_a_key < group_b_key:
- group_a = next(rows_a_groups)
- group_a_key = tuple(group_a[0][key] for key in keys)
- except StopIteration:
- return
- class OuterJoiner(Joiner):
- """Join with outer strategy"""
- def __call__(self, keys: Sequence[str], rows_a: Iterable[Row],
- rows_b: Iterable[Row]) -> OperationResult:
- rows_a_groups = Reduce(JustGroup(), keys=keys)(rows_a)
- rows_b_groups = Reduce(JustGroup(), keys=keys)(rows_b)
- try:
- a_buffered, b_buffered = False, False
- group_a, a_buffered = next(rows_a_groups), True
- group_b, b_buffered = next(rows_b_groups), True
- group_a_key = tuple(group_a[0][key] for key in keys)
- group_b_key = tuple(group_b[0][key] for key in keys)
- while True:
- if group_a_key == group_b_key:
- for row_a in group_a:
- for row_b in group_b:
- yield self.unite_rows(row_a, row_b, keys)
- a_buffered, b_buffered = False, False
- group_a, a_buffered = next(rows_a_groups), True
- group_b, b_buffered = next(rows_b_groups), True
- group_a_key = tuple(group_a[0][key] for key in keys)
- group_b_key = tuple(group_b[0][key] for key in keys)
- if group_a_key > group_b_key:
- for row_b in group_b: yield row_b
- b_buffered = False
- group_b, b_buffered = next(rows_b_groups), True
- group_b_key = tuple(group_b[0][key] for key in keys)
- if group_a_key < group_b_key:
- for row_a in group_a: yield row_a
- a_buffered = False
- group_a, a_buffered = next(rows_a_groups), True
- group_a_key = tuple(group_a[0][key] for key in keys)
- except StopIteration:
- if a_buffered:
- for row in group_a: yield row
- if b_buffered:
- for row in group_b: yield row
- for group in rows_a_groups:
- for row in group: yield row
- for group in rows_b_groups:
- for row in group: yield row
- class LeftJoiner(Joiner):
- """Join with left strategy"""
- def __call__(self, keys: Sequence[str], rows_a: Iterable[Row],
- rows_b: Iterable[Row]) -> OperationResult:
- rows_a_groups = Reduce(JustGroup(), keys=keys)(rows_a)
- rows_b_groups = Reduce(JustGroup(), keys=keys)(rows_b)
- try:
- a_buffered = False
- group_a, a_buffered = next(rows_a_groups), True
- group_b = next(rows_b_groups)
- group_a_key = tuple(group_a[0][key] for key in keys)
- group_b_key = tuple(group_b[0][key] for key in keys)
- while True:
- if group_a_key == group_b_key:
- for row_a in group_a:
- for row_b in group_b:
- yield self.unite_rows(row_a, row_b, keys)
- a_buffered = False
- group_a, a_buffered = next(rows_a_groups), True
- group_b = next(rows_b_groups)
- group_a_key = tuple(group_a[0][key] for key in keys)
- group_b_key = tuple(group_b[0][key] for key in keys)
- if group_a_key > group_b_key:
- group_b = next(rows_b_groups)
- group_b_key = tuple(group_b[0][key] for key in keys)
- if group_a_key < group_b_key:
- for row_a in group_a: yield row_a
- a_buffered = False
- group_a, a_buffered = next(rows_a_groups), True
- group_a_key = tuple(group_a[0][key] for key in keys)
- except StopIteration:
- if a_buffered:
- for row in group_a: yield row
- for group in rows_a_groups:
- for row in group: yield row
- class RightJoiner(Joiner):
- """Join with right strategy"""
- def __call__(self, keys: Sequence[str], rows_a: Iterable[Row],
- rows_b: Iterable[Row]) -> OperationResult:
- for row in Join(LeftJoiner(), keys=keys)(rows_b, rows_a): yield row
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement