Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- input.foreachPartition { part => {
- val disruptor = new Disruptor[util.Map[String, Object]](new MessageEventFactory, ringBufferSize, new MessageThreadFactory, ProducerType.MULTI, new BlockingWaitStrategy)
- disruptor.handleEventsWith(new MessageEventHandler(batchSize, this))
- disruptor.setDefaultExceptionHandler(new MessageExceptionHandler)
- val ringBuffer = disruptor.start
- val producer = new MessageEventProducer(ringBuffer)
- part.foreach { row =>
- accm.add(1)
- producer.onData(row)
- }
- }
- }
- class MessageEventTranslator extends EventTranslatorOneArg[util.Map[String, Object], Row] {
- private val LOGGER = LoggerFactory.getLogger(classOf[MessageEventTranslator]);
- override def translateTo(event: util.Map[String, Object], sequence: Long, arg0: Row): Unit = {
- try {
- val fields = arg0.schema.fieldNames
- for (field <- fields) {
- val value = arg0.getAs[Object](field)
- event.put(field, value)
- }
- } catch {
- case e:NullPointerException => {
- LOGGER.error("MessageEventTranslator.translateTo{}",e.toString())
- }
- }
- }
- }
- class MessageEventHandler(val batchSize: Integer, val output:OutputFlusher) extends EventHandler[util.Map[String, Object]] {
- private val LOGGER = LoggerFactory.getLogger(classOf[MessageEventHandler]);
- val queue:util.Queue[util.Map[String,Object]] = new LinkedBlockingQueue[util.Map[String, Object]] ()
- override def onEvent(event: util.Map[String, Object], sequence: Long, endOfBatch: Boolean): Unit = {
- queue.add(event)
- if(queue.size() > batchSize || endOfBatch)
- {
- output.flush(queue)
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement