Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
- log.trace("Received record(s) from kinesis")
- for {
- record <- processRecordsInput.getRecords
- json <- jawn.parseByteBuffer(record.getData).toOption
- msg <- decode[T](json.toString).toOption
- } yield subscriber ! msg
- //TODO: Handle failure to parse messages properly, See PRO-903
- processRecordsInput.getCheckpointer.checkpoint()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement