Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import 'dart:async';
- import 'package:rxdart/src/streams/utils.dart';
- typedef Stream<T> RetryWhenStreamFactory<T>(dynamic error, StackTrace s);
- class RetryWhenStream<T> extends Stream<T> {
- final StreamFactory<T> streamFactory;
- final RetryWhenStreamFactory<T> retryWhenFactory;
- StreamController<T> controller;
- StreamSubscription<T> subscription;
- bool _isUsed = false;
- final List<ErrorAndStacktrace> _errors = <ErrorAndStacktrace>[];
- RetryWhenStream(this.streamFactory, this.retryWhenFactory);
- @override
- StreamSubscription<T> listen(
- void onData(T event), {
- Function onError,
- void onDone(),
- bool cancelOnError,
- }) {
- if (_isUsed) throw new StateError('Stream has already been listened to.');
- _isUsed = true;
- controller = new StreamController<T>(
- sync: true,
- onListen: retry,
- onPause: ([Future<dynamic> resumeSignal]) => subscription.pause(resumeSignal),
- onResume: () => subscription.resume(),
- onCancel: () => subscription.cancel());
- return controller.stream.listen(
- onData,
- onError: onError,
- onDone: onDone,
- cancelOnError: cancelOnError,
- );
- }
- void retry() {
- subscription = streamFactory().listen(controller.add, onError: (dynamic e, StackTrace s) {
- subscription.cancel();
- _errors.add(new ErrorAndStacktrace(e, s));
- retryWhenFactory(e, s).listen(
- (dynamic event) => retry(),
- onError: (dynamic e, StackTrace s) {
- controller.addError(new RetryError(e.toString(), _errors..add(new ErrorAndStacktrace(e, s))));
- controller.close();
- },
- );
- }, onDone: controller.close, cancelOnError: false);
- }
- }
- class RetryError extends Error {
- final String message;
- final List<ErrorAndStacktrace> errors;
- RetryError(this.message, this.errors);
- @override
- String toString() => message;
- }
- class ErrorAndStacktrace {
- final dynamic error;
- final StackTrace stacktrace;
- ErrorAndStacktrace(this.error, this.stacktrace);
- @override
- String toString() {
- return 'ErrorAndStacktrace{error: $error, stacktrace: $stacktrace}';
- }
- @override
- bool operator ==(Object other) =>
- identical(this, other) ||
- other is ErrorAndStacktrace && runtimeType == other.runtimeType && error == other.error && stacktrace == other.stacktrace;
- @override
- int get hashCode => error.hashCode ^ stacktrace.hashCode;
- }
Add Comment
Please, Sign In to add comment