Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package lobby.core.streams
- import com.evolutiongaming.CasinoId
- import com.evolutiongaming.game.{CasinoTableId, TableId}
- import lobby.core.events.TableConfigEvent
- import lobby.core.events.TableConfigEvent._
- import lobby.core.streams.TableCommand._
- import org.apache.kafka.streams.processor.{Processor, ProcessorContext}
- import org.apache.kafka.streams.state.KeyValueStore
- class ConfigEventsProcessor(storeName: String) extends Processor[TableId, TableConfigEvent] {
- private var context: ProcessorContext = _
- private var store: KeyValueStore[TableId, Set[CasinoTableId]] = _
- private def getIds(tableId: TableId): Set[CasinoTableId] = Option(store.get(tableId)).getOrElse(Set.empty)
- override def init(context: ProcessorContext): Unit = {
- this.context = context
- store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[TableId, Set[CasinoTableId]]]
- }
- override def process(tableId: TableId, event: TableConfigEvent): Unit = event match {
- case _: Deleted => delete(tableId)
- case _: Published => send(getIds(tableId), Publish)
- case _: Unpublished => send(getIds(tableId), Unpublish)
- case Unassigned(casinoId, _) => delete(tableId, Some(casinoId))
- case event: ConfigChanged =>
- val id = CasinoTableId(event.casinoId, event.tableId, event.vtId)
- store.put(tableId, getIds(tableId) + id)
- context.forward(id, Update(event.gameType, event.config, event.published))
- context.commit()
- }
- private def send(ids: Set[CasinoTableId], command: TableCommand): Unit = {
- ids.foreach(context.forward(_, command))
- if (ids.nonEmpty) context.commit()
- }
- private def delete(tableId: TableId, casinoId: Option[CasinoId] = None): Unit = {
- val ids = getIds(tableId)
- val remove = ids.filter(id => casinoId.forall(_ == id.casinoId))
- val left = ids -- remove
- if (left.isEmpty) store.delete(tableId) else store.put(tableId, left)
- send(remove, Delete)
- }
- override def close(): Unit = ()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement