Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.elama.bidmanager.broker.stream
- import scala.reflect._
- import scala.language.reflectiveCalls
- import akka.stream.scaladsl.Flow
- import org.slf4j.LoggerFactory
- object LoggingFlow {
- type Campaignable = { def astCampaign: AstCampaign }
- protected val logger = LoggerFactory.getLogger(getClass)
- implicit class LoggingFlow[In: ClassTag, Out <: Campaignable: ClassTag, +Mat](
- flow: Flow[AckableMessage[FlowItem[In]], AckableMessage[FlowItem[Out]], Mat]
- ) {
- val flowName = s"Flow[${classTag[In].runtimeClass.getSimpleName} -> ${classTag[Out].runtimeClass.getSimpleName}]"
- def withLog(
- campaignIds: Set[Long] = Set.empty,
- msgFn: FlowItem[Out] => Any = _.toString
- ): Flow[AckableMessage[FlowItem[In]], AckableMessage[FlowItem[Out]], Mat] = {
- flow map (_.map(
- fi =>
- fi.map { campaignable =>
- val campaignId = campaignable.astCampaign.credentials.campaignId
- if (campaignIds contains campaignId)
- logger.info(s"CONDITIONAL for $campaignId\n$flowName\n${msgFn(fi)}")
- campaignable
- }
- ))
- }
- }
- }
Add Comment
Please, Sign In to add comment