Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import typing as tp
- from . import operations as ops
- from . import external_sort
- import json
- class Graph:
- """Computational graph implementation"""
- def __init__(self, rows_name: str, operation_stack: tp.List[ops.Operation], join_list: tp.List['Graph']):
- """Constructor of class
- :param rows_name: name of rows in graph
- :param operation_stack: stack with all operations
- :param join_list: all graph which joined with this
- """
- self.rows_name = rows_name
- self.operation_stack = operation_stack
- self.join_list = join_list
- @classmethod
- def graph_from_iter(cls, name: str) -> 'Graph':
- """Construct new graph which reads data from row iterator (in form of sequence of Rows
- from 'kwargs' passed to 'run' method) into graph data-flow
- :param name: name of kwarg to use as data source
- """
- return cls(name, [], [])
- @classmethod
- def graph_from_graph(cls, graph: 'Graph') -> 'Graph':
- """Construct new graph from another graph
- :param graph: graph to read from
- """
- return cls(graph.rows_name, graph.operation_stack[:], [graph.graph_from_graph(graph) for graph in
- graph.join_list])
- def map(self, mapper: ops.Mapper) -> 'Graph':
- """Construct new graph extended with map operation with particular mapper
- :param mapper: mapper to use
- """
- graph = Graph.graph_from_graph(self)
- graph.operation_stack.append(ops.Map(mapper))
- return graph
- def reduce(self, reducer: ops.Reducer, keys: tp.List[str]) -> 'Graph':
- """Construct new graph extended with reduce operation with particular reducer
- :param reducer: reducer to use
- :param keys: keys for grouping
- """
- graph = Graph.graph_from_graph(self)
- graph.operation_stack.append(ops.Reduce(reducer, keys))
- return graph
- def sort(self, keys: tp.Sequence[str]) -> 'Graph':
- """Construct new graph extended with sort operation
- :param keys: sorting keys (typical is tuple of strings)
- """
- graph = Graph.graph_from_graph(self)
- graph.operation_stack.append(external_sort.ExternalSort(keys))
- return graph
- def join(self, joiner: ops.Joiner, join_graph: 'Graph', keys: tp.Sequence[str]) -> 'Graph':
- """Construct new graph extended with join operation with another graph
- :param joiner: join strategy to use
- :param join_graph: other graph to join with
- :param keys: keys for grouping
- """
- graph = Graph.graph_from_graph(self)
- graph.operation_stack.append(ops.Join(joiner, keys))
- graph.join_list.append(join_graph)
- return graph
- def run(self, **kwargs: tp.Any) -> tp.List[ops.TRow]:
- """Single method to start execution; data sources passed as kwargs"""
- rows: tp.Iterable[ops.TRow] = iter([])
- if callable(kwargs[self.rows_name]):
- rows = kwargs[self.rows_name]()
- elif isinstance(kwargs[self.rows_name], str):
- with open(kwargs[self.rows_name]) as rows_file:
- rows = (json.loads(row) for row in rows_file)
- self.join_list.reverse()
- for operation in self.operation_stack:
- second_rows = None
- if isinstance(operation, ops.Join):
- second_rows = self.join_list.pop().run(**kwargs)
- rows = (row for row in operation(rows, second_rows))
- return list(rows)
Advertisement
Add Comment
Please, Sign In to add comment