Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- trait Record[K, V] {
- def topic: String
- def key(value: V): K
- def timestamp(value: V): Long
- }
- object Producer {
- def apply[V] = new ProducerBuilder[V]
- class ProducerBuilder[V] {
- def apply[K](config: Properties)(implicit record: Record[K, V],
- keySerializer: Serializer[K],
- valueSerializer: Serializer[V]): KafkaProducer[K, V] =
- new KafkaProducer(config, keySerializer, valueSerializer)
- }
- }
- implicit class KafkaProducerOps[K, V](kafkaProducer: KafkaProducer[K, V]) {
- def send(value: V)(implicit record: Record[K, V]): Future[RecordMetadata] = Future {
- kafkaProducer.send(new ProducerRecord(record.topic, null, record.timestamp(value), record.key(value), value)).get()
- }
- }
Add Comment
Please, Sign In to add comment