mfgnik

Untitled

Apr 23rd, 2020
1,116
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.56 KB | None | 0 0
  1. import typing as tp
  2. from . import operations as ops
  3. from . import external_sort
  4. import json
  5.  
  6.  
  7. class Graph:
  8.     """Computational graph implementation"""
  9.     def __init__(self, rows_name: str, operation_stack: tp.List[ops.Operation], join_list: tp.List['Graph']):
  10.         """Constructor of class
  11.        :param rows_name: name of rows in graph
  12.        :param operation_stack: stack with all operations
  13.        :param join_list: all graph which joined with this
  14.        """
  15.         self.rows_name = rows_name
  16.         self.operation_stack = operation_stack
  17.         self.join_list = join_list
  18.  
  19.     @classmethod
  20.     def graph_from_iter(cls, name: str) -> 'Graph':
  21.         """Construct new graph which reads data from row iterator (in form of sequence of Rows
  22.        from 'kwargs' passed to 'run' method) into graph data-flow
  23.        :param name: name of kwarg to use as data source
  24.        """
  25.         return cls(name, [], [])
  26.  
  27.     @classmethod
  28.     def graph_from_graph(cls, graph: 'Graph') -> 'Graph':
  29.         """Construct new graph from another graph
  30.        :param graph: graph to read from
  31.        """
  32.         return cls(graph.rows_name, graph.operation_stack[:], [graph.graph_from_graph(graph) for graph in
  33.                                                                graph.join_list])
  34.  
  35.     def map(self, mapper: ops.Mapper) -> 'Graph':
  36.         """Construct new graph extended with map operation with particular mapper
  37.        :param mapper: mapper to use
  38.        """
  39.         graph = Graph.graph_from_graph(self)
  40.         graph.operation_stack.append(ops.Map(mapper))
  41.         return graph
  42.  
  43.     def reduce(self, reducer: ops.Reducer, keys: tp.List[str]) -> 'Graph':
  44.         """Construct new graph extended with reduce operation with particular reducer
  45.        :param reducer: reducer to use
  46.        :param keys: keys for grouping
  47.        """
  48.         graph = Graph.graph_from_graph(self)
  49.         graph.operation_stack.append(ops.Reduce(reducer, keys))
  50.         return graph
  51.  
  52.     def sort(self, keys: tp.Sequence[str]) -> 'Graph':
  53.         """Construct new graph extended with sort operation
  54.        :param keys: sorting keys (typical is tuple of strings)
  55.        """
  56.         graph = Graph.graph_from_graph(self)
  57.         graph.operation_stack.append(external_sort.ExternalSort(keys))
  58.         return graph
  59.  
  60.     def join(self, joiner: ops.Joiner, join_graph: 'Graph', keys: tp.Sequence[str]) -> 'Graph':
  61.         """Construct new graph extended with join operation with another graph
  62.        :param joiner: join strategy to use
  63.        :param join_graph: other graph to join with
  64.        :param keys: keys for grouping
  65.        """
  66.         graph = Graph.graph_from_graph(self)
  67.         graph.operation_stack.append(ops.Join(joiner, keys))
  68.         graph.join_list.append(join_graph)
  69.         return graph
  70.  
  71.     def run(self, **kwargs: tp.Any) -> tp.List[ops.TRow]:
  72.         """Single method to start execution; data sources passed as kwargs"""
  73.         rows: tp.Iterable[ops.TRow] = iter([])
  74.         if callable(kwargs[self.rows_name]):
  75.             rows = kwargs[self.rows_name]()
  76.         elif isinstance(kwargs[self.rows_name], str):
  77.             with open(kwargs[self.rows_name]) as rows_file:
  78.                 rows = (json.loads(row) for row in rows_file)
  79.         self.join_list.reverse()
  80.         for operation in self.operation_stack:
  81.             second_rows = None
  82.             if isinstance(operation, ops.Join):
  83.                 second_rows = self.join_list.pop().run(**kwargs)
  84.             rows = (row for row in operation(rows, second_rows))
  85.         return list(rows)
Advertisement
Add Comment
Please, Sign In to add comment