Guest User

Untitled

a guest
Dec 14th, 2018
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.67 KB | None | 0 0
  1. from collections import defaultdict
  2. from typing import TypeVar, Generic, Iterable, Union, Callable, Sequence, Optional, Iterator, Tuple
  3.  
  4. from pyspark.rdd import RDD
  5.  
  6.  
  7. A = TypeVar('A')
  8. B = TypeVar('B')
  9.  
  10.  
  11. def new_instance(elements: Union[RDD[A], Iterable[A], Callable[[], Iterable[A]]],
  12. ) -> 'DataTypeclass[A]':
  13. """Constructs an appropriate implementation given the input data's type.
  14. """
  15. if isinstance(elements, RDD):
  16. return RddDataEvidence(elements)
  17. else:
  18. return LocalDataEvidence(elements)
  19.  
  20.  
  21. class DataTypeclass(Generic[A]):
  22. """A one-size-fits-all API for data transformations.
  23. """
  24.  
  25. def map(self, apply: Callable[[A], B]) -> 'DataTypeclass[B]':
  26. """Apply the function to every element in the data.
  27. """
  28. raise NotImplementedError
  29.  
  30. def map_partition(self, apply: Callable[[Iterator[A]], Iterator[B]]) -> 'DataTypeclass[B]':
  31. """Apply the function to each partition of the internal data.
  32. """
  33. raise NotImplementedError
  34.  
  35. def flat_map(self, apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'DataTypeclass[B]':
  36. """Apply the function to every element, flattening results as appropriate.
  37. """
  38. raise NotImplementedError
  39.  
  40. def aggregate(self,
  41. zero: B,
  42. seq_op: Callable[[B, A], B],
  43. comb_op: Callable[[B, B], B]) -> B:
  44. raise NotImplementedError
  45.  
  46. def reduce(self, comb_op: Callable[[A, A], A]) -> A:
  47. """Combine elements together, yielding a single value.
  48.  
  49. NOTE: The combination operator, `comb_ob`, must be a commutative and associative operation.
  50. """
  51. raise NotImplementedError
  52.  
  53. def collect(self) -> Sequence[A]:
  54. """Obtain all data elements into a single indexable sequence.
  55.  
  56. NOTE: There are no ordering guarentees on the resulting `Sequence`.
  57. """
  58. raise NotImplementedError
  59.  
  60. def take(self, n: int) -> Sequence[A]:
  61. """Obtain at most `n` arbitrary distinct elements from the data.
  62.  
  63. NOTE: Will obtain less than `n` elements iff `n` > :func:`size()`.
  64. """
  65. raise NotImplementedError
  66.  
  67. def group_by(self, key: Callable[[A], B]) -> 'DataTypeclass[Tuple[B, Sequence[A]]]':
  68. """Group data elements that have the same value using the `key` function.
  69.  
  70. NOTE: The groups have arbitrary ordering.
  71. """
  72. raise NotImplementedError
  73.  
  74. def iter(self) -> Iterator[A]:
  75. """Iterate through all data elements, in an arbitrary order.
  76. """
  77. raise NotImplementedError
  78.  
  79. def __iter__(self) -> Iterator[A]:
  80. """Alias for :func:`iter`.
  81. """
  82. return self.iter()
  83.  
  84. def size() -> int:
  85. """The number of distinct data elements.
  86. """
  87. raise NotImplementedError
  88.  
  89. def __len__(self) -> int:
  90. """Alias for :func:`size`.
  91. """
  92. return self.size()
  93.  
  94.  
  95. class Local(DataTypeclass[A]):
  96.  
  97. def __init__(self, source: Union[Iterable[A], Callable[[], Iterable[A]]]) -> None:
  98. if isinstance(source, Iterable):
  99. self._source = tuple(source)
  100. self._size = len(self._source)
  101. elif callable(source):
  102. self._source = source
  103. self._size = -1
  104. else:
  105. raise TypeError(f"Unexpected source type: not a Sequence nor a () -> Iterable: "
  106. f"{type(source)}")
  107.  
  108. def _elements(self) -> Iterable[A]:
  109. """The sequence of data items or an evaluation of the Iterable-producing source function.
  110. """
  111. if callable(self._source):
  112. elements = self._source()
  113. if isinstance(elements, Sequence):
  114. # if we had a lazy-evaluated thing before, i.e. a () -> Iterable[A],
  115. # since we just called it, we check if that thing returned a Sequence[A]
  116. # if it does, then we can cache the function with its returned sequence
  117. self._source = elements
  118. else:
  119. elements = self._source
  120. return elements
  121.  
  122. def iter(self) -> Iterator[A]:
  123. return iter(self._elements())
  124.  
  125. def flat_map(self,
  126. apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'LocalDataEvidence[B]':
  127. new_elements = []
  128. for e in self._elements():
  129. res = apply(e)
  130. if res is not None:
  131. if isinstance(res, Iterable):
  132. for expanded_e in res:
  133. new_elements.append(expanded_e)
  134. else:
  135. new_elements.append(e)
  136. return LocalDataEvidence(tuple(new_elements))
  137.  
  138. def map_partition(self, apply: Callable[[Iterable[A]], Iterable[B]]) -> 'LocalDataEvidence[B]':
  139. return LocalDataEvidence(lambda: apply(self._elements()))
  140.  
  141. def map(self, apply: Callable[[A], B]) -> 'LocalDataEvidence[B]':
  142. if isinstance(self._source, Sequence):
  143. return LocalDataEvidence(tuple(map(apply, self._source)))
  144. else:
  145. def delay_map_application():
  146. for x in self._elements():
  147. yield apply(x)
  148.  
  149. return LocalDataEvidence[B](delay_map_application)
  150.  
  151. def aggregate(self,
  152. zero: B,
  153. seq_op: Callable[[B, A], B],
  154. comb_op: Optional[Callable[[B, B], B]] = None) -> B:
  155. zero = zero
  156. for x in self._elements():
  157. zero = seq_op(zero, x)
  158. return zero
  159.  
  160. def reduce(self, comb_op: Callable[[A, A], A]) -> A:
  161. if isinstance(self._source, Sequence):
  162. first, second, rest = self._source[0], self._source[1], self._source[2:]
  163. else:
  164. elmns = iter(self._elements())
  165. first = next(elmns)
  166. second = next(elmns)
  167. rest = elmns
  168.  
  169. zero = comb_op(first, second)
  170. for x in rest:
  171. zero = comb_op(zero, x)
  172. return zero
  173.  
  174. def collect(self) -> Sequence[A]:
  175. if isinstance(self._source, Sequence):
  176. return list(self._source)
  177. else:
  178. return list(self._elements())
  179.  
  180. def take(self, n: int) -> Sequence[A]:
  181. if isinstance(self._source, Sequence):
  182. return list(self._source[0:n])
  183. else:
  184. elements = []
  185. i = 0
  186. for e in self._elements():
  187. if i >= n:
  188. break
  189. elements.append(e)
  190. i += 1
  191. return elements
  192.  
  193. def group_by(self, key: Callable[[A], B]) -> 'LocalDataEvidence[Tuple[B, Sequence[A]]]':
  194. d = defaultdict(list)
  195. for e in self._elements():
  196. d[key(e)].append(e)
  197. return LocalDataEvidence(tuple(d.items()))
  198.  
  199. def __str__(self) -> str:
  200. """Displays the underlying collection's `__str__` representation and this class's name.
  201. """
  202. return f"{type(self).__name__}({str(self._source)})"
  203.  
  204. def size() -> int:
  205. if self._size == -1:
  206. # need to calculate size by iterating through elements
  207. self._size = 0
  208. for _ in self._elements():
  209. self._size += 1
  210.  
  211. return self._size
  212.  
  213.  
  214. class Rdd(DataTypeclass[A]):
  215.  
  216. def __init__(self, rdd: RDD[A]) -> None:
  217. self._rdd = rdd
  218.  
  219. def iter(self) -> Iterator[A]:
  220. return self._rdd.toLocalIterator()
  221.  
  222. def aggregate(self,
  223. zero: B,
  224. seq_op: Callable[[B, A], B],
  225. comb_op: Callable[[B, B], B]) -> B:
  226. return self._rdd.aggregate(zero, seq_op, comb_op)
  227.  
  228. def reduce(self, comb_op: Callable[[A, A], A]) -> A:
  229. return self._rdd.reduce(comb_op)
  230.  
  231. def collect(self) -> Sequence[A]:
  232. return self._rdd.collect()
  233.  
  234. def take(self, n: int) -> Sequence[A]:
  235. return self._rdd.take(n)
  236.  
  237. def map(self, apply: Callable[[A], B]) -> 'RddDataEvidence[B]':
  238. return self._rdd.map(apply)
  239.  
  240. def map_partition(self,
  241. apply: Callable[[Iterator[A]], Iterator[B]]) -> 'RddDataEvidence[B]':
  242. return self._rdd.mapPartiions(apply)
  243.  
  244. def flat_map(self,
  245. apply: Callable[[A], Union[Optional[B], Iterable[B]]]) -> 'RddDataEvidence[B]':
  246. return self._rdd.flatMap(apply)
  247.  
  248. def group_by(self, key: Callable[[A], B]) -> 'RddDataEvidence[Tuple[B, Sequence[A]]]':
  249. return self._rdd.groupBy(key)
  250.  
  251. def size() -> int:
  252. return self._rdd.count()
  253.  
  254. def __str__(self) -> str:
  255. """Display the underlying RDD's :func:`__str__` representation & this class name.
  256. """
  257. return f"{type(self).__name__}({str(self._rdd)})"
Add Comment
Please, Sign In to add comment