View difference between Paste ID: 1reJivVL and N4uBDQ7y
SHOW: | | - or go back to the newest paste.
1
package de.s3rius.tracker
2
3
import kotlinx.coroutines.channels.onFailure
4
import kotlinx.coroutines.channels.onSuccess
5
import kotlinx.coroutines.channels.produce
6
import kotlinx.coroutines.delay
7
import kotlinx.coroutines.flow.Flow
8
import kotlinx.coroutines.flow.channelFlow
9
import kotlinx.coroutines.flow.flow
10
import kotlinx.coroutines.flow.flowOf
11
import kotlinx.coroutines.flow.toList
12
import kotlinx.coroutines.selects.onTimeout
13
import kotlinx.coroutines.selects.select
14
import kotlinx.coroutines.test.runTest
15
import org.amshove.kluent.internal.assertFailsWith
16
import org.amshove.kluent.shouldBeEqualTo
17
import org.junit.Test
18
import kotlin.time.Duration
19
import kotlin.time.Duration.Companion.seconds
20
21
/**
22
 * Collects emission of the flow into lists of [count] items before emitting them downstream.
23
 * The original order of items is preserved.
24
 * If a [timeout] is given, any currently collected items will be emitted after [timeout] duratino
25
 * after the last emission, even if [count] wasn't reached yet.
26
 * Will not emit empty lists ever.
27
 */
28
suspend fun <T> Flow<T>.chunked(count: Int, timeout: Duration? = null): Flow<List<T>> = channelFlow {
29
    if (timeout != null)
30
        require(timeout.isPositive()) { "timeout must be larger than 0" }
31
    require(count >= 0) { "count must not be negative" }
32
33
    val values = produce { collect { value -> send(value) } }
34
35
    val accumulator = mutableListOf<T>()
36
    var finished = false
37
38
    suspend fun flush() {
39
        if (accumulator.isEmpty()) return
40
        send(accumulator.toList())
41
        accumulator.clear()
42
    }
43
44
    while (!finished) {
45
        select {
46
            if (timeout != null && accumulator.isNotEmpty()) {
47
                onTimeout(timeout) {
48
                    flush()
49
                }
50
            }
51
            values.onReceiveCatching { value ->
52
                value
53
                    .onSuccess {
54
                        accumulator.add(it)
55
                        if (accumulator.size >= count) {
56
                            flush()
57
                        }
58
                    }
59
                    .onFailure {
60
                        it?.let { throw it }
61
                        flush()
62
                        finished = true
63
                    }
64
            }
65
        }
66
    }
67
}
68
69
class ChunkedFlowTests {
70
    @Test
71
    fun `all the tests`() = runTest {
72
        flowOf(1, 2, 3, 4)
73
            .chunked(2, 1.seconds)
74
            .toList() shouldBeEqualTo listOf(listOf(1, 2), listOf(3, 4))
75
76
        flowOf(1)
77
            .chunked(2, 1.seconds)
78
            .toList() shouldBeEqualTo listOf(listOf(1))
79
80
        flow {
81
            emit(1)
82
            delay(2000)
83
            emit(2)
84
        }.chunked(2, 1.seconds)
85
            .toList() shouldBeEqualTo listOf(listOf(1), listOf(2))
86
87
        flow {
88
            emit(1)
89
            delay(2000)
90
        }.chunked(2, 1.seconds)
91
            .toList() shouldBeEqualTo listOf(listOf(1))
92
93
        flow<Int> {
94
            delay(2000)
95
        }.chunked(2, 1.seconds)
96
            .toList() shouldBeEqualTo listOf()
97
98
        flow {
99
            emit(1)
100
        }.chunked(2, 1.seconds)
101
            .toList() shouldBeEqualTo listOf(listOf(1)) // still emits remaining elements after flow finishes
102
103
        flow {
104
            emit(1)
105
            delay(2000)
106
            emit(2)
107
            delay(2000)
108
            emit(3)
109
            emit(4)
110
        }.chunked(2, 1.seconds)
111
            .toList() shouldBeEqualTo listOf(listOf(1), listOf(2), listOf(3, 4))
112
113
        flowOf<Int>()
114
            .chunked(2, 1.seconds)
115
            .toList() shouldBeEqualTo listOf()
116
117
        flow<Int> {
118
            delay(2000)
119
        }.chunked(2, 1.seconds)
120
            .toList() shouldBeEqualTo listOf()
121
122
        flowOf(1, 2, 3)
123
            .chunked(0, 1.seconds)
124
            .toList() shouldBeEqualTo listOf(listOf(1), listOf(2), listOf(3))
125
126
        flowOf(1)
127
            .chunked(2, null)
128
            .toList() shouldBeEqualTo listOf(listOf(1))
129
130
        assertFailsWith(IllegalArgumentException::class) {
131
            flowOf(1).chunked(1, 0.seconds).toList()
132
        }
133
134
        assertFailsWith(IllegalArgumentException::class) {
135
            flowOf(1).chunked(-1, 1.seconds).toList()
136
        }
137
138
        // Ensure exceptions are rethrown
139
        assertFailsWith(NullPointerException::class) {
140
            flow<Int> {
141
                throw java.lang.NullPointerException()
142
            }.chunked(1, 1.seconds).toList()
143
        }
144
    }
145
}
146