Advertisement
Guest User

Untitled

a guest
Nov 17th, 2018
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.01 KB | None | 0 0
  1. package lobby.core.streams
  2.  
  3. import com.evolutiongaming.CasinoId
  4. import com.evolutiongaming.game.{CasinoTableId, TableId}
  5. import lobby.core.events.TableConfigEvent
  6. import lobby.core.events.TableConfigEvent._
  7. import lobby.core.streams.TableCommand._
  8. import org.apache.kafka.streams.processor.{Processor, ProcessorContext}
  9. import org.apache.kafka.streams.state.KeyValueStore
  10.  
  11. class ConfigEventsProcessor(storeName: String) extends Processor[TableId, TableConfigEvent] {
  12.  
  13.   private var context: ProcessorContext = _
  14.   private var store: KeyValueStore[TableId, Set[CasinoTableId]] = _
  15.  
  16.   private def getIds(tableId: TableId): Set[CasinoTableId] = Option(store.get(tableId)).getOrElse(Set.empty)
  17.  
  18.   override def init(context: ProcessorContext): Unit = {
  19.     this.context = context
  20.     store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[TableId, Set[CasinoTableId]]]
  21.   }
  22.  
  23.   override def process(tableId: TableId, event: TableConfigEvent): Unit = event match {
  24.     case _: Deleted              => delete(tableId)
  25.     case _: Published            => send(getIds(tableId), Publish)
  26.     case _: Unpublished          => send(getIds(tableId), Unpublish)
  27.     case Unassigned(casinoId, _) => delete(tableId, Some(casinoId))
  28.     case event: ConfigChanged    =>
  29.       val id = CasinoTableId(event.casinoId, event.tableId, event.vtId)
  30.       store.put(tableId, getIds(tableId) + id)
  31.       context.forward(id, Update(event.gameType, event.config, event.published))
  32.       context.commit()
  33.   }
  34.  
  35.   private def send(ids: Set[CasinoTableId], command: TableCommand): Unit = {
  36.     ids.foreach(context.forward(_, command))
  37.     if (ids.nonEmpty) context.commit()
  38.   }
  39.  
  40.   private def delete(tableId: TableId, casinoId: Option[CasinoId] = None): Unit = {
  41.     val ids    = getIds(tableId)
  42.     val remove = ids.filter(id => casinoId.forall(_ == id.casinoId))
  43.     val left   = ids -- remove
  44.     if (left.isEmpty) store.delete(tableId) else store.put(tableId, left)
  45.     send(remove, Delete)
  46.   }
  47.  
  48.   override def close(): Unit = ()
  49. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement