Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package vectos.kafka.akkaimpl.producer
- import akka.Done
- import akka.actor._
- import akka.pattern.{ask, pipe}
- import akka.util.Timeout
- import vectos.kafka.akkaimpl.KafkaConnection
- import vectos.kafka.types.v0._
- import scala.concurrent.duration._
- import scala.concurrent.{ExecutionContext, Future}
- import scala.util.{Failure, Success, Try}
- object ConnectionCoordinator {
- def props(servers: Seq[BrokerAddress]) = Props(new ConnectionCoordinator(servers))
- }
- final case class Produce(topic: String, partition: Int, records: Seq[KafkaRecord]) {
- def toKafkaRequest(timeout: Int): KafkaRequest.Produce = {
- import vectos.kafka.types.v0._
- val messages = records.map(r => MessageSetEntry(0, Message(0, 0, r.key.toVector, r.value.toVector))).toVector
- val topicRequest = ProduceTopicRequest(Some(topic), Vector(ProduceTopicPartitionRequest(partition, messages)))
- KafkaRequest.Produce(1, timeout, Vector(topicRequest))
- }
- }
- class ConnectionCoordinator(bootstrapServers: Seq[BrokerAddress]) extends Actor with ActorLogging with Stash {
- @SuppressWarnings(Array("org.wartremover.warts.Var"))
- private var brokers = Map.empty[Int, Broker]
- @SuppressWarnings(Array("org.wartremover.warts.Var"))
- private var topics = Map.empty[String, TopicStatus]
- private implicit val ec: ExecutionContext = context.dispatcher
- private implicit val timeout: Timeout = Timeout(30.seconds) //TODO: This should be configureable
- override def preStart(): Unit = {
- val bootstraps = bootstrapServers.map(initializeBrokerConnection)
- context.become(bootstraping(bootstraps))
- }
- private def bootstraping(connections: Seq[ActorRef]): Receive = {
- connections.foreach(_ ! KafkaRequest.Metadata(Vector.empty))
- {
- case _: Produce => stash()
- case Success(metadata: KafkaResponse.Metadata) =>
- updateMetadata(metadata)
- connections.foreach(connection => context.stop(connection)) //TODO: Maybe reuse this connections
- unstashAll()
- context.become(receive)
- }
- }
- @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
- private def forwardToBroker(broker: Broker, produce: Produce): Unit =
- broker.ref
- .ask(produce.toKafkaRequest(30)) //TODO: This should be configureable
- .mapTo[Try[KafkaResponse.Produce]]
- .flatMap {
- case Success(response) =>
- val errors = response.topics.flatMap(_.partitions.map(_.errorCode).filter(_ != KafkaError.NoError))
- if (errors.isEmpty) {
- Future.successful(Done)
- } else {
- Future.failed(new IllegalStateException(s"Errors occurred $errors."))
- }
- case Failure(ex) => Future.failed(ex)
- }
- .pipeTo(sender())
- override def receive: Receive = {
- case p @ Produce(topic, partition, _) if topics.contains(topic) =>
- getBrokerFor(topic, partition) match {
- case Some(broker) =>
- forwardToBroker(broker, p)
- case None =>
- sender() ! Status.Failure(new IllegalStateException(s"Broker does not exist for topic $topic, partition $partition."))
- }
- case Produce(topic, _, _) =>
- stash()
- context.become(fetchingMetadata(topic))
- case Terminated(brokerRef) =>
- markBrokerAsUnavailable(brokerRef)
- }
- private def markBrokerAsUnavailable(brokerRef: ActorRef): Unit = {
- log.error(s"Broker unavailable $brokerRef")
- //TODO: Handle this error
- }
- private def fetchingMetadata(topic: String): Receive = {
- log.info(s"Fetching metadata for $topic.")
- brokers.foreach {
- case (_, broker) =>
- broker.ref ! KafkaRequest.Metadata(Vector(Some(topic)))
- }
- {
- case Success(metadata: KafkaResponse.Metadata) =>
- updateMetadata(metadata)
- unstashAll()
- context.become(receive)
- case Terminated(brokerRef) =>
- markBrokerAsUnavailable(brokerRef)
- case _ => stash()
- }
- }
- @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
- private def updateMetadata(metadata: KafkaResponse.Metadata) = {
- val currentBrokersIds = brokers.keys.toSet
- val newBrokers = metadata.brokers.filterNot(b => currentBrokersIds.contains(b.nodeId))
- brokers = brokers ++ newBrokers.map {
- case m =>
- //TODO: MetadataBrokerResponse`s host should not be an option
- val host = m.host.getOrElse(throw new IllegalStateException("Cannot resolve host"))
- val address = BrokerAddress(host, m.port)
- val connection = initializeBrokerConnection(address)
- context.watch(connection)
- m.nodeId -> Broker(m.nodeId, address, connection)
- }.toMap
- topics = topics ++ metadata.topicMetadata
- .map(t => {
- //TODO: MetadataTopicMetadataResponse`s topic should not be an option
- val name = t.name.getOrElse(throw new IllegalStateException("Cannot resolve topic name"))
- name -> TopicStatus(t.partitionMetaData.map(p => p.id -> p.leader).toMap)
- })
- .filter { case (_, status) => status.partitionToBroker.nonEmpty }
- .toMap
- }
- private def initializeBrokerConnection(address: BrokerAddress) = {
- val props = KafkaConnection.props(KafkaConnection.Settings(address.host, address.port, 100))
- context.actorOf(props)
- }
- private def getBrokerFor(topic: String, partition: Int): Option[Broker] =
- for {
- t <- topics.get(topic)
- p <- t.partitionToBroker.get(partition)
- broker <- brokers.get(p)
- } yield broker
- override def unhandled(message: Any): Unit = {
- super.unhandled(message)
- log.error(s"Unhandled message $message.")
- }
- private case class Broker(id: Int, address: BrokerAddress, ref: ActorRef)
- private case class TopicStatus(partitionToBroker: Map[Int, Int])
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement