mitrakov

AkkaStreams: Logging flow mapping

Sep 4th, 2019
328
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.12 KB | None | 0 0
  1. package com.elama.bidmanager.broker.stream
  2.  
  3. import scala.reflect._
  4. import scala.language.reflectiveCalls
  5. import akka.stream.scaladsl.Flow
  6. import org.slf4j.LoggerFactory
  7.  
  8. object LoggingFlow {
  9.   type Campaignable = { def astCampaign: AstCampaign }
  10.   protected val logger = LoggerFactory.getLogger(getClass)
  11.  
  12.   implicit class LoggingFlow[In: ClassTag, Out <: Campaignable: ClassTag, +Mat](
  13.     flow: Flow[AckableMessage[FlowItem[In]], AckableMessage[FlowItem[Out]], Mat]
  14.   ) {
  15.     val flowName = s"Flow[${classTag[In].runtimeClass.getSimpleName} -> ${classTag[Out].runtimeClass.getSimpleName}]"
  16.     def withLog(
  17.       campaignIds: Set[Long] = Set.empty,
  18.       msgFn: FlowItem[Out] => Any = _.toString
  19.     ): Flow[AckableMessage[FlowItem[In]], AckableMessage[FlowItem[Out]], Mat] = {
  20.       flow map (_.map(
  21.         fi =>
  22.           fi.map { campaignable =>
  23.             val campaignId = campaignable.astCampaign.credentials.campaignId
  24.             if (campaignIds contains campaignId)
  25.               logger.info(s"CONDITIONAL for $campaignId\n$flowName\n${msgFn(fi)}")
  26.             campaignable
  27.         }
  28.       ))
  29.     }
  30.   }
  31. }
Add Comment
Please, Sign In to add comment