Advertisement
Guest User

Untitled

a guest
Jul 26th, 2016
53
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.59 KB | None | 0 0
  1. package vectos.kafka.akkaimpl.producer
  2.  
  3. import akka.Done
  4. import akka.actor._
  5. import akka.pattern.{ask, pipe}
  6. import akka.util.Timeout
  7. import vectos.kafka.akkaimpl.KafkaConnection
  8. import vectos.kafka.types.v0._
  9.  
  10. import scala.concurrent.duration._
  11. import scala.concurrent.{ExecutionContext, Future}
  12. import scala.util.{Failure, Success, Try}
  13.  
  14. object ConnectionCoordinator {
  15. def props(servers: Seq[BrokerAddress]) = Props(new ConnectionCoordinator(servers))
  16. }
  17.  
  18. final case class Produce(topic: String, partition: Int, records: Seq[KafkaRecord]) {
  19.  
  20. def toKafkaRequest(timeout: Int): KafkaRequest.Produce = {
  21. import vectos.kafka.types.v0._
  22. val messages = records.map(r => MessageSetEntry(0, Message(0, 0, r.key.toVector, r.value.toVector))).toVector
  23. val topicRequest = ProduceTopicRequest(Some(topic), Vector(ProduceTopicPartitionRequest(partition, messages)))
  24.  
  25. KafkaRequest.Produce(1, timeout, Vector(topicRequest))
  26. }
  27.  
  28. }
  29.  
  30. class ConnectionCoordinator(bootstrapServers: Seq[BrokerAddress]) extends Actor with ActorLogging with Stash {
  31. @SuppressWarnings(Array("org.wartremover.warts.Var"))
  32. private var brokers = Map.empty[Int, Broker]
  33. @SuppressWarnings(Array("org.wartremover.warts.Var"))
  34. private var topics = Map.empty[String, TopicStatus]
  35. private implicit val ec: ExecutionContext = context.dispatcher
  36. private implicit val timeout: Timeout = Timeout(30.seconds) //TODO: This should be configureable
  37.  
  38. override def preStart(): Unit = {
  39. val bootstraps = bootstrapServers.map(initializeBrokerConnection)
  40. context.become(bootstraping(bootstraps))
  41. }
  42.  
  43. private def bootstraping(connections: Seq[ActorRef]): Receive = {
  44. connections.foreach(_ ! KafkaRequest.Metadata(Vector.empty))
  45.  
  46. {
  47. case _: Produce => stash()
  48. case Success(metadata: KafkaResponse.Metadata) =>
  49. updateMetadata(metadata)
  50. connections.foreach(connection => context.stop(connection)) //TODO: Maybe reuse this connections
  51. unstashAll()
  52. context.become(receive)
  53. }
  54. }
  55.  
  56. @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
  57. private def forwardToBroker(broker: Broker, produce: Produce): Unit =
  58. broker.ref
  59. .ask(produce.toKafkaRequest(30)) //TODO: This should be configureable
  60. .mapTo[Try[KafkaResponse.Produce]]
  61. .flatMap {
  62. case Success(response) =>
  63. val errors = response.topics.flatMap(_.partitions.map(_.errorCode).filter(_ != KafkaError.NoError))
  64. if (errors.isEmpty) {
  65. Future.successful(Done)
  66. } else {
  67. Future.failed(new IllegalStateException(s"Errors occurred $errors."))
  68. }
  69. case Failure(ex) => Future.failed(ex)
  70. }
  71. .pipeTo(sender())
  72.  
  73. override def receive: Receive = {
  74. case p @ Produce(topic, partition, _) if topics.contains(topic) =>
  75. getBrokerFor(topic, partition) match {
  76. case Some(broker) =>
  77. forwardToBroker(broker, p)
  78. case None =>
  79. sender() ! Status.Failure(new IllegalStateException(s"Broker does not exist for topic $topic, partition $partition."))
  80. }
  81.  
  82. case Produce(topic, _, _) =>
  83. stash()
  84. context.become(fetchingMetadata(topic))
  85.  
  86. case Terminated(brokerRef) =>
  87. markBrokerAsUnavailable(brokerRef)
  88. }
  89.  
  90. private def markBrokerAsUnavailable(brokerRef: ActorRef): Unit = {
  91. log.error(s"Broker unavailable $brokerRef")
  92. //TODO: Handle this error
  93. }
  94.  
  95. private def fetchingMetadata(topic: String): Receive = {
  96. log.info(s"Fetching metadata for $topic.")
  97. brokers.foreach {
  98. case (_, broker) =>
  99. broker.ref ! KafkaRequest.Metadata(Vector(Some(topic)))
  100. }
  101.  
  102. {
  103. case Success(metadata: KafkaResponse.Metadata) =>
  104. updateMetadata(metadata)
  105. unstashAll()
  106. context.become(receive)
  107.  
  108. case Terminated(brokerRef) =>
  109. markBrokerAsUnavailable(brokerRef)
  110.  
  111. case _ => stash()
  112. }
  113. }
  114.  
  115. @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
  116. private def updateMetadata(metadata: KafkaResponse.Metadata) = {
  117. val currentBrokersIds = brokers.keys.toSet
  118. val newBrokers = metadata.brokers.filterNot(b => currentBrokersIds.contains(b.nodeId))
  119.  
  120. brokers = brokers ++ newBrokers.map {
  121. case m =>
  122. //TODO: MetadataBrokerResponse`s host should not be an option
  123. val host = m.host.getOrElse(throw new IllegalStateException("Cannot resolve host"))
  124. val address = BrokerAddress(host, m.port)
  125. val connection = initializeBrokerConnection(address)
  126.  
  127. context.watch(connection)
  128.  
  129. m.nodeId -> Broker(m.nodeId, address, connection)
  130. }.toMap
  131.  
  132. topics = topics ++ metadata.topicMetadata
  133. .map(t => {
  134. //TODO: MetadataTopicMetadataResponse`s topic should not be an option
  135. val name = t.name.getOrElse(throw new IllegalStateException("Cannot resolve topic name"))
  136. name -> TopicStatus(t.partitionMetaData.map(p => p.id -> p.leader).toMap)
  137. })
  138. .filter { case (_, status) => status.partitionToBroker.nonEmpty }
  139. .toMap
  140.  
  141. }
  142.  
  143. private def initializeBrokerConnection(address: BrokerAddress) = {
  144. val props = KafkaConnection.props(KafkaConnection.Settings(address.host, address.port, 100))
  145. context.actorOf(props)
  146. }
  147.  
  148. private def getBrokerFor(topic: String, partition: Int): Option[Broker] =
  149. for {
  150. t <- topics.get(topic)
  151. p <- t.partitionToBroker.get(partition)
  152. broker <- brokers.get(p)
  153. } yield broker
  154.  
  155. override def unhandled(message: Any): Unit = {
  156. super.unhandled(message)
  157. log.error(s"Unhandled message $message.")
  158. }
  159.  
  160. private case class Broker(id: Int, address: BrokerAddress, ref: ActorRef)
  161.  
  162. private case class TopicStatus(partitionToBroker: Map[Int, Int])
  163.  
  164. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement