Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- fun start() {
- log.debug("start")
- Thread {
- while (true) try {
- handle()
- } catch (exception: Throwable) {
- log.error(exception)
- }
- }.start()
- }
- private fun handle() {
- val messages = mutableListOf<InterfaceAggregatorMessage>()
- @Suppress("UNCHECKED_CAST")
- messages.add((interfaceAggregatorFlowChannel.receive() as Message<InterfaceAggregatorMessage>).payload)
- val startTime = System.currentTimeMillis()
- do {
- @Suppress("UNCHECKED_CAST")
- val message = (interfaceAggregatorFlowChannel.receive(10) as Message<InterfaceAggregatorMessage>?)?.payload
- if (message != null) {
- messages.add(message)
- }
- } while (message != null && System.currentTimeMillis() - startTime < 500)
- log.debug("messages: ${messages.size} in ${System.currentTimeMillis() - startTime} ms")
- // Message grouping. Out of question scope
- // val groupedMessages = messages
- // .groupBy {
- // when (it::class.java) {
- // MonitorState::class.java -> MonitorState::class.java
- // EntityUpdate2::class.java -> EntityUpdate2::class.java
- // else -> {
- // log.error("Unsupported class: ${it::class.java}: $it")
- // null
- // }
- // }
- // }
- //
- // @Suppress("UNCHECKED_CAST")
- // onEntityUpdate((groupedMessages[EntityUpdate2::class.java] as List<EntityUpdate2>? ?: emptyList()))
- // @Suppress("UNCHECKED_CAST")
- // (groupedMessages[MonitorState::class.java] as List<MonitorState>? ?: emptyList())
- // .forEach { onInterfaceRequest(it) }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement