Guest User

Untitled

a guest
May 24th, 2018
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.94 KB | None | 0 0
  1. import java.util.{Properties}
  2. import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
  3. import scala.sys.process._
  4.  
  5. val props = new Properties()
  6. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "rm01.itversity.com:6667,nn02.itversity.com:6667,nn01.itversity.com:6667")
  7. props.put(ProducerConfig.CLIENT_ID_CONFIG, "ScalaProducerExample")
  8. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  9. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  10. val producer = new KafkaProducer[String, String](props)
  11.  
  12. def processLine(line: String, producer: KafkaProducer[String, String]): Unit = {
  13. val data = new ProducerRecord[String, String]("Kafka-Testing", "Key", line)
  14. producer.send(data)
  15. }
  16.  
  17. val file = "/opt/gen_logs/logs/access.log"
  18.  
  19. val tail = Seq("tail", "-f", file)
  20.  
  21. tail.lineStream.foreach(processLine(_, producer))
Add Comment
Please, Sign In to add comment