Advertisement
Guest User

Untitled

a guest
Apr 25th, 2019
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.81 KB | None | 0 0
  1. class JustGroup(Reducer):
  2.     """Return top N by value"""
  3.     def __init__(self):
  4.         pass
  5.  
  6.     def __call__(self, group_key: Tuple[str],
  7.                  rows: Iterable[Row]) -> OperationResult:
  8.         return [[row for row in rows]]
  9.  
  10.  
  11. # Joiners
  12.  
  13. class InnerJoiner(Joiner):
  14.     """Join with inner strategy"""
  15.     def __call__(self, keys: Sequence[str], rows_a: Iterable[Row],
  16.                  rows_b: Iterable[Row]) -> OperationResult:
  17.  
  18.         rows_a_groups = Reduce(JustGroup(), keys=keys)(rows_a)
  19.         rows_b_groups = Reduce(JustGroup(), keys=keys)(rows_b)
  20.  
  21.         try:
  22.  
  23.             group_a = next(rows_a_groups)
  24.             group_b = next(rows_b_groups)
  25.  
  26.             group_a_key = tuple(group_a[0][key] for key in keys)
  27.             group_b_key = tuple(group_b[0][key] for key in keys)
  28.  
  29.             while True:
  30.                 if group_a_key == group_b_key:
  31.                     for row_a in group_a:
  32.                         for row_b in group_b:
  33.                             yield self.unite_rows(row_a, row_b, keys)
  34.                     group_a = next(rows_a_groups)
  35.                     group_b = next(rows_b_groups)
  36.                     group_a_key = tuple(group_a[0][key] for key in keys)
  37.                     group_b_key = tuple(group_b[0][key] for key in keys)
  38.  
  39.                 if group_a_key > group_b_key:
  40.                     group_b = next(rows_b_groups)
  41.                     group_b_key = tuple(group_b[0][key] for key in keys)
  42.  
  43.                 if group_a_key < group_b_key:
  44.                     group_a = next(rows_a_groups)
  45.                     group_a_key = tuple(group_a[0][key] for key in keys)
  46.         except StopIteration:
  47.             return
  48.  
  49. class OuterJoiner(Joiner):
  50.     """Join with outer strategy"""
  51.     def __call__(self, keys: Sequence[str], rows_a: Iterable[Row],
  52.                  rows_b: Iterable[Row]) -> OperationResult:
  53.  
  54.         rows_a_groups = Reduce(JustGroup(), keys=keys)(rows_a)
  55.         rows_b_groups = Reduce(JustGroup(), keys=keys)(rows_b)
  56.  
  57.         try:
  58.             a_buffered, b_buffered = False, False
  59.             group_a, a_buffered = next(rows_a_groups), True
  60.             group_b, b_buffered = next(rows_b_groups), True
  61.  
  62.             group_a_key = tuple(group_a[0][key] for key in keys)
  63.             group_b_key = tuple(group_b[0][key] for key in keys)
  64.  
  65.             while True:
  66.                 if group_a_key == group_b_key:
  67.                     for row_a in group_a:
  68.                         for row_b in group_b:
  69.                             yield self.unite_rows(row_a, row_b, keys)
  70.                     a_buffered, b_buffered = False, False
  71.                     group_a, a_buffered = next(rows_a_groups), True
  72.                     group_b, b_buffered = next(rows_b_groups), True
  73.  
  74.                     group_a_key = tuple(group_a[0][key] for key in keys)
  75.                     group_b_key = tuple(group_b[0][key] for key in keys)
  76.  
  77.  
  78.                 if group_a_key > group_b_key:
  79.                     for row_b in group_b: yield row_b
  80.                     b_buffered = False
  81.                     group_b, b_buffered = next(rows_b_groups), True
  82.                     group_b_key = tuple(group_b[0][key] for key in keys)
  83.  
  84.                 if group_a_key < group_b_key:
  85.                     for row_a in group_a: yield row_a
  86.                     a_buffered = False
  87.                     group_a, a_buffered = next(rows_a_groups), True
  88.                     group_a_key = tuple(group_a[0][key] for key in keys)
  89.         except StopIteration:
  90.             if a_buffered:
  91.                 for row in group_a: yield row
  92.             if b_buffered:
  93.                 for row in group_b: yield row
  94.             for group in rows_a_groups:
  95.                 for row in group: yield row
  96.             for group in rows_b_groups:
  97.                 for row in group: yield row
  98.  
  99. class LeftJoiner(Joiner):
  100.     """Join with left strategy"""
  101.     def __call__(self, keys: Sequence[str], rows_a: Iterable[Row],
  102.                  rows_b: Iterable[Row]) -> OperationResult:
  103.         rows_a_groups = Reduce(JustGroup(), keys=keys)(rows_a)
  104.         rows_b_groups = Reduce(JustGroup(), keys=keys)(rows_b)
  105.  
  106.         try:
  107.             a_buffered = False
  108.             group_a, a_buffered = next(rows_a_groups), True
  109.             group_b = next(rows_b_groups)
  110.  
  111.             group_a_key = tuple(group_a[0][key] for key in keys)
  112.             group_b_key = tuple(group_b[0][key] for key in keys)
  113.  
  114.             while True:
  115.                 if group_a_key == group_b_key:
  116.                     for row_a in group_a:
  117.                         for row_b in group_b:
  118.                             yield self.unite_rows(row_a, row_b, keys)
  119.                     a_buffered = False
  120.                     group_a, a_buffered = next(rows_a_groups), True
  121.                     group_b = next(rows_b_groups)
  122.  
  123.                     group_a_key = tuple(group_a[0][key] for key in keys)
  124.                     group_b_key = tuple(group_b[0][key] for key in keys)
  125.  
  126.  
  127.                 if group_a_key > group_b_key:
  128.                     group_b = next(rows_b_groups)
  129.                     group_b_key = tuple(group_b[0][key] for key in keys)
  130.  
  131.                 if group_a_key < group_b_key:
  132.                     for row_a in group_a: yield row_a
  133.                     a_buffered = False
  134.                     group_a, a_buffered = next(rows_a_groups), True
  135.                     group_a_key = tuple(group_a[0][key] for key in keys)
  136.         except StopIteration:
  137.             if a_buffered:
  138.                 for row in group_a: yield row
  139.             for group in rows_a_groups:
  140.                 for row in group: yield row
  141.  
  142. class RightJoiner(Joiner):
  143.     """Join with right strategy"""
  144.     def __call__(self, keys: Sequence[str], rows_a: Iterable[Row],
  145.                  rows_b: Iterable[Row]) -> OperationResult:
  146.         for row in Join(LeftJoiner(), keys=keys)(rows_b, rows_a): yield row
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement