SHOW:
|
|
- or go back to the newest paste.
1 | package de.s3rius.tracker | |
2 | ||
3 | import kotlinx.coroutines.channels.onFailure | |
4 | import kotlinx.coroutines.channels.onSuccess | |
5 | import kotlinx.coroutines.channels.produce | |
6 | import kotlinx.coroutines.delay | |
7 | import kotlinx.coroutines.flow.Flow | |
8 | import kotlinx.coroutines.flow.channelFlow | |
9 | import kotlinx.coroutines.flow.flow | |
10 | import kotlinx.coroutines.flow.flowOf | |
11 | import kotlinx.coroutines.flow.toList | |
12 | import kotlinx.coroutines.selects.onTimeout | |
13 | import kotlinx.coroutines.selects.select | |
14 | import kotlinx.coroutines.test.runTest | |
15 | import org.amshove.kluent.internal.assertFailsWith | |
16 | import org.amshove.kluent.shouldBeEqualTo | |
17 | import org.junit.Test | |
18 | import kotlin.time.Duration | |
19 | import kotlin.time.Duration.Companion.seconds | |
20 | ||
21 | /** | |
22 | * Collects emission of the flow into lists of [count] items before emitting them downstream. | |
23 | * The original order of items is preserved. | |
24 | * If a [timeout] is given, any currently collected items will be emitted after [timeout] duratino | |
25 | * after the last emission, even if [count] wasn't reached yet. | |
26 | * Will not emit empty lists ever. | |
27 | */ | |
28 | suspend fun <T> Flow<T>.chunked(count: Int, timeout: Duration? = null): Flow<List<T>> = channelFlow { | |
29 | if (timeout != null) | |
30 | require(timeout.isPositive()) { "timeout must be larger than 0" } | |
31 | require(count >= 0) { "count must not be negative" } | |
32 | ||
33 | val values = produce { collect { value -> send(value) } } | |
34 | ||
35 | val accumulator = mutableListOf<T>() | |
36 | var finished = false | |
37 | ||
38 | suspend fun flush() { | |
39 | if (accumulator.isEmpty()) return | |
40 | send(accumulator.toList()) | |
41 | accumulator.clear() | |
42 | } | |
43 | ||
44 | while (!finished) { | |
45 | select { | |
46 | if (timeout != null && accumulator.isNotEmpty()) { | |
47 | onTimeout(timeout) { | |
48 | flush() | |
49 | } | |
50 | } | |
51 | values.onReceiveCatching { value -> | |
52 | value | |
53 | .onSuccess { | |
54 | accumulator.add(it) | |
55 | if (accumulator.size >= count) { | |
56 | flush() | |
57 | } | |
58 | } | |
59 | .onFailure { | |
60 | it?.let { throw it } | |
61 | flush() | |
62 | finished = true | |
63 | } | |
64 | } | |
65 | } | |
66 | } | |
67 | } | |
68 | ||
69 | class ChunkedFlowTests { | |
70 | @Test | |
71 | fun `all the tests`() = runTest { | |
72 | flowOf(1, 2, 3, 4) | |
73 | .chunked(2, 1.seconds) | |
74 | .toList() shouldBeEqualTo listOf(listOf(1, 2), listOf(3, 4)) | |
75 | ||
76 | flowOf(1) | |
77 | .chunked(2, 1.seconds) | |
78 | .toList() shouldBeEqualTo listOf(listOf(1)) | |
79 | ||
80 | flow { | |
81 | emit(1) | |
82 | delay(2000) | |
83 | emit(2) | |
84 | }.chunked(2, 1.seconds) | |
85 | .toList() shouldBeEqualTo listOf(listOf(1), listOf(2)) | |
86 | ||
87 | flow { | |
88 | emit(1) | |
89 | delay(2000) | |
90 | }.chunked(2, 1.seconds) | |
91 | .toList() shouldBeEqualTo listOf(listOf(1)) | |
92 | ||
93 | flow<Int> { | |
94 | delay(2000) | |
95 | }.chunked(2, 1.seconds) | |
96 | .toList() shouldBeEqualTo listOf() | |
97 | ||
98 | flow { | |
99 | emit(1) | |
100 | }.chunked(2, 1.seconds) | |
101 | .toList() shouldBeEqualTo listOf(listOf(1)) // still emits remaining elements after flow finishes | |
102 | ||
103 | flow { | |
104 | emit(1) | |
105 | delay(2000) | |
106 | emit(2) | |
107 | delay(2000) | |
108 | emit(3) | |
109 | emit(4) | |
110 | }.chunked(2, 1.seconds) | |
111 | .toList() shouldBeEqualTo listOf(listOf(1), listOf(2), listOf(3, 4)) | |
112 | ||
113 | flowOf<Int>() | |
114 | .chunked(2, 1.seconds) | |
115 | .toList() shouldBeEqualTo listOf() | |
116 | ||
117 | flow<Int> { | |
118 | delay(2000) | |
119 | }.chunked(2, 1.seconds) | |
120 | .toList() shouldBeEqualTo listOf() | |
121 | ||
122 | flowOf(1, 2, 3) | |
123 | .chunked(0, 1.seconds) | |
124 | .toList() shouldBeEqualTo listOf(listOf(1), listOf(2), listOf(3)) | |
125 | ||
126 | flowOf(1) | |
127 | .chunked(2, null) | |
128 | .toList() shouldBeEqualTo listOf(listOf(1)) | |
129 | ||
130 | assertFailsWith(IllegalArgumentException::class) { | |
131 | flowOf(1).chunked(1, 0.seconds).toList() | |
132 | } | |
133 | ||
134 | assertFailsWith(IllegalArgumentException::class) { | |
135 | flowOf(1).chunked(-1, 1.seconds).toList() | |
136 | } | |
137 | ||
138 | // Ensure exceptions are rethrown | |
139 | assertFailsWith(NullPointerException::class) { | |
140 | flow<Int> { | |
141 | throw java.lang.NullPointerException() | |
142 | }.chunked(1, 1.seconds).toList() | |
143 | } | |
144 | } | |
145 | } | |
146 |