Guest User

Untitled

a guest
Jun 24th, 2018
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.36 KB | None | 0 0
  1. import 'dart:async';
  2.  
  3. import 'package:rxdart/src/streams/utils.dart';
  4.  
  5. typedef Stream<T> RetryWhenStreamFactory<T>(dynamic error, StackTrace s);
  6.  
  7. class RetryWhenStream<T> extends Stream<T> {
  8. final StreamFactory<T> streamFactory;
  9. final RetryWhenStreamFactory<T> retryWhenFactory;
  10. StreamController<T> controller;
  11. StreamSubscription<T> subscription;
  12. bool _isUsed = false;
  13. final List<ErrorAndStacktrace> _errors = <ErrorAndStacktrace>[];
  14.  
  15. RetryWhenStream(this.streamFactory, this.retryWhenFactory);
  16.  
  17. @override
  18. StreamSubscription<T> listen(
  19. void onData(T event), {
  20. Function onError,
  21. void onDone(),
  22. bool cancelOnError,
  23. }) {
  24. if (_isUsed) throw new StateError('Stream has already been listened to.');
  25. _isUsed = true;
  26.  
  27. controller = new StreamController<T>(
  28. sync: true,
  29. onListen: retry,
  30. onPause: ([Future<dynamic> resumeSignal]) => subscription.pause(resumeSignal),
  31. onResume: () => subscription.resume(),
  32. onCancel: () => subscription.cancel());
  33.  
  34. return controller.stream.listen(
  35. onData,
  36. onError: onError,
  37. onDone: onDone,
  38. cancelOnError: cancelOnError,
  39. );
  40. }
  41.  
  42. void retry() {
  43. subscription = streamFactory().listen(controller.add, onError: (dynamic e, StackTrace s) {
  44. subscription.cancel();
  45.  
  46. _errors.add(new ErrorAndStacktrace(e, s));
  47. retryWhenFactory(e, s).listen(
  48. (dynamic event) => retry(),
  49. onError: (dynamic e, StackTrace s) {
  50. controller.addError(new RetryError(e.toString(), _errors..add(new ErrorAndStacktrace(e, s))));
  51. controller.close();
  52. },
  53. );
  54. }, onDone: controller.close, cancelOnError: false);
  55. }
  56. }
  57.  
  58. class RetryError extends Error {
  59. final String message;
  60. final List<ErrorAndStacktrace> errors;
  61.  
  62. RetryError(this.message, this.errors);
  63.  
  64. @override
  65. String toString() => message;
  66. }
  67.  
  68. class ErrorAndStacktrace {
  69. final dynamic error;
  70. final StackTrace stacktrace;
  71.  
  72. ErrorAndStacktrace(this.error, this.stacktrace);
  73.  
  74. @override
  75. String toString() {
  76. return 'ErrorAndStacktrace{error: $error, stacktrace: $stacktrace}';
  77. }
  78.  
  79. @override
  80. bool operator ==(Object other) =>
  81. identical(this, other) ||
  82. other is ErrorAndStacktrace && runtimeType == other.runtimeType && error == other.error && stacktrace == other.stacktrace;
  83.  
  84. @override
  85. int get hashCode => error.hashCode ^ stacktrace.hashCode;
  86. }
Add Comment
Please, Sign In to add comment