Advertisement
Guest User

Untitled

a guest
Aug 6th, 2019
218
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 1.96 KB | None | 0 0
  1.     fun start() {
  2.         log.debug("start")
  3.         Thread {
  4.             while (true) try {
  5.                 handle()
  6.             } catch (exception: Throwable) {
  7.                 log.error(exception)
  8.             }
  9.         }.start()
  10.     }
  11.  
  12.     private fun handle() {
  13.         val messages = mutableListOf<InterfaceAggregatorMessage>()
  14.         @Suppress("UNCHECKED_CAST")
  15.         messages.add((interfaceAggregatorFlowChannel.receive() as Message<InterfaceAggregatorMessage>).payload)
  16.         val startTime = System.currentTimeMillis()
  17.         do {
  18.             @Suppress("UNCHECKED_CAST")
  19.             val message = (interfaceAggregatorFlowChannel.receive(10) as Message<InterfaceAggregatorMessage>?)?.payload
  20.             if (message != null) {
  21.                 messages.add(message)
  22.             }
  23.         } while (message != null && System.currentTimeMillis() - startTime < 500)
  24.         log.debug("messages: ${messages.size} in ${System.currentTimeMillis() - startTime} ms")
  25.  
  26.         // Message grouping. Out of question scope
  27.         //        val groupedMessages = messages
  28.         //            .groupBy {
  29.         //                when (it::class.java) {
  30.         //                    MonitorState::class.java -> MonitorState::class.java
  31.         //                    EntityUpdate2::class.java -> EntityUpdate2::class.java
  32.         //                    else -> {
  33.         //                        log.error("Unsupported class: ${it::class.java}: $it")
  34.         //                        null
  35.         //                    }
  36.         //                }
  37.         //            }
  38.         //
  39.         //        @Suppress("UNCHECKED_CAST")
  40.         //        onEntityUpdate((groupedMessages[EntityUpdate2::class.java] as List<EntityUpdate2>? ?: emptyList()))
  41.         //        @Suppress("UNCHECKED_CAST")
  42.         //        (groupedMessages[MonitorState::class.java] as List<MonitorState>? ?: emptyList())
  43.         //            .forEach { onInterfaceRequest(it) }
  44.  
  45.     }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement