Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.Properties
- import com.playtech.systema.messaging.AbstractListenerActor
- import kafka.consumer._
- import kafka.serializer._
- import scala.concurrent.ExecutionContext.Implicits.global
- import scala.concurrent.duration._
- import scala.concurrent.{Await, Future}
- import scala.util.{Failure, Success}
- /**
- * Kafka listener actor.
- * @param connection instance of KafkaConnection
- * @param queue queue name
- * @param props consumer properties
- * @param callBack function, which will be called, when message is arrive
- * @param autoAck is auto ack enabled . By default for kafka false, because commit each message will lead to performance degradation,
- * so in case of true will be used auto commit by timeout see auto.commit.enable and auto.commit.interval.ms
- * consumer configuration properties, otherwise (false) commit will be on each message and
- * auto.commit.enable prop will be set to false
- */
- class KafkaListenerActor(connection: KafkaConnection,
- queue: String,
- props: Properties,
- callBack: (Array[Byte]) => Any,
- autoAck: Boolean = false) extends AbstractListenerActor {
- if (!autoAck) {
- //disable autocommit by interval and use "manual"
- props.put("auto.commit.enable", "false")
- }
- val consumerConfig: ConsumerConfig = new ConsumerConfig(props)
- var connector: ConsumerConnector = Consumer.create(consumerConfig)
- val filterSpec = new Whitelist(queue)
- val stringDecoder = new StringDecoder()
- val defaultDecoder = new DefaultDecoder()
- def onBind() {
- val stream: KafkaStream[String, Array[Byte]] = connector.createMessageStreamsByFilter(
- filterSpec,
- 1,
- stringDecoder,
- defaultDecoder)(0)
- logger.info(s"Consumer is binded with to $filterSpec ack flag is $autoAck")
- for (messageAndTopic <- stream) {
- logger.debug("Consumer on message [" + new String (messageAndTopic.message()) + "]")
- val future : Future[_] = onMessage(messageAndTopic.message(), messageAndTopic.key(), None, messageAndTopic )
- future.onComplete {
- case Success(s) =>
- if (!autoAck) {
- connector.commitOffsets
- logger.debug("Commit offsets")
- }
- case Failure(t) => logger.error("Cannot process message", t)
- }
- Await.result(future, 5.seconds)
- }
- }
- /**
- * Method to handle message delivery.
- * @param consumerTag - label of consumer.
- * @param body - message
- */
- def onMessage(body: Array[Byte], consumerTag: Any, envelop: Any, properties: Any): Future[Unit] = Future {
- callBack(body)
- }
- /**
- * Close wrapped producer.
- */
- def onClose() = {
- if (connector != null) {
- logger.debug("Close message listener")
- connector.shutdown()
- connector = null
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement