Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package de.s3rius.tracker
- import kotlinx.coroutines.channels.onFailure
- import kotlinx.coroutines.channels.onSuccess
- import kotlinx.coroutines.channels.produce
- import kotlinx.coroutines.delay
- import kotlinx.coroutines.flow.Flow
- import kotlinx.coroutines.flow.channelFlow
- import kotlinx.coroutines.flow.flow
- import kotlinx.coroutines.flow.flowOf
- import kotlinx.coroutines.flow.toList
- import kotlinx.coroutines.selects.onTimeout
- import kotlinx.coroutines.selects.select
- import kotlinx.coroutines.test.runTest
- import org.amshove.kluent.internal.assertFailsWith
- import org.amshove.kluent.shouldBeEqualTo
- import org.junit.Test
- import kotlin.time.Duration
- import kotlin.time.Duration.Companion.seconds
- /**
- * Collects emission of the flow into lists of [count] items before emitting them downstream.
- * The original order of items is preserved.
- * If a [timeout] is given, any currently collected items will be emitted after [timeout] duratino
- * after the last emission, even if [count] wasn't reached yet.
- * Will not emit empty lists ever.
- */
- suspend fun <T> Flow<T>.chunked(count: Int, timeout: Duration? = null): Flow<List<T>> = channelFlow {
- if (timeout != null)
- require(timeout.isPositive()) { "timeout must be larger than 0" }
- require(count >= 0) { "count must not be negative" }
- val values = produce { collect { value -> send(value) } }
- val accumulator = mutableListOf<T>()
- var finished = false
- suspend fun flush() {
- if (accumulator.isEmpty()) return
- send(accumulator.toList())
- accumulator.clear()
- }
- while (!finished) {
- select {
- if (timeout != null && accumulator.isNotEmpty()) {
- onTimeout(timeout) {
- flush()
- }
- }
- values.onReceiveCatching { value ->
- value
- .onSuccess {
- accumulator.add(it)
- if (accumulator.size >= count) {
- flush()
- }
- }
- .onFailure {
- it?.let { throw it }
- flush()
- finished = true
- }
- }
- }
- }
- }
- class ChunkedFlowTests {
- @Test
- fun `all the tests`() = runTest {
- flowOf(1, 2, 3, 4)
- .chunked(2, 1.seconds)
- .toList() shouldBeEqualTo listOf(listOf(1, 2), listOf(3, 4))
- flowOf(1)
- .chunked(2, 1.seconds)
- .toList() shouldBeEqualTo listOf(listOf(1))
- flow {
- emit(1)
- delay(2000)
- emit(2)
- }.chunked(2, 1.seconds)
- .toList() shouldBeEqualTo listOf(listOf(1), listOf(2))
- flow {
- emit(1)
- delay(2000)
- }.chunked(2, 1.seconds)
- .toList() shouldBeEqualTo listOf(listOf(1))
- flow<Int> {
- delay(2000)
- }.chunked(2, 1.seconds)
- .toList() shouldBeEqualTo listOf()
- flow {
- emit(1)
- }.chunked(2, 1.seconds)
- .toList() shouldBeEqualTo listOf(listOf(1)) // still emits remaining elements after flow finishes
- flow {
- emit(1)
- delay(2000)
- emit(2)
- delay(2000)
- emit(3)
- emit(4)
- }.chunked(2, 1.seconds)
- .toList() shouldBeEqualTo listOf(listOf(1), listOf(2), listOf(3, 4))
- flowOf<Int>()
- .chunked(2, 1.seconds)
- .toList() shouldBeEqualTo listOf()
- flow<Int> {
- delay(2000)
- }.chunked(2, 1.seconds)
- .toList() shouldBeEqualTo listOf()
- flowOf(1, 2, 3)
- .chunked(0, 1.seconds)
- .toList() shouldBeEqualTo listOf(listOf(1), listOf(2), listOf(3))
- flowOf(1)
- .chunked(2, null)
- .toList() shouldBeEqualTo listOf(listOf(1))
- assertFailsWith(IllegalArgumentException::class) {
- flowOf(1).chunked(1, 0.seconds).toList()
- }
- assertFailsWith(IllegalArgumentException::class) {
- flowOf(1).chunked(-1, 1.seconds).toList()
- }
- // Ensure exceptions are rethrown
- assertFailsWith(NullPointerException::class) {
- flow<Int> {
- throw java.lang.NullPointerException()
- }.chunked(1, 1.seconds).toList()
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement