Advertisement
Guest User

Untitled

a guest
Jul 22nd, 2019
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.73 KB | None | 0 0
  1. package au.com.simplesteph.kafka.kafka0_11.demo
  2.  
  3. import java.util.Properties
  4.  
  5. import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
  6. import org.apache.kafka.common.KafkaException
  7. import org.apache.kafka.common.errors.AuthorizationException
  8. import org.apache.kafka.common.errors.OutOfOrderSequenceException
  9. import org.apache.kafka.common.errors.ProducerFencedException
  10. import org.apache.kafka.common.serialization.StringSerializer
  11.  
  12. object TransactionalProducer {
  13. def main(args: Array[String]): Unit = {
  14. val props = new Properties()
  15. props.put("bootstrap.servers", "localhost:9092")
  16. props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id") // this has to be set!!! (unique for each producer you're having)
  17. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") // has to be idempotent
  18.  
  19.  
  20. val producer = new KafkaProducer[String ,String](props, new StringSerializer, new StringSerializer)
  21. producer.initTransactions()
  22.  
  23. try {
  24. producer.beginTransaction()
  25. for (i <- Range(0, 100)) {
  26. producer.send(new ProducerRecord[String, String]("my-transactional-topic", Integer.toString(i), Integer.toString(i)))
  27. producer.send(new ProducerRecord[String, String]("my-other-topic", Integer.toString(i), Integer.toString(i)))
  28. }
  29. producer.commitTransaction()
  30. } catch {
  31. case e@(_: ProducerFencedException | _: OutOfOrderSequenceException | _: AuthorizationException) =>
  32. // We can't recover from these exceptions, so our only option is to close the producer and exit.
  33. producer.close()
  34. case e: KafkaException =>
  35. // For all other exceptions, just abort the transaction and try again.
  36. producer.abortTransaction()
  37. }
  38. }
  39. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement