Guest User

Untitled

a guest
Nov 20th, 2017
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.11 KB | None | 0 0
  1. {
  2. "status": "NOT_AVAILABLE",
  3. "itemid": "550672332",
  4. "qty": 0,
  5. "lmts": "2017-11-18T10:39:21-08:00",
  6. "timestamp": 1511030361000
  7. }
  8.  
  9. case class NliEvents(itemid: String, status: String, qty: String)
  10.  
  11. def main(args: Array[String]): Unit = {
  12. .....
  13. val stream = KafkaUtils.createDirectStream[String, String](
  14. ssc,
  15. PreferConsistent,
  16. Subscribe[String, String](topics, kafkaParams)
  17. )
  18.  
  19.  
  20. val valueStream = stream.map(_.value())
  21. val cassandraCrud = new CassandraOperations
  22. import com.datastax.spark.connector._
  23.  
  24. val columns = SomeColumns("itemid", "status", "qty")
  25. val keySpace = configuration.getString(env + ".cassandra.keyspace")
  26. val gson = new Gson()
  27. import org.json4s._
  28. import org.json4s.jackson.JsonMethods._
  29. implicit val formats = DefaultFormats
  30. valueStream.foreachRDD((rdd, time) => {
  31. if (!rdd.isEmpty()) {
  32. val mapped = rdd.map(records => {
  33. val json = parse(records)
  34. val events = json.extract[NliEvents]
  35. events
  36. }
  37. )
  38. mapped.saveToCassandra(keySpace, "nli_events", columns)
  39. }
  40. })
  41. }
Add Comment
Please, Sign In to add comment