Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import com.nomadgcs.data.event.EventPriority
- import kotlinx.coroutines.*
- import kotlinx.coroutines.flow.*
- import kotlinx.coroutines.sync.Mutex
- import kotlinx.coroutines.sync.withLock
- import java.util.*
- import java.util.concurrent.atomic.AtomicInteger
- import kotlin.reflect.KClass
- fun main(): Unit = runBlocking {
- // Create and register the handler
- val bus = EventRegistrar()
- val called = AtomicInteger()
- launch {
- for (i in 0..10) {
- delay(100)
- bus.emit(Any())
- }
- delay(100)
- println("Called $called times")
- }
- val mutex = Mutex(true)
- bus.register(Any::class, EventPriority.NORMAL) {
- println("Called")
- called.getAndIncrement()
- mutex.unlock()
- }
- // Call the event
- launch {
- println("Suspending")
- mutex.withLock {
- println("Acquired")
- }
- }
- }
- @Suppress("UNCHECKED_CAST")
- class EventRegistrar(job: Job = Job()) : Job by job {
- private val scope = CoroutineScope(this)
- private val handlers = mutableMapOf<KClass<*>, EnumMap<EventPriority, MutableSharedFlow<*>>>()
- fun <T : Any> register(
- type: KClass<T>,
- priority: EventPriority,
- handler: suspend (T) -> Unit
- ): Job {
- val flow = create(priority, type)
- .onEach(handler)
- .catch { println("Exception caught while handling event for ${type.simpleName}") }
- println("Registering")
- val job = scope.launch(start = CoroutineStart.UNDISPATCHED) { flow.collect() }
- println("Registered")
- return job
- }
- suspend fun <T : Any> emit(event: T): T {
- println("Emitting")
- val handlers = handlers[event::class] ?: return event
- // Since we're using an EnumMap, we know that values will always
- // be iterated in the order of the EventPriority declaration
- println("Emitting here")
- // println("Emitting to ${handlers.size}") // << Uncommenting will cause the problem to "fix" itself
- handlers.values.forEach {
- println("Subscribers")
- println("${it.subscriptionCount.value}")
- // println("Subscribers ${it.subscriptionCount.value}") // << Uncommenting will cause the problem to "fix" itself
- (it as MutableSharedFlow<T>).emit(event)
- }
- println("Done")
- return event
- }
- private fun <T : Any> create(priority: EventPriority, type: KClass<T>) =
- handlers.getOrPut(type) { EnumMap(EventPriority::class.java) }
- .getOrPut(priority) { MutableSharedFlow<T>(/*extraBufferCapacity = 10*/) } as MutableSharedFlow<T>
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement