Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import scala.concurrent.duration._
- object Hi extends App {
- case class A[K](k: K)
- /**
- * Settings for producers. See `akka.kafka.producer` section in
- * reference.conf. Note that the [[ProducerSettings companion]] object provides
- * `apply` and `create` functions for convenient construction of the settings, together with
- * the `with` methods.
- */
- final class ProducerSettings[K, V](
- val properties: Map[String, String],
- val keySerializerOpt: Option[A[K]],
- val valueSerializerOpt: Option[A[V]],
- val closeTimeout: FiniteDuration,
- val parallelism: Int,
- val dispatcher: String
- ) {
- /**
- * The raw properties of the kafka-clients driver, see constants in
- * `org.apache.kafka.clients.producer.ProducerConfig`.
- */
- def withProperty(key: String, value: String): ProducerSettings[K, V] =
- copy(properties = properties.updated(key, value))
- def withCloseTimeout(closeTimeout: FiniteDuration): ProducerSettings[K, V] =
- copy(closeTimeout = closeTimeout)
- def withParallelism(parallelism: Int): ProducerSettings[K, V] =
- copy(parallelism = parallelism)
- def withDispatcher(dispatcher: String): ProducerSettings[K, V] =
- copy(dispatcher = dispatcher)
- private def copy(
- properties: Map[String, String] = properties,
- keySerializer: Option[A[K]] = keySerializerOpt,
- valueSerializer: Option[A[V]] = valueSerializerOpt,
- closeTimeout: FiniteDuration = closeTimeout,
- parallelism: Int = parallelism,
- dispatcher: String = dispatcher
- ): ProducerSettings[K, V] =
- new ProducerSettings[K, V](properties, keySerializer, valueSerializer, closeTimeout, parallelism, dispatcher)
- }
- def getConnectionString(username: String, password: String): String =
- raw"""org.apache.kafka.common.security.plain.PlainLoginModule required username="$username" password="$password";"""
- def getGenericSettings(): Map[String, String] =
- Map("request.timeout.ms" -> "20000", "retry.backoff.ms" -> "500")
- def getConfluentSettings(listener: String,
- username: String,
- password: String): Map[String, String] =
- Map(
- "listeners" -> listener,
- "security.protocol" -> "SASL_SSL",
- "sasl.mechanism" -> "PLAIN",
- "sasl.jaas.config" -> getConnectionString(username, password)
- )
- def connect[K, V](settings: ProducerSettings[K, V],
- listener: String,
- username: Option[String],
- password: Option[String],
- other: Option[Map[String, String]]): ProducerSettings[K, V] = {
- val m1 = getGenericSettings() ++ other.getOrElse(Map())
- val result = (for {
- u <- username
- p <- password
- } yield getConfluentSettings(listener, u, p)).getOrElse(Map()) ++ m1
- result.foldLeft(settings)((a, b) => a.withProperty(b._1, b._2))
- }
- val producer = new ProducerSettings[String, String](Map(), None, None, 5 seconds, 2, "hi")
- print(connect(producer, "hi", Some("kappa"), Some("keepo"), None).properties)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement