Advertisement
Guest User

Untitled

a guest
Nov 24th, 2017
83
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.82 KB | None | 0 0
  1. import json
  2. import itertools
  3.  
  4. class Operation(object):
  5.     pass
  6.  
  7.  
  8. class Map(Operation):
  9.    
  10.     def __init__(self, mapper):
  11.         self.mapper = mapper
  12.  
  13.     def compute(self, input_table):
  14.         out_table = [];
  15.         for row in input_table:
  16.             for new_row in self.mapper(row):
  17.                 out_table.append(new_row)
  18.         return out_table
  19.        
  20.  
  21. class Sort(Operation):
  22.  
  23.     def __init__(self, cols):
  24.         self.cols = cols
  25.  
  26.     def compute(self, input_table):
  27.         out_table = sorted(input_table, key=lambda row: tuple(row[x] for x in self.cols))
  28.         return out_table
  29.  
  30. class Fold(Operation):
  31.    
  32.     def __init__(self, folder, start):
  33.         self.folder = folder
  34.         self.start = start
  35.  
  36.     def compute(self, input_table):
  37.         state = self.start
  38.         for row in input_table:
  39.             state = self.folder(state, row)
  40.         return [state]
  41.  
  42. class Reduce(Operation):
  43.  
  44.     def __init__(self, reducer, key):
  45.         self.reducer = reducer
  46.         self.key = key
  47.  
  48.     def compute(self, input_table):
  49.         out_table = [];
  50.         subtable_to_reduce = [];
  51.         cur_key = [input_table[0][col] for col in self.key]
  52.         for row in input_table:
  53.             next_key = [row[col] for col in self.key]
  54.             if (cur_key != next_key):
  55.                 for new_row in self.reducer(subtable_to_reduce):
  56.                     out_table.append(new_row)
  57.                 subtable_to_reduce = []
  58.                 cur_key = next_key
  59.             subtable_to_reduce.append(row)
  60.         if len(subtable_to_reduce) != 0:
  61.             for new_row in self.reducer(subtable_to_reduce):
  62.                     out_table.append(new_row)
  63.         return out_table
  64.  
  65. class Join(Operation):
  66.    
  67.     def __init__(self, on, pred = lambda x: True, strategy = 'inner'):
  68.         self.on = on
  69.         self.strategy = strategy
  70.         self.pred = pred
  71.  
  72.     def join_rows(self, row1, row2):
  73.         new_row = row1.copy()
  74.         for key in row2:
  75.             new_key = key
  76.             while new_key in new_row:
  77.                 new_key += '.'
  78.             new_row[new_key] = row2[key]
  79.         return new_row
  80.  
  81.     def left_join(self, table1, table2):
  82.         table2_None_row = {}
  83.         for key in table2[0]:
  84.             table2_None_row[key] = None;
  85.         out_table = []
  86.         for row1 in table1:
  87.             added = False
  88.             for row2 in table2:
  89.                 if self.pred(row1, row2):
  90.                     out_table.append(self.join_rows(row1, row2))
  91.                     added = True
  92.             if not added:
  93.                 out_table.append(self.join_rows(row1, table2_None_row))      
  94.         return out_table
  95.  
  96.     def right_join(self, table1, table2):
  97.         table1_None_row = {}
  98.         for key in table1[0]:
  99.             table1_None_row[key] = None;
  100.         out_table = []
  101.         for row2 in table2:
  102.             added = False
  103.             for row1 in table1:
  104.                 if self.pred(row1, row2):
  105.                     out_table.append(self.join_rows(row1, row2))
  106.                     added = True
  107.             if not added:
  108.                 out_table.append(self.join_rows(table1_None_row, row2))      
  109.         return out_table
  110.  
  111.     def inner_join(self, table1, table2):
  112.         out_table = []
  113.         for row1, row2 in itertools.product(table1, table2):
  114.             if self.pred(row1, row2):
  115.                 out_table.append(self.join_rows(row1, row2))
  116.         return out_table
  117.  
  118.     def outer_join(self, table1, table2):
  119.         table2_None_row = {}
  120.         for key in table2[0]:
  121.             table2_None_row[key] = None;
  122.         table1_None_row = {}
  123.         for key in table1[0]:
  124.             table1_None_row[key] = None;
  125.         out_table = []
  126.         row2_added = [False] * len(table2)
  127.         for row1 in table1:
  128.             row1_added = False
  129.             for i,row2 in enumerate(table2):
  130.                 if self.pred(row1, row2):
  131.                     out_table.append(self.join_rows(row1, row2))
  132.                     row1_added = True
  133.                     row2_added[i] = True
  134.             if not row1_added:
  135.                 out_table.append(self.join_rows(row1, table2_None_row))
  136.         for i in range(len(table2)):
  137.             if not row2_added[i]:
  138.                 out_table.append(self.join_rows(table1_None_row, table2[i]))
  139.         return out_table
  140.  
  141.     def compute(self, input_table, other_table):
  142.         if (self.strategy == 'left'):
  143.             return self.left_join(input_table, other_table)
  144.         if (self.strategy == 'right'):
  145.             return self.right_join(input_table, other_table)
  146.         if (self.strategy == 'inner'):
  147.             return self.inner_join(input_table, other_table)
  148.         if (self.strategy == 'outer'):
  149.             return self.outer_join(input_table, other_table)
  150.         return None
  151.  
  152. class ComputeGraph(object):
  153.  
  154.     def __init__(self, source = 'main_input'):
  155.         self.operation_list = []
  156.         self.source = source
  157.         self.called_graphs = []
  158.  
  159.     def add_operation(self, operation):
  160.         if type(operation) == Join:
  161.             self.called_graphs.append(operation.on)
  162.         self.operation_list.append(operation)
  163.  
  164.     def compute(self, input_table, graph_outs, verbose = False):
  165.         table = input_table
  166.         for operation in self.operation_list:
  167.             if verbose:
  168.                 print(operation)
  169.             if type(operation) == Join:
  170.                 table = operation.compute(table, graph_outs[operation.on.number])
  171.             else:
  172.                 table = operation.compute(table)
  173.         return table
  174.    
  175.     def topologicalSort(self, res = [], added = {}, start = True):
  176.         for child_graph in self.called_graphs:
  177.             res = child_graph.topologicalSort(res, added, False)
  178.             if child_graph not in added:
  179.                 res.append(child_graph)
  180.                 added[child_graph] = 1
  181.         if (start):
  182.             res.append(self)
  183.         return res
  184.  
  185.     def run(self, save_result = None, verbose = False, **kwargs):
  186.         inputs = kwargs
  187.         calculation_order = self.topologicalSort()
  188.         graphs_out = []
  189.         for i,graph in enumerate(calculation_order):
  190.             graph.number = i
  191.             input = inputs[graph.source]
  192.             if type(input) != list and type(input) != dict:
  193.                 table = []
  194.                 for line in input:
  195.                     table.append(json.loads(line))
  196.             else:
  197.                 table = input
  198.             if verbose:
  199.                 print('{} is load'.format(graph.source))
  200.             out = graph.compute(table, graphs_out, verbose=verbose)
  201.             graphs_out.append(out)
  202.         if verbose:
  203.                 print('Calculations over. Save result...')  
  204.         if save_result != None:
  205.             for row in graphs_out[-1]:
  206.                 save_result.write(str(row) + '\n')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement