Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from collections import defaultdict
- from typing import TypeVar, Generic, Iterable, Union, Callable, Sequence, Optional, Iterator, Tuple
- from pyspark.rdd import RDD
- A = TypeVar('A')
- B = TypeVar('B')
- def new_instance(elements: Union[RDD[A], Iterable[A], Callable[[], Iterable[A]]],
- ) -> 'DataTypeclass[A]':
- """Constructs an appropriate implementation given the input data's type.
- """
- if isinstance(elements, RDD):
- return RddDataEvidence(elements)
- else:
- return LocalDataEvidence(elements)
- class DataTypeclass(Generic[A]):
- """A one-size-fits-all API for data transformations.
- """
- def map(self, apply: Callable[[A], B]) -> 'DataTypeclass[B]':
- """Apply the function to every element in the data.
- """
- raise NotImplementedError
- def map_partition(self, apply: Callable[[Iterator[A]], Iterator[B]]) -> 'DataTypeclass[B]':
- """Apply the function to each partition of the internal data.
- """
- raise NotImplementedError
- def flat_map(self, apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'DataTypeclass[B]':
- """Apply the function to every element, flattening results as appropriate.
- """
- raise NotImplementedError
- def aggregate(self,
- zero: B,
- seq_op: Callable[[B, A], B],
- comb_op: Callable[[B, B], B]) -> B:
- raise NotImplementedError
- def reduce(self, comb_op: Callable[[A, A], A]) -> A:
- """Combine elements together, yielding a single value.
- NOTE: The combination operator, `comb_ob`, must be a commutative and associative operation.
- """
- raise NotImplementedError
- def collect(self) -> Sequence[A]:
- """Obtain all data elements into a single indexable sequence.
- NOTE: There are no ordering guarentees on the resulting `Sequence`.
- """
- raise NotImplementedError
- def take(self, n: int) -> Sequence[A]:
- """Obtain at most `n` arbitrary distinct elements from the data.
- NOTE: Will obtain less than `n` elements iff `n` > :func:`size()`.
- """
- raise NotImplementedError
- def group_by(self, key: Callable[[A], B]) -> 'DataTypeclass[Tuple[B, Sequence[A]]]':
- """Group data elements that have the same value using the `key` function.
- NOTE: The groups have arbitrary ordering.
- """
- raise NotImplementedError
- def iter(self) -> Iterator[A]:
- """Iterate through all data elements, in an arbitrary order.
- """
- raise NotImplementedError
- def __iter__(self) -> Iterator[A]:
- """Alias for :func:`iter`.
- """
- return self.iter()
- def size() -> int:
- """The number of distinct data elements.
- """
- raise NotImplementedError
- def __len__(self) -> int:
- """Alias for :func:`size`.
- """
- return self.size()
- class Local(DataTypeclass[A]):
- def __init__(self, source: Union[Iterable[A], Callable[[], Iterable[A]]]) -> None:
- if isinstance(source, Iterable):
- self._source = tuple(source)
- self._size = len(self._source)
- elif callable(source):
- self._source = source
- self._size = -1
- else:
- raise TypeError(f"Unexpected source type: not a Sequence nor a () -> Iterable: "
- f"{type(source)}")
- def _elements(self) -> Iterable[A]:
- """The sequence of data items or an evaluation of the Iterable-producing source function.
- """
- if callable(self._source):
- elements = self._source()
- if isinstance(elements, Sequence):
- # if we had a lazy-evaluated thing before, i.e. a () -> Iterable[A],
- # since we just called it, we check if that thing returned a Sequence[A]
- # if it does, then we can cache the function with its returned sequence
- self._source = elements
- else:
- elements = self._source
- return elements
- def iter(self) -> Iterator[A]:
- return iter(self._elements())
- def flat_map(self,
- apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'LocalDataEvidence[B]':
- new_elements = []
- for e in self._elements():
- res = apply(e)
- if res is not None:
- if isinstance(res, Iterable):
- for expanded_e in res:
- new_elements.append(expanded_e)
- else:
- new_elements.append(e)
- return LocalDataEvidence(tuple(new_elements))
- def map_partition(self, apply: Callable[[Iterable[A]], Iterable[B]]) -> 'LocalDataEvidence[B]':
- return LocalDataEvidence(lambda: apply(self._elements()))
- def map(self, apply: Callable[[A], B]) -> 'LocalDataEvidence[B]':
- if isinstance(self._source, Sequence):
- return LocalDataEvidence(tuple(map(apply, self._source)))
- else:
- def delay_map_application():
- for x in self._elements():
- yield apply(x)
- return LocalDataEvidence[B](delay_map_application)
- def aggregate(self,
- zero: B,
- seq_op: Callable[[B, A], B],
- comb_op: Optional[Callable[[B, B], B]] = None) -> B:
- zero = zero
- for x in self._elements():
- zero = seq_op(zero, x)
- return zero
- def reduce(self, comb_op: Callable[[A, A], A]) -> A:
- if isinstance(self._source, Sequence):
- first, second, rest = self._source[0], self._source[1], self._source[2:]
- else:
- elmns = iter(self._elements())
- first = next(elmns)
- second = next(elmns)
- rest = elmns
- zero = comb_op(first, second)
- for x in rest:
- zero = comb_op(zero, x)
- return zero
- def collect(self) -> Sequence[A]:
- if isinstance(self._source, Sequence):
- return list(self._source)
- else:
- return list(self._elements())
- def take(self, n: int) -> Sequence[A]:
- if isinstance(self._source, Sequence):
- return list(self._source[0:n])
- else:
- elements = []
- i = 0
- for e in self._elements():
- if i >= n:
- break
- elements.append(e)
- i += 1
- return elements
- def group_by(self, key: Callable[[A], B]) -> 'LocalDataEvidence[Tuple[B, Sequence[A]]]':
- d = defaultdict(list)
- for e in self._elements():
- d[key(e)].append(e)
- return LocalDataEvidence(tuple(d.items()))
- def __str__(self) -> str:
- """Displays the underlying collection's `__str__` representation and this class's name.
- """
- return f"{type(self).__name__}({str(self._source)})"
- def size() -> int:
- if self._size == -1:
- # need to calculate size by iterating through elements
- self._size = 0
- for _ in self._elements():
- self._size += 1
- return self._size
- class Rdd(DataTypeclass[A]):
- def __init__(self, rdd: RDD[A]) -> None:
- self._rdd = rdd
- def iter(self) -> Iterator[A]:
- return self._rdd.toLocalIterator()
- def aggregate(self,
- zero: B,
- seq_op: Callable[[B, A], B],
- comb_op: Callable[[B, B], B]) -> B:
- return self._rdd.aggregate(zero, seq_op, comb_op)
- def reduce(self, comb_op: Callable[[A, A], A]) -> A:
- return self._rdd.reduce(comb_op)
- def collect(self) -> Sequence[A]:
- return self._rdd.collect()
- def take(self, n: int) -> Sequence[A]:
- return self._rdd.take(n)
- def map(self, apply: Callable[[A], B]) -> 'RddDataEvidence[B]':
- return self._rdd.map(apply)
- def map_partition(self,
- apply: Callable[[Iterator[A]], Iterator[B]]) -> 'RddDataEvidence[B]':
- return self._rdd.mapPartiions(apply)
- def flat_map(self,
- apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'RddDataEvidence[B]':
- return self._rdd.flatMap(apply)
- def group_by(self, key: Callable[[A], B]) -> 'RddDataEvidence[Tuple[B, Sequence[A]]]':
- return self._rdd.groupBy(key)
- def size() -> int:
- return self._rdd.count()
- def __str__(self) -> str:
- """Display the underlying RDD's :func:`__str__` representation & this class name.
- """
- return f"{type(self).__name__}({str(self._rdd)})"
Add Comment
Please, Sign In to add comment