SHARE
TWEET

Untitled

a guest Jun 24th, 2019 77 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. input.foreachPartition { part => {
  2.  
  3.       val disruptor = new Disruptor[util.Map[String, Object]](new MessageEventFactory, ringBufferSize, new MessageThreadFactory, ProducerType.MULTI, new BlockingWaitStrategy)
  4.  
  5.       disruptor.handleEventsWith(new MessageEventHandler(batchSize, this))
  6.       disruptor.setDefaultExceptionHandler(new MessageExceptionHandler)
  7.       val ringBuffer = disruptor.start
  8.       val producer = new MessageEventProducer(ringBuffer)
  9.  
  10.       part.foreach { row =>
  11.         accm.add(1)
  12.         producer.onData(row)
  13.  
  14.       }
  15.     }
  16.     }
  17.  
  18.  
  19. class MessageEventTranslator extends EventTranslatorOneArg[util.Map[String, Object], Row] {
  20.   private val LOGGER = LoggerFactory.getLogger(classOf[MessageEventTranslator]);
  21.  
  22.   override def translateTo(event: util.Map[String, Object], sequence: Long, arg0: Row): Unit = {
  23.  
  24.     try {
  25.  
  26.       val fields = arg0.schema.fieldNames
  27.       for (field <- fields) {
  28.         val value =  arg0.getAs[Object](field)
  29.         event.put(field, value)
  30.       }
  31.     } catch {
  32.       case e:NullPointerException => {
  33.         LOGGER.error("MessageEventTranslator.translateTo{}",e.toString())
  34.       }
  35.     }
  36.  
  37.   }
  38. }
  39.  
  40.  
  41.  
  42. class MessageEventHandler(val batchSize: Integer, val output:OutputFlusher) extends EventHandler[util.Map[String, Object]] {
  43.  
  44.   private val LOGGER = LoggerFactory.getLogger(classOf[MessageEventHandler]);
  45.  
  46.   val queue:util.Queue[util.Map[String,Object]] = new LinkedBlockingQueue[util.Map[String, Object]] ()
  47.  
  48.   override def onEvent(event: util.Map[String, Object], sequence: Long, endOfBatch: Boolean): Unit = {
  49.     queue.add(event)
  50.     if(queue.size() > batchSize || endOfBatch)
  51.     {
  52.         output.flush(queue)
  53.     }
  54.   }
  55. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top