Advertisement
Guest User

Untitled

a guest
Jun 24th, 2019
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.65 KB | None | 0 0
  1. input.foreachPartition { part => {
  2.  
  3. val disruptor = new Disruptor[util.Map[String, Object]](new MessageEventFactory, ringBufferSize, new MessageThreadFactory, ProducerType.MULTI, new BlockingWaitStrategy)
  4.  
  5. disruptor.handleEventsWith(new MessageEventHandler(batchSize, this))
  6. disruptor.setDefaultExceptionHandler(new MessageExceptionHandler)
  7. val ringBuffer = disruptor.start
  8. val producer = new MessageEventProducer(ringBuffer)
  9.  
  10. part.foreach { row =>
  11. accm.add(1)
  12. producer.onData(row)
  13.  
  14. }
  15. }
  16. }
  17.  
  18.  
  19. class MessageEventTranslator extends EventTranslatorOneArg[util.Map[String, Object], Row] {
  20. private val LOGGER = LoggerFactory.getLogger(classOf[MessageEventTranslator]);
  21.  
  22. override def translateTo(event: util.Map[String, Object], sequence: Long, arg0: Row): Unit = {
  23.  
  24. try {
  25.  
  26. val fields = arg0.schema.fieldNames
  27. for (field <- fields) {
  28. val value = arg0.getAs[Object](field)
  29. event.put(field, value)
  30. }
  31. } catch {
  32. case e:NullPointerException => {
  33. LOGGER.error("MessageEventTranslator.translateTo{}",e.toString())
  34. }
  35. }
  36.  
  37. }
  38. }
  39.  
  40.  
  41.  
  42. class MessageEventHandler(val batchSize: Integer, val output:OutputFlusher) extends EventHandler[util.Map[String, Object]] {
  43.  
  44. private val LOGGER = LoggerFactory.getLogger(classOf[MessageEventHandler]);
  45.  
  46. val queue:util.Queue[util.Map[String,Object]] = new LinkedBlockingQueue[util.Map[String, Object]] ()
  47.  
  48. override def onEvent(event: util.Map[String, Object], sequence: Long, endOfBatch: Boolean): Unit = {
  49. queue.add(event)
  50. if(queue.size() > batchSize || endOfBatch)
  51. {
  52. output.flush(queue)
  53. }
  54. }
  55. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement