Guest User

Untitled

a guest
Nov 14th, 2018
128
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.37 KB | None | 0 0
  1. import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
  2. import org.apache.kafka.common.serialization.Serializer
  3.  
  4. import java.util.HashMap
  5.  
  6. import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
  7. import io.confluent.kafka.serializers.KafkaAvroSerializer
  8.  
  9. class StringAvroSerializer<T> : Serializer<T> {
  10.  
  11. private val inner: KafkaAvroSerializer
  12.  
  13. constructor() {
  14. inner = KafkaAvroSerializer()
  15. }
  16.  
  17. /**
  18. * For testing purposes only.
  19. */
  20. internal constructor(client: SchemaRegistryClient) {
  21. inner = KafkaAvroSerializer(client)
  22. }
  23.  
  24. override fun configure(serializerConfig: Map<String, *>,
  25. isSerializerForRecordKeys: Boolean) {
  26. inner.configure(
  27. withSpecificAvroEnabled(serializerConfig),
  28. isSerializerForRecordKeys)
  29. }
  30.  
  31. override fun serialize(topic: String, record: T): ByteArray {
  32. return inner.serialize(topic, record)
  33. }
  34.  
  35. override fun close() {
  36. inner.close()
  37. }
  38.  
  39. companion object {
  40.  
  41. fun withSpecificAvroEnabled(config: Map<String, *>?): Map<String, Any> {
  42. val specificAvroEnabledConfig = if (config == null) HashMap() else HashMap<String, Any>(config)
  43. specificAvroEnabledConfig[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
  44. return specificAvroEnabledConfig
  45. }
  46. }
  47.  
  48. }
Add Comment
Please, Sign In to add comment