SHARE
TWEET

Untitled

a guest Jul 22nd, 2019 55 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package au.com.simplesteph.kafka.kafka0_11.demo
  2.  
  3. import java.util.Properties
  4.  
  5. import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
  6. import org.apache.kafka.common.KafkaException
  7. import org.apache.kafka.common.errors.AuthorizationException
  8. import org.apache.kafka.common.errors.OutOfOrderSequenceException
  9. import org.apache.kafka.common.errors.ProducerFencedException
  10. import org.apache.kafka.common.serialization.StringSerializer
  11.  
  12. object TransactionalProducer {
  13.   def main(args: Array[String]): Unit = {
  14.     val props = new Properties()
  15.     props.put("bootstrap.servers", "localhost:9092")
  16.     props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id") // this has to be set!!! (unique for each producer you're having)
  17.     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") // has to be idempotent
  18.  
  19.  
  20.     val producer = new KafkaProducer[String ,String](props, new StringSerializer, new StringSerializer)
  21.     producer.initTransactions()
  22.  
  23.     try {
  24.       producer.beginTransaction()
  25.       for (i <- Range(0, 100)) {
  26.         producer.send(new ProducerRecord[String, String]("my-transactional-topic", Integer.toString(i), Integer.toString(i)))
  27.         producer.send(new ProducerRecord[String, String]("my-other-topic", Integer.toString(i), Integer.toString(i)))
  28.       }
  29.       producer.commitTransaction()
  30.     } catch {
  31.       case e@(_: ProducerFencedException | _: OutOfOrderSequenceException | _: AuthorizationException) =>
  32.         // We can't recover from these exceptions, so our only option is to close the producer and exit.
  33.         producer.close()
  34.       case e: KafkaException =>
  35.         // For all other exceptions, just abort the transaction and try again.
  36.         producer.abortTransaction()
  37.     }
  38.   }
  39. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Not a member of Pastebin yet?
Sign Up, it unlocks many cool features!
 
Top