Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
- import org.apache.kafka.common.serialization.Serializer
- import java.util.HashMap
- import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
- import io.confluent.kafka.serializers.KafkaAvroSerializer
- class StringAvroSerializer<T> : Serializer<T> {
- private val inner: KafkaAvroSerializer
- constructor() {
- inner = KafkaAvroSerializer()
- }
- /**
- * For testing purposes only.
- */
- internal constructor(client: SchemaRegistryClient) {
- inner = KafkaAvroSerializer(client)
- }
- override fun configure(serializerConfig: Map<String, *>,
- isSerializerForRecordKeys: Boolean) {
- inner.configure(
- withSpecificAvroEnabled(serializerConfig),
- isSerializerForRecordKeys)
- }
- override fun serialize(topic: String, record: T): ByteArray {
- return inner.serialize(topic, record)
- }
- override fun close() {
- inner.close()
- }
- companion object {
- fun withSpecificAvroEnabled(config: Map<String, *>?): Map<String, Any> {
- val specificAvroEnabledConfig = if (config == null) HashMap() else HashMap<String, Any>(config)
- specificAvroEnabledConfig[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
- return specificAvroEnabledConfig
- }
- }
- }
Add Comment
Please, Sign In to add comment