Guest User

Untitled

a guest
May 24th, 2018
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.75 KB | None | 0 0
  1. trait Record[K, V] {
  2. def topic: String
  3. def key(value: V): K
  4. def timestamp(value: V): Long
  5. }
  6.  
  7. object Producer {
  8. def apply[V] = new ProducerBuilder[V]
  9. class ProducerBuilder[V] {
  10. def apply[K](config: Properties)(implicit record: Record[K, V],
  11. keySerializer: Serializer[K],
  12. valueSerializer: Serializer[V]): KafkaProducer[K, V] =
  13. new KafkaProducer(config, keySerializer, valueSerializer)
  14. }
  15. }
  16.  
  17. implicit class KafkaProducerOps[K, V](kafkaProducer: KafkaProducer[K, V]) {
  18. def send(value: V)(implicit record: Record[K, V]): Future[RecordMetadata] = Future {
  19. kafkaProducer.send(new ProducerRecord(record.topic, null, record.timestamp(value), record.key(value), value)).get()
  20. }
  21. }
Add Comment
Please, Sign In to add comment