Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- override fun createBackup(info: BasicBackupInfo, data: Flow<ByteBuffer>): Flow<CreateBackupResult> {
- return channelFlow {
- val completedSubscribers = MutableSharedFlow<Unit>()
- val shared = data.onCompletion<ByteBuffer?> { emit(null) }
- .shareIn(this, sharingStartedAfterSubs(delegates.size, completedSubscribers))
- .takeWhile { it != null }
- .filterNotNull()
- for (delegate in delegates) {
- launch {
- try {
- delegate.createBackup(info, shared).collect { send(it) }
- } finally {
- completedSubscribers.emit(Unit)
- }
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement