Advertisement
Guest User

Untitled

a guest
Jul 13th, 2017
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.90 KB | None | 0 0
  1. import scala.concurrent.duration._
  2.  
  3. object Hi extends App {
  4.  
  5. case class A[K](k: K)
  6.  
  7. /**
  8. * Settings for producers. See `akka.kafka.producer` section in
  9. * reference.conf. Note that the [[ProducerSettings companion]] object provides
  10. * `apply` and `create` functions for convenient construction of the settings, together with
  11. * the `with` methods.
  12. */
  13. final class ProducerSettings[K, V](
  14. val properties: Map[String, String],
  15. val keySerializerOpt: Option[A[K]],
  16. val valueSerializerOpt: Option[A[V]],
  17. val closeTimeout: FiniteDuration,
  18. val parallelism: Int,
  19. val dispatcher: String
  20. ) {
  21.  
  22. /**
  23. * The raw properties of the kafka-clients driver, see constants in
  24. * `org.apache.kafka.clients.producer.ProducerConfig`.
  25. */
  26. def withProperty(key: String, value: String): ProducerSettings[K, V] =
  27. copy(properties = properties.updated(key, value))
  28.  
  29. def withCloseTimeout(closeTimeout: FiniteDuration): ProducerSettings[K, V] =
  30. copy(closeTimeout = closeTimeout)
  31.  
  32. def withParallelism(parallelism: Int): ProducerSettings[K, V] =
  33. copy(parallelism = parallelism)
  34.  
  35. def withDispatcher(dispatcher: String): ProducerSettings[K, V] =
  36. copy(dispatcher = dispatcher)
  37.  
  38. private def copy(
  39. properties: Map[String, String] = properties,
  40. keySerializer: Option[A[K]] = keySerializerOpt,
  41. valueSerializer: Option[A[V]] = valueSerializerOpt,
  42. closeTimeout: FiniteDuration = closeTimeout,
  43. parallelism: Int = parallelism,
  44. dispatcher: String = dispatcher
  45. ): ProducerSettings[K, V] =
  46. new ProducerSettings[K, V](properties, keySerializer, valueSerializer, closeTimeout, parallelism, dispatcher)
  47.  
  48. }
  49.  
  50. def getConnectionString(username: String, password: String): String =
  51. raw"""org.apache.kafka.common.security.plain.PlainLoginModule required username="$username" password="$password";"""
  52.  
  53. def getGenericSettings(): Map[String, String] =
  54. Map("request.timeout.ms" -> "20000", "retry.backoff.ms" -> "500")
  55.  
  56. def getConfluentSettings(listener: String,
  57. username: String,
  58. password: String): Map[String, String] =
  59. Map(
  60. "listeners" -> listener,
  61. "security.protocol" -> "SASL_SSL",
  62. "sasl.mechanism" -> "PLAIN",
  63. "sasl.jaas.config" -> getConnectionString(username, password)
  64. )
  65.  
  66. def connect[K, V](settings: ProducerSettings[K, V],
  67. listener: String,
  68. username: Option[String],
  69. password: Option[String],
  70. other: Option[Map[String, String]]): ProducerSettings[K, V] = {
  71. val m1 = getGenericSettings() ++ other.getOrElse(Map())
  72. val result = (for {
  73. u <- username
  74. p <- password
  75. } yield getConfluentSettings(listener, u, p)).getOrElse(Map()) ++ m1
  76. result.foldLeft(settings)((a, b) => a.withProperty(b._1, b._2))
  77. }
  78.  
  79. val producer = new ProducerSettings[String, String](Map(), None, None, 5 seconds, 2, "hi")
  80.  
  81. print(connect(producer, "hi", Some("kappa"), Some("keepo"), None).properties)
  82. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement