Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- {
- "status": "NOT_AVAILABLE",
- "itemid": "550672332",
- "qty": 0,
- "lmts": "2017-11-18T10:39:21-08:00",
- "timestamp": 1511030361000
- }
- case class NliEvents(itemid: String, status: String, qty: String)
- def main(args: Array[String]): Unit = {
- .....
- val stream = KafkaUtils.createDirectStream[String, String](
- ssc,
- PreferConsistent,
- Subscribe[String, String](topics, kafkaParams)
- )
- val valueStream = stream.map(_.value())
- val cassandraCrud = new CassandraOperations
- import com.datastax.spark.connector._
- val columns = SomeColumns("itemid", "status", "qty")
- val keySpace = configuration.getString(env + ".cassandra.keyspace")
- val gson = new Gson()
- import org.json4s._
- import org.json4s.jackson.JsonMethods._
- implicit val formats = DefaultFormats
- valueStream.foreachRDD((rdd, time) => {
- if (!rdd.isEmpty()) {
- val mapped = rdd.map(records => {
- val json = parse(records)
- val events = json.extract[NliEvents]
- events
- }
- )
- mapped.saveToCassandra(keySpace, "nli_events", columns)
- }
- })
- }
Add Comment
Please, Sign In to add comment