Advertisement
Guest User

Untitled

a guest
Aug 6th, 2023
14
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 4.18 KB | Source Code | 0 0
  1. package de.s3rius.tracker
  2.  
  3. import kotlinx.coroutines.channels.onFailure
  4. import kotlinx.coroutines.channels.onSuccess
  5. import kotlinx.coroutines.channels.produce
  6. import kotlinx.coroutines.delay
  7. import kotlinx.coroutines.flow.Flow
  8. import kotlinx.coroutines.flow.channelFlow
  9. import kotlinx.coroutines.flow.flow
  10. import kotlinx.coroutines.flow.flowOf
  11. import kotlinx.coroutines.flow.toList
  12. import kotlinx.coroutines.selects.onTimeout
  13. import kotlinx.coroutines.selects.select
  14. import kotlinx.coroutines.test.runTest
  15. import org.amshove.kluent.internal.assertFailsWith
  16. import org.amshove.kluent.shouldBeEqualTo
  17. import org.junit.Test
  18. import kotlin.time.Duration
  19. import kotlin.time.Duration.Companion.seconds
  20.  
  21. /**
  22.  * Collects emission of the flow into lists of [count] items before emitting them downstream.
  23.  * The original order of items is preserved.
  24.  * If a [timeout] is given, any currently collected items will be emitted after [timeout] duratino
  25.  * after the last emission, even if [count] wasn't reached yet.
  26.  * Will not emit empty lists ever.
  27.  */
  28. suspend fun <T> Flow<T>.chunked(count: Int, timeout: Duration? = null): Flow<List<T>> = channelFlow {
  29.     if (timeout != null)
  30.         require(timeout.isPositive()) { "timeout must be larger than 0" }
  31.     require(count >= 0) { "count must not be negative" }
  32.  
  33.     val values = produce { collect { value -> send(value) } }
  34.  
  35.     val accumulator = mutableListOf<T>()
  36.     var finished = false
  37.  
  38.     suspend fun flush() {
  39.         if (accumulator.isEmpty()) return
  40.         send(accumulator.toList())
  41.         accumulator.clear()
  42.     }
  43.  
  44.     while (!finished) {
  45.         select {
  46.             if (timeout != null && accumulator.isNotEmpty()) {
  47.                 onTimeout(timeout) {
  48.                     flush()
  49.                 }
  50.             }
  51.             values.onReceiveCatching { value ->
  52.                 value
  53.                     .onSuccess {
  54.                         accumulator.add(it)
  55.                         if (accumulator.size >= count) {
  56.                             flush()
  57.                         }
  58.                     }
  59.                     .onFailure {
  60.                         it?.let { throw it }
  61.                         flush()
  62.                         finished = true
  63.                     }
  64.             }
  65.         }
  66.     }
  67. }
  68.  
  69. class ChunkedFlowTests {
  70.     @Test
  71.     fun `all the tests`() = runTest {
  72.         flowOf(1, 2, 3, 4)
  73.             .chunked(2, 1.seconds)
  74.             .toList() shouldBeEqualTo listOf(listOf(1, 2), listOf(3, 4))
  75.  
  76.         flowOf(1)
  77.             .chunked(2, 1.seconds)
  78.             .toList() shouldBeEqualTo listOf(listOf(1))
  79.  
  80.         flow {
  81.             emit(1)
  82.             delay(2000)
  83.             emit(2)
  84.         }.chunked(2, 1.seconds)
  85.             .toList() shouldBeEqualTo listOf(listOf(1), listOf(2))
  86.  
  87.         flow {
  88.             emit(1)
  89.             delay(2000)
  90.         }.chunked(2, 1.seconds)
  91.             .toList() shouldBeEqualTo listOf(listOf(1))
  92.  
  93.         flow<Int> {
  94.             delay(2000)
  95.         }.chunked(2, 1.seconds)
  96.             .toList() shouldBeEqualTo listOf()
  97.  
  98.         flow {
  99.             emit(1)
  100.             delay(2000)
  101.             emit(2)
  102.             delay(2000)
  103.             emit(3)
  104.             emit(4)
  105.         }.chunked(2, 1.seconds)
  106.             .toList() shouldBeEqualTo listOf(listOf(1), listOf(2), listOf(3, 4))
  107.  
  108.         flowOf<Int>()
  109.             .chunked(2, 1.seconds)
  110.             .toList() shouldBeEqualTo listOf()
  111.  
  112.         flow<Int> {
  113.             delay(2000)
  114.         }.chunked(2, 1.seconds)
  115.             .toList() shouldBeEqualTo listOf()
  116.  
  117.         flowOf(1, 2, 3)
  118.             .chunked(0, 1.seconds)
  119.             .toList() shouldBeEqualTo listOf(listOf(1), listOf(2), listOf(3))
  120.  
  121.         assertFailsWith(IllegalArgumentException::class) {
  122.             flowOf(1).chunked(1, 0.seconds).toList()
  123.         }
  124.  
  125.         assertFailsWith(IllegalArgumentException::class) {
  126.             flowOf(1).chunked(-1, 1.seconds).toList()
  127.         }
  128.  
  129.         // Ensure exceptions are rethrown
  130.         assertFailsWith(NullPointerException::class) {
  131.             flow<Int> {
  132.                 throw java.lang.NullPointerException()
  133.             }.chunked(1, 1.seconds).toList()
  134.         }
  135.     }
  136. }
  137.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement