Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import itertools
- class Operation(object):
- pass
- class Map(Operation):
- def __init__(self, mapper):
- self.mapper = mapper
- def compute(self, input_table):
- out_table = [];
- for row in input_table:
- for new_row in self.mapper(row):
- out_table.append(new_row)
- return out_table
- class Sort(Operation):
- def __init__(self, cols):
- self.cols = cols
- def compute(self, input_table):
- out_table = sorted(input_table, key=lambda row: tuple(row[x] for x in self.cols))
- return out_table
- class Fold(Operation):
- def __init__(self, folder, start):
- self.folder = folder
- self.start = start
- def compute(self, input_table):
- state = self.start
- for row in input_table:
- state = self.folder(state, row)
- return [state]
- class Reduce(Operation):
- def __init__(self, reducer, key):
- self.reducer = reducer
- self.key = key
- def compute(self, input_table):
- out_table = [];
- subtable_to_reduce = [];
- cur_key = [input_table[0][col] for col in self.key]
- for row in input_table:
- next_key = [row[col] for col in self.key]
- if (cur_key != next_key):
- for new_row in self.reducer(subtable_to_reduce):
- out_table.append(new_row)
- subtable_to_reduce = []
- cur_key = next_key
- subtable_to_reduce.append(row)
- if len(subtable_to_reduce) != 0:
- for new_row in self.reducer(subtable_to_reduce):
- out_table.append(new_row)
- return out_table
- class Join(Operation):
- def __init__(self, on, pred = lambda x: True, strategy = 'inner'):
- self.on = on
- self.strategy = strategy
- self.pred = pred
- def join_rows(self, row1, row2):
- new_row = row1.copy()
- for key in row2:
- new_key = key
- while new_key in new_row:
- new_key += '.'
- new_row[new_key] = row2[key]
- return new_row
- def left_join(self, table1, table2):
- table2_None_row = {}
- for key in table2[0]:
- table2_None_row[key] = None;
- out_table = []
- for row1 in table1:
- added = False
- for row2 in table2:
- if self.pred(row1, row2):
- out_table.append(self.join_rows(row1, row2))
- added = True
- if not added:
- out_table.append(self.join_rows(row1, table2_None_row))
- return out_table
- def right_join(self, table1, table2):
- table1_None_row = {}
- for key in table1[0]:
- table1_None_row[key] = None;
- out_table = []
- for row2 in table2:
- added = False
- for row1 in table1:
- if self.pred(row1, row2):
- out_table.append(self.join_rows(row1, row2))
- added = True
- if not added:
- out_table.append(self.join_rows(table1_None_row, row2))
- return out_table
- def inner_join(self, table1, table2):
- out_table = []
- for row1, row2 in itertools.product(table1, table2):
- if self.pred(row1, row2):
- out_table.append(self.join_rows(row1, row2))
- return out_table
- def outer_join(self, table1, table2):
- table2_None_row = {}
- for key in table2[0]:
- table2_None_row[key] = None;
- table1_None_row = {}
- for key in table1[0]:
- table1_None_row[key] = None;
- out_table = []
- row2_added = [False] * len(table2)
- for row1 in table1:
- row1_added = False
- for i,row2 in enumerate(table2):
- if self.pred(row1, row2):
- out_table.append(self.join_rows(row1, row2))
- row1_added = True
- row2_added[i] = True
- if not row1_added:
- out_table.append(self.join_rows(row1, table2_None_row))
- for i in range(len(table2)):
- if not row2_added[i]:
- out_table.append(self.join_rows(table1_None_row, table2[i]))
- return out_table
- def compute(self, input_table, other_table):
- if (self.strategy == 'left'):
- return self.left_join(input_table, other_table)
- if (self.strategy == 'right'):
- return self.right_join(input_table, other_table)
- if (self.strategy == 'inner'):
- return self.inner_join(input_table, other_table)
- if (self.strategy == 'outer'):
- return self.outer_join(input_table, other_table)
- return None
- class ComputeGraph(object):
- def __init__(self, source = 'main_input'):
- self.operation_list = []
- self.source = source
- self.called_graphs = []
- def add_operation(self, operation):
- if type(operation) == Join:
- self.called_graphs.append(operation.on)
- self.operation_list.append(operation)
- def compute(self, input_table, graph_outs, verbose = False):
- table = input_table
- for operation in self.operation_list:
- if verbose:
- print(operation)
- if type(operation) == Join:
- table = operation.compute(table, graph_outs[operation.on.number])
- else:
- table = operation.compute(table)
- return table
- def topologicalSort(self, res = [], added = {}, start = True):
- for child_graph in self.called_graphs:
- res = child_graph.topologicalSort(res, added, False)
- if child_graph not in added:
- res.append(child_graph)
- added[child_graph] = 1
- if (start):
- res.append(self)
- return res
- def run(self, save_result = None, verbose = False, **kwargs):
- inputs = kwargs
- calculation_order = self.topologicalSort()
- graphs_out = []
- for i,graph in enumerate(calculation_order):
- graph.number = i
- input = inputs[graph.source]
- if type(input) != list and type(input) != dict:
- table = []
- for line in input:
- table.append(json.loads(line))
- else:
- table = input
- if verbose:
- print('{} is load'.format(graph.source))
- out = graph.compute(table, graphs_out, verbose=verbose)
- graphs_out.append(out)
- if verbose:
- print('Calculations over. Save result...')
- if save_result != None:
- for row in graphs_out[-1]:
- save_result.write(str(row) + '\n')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement