Advertisement
Guest User

Untitled

a guest
Aug 29th, 2016
59
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.43 KB | None | 0 0
  1. override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
  2. log.trace("Received record(s) from kinesis")
  3. for {
  4. record <- processRecordsInput.getRecords
  5. json <- jawn.parseByteBuffer(record.getData).toOption
  6. msg <- decode[T](json.toString).toOption
  7. } yield subscriber ! msg
  8. //TODO: Handle failure to parse messages properly, See PRO-903
  9. processRecordsInput.getCheckpointer.checkpoint()
  10. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement