Advertisement
Guest User

Untitled

a guest
Oct 13th, 2014
236
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.93 KB | None | 0 0
  1.  
  2.  
  3. import java.util.Properties
  4.  
  5. import com.playtech.systema.messaging.AbstractListenerActor
  6. import kafka.consumer._
  7. import kafka.serializer._
  8.  
  9. import scala.concurrent.ExecutionContext.Implicits.global
  10. import scala.concurrent.duration._
  11. import scala.concurrent.{Await, Future}
  12. import scala.util.{Failure, Success}
  13.  
  14. /**
  15.  * Kafka listener actor.
  16.  * @param connection  instance of  KafkaConnection
  17.  * @param queue  queue name
  18.  * @param props  consumer properties
  19.  * @param callBack function, which will be called, when message is arrive
  20.  * @param autoAck is  auto ack enabled . By default for kafka false, because commit each message will lead to performance degradation,
  21.  *                so in case of true will be used auto  commit by timeout see auto.commit.enable and auto.commit.interval.ms
  22.  *                consumer configuration properties, otherwise (false) commit will be on each message and
  23.  *                auto.commit.enable prop will be set to false
  24.  */
  25. class KafkaListenerActor(connection: KafkaConnection,
  26.                          queue: String,
  27.                          props: Properties,
  28.                          callBack: (Array[Byte]) => Any,
  29.                          autoAck: Boolean = false) extends AbstractListenerActor {
  30.  
  31.  
  32.   if (!autoAck) {
  33.     //disable autocommit by interval and use "manual"
  34.     props.put("auto.commit.enable", "false")
  35.  
  36.   }
  37.  
  38.   val consumerConfig: ConsumerConfig = new ConsumerConfig(props)
  39.   var connector: ConsumerConnector = Consumer.create(consumerConfig)
  40.   val filterSpec = new Whitelist(queue)
  41.   val stringDecoder = new StringDecoder()
  42.   val defaultDecoder = new DefaultDecoder()
  43.  
  44.  
  45.   def onBind() {
  46.  
  47.     val stream: KafkaStream[String, Array[Byte]] = connector.createMessageStreamsByFilter(
  48.       filterSpec,
  49.       1,
  50.       stringDecoder,
  51.       defaultDecoder)(0)
  52.  
  53.     logger.info(s"Consumer is binded with to $filterSpec ack flag is $autoAck")
  54.  
  55.     for (messageAndTopic <- stream) {
  56.  
  57.       logger.debug("Consumer on message [" + new String (messageAndTopic.message()) + "]")
  58.  
  59.       val future : Future[_] = onMessage(messageAndTopic.message(), messageAndTopic.key(), None, messageAndTopic )
  60.  
  61.       future.onComplete {
  62.         case Success(s) =>
  63.           if (!autoAck) {
  64.             connector.commitOffsets
  65.             logger.debug("Commit offsets")
  66.           }
  67.         case Failure(t) => logger.error("Cannot process message", t)
  68.       }
  69.  
  70.       Await.result(future, 5.seconds)
  71.  
  72.  
  73.     }
  74.  
  75.   }
  76.  
  77.  
  78.   /**
  79.    * Method to handle message delivery.
  80.    * @param consumerTag - label of consumer.
  81.    * @param body - message
  82.    */
  83.   def onMessage(body: Array[Byte], consumerTag: Any, envelop: Any, properties: Any): Future[Unit] = Future {
  84.     callBack(body)
  85.   }
  86.  
  87.   /**
  88.    * Close wrapped producer.
  89.    */
  90.   def onClose() = {
  91.  
  92.     if (connector != null) {
  93.       logger.debug("Close message listener")
  94.       connector.shutdown()
  95.       connector = null
  96.     }
  97.  
  98.   }
  99.  
  100.  
  101. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement