Advertisement
Guest User

Untitled

a guest
Mar 21st, 2023
47
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 2.66 KB | None | 0 0
  1. import com.nomadgcs.data.event.EventPriority
  2. import kotlinx.coroutines.*
  3. import kotlinx.coroutines.flow.*
  4. import kotlinx.coroutines.sync.Mutex
  5. import kotlinx.coroutines.sync.withLock
  6. import java.util.*
  7. import java.util.concurrent.atomic.AtomicInteger
  8. import kotlin.reflect.KClass
  9.  
  10. fun main(): Unit = runBlocking {
  11.     // Create and register the handler
  12.     val bus = EventRegistrar()
  13.     val called = AtomicInteger()
  14.     launch {
  15.  
  16.         for (i in 0..10) {
  17.             delay(100)
  18.             bus.emit(Any())
  19.         }
  20.  
  21.         delay(100)
  22.         println("Called $called times")
  23.     }
  24.  
  25.     val mutex = Mutex(true)
  26.     bus.register(Any::class, EventPriority.NORMAL) {
  27.         println("Called")
  28.         called.getAndIncrement()
  29.         mutex.unlock()
  30.     }
  31.     // Call the event
  32.     launch {
  33.  
  34.         println("Suspending")
  35.         mutex.withLock {
  36.             println("Acquired")
  37.         }
  38.     }
  39. }
  40.  
  41. @Suppress("UNCHECKED_CAST")
  42. class EventRegistrar(job: Job = Job()) : Job by job {
  43.  
  44.     private val scope = CoroutineScope(this)
  45.     private val handlers = mutableMapOf<KClass<*>, EnumMap<EventPriority, MutableSharedFlow<*>>>()
  46.  
  47.     fun <T : Any> register(
  48.         type: KClass<T>,
  49.         priority: EventPriority,
  50.         handler: suspend (T) -> Unit
  51.     ): Job {
  52.         val flow = create(priority, type)
  53.             .onEach(handler)
  54.             .catch { println("Exception caught while handling event for ${type.simpleName}") }
  55.         println("Registering")
  56.         val job = scope.launch(start = CoroutineStart.UNDISPATCHED) { flow.collect() }
  57.         println("Registered")
  58.         return job
  59.     }
  60.  
  61.     suspend fun <T : Any> emit(event: T): T {
  62.         println("Emitting")
  63.         val handlers = handlers[event::class] ?: return event
  64.         // Since we're using an EnumMap, we know that values will always
  65.         // be iterated in the order of the EventPriority declaration
  66.         println("Emitting here")
  67. //        println("Emitting to ${handlers.size}") // << Uncommenting will cause the problem to "fix" itself
  68.         handlers.values.forEach {
  69.             println("Subscribers")
  70.             println("${it.subscriptionCount.value}")
  71. //            println("Subscribers ${it.subscriptionCount.value}") // << Uncommenting will cause the problem to "fix" itself
  72.             (it as MutableSharedFlow<T>).emit(event)
  73.         }
  74.         println("Done")
  75.         return event
  76.     }
  77.  
  78.     private fun <T : Any> create(priority: EventPriority, type: KClass<T>) =
  79.         handlers.getOrPut(type) { EnumMap(EventPriority::class.java) }
  80.             .getOrPut(priority) { MutableSharedFlow<T>(/*extraBufferCapacity = 10*/) } as MutableSharedFlow<T>
  81. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement