Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala b/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala
- index ec6b281058..32a523979b 100644
- --- a/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala
- +++ b/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala
- @@ -36,14 +36,14 @@ final case class SeqNo(rawValue: Long) extends Ordered[SeqNo] {
- */
- def inc: SeqNo = new SeqNo(this.rawValue + 1L)
- - override def compare(that: SeqNo) = SeqNo.ord.compare(this, that)
- + override def compare(that: SeqNo): Int = SeqNo.ord.compare(this, that)
- - override def toString = String.valueOf(rawValue)
- + override def toString: String = String.valueOf(rawValue)
- }
- object HasSequenceNumber {
- implicit def seqOrdering[T <: HasSequenceNumber]: Ordering[T] = new Ordering[T] {
- - def compare(x: T, y: T) = x.seq.compare(y.seq)
- + def compare(x: T, y: T): Int = x.seq.compare(y.seq)
- }
- }
- @@ -66,7 +66,7 @@ trait HasSequenceNumber {
- * @param nacks Set of sequence numbers between the last delivered one and cumulativeAck that has been not yet received.
- */
- final case class Ack(cumulativeAck: SeqNo, nacks: Set[SeqNo] = Set.empty) {
- - override def toString = s"ACK[$cumulativeAck, ${nacks.mkString("{", ", ", "}")}]"
- + override def toString: String = s"ACK[$cumulativeAck, ${nacks.mkString("{", ", ", "}")}]"
- }
- class ResendBufferCapacityReachedException(c: Int)
- @@ -125,7 +125,7 @@ final case class AckedSendBuffer[T <: HasSequenceNumber](
- this.copy(nonAcked = this.nonAcked :+ msg, maxSeq = msg.seq)
- }
- - override def toString = s"[$maxSeq ${nonAcked.map(_.seq).mkString("{", ", ", "}")}]"
- + override def toString: String = s"[$maxSeq ${nonAcked.map(_.seq).mkString("{", ", ", "}")}]"
- }
- /**
- @@ -201,5 +201,5 @@ final case class AckedReceiveBuffer[T <: HasSequenceNumber](
- buf = (this.buf union that.buf).filter { _.seq > mergedLastDelivered })
- }
- - override def toString = buf.map { _.seq }.mkString("[", ", ", "]")
- + override def toString: String = buf.map { _.seq }.mkString("[", ", ", "]")
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/AddressUidExtension.scala b/akka-remote/src/main/scala/akka/remote/AddressUidExtension.scala
- index a87bcb584c..a42042f59b 100644
- --- a/akka-remote/src/main/scala/akka/remote/AddressUidExtension.scala
- +++ b/akka-remote/src/main/scala/akka/remote/AddressUidExtension.scala
- @@ -21,7 +21,7 @@ import akka.actor.ExtensionIdProvider
- object AddressUidExtension extends ExtensionId[AddressUidExtension] with ExtensionIdProvider {
- override def get(system: ActorSystem): AddressUidExtension = super.get(system)
- - override def lookup = AddressUidExtension
- + override def lookup: AddressUidExtension.type = AddressUidExtension
- override def createExtension(system: ExtendedActorSystem): AddressUidExtension = new AddressUidExtension(system)
- diff --git a/akka-remote/src/main/scala/akka/remote/BoundAddressesExtension.scala b/akka-remote/src/main/scala/akka/remote/BoundAddressesExtension.scala
- index a572ae8cc6..17d8aa2a27 100644
- --- a/akka-remote/src/main/scala/akka/remote/BoundAddressesExtension.scala
- +++ b/akka-remote/src/main/scala/akka/remote/BoundAddressesExtension.scala
- @@ -17,7 +17,7 @@ import akka.remote.artery.ArteryTransport
- object BoundAddressesExtension extends ExtensionId[BoundAddressesExtension] with ExtensionIdProvider {
- override def get(system: ActorSystem): BoundAddressesExtension = super.get(system)
- - override def lookup = BoundAddressesExtension
- + override def lookup: BoundAddressesExtension.type = BoundAddressesExtension
- override def createExtension(system: ExtendedActorSystem): BoundAddressesExtension =
- new BoundAddressesExtension(system)
- diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala
- index 3c23996ee3..8d41fd615b 100644
- --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala
- +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala
- @@ -31,6 +31,8 @@ import java.util.concurrent.locks.LockSupport
- import scala.concurrent.Future
- import akka.util.OptionVal
- import akka.util.OptionVal
- +import akka.dispatch.MessageDispatcher
- +import java.util.LinkedList
- /**
- * INTERNAL API
- @@ -215,7 +217,7 @@ private[remote] class ReliableDeliverySupervisor(
- val autoResendTimer = context.system.scheduler.schedule(
- settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
- - override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
- + override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy(loggingEnabled = false) {
- case e @ (_: AssociationProblem) ⇒ Escalate
- case NonFatal(e) ⇒
- val causedBy = if (e.getCause == null) "" else s"Caused by: [${e.getCause.getMessage}]"
- @@ -460,7 +462,7 @@ private[remote] abstract class EndpointActor(
- def inbound: Boolean
- - val eventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel)
- + val eventPublisher: EventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel)
- def publishError(reason: Throwable, logLevel: Logging.LogLevel): Unit =
- tryPublish(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound, logLevel))
- @@ -539,34 +541,34 @@ private[remote] class EndpointWriter(
- private val markLog = Logging.withMarker(this)
- val extendedSystem: ExtendedActorSystem = context.system.asInstanceOf[ExtendedActorSystem]
- - val remoteMetrics = RemoteMetricsExtension(extendedSystem)
- - val backoffDispatcher = context.system.dispatchers.lookup("akka.remote.backoff-remote-dispatcher")
- + val remoteMetrics: RemoteMetrics = RemoteMetricsExtension(extendedSystem)
- + val backoffDispatcher: MessageDispatcher = context.system.dispatchers.lookup("akka.remote.backoff-remote-dispatcher")
- var reader: Option[ActorRef] = None
- var handle: Option[AkkaProtocolHandle] = handleOrActive
- - val readerId = Iterator from 0
- + val readerId: Iterator[Int] = Iterator from 0
- def newAckDeadline: Deadline = Deadline.now + settings.SysMsgAckTimeout
- var ackDeadline: Deadline = newAckDeadline
- var lastAck: Option[Ack] = None
- - override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
- + override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy(loggingEnabled = false) {
- case NonFatal(e) ⇒ publishAndThrow(e, Logging.ErrorLevel)
- }
- - val provider = RARP(extendedSystem).provider
- - val msgDispatch = new DefaultMessageDispatcher(extendedSystem, provider, markLog)
- + val provider: RemoteActorRefProvider = RARP(extendedSystem).provider
- + val msgDispatch: DefaultMessageDispatcher = new DefaultMessageDispatcher(extendedSystem, provider, markLog)
- - val inbound = handle.isDefined
- + val inbound: Boolean = handle.isDefined
- var stopReason: DisassociateInfo = AssociationHandle.Unknown
- // Use an internal buffer instead of Stash for efficiency
- // stash/unstashAll is slow when many messages are stashed
- // IMPORTANT: sender is not stored, so sender() and forward must not be used in EndpointWriter
- - val buffer = new java.util.LinkedList[AnyRef]
- - val prioBuffer = new java.util.LinkedList[Send]
- - var largeBufferLogTimestamp = System.nanoTime()
- + val buffer: LinkedList[AnyRef] = new java.util.LinkedList[AnyRef]
- + val prioBuffer: LinkedList[Send] = new java.util.LinkedList[Send]
- + var largeBufferLogTimestamp: Long = System.nanoTime()
- private def publishAndThrow(reason: Throwable, logLevel: Logging.LogLevel): Nothing = {
- reason match {
- @@ -576,7 +578,7 @@ private[remote] class EndpointWriter(
- throw reason
- }
- - val ackIdleTimer = {
- + val ackIdleTimer: Cancellable = {
- val interval = settings.SysMsgAckTimeout / 2
- context.system.scheduler.schedule(interval, interval, self, AckIdleCheckTimer)
- }
- @@ -603,7 +605,7 @@ private[remote] class EndpointWriter(
- eventPublisher.notifyListeners(DisassociatedEvent(localAddress, remoteAddress, inbound))
- }
- - def receive = if (handle.isEmpty) initializing else writing
- + def receive: Actor.Receive = if (handle.isEmpty) initializing else writing
- def initializing: Receive = {
- case s: Send ⇒
- @@ -648,7 +650,7 @@ private[remote] class EndpointWriter(
- }
- var writeCount = 0
- - var maxWriteCount = MaxWriteCount
- + var maxWriteCount: Int = MaxWriteCount
- var adaptiveBackoffNanos = 1000000L // 1 ms
- var fullBackoff = false
- @@ -941,8 +943,8 @@ private[remote] class EndpointReader(
- import EndpointWriter.{ OutboundAck, StopReading, StoppedReading }
- - val provider = RARP(context.system).provider
- - var ackedReceiveBuffer = new AckedReceiveBuffer[Message]
- + val provider: RemoteActorRefProvider = RARP(context.system).provider
- + var ackedReceiveBuffer: AckedReceiveBuffer[Message] = new AckedReceiveBuffer[Message]
- override def preStart(): Unit = {
- receiveBuffers.get(Link(localAddress, remoteAddress)) match {
- diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala
- index 0e22eea364..ece1071e9e 100644
- --- a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala
- +++ b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala
- @@ -39,7 +39,7 @@ object FailureDetector {
- // Abstract class to be able to extend it from Java
- abstract class Clock extends (() ⇒ Long)
- - implicit val defaultClock = new Clock {
- - def apply() = NANOSECONDS.toMillis(System.nanoTime)
- + implicit val defaultClock: Clock = new Clock {
- + def apply(): Long = NANOSECONDS.toMillis(System.nanoTime)
- }
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala b/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala
- index 527bf62a03..3b4c282936 100644
- --- a/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala
- +++ b/akka-remote/src/main/scala/akka/remote/FailureDetectorRegistry.scala
- @@ -84,6 +84,6 @@ private[akka] object FailureDetectorLoader {
- * @param config Configuration that will be passed to the implementation
- * @return
- */
- - def apply(fqcn: String, config: Config)(implicit ctx: ActorContext) = load(fqcn, config, ctx.system)
- + def apply(fqcn: String, config: Config)(implicit ctx: ActorContext): FailureDetector = load(fqcn, config, ctx.system)
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
- index 22f28ae9e8..0d72c0ab15 100644
- --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
- +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
- @@ -535,7 +535,7 @@ private[akka] class RemoteActorRef private[akka] (
- /**
- * Determine if a watch/unwatch message must be handled by the remoteWatcher actor, or sent to this remote ref
- */
- - def isWatchIntercepted(watchee: ActorRef, watcher: ActorRef) =
- + def isWatchIntercepted(watchee: ActorRef, watcher: ActorRef): Boolean =
- if (watchee.path.uid == akka.actor.ActorCell.undefinedUid) {
- provider.log.debug("actorFor is deprecated, and watching a remote ActorRef acquired with actorFor is not reliable: [{}]", watchee.path)
- false // Not managed by the remote watcher, so not reliable to communication failure or remote system crash
- diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala
- index f0f6de8ccb..fba1e864f6 100644
- --- a/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala
- +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala
- @@ -26,9 +26,9 @@ private[akka] object RemoteDeploymentWatcher {
- */
- private[akka] class RemoteDeploymentWatcher extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
- import RemoteDeploymentWatcher._
- - var supervisors = Map.empty[ActorRef, InternalActorRef]
- + var supervisors: Map[ActorRef, InternalActorRef] = Map.empty[ActorRef, InternalActorRef]
- - def receive = {
- + def receive: PartialFunction[Any, Unit] = {
- case WatchRemote(a, supervisor: InternalActorRef) ⇒
- supervisors += (a → supervisor)
- context.watch(a)
- diff --git a/akka-remote/src/main/scala/akka/remote/RemoteMetricsExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteMetricsExtension.scala
- index d00eb2f41a..5361359ccb 100644
- --- a/akka-remote/src/main/scala/akka/remote/RemoteMetricsExtension.scala
- +++ b/akka-remote/src/main/scala/akka/remote/RemoteMetricsExtension.scala
- @@ -22,7 +22,7 @@ import akka.routing.RouterEnvelope
- private[akka] object RemoteMetricsExtension extends ExtensionId[RemoteMetrics] with ExtensionIdProvider {
- override def get(system: ActorSystem): RemoteMetrics = super.get(system)
- - override def lookup = RemoteMetricsExtension
- + override def lookup: RemoteMetricsExtension.type = RemoteMetricsExtension
- override def createExtension(system: ExtendedActorSystem): RemoteMetrics =
- if (system.settings.config.getString("akka.remote.log-frame-size-exceeding").toLowerCase == "off")
- diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala
- index 701cbeadd2..fb69311ad9 100644
- --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala
- +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala
- @@ -19,7 +19,7 @@ final class RemoteSettings(val config: Config) {
- import config._
- import scala.collection.JavaConverters._
- - val Artery = ArterySettings(getConfig("akka.remote.artery"))
- + val Artery: ArterySettings = ArterySettings(getConfig("akka.remote.artery"))
- val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages")
- diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala
- index fe90d93281..5f18ef35a4 100644
- --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala
- +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala
- @@ -12,6 +12,7 @@ import akka.remote.artery.ArteryMessage
- import scala.collection.mutable
- import scala.concurrent.duration._
- +import scala.collection.mutable.{ HashMap, MultiMap }
- /**
- * INTERNAL API
- @@ -92,27 +93,27 @@ private[akka] class RemoteWatcher(
- import RemoteWatcher._
- import context.dispatcher
- - def scheduler = context.system.scheduler
- + def scheduler: Scheduler = context.system.scheduler
- val remoteProvider: RemoteActorRefProvider = RARP(context.system).provider
- - val artery = remoteProvider.remoteSettings.Artery.Enabled
- + val artery: Boolean = remoteProvider.remoteSettings.Artery.Enabled
- val (heartBeatMsg, selfHeartbeatRspMsg) =
- if (artery) (ArteryHeartbeat, ArteryHeartbeatRsp(AddressUidExtension(context.system).longAddressUid))
- else (Heartbeat, HeartbeatRsp(AddressUidExtension(context.system).addressUid))
- // actors that this node is watching, map of watchee -> Set(watchers)
- - val watching = new mutable.HashMap[InternalActorRef, mutable.Set[InternalActorRef]]() with mutable.MultiMap[InternalActorRef, InternalActorRef]
- + val watching: HashMap[InternalActorRef, mutable.Set[InternalActorRef]] with MultiMap[InternalActorRef, InternalActorRef] = new mutable.HashMap[InternalActorRef, mutable.Set[InternalActorRef]]() with mutable.MultiMap[InternalActorRef, InternalActorRef]
- // nodes that this node is watching, i.e. expecting heartbeats from these nodes. Map of address -> Set(watchee) on this address
- - val watcheeByNodes = new mutable.HashMap[Address, mutable.Set[InternalActorRef]]() with mutable.MultiMap[Address, InternalActorRef]
- - def watchingNodes = watcheeByNodes.keySet
- + val watcheeByNodes: HashMap[Address, mutable.Set[InternalActorRef]] with MultiMap[Address, InternalActorRef] = new mutable.HashMap[Address, mutable.Set[InternalActorRef]]() with mutable.MultiMap[Address, InternalActorRef]
- + def watchingNodes: collection.Set[Address] = watcheeByNodes.keySet
- var unreachable: Set[Address] = Set.empty
- var addressUids: Map[Address, Long] = Map.empty
- - val heartbeatTask = scheduler.schedule(heartbeatInterval, heartbeatInterval, self, HeartbeatTick)
- - val failureDetectorReaperTask = scheduler.schedule(unreachableReaperInterval, unreachableReaperInterval,
- + val heartbeatTask: Cancellable = scheduler.schedule(heartbeatInterval, heartbeatInterval, self, HeartbeatTick)
- + val failureDetectorReaperTask: Cancellable = scheduler.schedule(unreachableReaperInterval, unreachableReaperInterval,
- self, ReapUnreachableTick)
- override def postStop(): Unit = {
- @@ -121,7 +122,7 @@ private[akka] class RemoteWatcher(
- failureDetectorReaperTask.cancel()
- }
- - def receive = {
- + def receive: PartialFunction[Any, Unit] = {
- case HeartbeatTick ⇒ sendHeartbeat()
- case Heartbeat | ArteryHeartbeat ⇒ receiveHeartbeat()
- case HeartbeatRsp(uid) ⇒ receiveHeartbeatRsp(uid.toLong)
- diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala
- index 17d01b6f9b..1b7b9ba5be 100644
- --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala
- +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala
- @@ -46,9 +46,9 @@ private[akka] final case class RARP(provider: RemoteActorRefProvider) extends Ex
- */
- private[akka] object RARP extends ExtensionId[RARP] with ExtensionIdProvider {
- - override def lookup() = RARP
- + override def lookup(): RARP.type = RARP
- - override def createExtension(system: ExtendedActorSystem) = RARP(system.provider.asInstanceOf[RemoteActorRefProvider])
- + override def createExtension(system: ExtendedActorSystem): RARP = RARP(system.provider.asInstanceOf[RemoteActorRefProvider])
- }
- /**
- @@ -104,11 +104,11 @@ private[remote] object Remoting {
- final case class RegisterTransportActor(props: Props, name: String) extends NoSerializationVerificationNeeded
- private[Remoting] class TransportSupervisor extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
- - override def supervisorStrategy = OneForOneStrategy() {
- + override def supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
- case NonFatal(e) ⇒ Restart
- }
- - def receive = {
- + def receive: PartialFunction[Any, Unit] = {
- case RegisterTransportActor(props, name) ⇒
- sender() ! context.actorOf(
- RARP(context.system).configureDispatcher(props.withDeploy(Deploy.local)),
- @@ -134,14 +134,14 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
- import provider.remoteSettings._
- - val transportSupervisor = system.systemActorOf(
- + val transportSupervisor: ActorRef = system.systemActorOf(
- configureDispatcher(Props[TransportSupervisor]),
- "transports")
- override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote)
- val log: LoggingAdapter = Logging(system.eventStream, getClass.getName)
- - val eventPublisher = new EventPublisher(system, log, RemoteLifecycleEventsLogLevel)
- + val eventPublisher: EventPublisher = new EventPublisher(system, log, RemoteLifecycleEventsLogLevel)
- private def notifyError(msg: String, cause: Throwable): Unit =
- eventPublisher.notifyListeners(RemotingErrorEvent(new RemoteTransportException(msg, cause)))
- @@ -262,11 +262,11 @@ private[remote] object EndpointManager {
- case object ShutdownAndFlush extends RemotingCommand
- final case class Send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef, seqOpt: Option[SeqNo] = None)
- extends RemotingCommand with HasSequenceNumber {
- - override def toString = s"Remote message $senderOption -> $recipient"
- + override def toString: String = s"Remote message $senderOption -> $recipient"
- // This MUST throw an exception to indicate that we attempted to put a nonsequenced message in one of the
- // acknowledged delivery buffers
- - def seq = seqOpt.get
- + def seq: SeqNo = seqOpt.get
- }
- final case class Quarantine(remoteAddress: Address, uid: Option[Int]) extends RemotingCommand
- final case class ManagementCommand(cmd: Any) extends RemotingCommand
- @@ -445,15 +445,15 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
- import EndpointManager._
- import context.dispatcher
- - val settings = new RemoteSettings(conf)
- - val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem]
- + val settings: RemoteSettings = new RemoteSettings(conf)
- + val extendedSystem: ExtendedActorSystem = context.system.asInstanceOf[ExtendedActorSystem]
- val endpointId: Iterator[Int] = Iterator from 0
- - val eventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel)
- + val eventPublisher: EventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel)
- // Mapping between addresses and endpoint actors. If passive connections are turned off, incoming connections
- // will be not part of this map!
- - val endpoints = new EndpointRegistry
- + val endpoints: EndpointRegistry = new EndpointRegistry
- // Mapping between transports and the local addresses they listen to
- var transportMapping: Map[Address, AkkaProtocolTransport] = Map()
- @@ -462,8 +462,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
- val pruneTimerCancellable: Cancellable =
- context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
- - var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]()
- - var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]()
- + var pendingReadHandoffs: Map[ActorRef, AkkaProtocolHandle] = Map[ActorRef, AkkaProtocolHandle]()
- + var stashedInbound: Map[ActorRef, Vector[InboundAssociation]] = Map[ActorRef, Vector[InboundAssociation]]()
- def handleStashedInbound(endpoint: ActorRef, writerIsIdle: Boolean) {
- val stashed = stashedInbound.getOrElse(endpoint, Vector.empty)
- @@ -479,7 +479,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
- case None ⇒ body
- }
- - override val supervisorStrategy = {
- + override val supervisorStrategy: OneForOneStrategy = {
- def hopeless(e: HopelessAssociation): SupervisorStrategy.Directive = e match {
- case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason) ⇒
- log.error(reason, "Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.",
- @@ -548,9 +548,9 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
- }
- // Structure for saving reliable delivery state across restarts of Endpoints
- - val receiveBuffers = new ConcurrentHashMap[Link, ResendState]()
- + val receiveBuffers: ConcurrentHashMap[Link, ResendState] = new ConcurrentHashMap[Link, ResendState]()
- - def receive = {
- + def receive: PartialFunction[Any, Unit] = {
- case Listen(addressesPromise) ⇒
- listens map { ListensResult(addressesPromise, _) } recover {
- case NonFatal(e) ⇒ ListensFailure(addressesPromise, e)
- diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala
- index cbfe7f6ec2..c0e9a80663 100644
- --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala
- +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala
- @@ -85,7 +85,7 @@ final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleE
- object QuarantinedEvent extends AbstractFunction2[Address, Int, QuarantinedEvent] {
- @deprecated("Use long uid apply", "2.4.x")
- - def apply(address: Address, uid: Int) = new QuarantinedEvent(address, uid)
- + def apply(address: Address, uid: Int): QuarantinedEvent = new QuarantinedEvent(address, uid)
- }
- @SerialVersionUID(1L)
- @@ -106,7 +106,7 @@ final case class QuarantinedEvent(address: Address, longUid: Long) extends Remot
- def uid: Int = longUid.toInt
- @deprecated("Use long uid copy method", "2.4.x")
- - def copy(address: Address = address, uid: Int = uid) = new QuarantinedEvent(address, uid)
- + def copy(address: Address = address, uid: Int = uid): QuarantinedEvent = new QuarantinedEvent(address, uid)
- }
- @SerialVersionUID(1L)
- diff --git a/akka-remote/src/main/scala/akka/remote/UniqueAddress.scala b/akka-remote/src/main/scala/akka/remote/UniqueAddress.scala
- index 22164cd87b..0824a5de44 100644
- --- a/akka-remote/src/main/scala/akka/remote/UniqueAddress.scala
- +++ b/akka-remote/src/main/scala/akka/remote/UniqueAddress.scala
- @@ -7,7 +7,7 @@ import akka.actor.Address
- @SerialVersionUID(1L)
- final case class UniqueAddress(address: Address, uid: Long) extends Ordered[UniqueAddress] {
- - override def hashCode = java.lang.Long.hashCode(uid)
- + override def hashCode: Int = java.lang.Long.hashCode(uid)
- def compare(that: UniqueAddress): Int = {
- val result = Address.addressOrdering.compare(this.address, that.address)
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala
- index 3c71a516f2..d75250d41a 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala
- @@ -42,7 +42,7 @@ private[remote] object AeronSink {
- private final class OfferTask(pub: Publication, var buffer: UnsafeBuffer, var msgSize: Int, onOfferSuccess: AsyncCallback[Unit],
- giveUpAfter: Duration, onGiveUp: AsyncCallback[Unit], onPublicationClosed: AsyncCallback[Unit])
- extends (() ⇒ Boolean) {
- - val giveUpAfterNanos = giveUpAfter match {
- + val giveUpAfterNanos: Long = giveUpAfter match {
- case f: FiniteDuration ⇒ f.toNanos
- case _ ⇒ -1L
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala
- index 8011d0f2e2..2b49ceed91 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala
- @@ -45,7 +45,7 @@ private[remote] object AeronSource {
- private[remote] var messageReceived: EnvelopeBuffer = null // private to avoid scalac warning about exposing EnvelopeBuffer
- - val fragmentsHandler = new Fragments(data ⇒ messageReceived = data, pool)
- + val fragmentsHandler: Fragments = new Fragments(data ⇒ messageReceived = data, pool)
- }
- class Fragments(onMessage: EnvelopeBuffer ⇒ Unit, pool: EnvelopeBufferPool) extends FragmentAssembler(new FragmentHandler {
- @@ -84,7 +84,7 @@ private[remote] class AeronSource(
- val out: Outlet[EnvelopeBuffer] = Outlet("AeronSource")
- override val shape: SourceShape[EnvelopeBuffer] = SourceShape(out)
- - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
- + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic with OutHandler with ResourceLifecycle with StageLogging, GraphStageLogic with OutHandler with ResourceLifecycle with StageLogging) = {
- val logic = new GraphStageLogic(shape) with OutHandler with ResourceLifecycle with StageLogging {
- private val sub = aeron.addSubscription(channel, streamId)
- @@ -106,7 +106,7 @@ private[remote] class AeronSource(
- freeSessionBuffers()
- }
- - override protected def logSource = classOf[AeronSource]
- + override protected def logSource: Class[AeronSource] = classOf[AeronSource]
- override def preStart(): Unit = {
- flightRecorder.loFreq(AeronSource_Started, channelMetadata)
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala
- index c8fbf60798..61dee5550e 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala
- @@ -18,6 +18,7 @@ import scala.concurrent.duration._
- import java.net.InetAddress
- import java.nio.file.Path
- import java.util.concurrent.TimeUnit
- +import java.time
- /** INTERNAL API */
- private[akka] final class ArterySettings private (config: Config) {
- @@ -34,7 +35,7 @@ private[akka] final class ArterySettings private (config: Config) {
- val Enabled: Boolean = getBoolean("enabled")
- object Canonical {
- - val config = getConfig("canonical")
- + val config: Config = getConfig("canonical")
- import config._
- val Port: Int = getInt("port").requiring(port ⇒
- @@ -43,7 +44,7 @@ private[akka] final class ArterySettings private (config: Config) {
- }
- object Bind {
- - val config = getConfig("bind")
- + val config: Config = getConfig("bind")
- import config._
- val Port: Int = getString("port") match {
- @@ -55,10 +56,10 @@ private[akka] final class ArterySettings private (config: Config) {
- case other ⇒ other
- }
- - val BindTimeout = getDuration("bind-timeout").requiring(!_.isNegative, "bind-timeout can not be negative")
- + val BindTimeout: time.Duration = getDuration("bind-timeout").requiring(!_.isNegative, "bind-timeout can not be negative")
- }
- - val LargeMessageDestinations =
- + val LargeMessageDestinations: WildcardIndex[NotUsed] =
- config.getStringList("large-message-destinations").asScala.foldLeft(WildcardIndex[NotUsed]()) { (tree, entry) ⇒
- val segments = entry.split('/').tail
- tree.insert(segments, NotUsed)
- @@ -72,33 +73,33 @@ private[akka] final class ArterySettings private (config: Config) {
- val LogAeronCounters: Boolean = config.getBoolean("log-aeron-counters")
- object Advanced {
- - val config = getConfig("advanced")
- + val config: Config = getConfig("advanced")
- import config._
- val TestMode: Boolean = getBoolean("test-mode")
- - val Dispatcher = getString("use-dispatcher")
- - val ControlStreamDispatcher = getString("use-control-stream-dispatcher")
- - val MaterializerSettings = {
- + val Dispatcher: String = getString("use-dispatcher")
- + val ControlStreamDispatcher: String = getString("use-control-stream-dispatcher")
- + val MaterializerSettings: ActorMaterializerSettings = {
- val settings = ActorMaterializerSettings(config.getConfig("materializer"))
- if (Dispatcher.isEmpty) settings
- else settings.withDispatcher(Dispatcher)
- }
- - val ControlStreamMaterializerSettings = {
- + val ControlStreamMaterializerSettings: ActorMaterializerSettings = {
- val settings = ActorMaterializerSettings(config.getConfig("materializer"))
- if (ControlStreamDispatcher.isEmpty) settings
- else settings.withDispatcher(ControlStreamDispatcher)
- }
- - val EmbeddedMediaDriver = getBoolean("embedded-media-driver")
- - val AeronDirectoryName = getString("aeron-dir") requiring (dir ⇒
- + val EmbeddedMediaDriver: Boolean = getBoolean("embedded-media-driver")
- + val AeronDirectoryName: String = getString("aeron-dir") requiring (dir ⇒
- EmbeddedMediaDriver || dir.nonEmpty, "aeron-dir must be defined when using external media driver")
- - val DeleteAeronDirectory = getBoolean("delete-aeron-dir")
- + val DeleteAeronDirectory: Boolean = getBoolean("delete-aeron-dir")
- val IdleCpuLevel: Int = getInt("idle-cpu-level").requiring(level ⇒
- 1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10")
- - val OutboundLanes = getInt("outbound-lanes").requiring(n ⇒
- + val OutboundLanes: Int = getInt("outbound-lanes").requiring(n ⇒
- n > 0, "outbound-lanes must be greater than zero")
- - val InboundLanes = getInt("inbound-lanes").requiring(n ⇒
- + val InboundLanes: Int = getInt("inbound-lanes").requiring(n ⇒
- n > 0, "inbound-lanes must be greater than zero")
- val SysMsgBufferSize: Int = getInt("system-message-buffer-size").requiring(
- _ > 0, "system-message-buffer-size must be more than zero")
- @@ -108,44 +109,44 @@ private[akka] final class ArterySettings private (config: Config) {
- _ > 0, "outbound-control-queue-size must be more than zero")
- val OutboundLargeMessageQueueSize: Int = getInt("outbound-large-message-queue-size").requiring(
- _ > 0, "outbound-large-message-queue-size must be more than zero")
- - val SystemMessageResendInterval = config.getMillisDuration("system-message-resend-interval").requiring(interval ⇒
- + val SystemMessageResendInterval: FiniteDuration = config.getMillisDuration("system-message-resend-interval").requiring(interval ⇒
- interval > Duration.Zero, "system-message-resend-interval must be more than zero")
- - val HandshakeTimeout = config.getMillisDuration("handshake-timeout").requiring(interval ⇒
- + val HandshakeTimeout: FiniteDuration = config.getMillisDuration("handshake-timeout").requiring(interval ⇒
- interval > Duration.Zero, "handshake-timeout must be more than zero")
- - val HandshakeRetryInterval = config.getMillisDuration("handshake-retry-interval").requiring(interval ⇒
- + val HandshakeRetryInterval: FiniteDuration = config.getMillisDuration("handshake-retry-interval").requiring(interval ⇒
- interval > Duration.Zero, "handshake-retry-interval must be more than zero")
- - val InjectHandshakeInterval = config.getMillisDuration("inject-handshake-interval").requiring(interval ⇒
- + val InjectHandshakeInterval: FiniteDuration = config.getMillisDuration("inject-handshake-interval").requiring(interval ⇒
- interval > Duration.Zero, "inject-handshake-interval must be more than zero")
- - val GiveUpMessageAfter = config.getMillisDuration("give-up-message-after").requiring(interval ⇒
- + val GiveUpMessageAfter: FiniteDuration = config.getMillisDuration("give-up-message-after").requiring(interval ⇒
- interval > Duration.Zero, "give-up-message-after must be more than zero")
- - val GiveUpSystemMessageAfter = config.getMillisDuration("give-up-system-message-after").requiring(interval ⇒
- + val GiveUpSystemMessageAfter: FiniteDuration = config.getMillisDuration("give-up-system-message-after").requiring(interval ⇒
- interval > Duration.Zero, "give-up-system-message-after must be more than zero")
- - val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout").requiring(interval ⇒
- + val ShutdownFlushTimeout: FiniteDuration = config.getMillisDuration("shutdown-flush-timeout").requiring(interval ⇒
- interval > Duration.Zero, "shutdown-flush-timeout must be more than zero")
- - val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout").requiring(interval ⇒
- + val InboundRestartTimeout: FiniteDuration = config.getMillisDuration("inbound-restart-timeout").requiring(interval ⇒
- interval > Duration.Zero, "inbound-restart-timeout must be more than zero")
- - val InboundMaxRestarts = getInt("inbound-max-restarts")
- - val OutboundRestartTimeout = config.getMillisDuration("outbound-restart-timeout").requiring(interval ⇒
- + val InboundMaxRestarts: Int = getInt("inbound-max-restarts")
- + val OutboundRestartTimeout: FiniteDuration = config.getMillisDuration("outbound-restart-timeout").requiring(interval ⇒
- interval > Duration.Zero, "outbound-restart-timeout must be more than zero")
- - val OutboundMaxRestarts = getInt("outbound-max-restarts")
- - val StopQuarantinedAfterIdle = config.getMillisDuration("stop-quarantined-after-idle").requiring(interval ⇒
- + val OutboundMaxRestarts: Int = getInt("outbound-max-restarts")
- + val StopQuarantinedAfterIdle: FiniteDuration = config.getMillisDuration("stop-quarantined-after-idle").requiring(interval ⇒
- interval > Duration.Zero, "stop-quarantined-after-idle must be more than zero")
- - val ClientLivenessTimeout = config.getMillisDuration("client-liveness-timeout").requiring(interval ⇒
- + val ClientLivenessTimeout: FiniteDuration = config.getMillisDuration("client-liveness-timeout").requiring(interval ⇒
- interval > Duration.Zero, "client-liveness-timeout must be more than zero")
- - val ImageLivenessTimeout = config.getMillisDuration("image-liveness-timeout").requiring(interval ⇒
- + val ImageLivenessTimeout: FiniteDuration = config.getMillisDuration("image-liveness-timeout").requiring(interval ⇒
- interval > Duration.Zero, "image-liveness-timeout must be more than zero")
- require(ImageLivenessTimeout < HandshakeTimeout, "image-liveness-timeout must be less than handshake-timeout")
- - val DriverTimeout = config.getMillisDuration("driver-timeout").requiring(interval ⇒
- + val DriverTimeout: FiniteDuration = config.getMillisDuration("driver-timeout").requiring(interval ⇒
- interval > Duration.Zero, "driver-timeout must be more than zero")
- val FlightRecorderEnabled: Boolean = getBoolean("flight-recorder.enabled")
- val FlightRecorderDestination: String = getString("flight-recorder.destination")
- - val Compression = new Compression(getConfig("compression"))
- + val Compression: Compression = new Compression(getConfig("compression"))
- final val MaximumFrameSize: Int = math.min(getBytes("maximum-frame-size"), Int.MaxValue).toInt
- .requiring(_ >= 32 * 1024, "maximum-frame-size must be greater than or equal to 32 KiB")
- final val BufferPoolSize: Int = getInt("buffer-pool-size")
- .requiring(_ > 0, "buffer-pool-size must be greater than 0")
- - final val InboundHubBufferSize = BufferPoolSize / 2
- + final val InboundHubBufferSize: Int = BufferPoolSize / 2
- final val MaximumLargeFrameSize: Int = math.min(getBytes("maximum-large-frame-size"), Int.MaxValue).toInt
- .requiring(_ >= 32 * 1024, "maximum-large-frame-size must be greater than or equal to 32 KiB")
- final val LargeBufferPoolSize: Int = getInt("large-buffer-pool-size")
- @@ -155,7 +156,7 @@ private[akka] final class ArterySettings private (config: Config) {
- /** INTERNAL API */
- private[akka] object ArterySettings {
- - def apply(config: Config) = new ArterySettings(config)
- + def apply(config: Config): ArterySettings = new ArterySettings(config)
- /** INTERNAL API */
- private[remote] final class Compression private[ArterySettings] (config: Config) {
- @@ -164,18 +165,18 @@ private[akka] object ArterySettings {
- private[akka] final val Enabled = ActorRefs.Max > 0 || Manifests.Max > 0
- object ActorRefs {
- - val config = getConfig("actor-refs")
- + val config: Config = getConfig("actor-refs")
- import config._
- - val AdvertisementInterval = config.getMillisDuration("advertisement-interval")
- - val Max = getInt("max")
- + val AdvertisementInterval: FiniteDuration = config.getMillisDuration("advertisement-interval")
- + val Max: Int = getInt("max")
- }
- object Manifests {
- - val config = getConfig("manifests")
- + val config: Config = getConfig("manifests")
- import config._
- - val AdvertisementInterval = config.getMillisDuration("advertisement-interval")
- - val Max = getInt("max")
- + val AdvertisementInterval: FiniteDuration = config.getMillisDuration("advertisement-interval")
- + val Max: Int = getInt("max")
- }
- }
- object Compression {
- @@ -183,7 +184,7 @@ private[akka] object ArterySettings {
- final val Debug = false // unlocks additional very verbose debug logging of compression events (to stdout)
- }
- - def getHostname(key: String, config: Config) = config.getString(key) match {
- + def getHostname(key: String, config: Config): String = config.getString(key) match {
- case "<getHostAddress>" ⇒ InetAddress.getLocalHost.getHostAddress
- case "<getHostName>" ⇒ InetAddress.getLocalHost.getHostName
- case other ⇒ other
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala
- index b7482c46d6..bdc402c1d9 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala
- @@ -235,9 +235,9 @@ private[remote] object FlushOnShutdown {
- private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration,
- inboundContext: InboundContext, associations: Set[Association]) extends Actor {
- - var remaining = Map.empty[UniqueAddress, Int]
- + var remaining: Map[UniqueAddress, Int] = Map.empty[UniqueAddress, Int]
- - val timeoutTask = context.system.scheduler.scheduleOnce(timeout, self, FlushOnShutdown.Timeout)(context.dispatcher)
- + val timeoutTask: Cancellable = context.system.scheduler.scheduleOnce(timeout, self, FlushOnShutdown.Timeout)(context.dispatcher)
- override def preStart(): Unit = {
- try {
- @@ -265,7 +265,7 @@ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDurati
- done.trySuccess(Done)
- }
- - def receive = {
- + def receive: PartialFunction[Any, Unit] = {
- case ActorSystemTerminatingAck(from) ⇒
- // Just treat unexpected acks as systems from which zero acks are expected
- val acksRemaining = remaining.getOrElse(from, 0)
- @@ -972,7 +972,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
- }
- // InboundContext
- - override def sendControl(to: Address, message: ControlMessage) =
- + override def sendControl(to: Address, message: ControlMessage): Unit =
- try {
- association(to).sendControl(message)
- } catch {
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala
- index 99ded3a261..48d27fc644 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala
- @@ -119,7 +119,7 @@ private[remote] class Association(
- private val log = Logging(transport.system, getClass.getName)
- private val flightRecorder = transport.createFlightRecorderEventSink(synchr = true)
- - override def settings = transport.settings
- + override def settings: ArterySettings = transport.settings
- private def advancedSettings = transport.settings.Advanced
- private val restartCounter = new RestartCounter(advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout)
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala
- index 5957ab684f..e4751d3309 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala
- @@ -37,7 +37,7 @@ private[remote] class EnvelopeBufferPool(maximumPayload: Int, maximumBuffers: In
- }
- }
- - def release(buffer: EnvelopeBuffer) =
- + def release(buffer: EnvelopeBuffer): Unit =
- if (buffer.byteBuffer.isDirect && !availableBuffers.offer(buffer)) buffer.tryCleanDirectByteBuffer()
- }
- @@ -45,7 +45,7 @@ private[remote] class EnvelopeBufferPool(maximumPayload: Int, maximumBuffers: In
- /** INTERNAL API */
- private[remote] final class ByteFlag(val mask: Byte) extends AnyVal {
- def isEnabled(byteFlags: Byte): Boolean = (byteFlags.toInt & mask) != 0
- - override def toString = s"ByteFlag(${ByteFlag.binaryLeftPad(mask)})"
- + override def toString: String = s"ByteFlag(${ByteFlag.binaryLeftPad(mask)})"
- }
- /**
- * INTERNAL API
- @@ -67,7 +67,7 @@ private[remote] object EnvelopeBuffer {
- val TagValueMask = 0x0000FFFF
- // Flags (1 byte allocated for them)
- - val MetadataPresentFlag = new ByteFlag(0x1)
- + val MetadataPresentFlag: ByteFlag = new ByteFlag(0x1)
- val VersionOffset = 0 // Byte
- val FlagsOffset = 1 // Byte
- @@ -231,17 +231,17 @@ private[remote] final class HeaderBuilderImpl(
- _remoteInstruments = OptionVal.None
- }
- - override def setVersion(v: Byte) = _version = v
- - override def version = _version
- + override def setVersion(v: Byte): Unit = _version = v
- + override def version: Byte = _version
- - override def setFlags(v: Byte) = _flags = v
- - override def flags = _flags
- + override def setFlags(v: Byte): Unit = _flags = v
- + override def flags: Byte = _flags
- override def flag(byteFlag: ByteFlag): Boolean = (_flags.toInt & byteFlag.mask) != 0
- override def setFlag(byteFlag: ByteFlag, value: Boolean): Unit =
- if (value) _flags = (flags | byteFlag.mask).toByte
- else _flags = (flags & ~byteFlag.mask).toByte
- - override def setUid(uid: Long) = _uid = uid
- + override def setUid(uid: Long): Unit = _uid = uid
- override def uid: Long = _uid
- override def inboundActorRefCompressionTableVersion: Byte = _inboundActorRefCompressionTableVersion
- @@ -333,7 +333,7 @@ private[remote] final class HeaderBuilderImpl(
- _remoteInstruments = OptionVal(instruments)
- }
- - override def toString =
- + override def toString: String =
- "HeaderBuilderImpl(" +
- "version:" + version + ", " +
- "flags:" + ByteFlag.binaryLeftPad(flags) + ", " +
- @@ -353,7 +353,7 @@ private[remote] final class HeaderBuilderImpl(
- */
- private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
- import EnvelopeBuffer._
- - val aeronBuffer = new UnsafeBuffer(byteBuffer)
- + val aeronBuffer: UnsafeBuffer = new UnsafeBuffer(byteBuffer)
- private var literalChars = new Array[Char](64)
- private var literalBytes = new Array[Byte](64)
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala
- index e019d20c36..0ff0eaef6e 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala
- @@ -85,7 +85,7 @@ private[remote] class Encoder(
- done.success(Done)
- }
- - override protected def logSource = classOf[Encoder]
- + override protected def logSource: Class[Encoder] = classOf[Encoder]
- private var debugLogSendEnabled = false
- @@ -356,7 +356,7 @@ private[remote] class Decoder(
- val logic = new TimerGraphStageLogic(shape) with InboundCompressionAccessImpl with InHandler with OutHandler with StageLogging {
- import Decoder.RetryResolveRemoteDeployedRecipient
- - override val compressions = inboundCompressions
- + override val compressions: InboundCompressions = inboundCompressions
- private val localAddress = inboundContext.localAddress.address
- private val headerBuilder = HeaderBuilder.in(compressions)
- @@ -374,7 +374,7 @@ private[remote] class Decoder(
- private var tickTimestamp = System.nanoTime()
- private var tickMessageCount = 0L
- - override protected def logSource = classOf[Decoder]
- + override protected def logSource: Class[Decoder] = classOf[Decoder]
- override def preStart(): Unit = {
- schedulePeriodically(Tick, 1.seconds)
- @@ -611,7 +611,7 @@ private[remote] class Deserializer(
- private val instruments: RemoteInstruments = RemoteInstruments(system)
- private val serialization = SerializationExtension(system)
- - override protected def logSource = classOf[Deserializer]
- + override protected def logSource: Class[Deserializer] = classOf[Deserializer]
- override def onPush(): Unit = {
- val envelope = grab(in)
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala
- index ead5d99372..b742174ce4 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala
- @@ -91,7 +91,7 @@ private[remote] class InboundControlJunction
- val out: Outlet[InboundEnvelope] = Outlet("InboundControlJunction.out")
- override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
- - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
- + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic with InHandler with OutHandler with ControlMessageSubject, GraphStageLogic with InHandler with OutHandler with ControlMessageSubject) = {
- val stoppedPromise = Promise[Done]()
- val logic = new GraphStageLogic(shape) with InHandler with OutHandler with ControlMessageSubject {
- @@ -161,12 +161,12 @@ private[remote] class OutboundControlJunction(
- val out: Outlet[OutboundEnvelope] = Outlet("OutboundControlJunction.out")
- override val shape: FlowShape[OutboundEnvelope, OutboundEnvelope] = FlowShape(in, out)
- - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
- + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic with InHandler with OutHandler with StageLogging with OutboundControlIngress, GraphStageLogic with InHandler with OutHandler with StageLogging with OutboundControlIngress) = {
- val logic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging with OutboundControlIngress {
- import OutboundControlJunction._
- - val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage)
- + val sendControlMessageCallback: AsyncCallback[ControlMessage] = getAsyncCallback[ControlMessage](internalSendControlMessage)
- private val maxControlMessageBufferSize: Int = outboundContext.settings.Advanced.OutboundControlQueueSize
- private val buffer = new ArrayDeque[OutboundEnvelope]
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala
- index 823e877ff7..c0461a62d4 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala
- @@ -224,39 +224,39 @@ private[remote] object FlightRecorder {
- FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ)
- }
- - val Alignment = 64 * 1024 // Windows is picky about mapped section alignments
- + val Alignment: Int = 64 * 1024 // Windows is picky about mapped section alignments
- val MagicString = 0x31524641 // "AFR1", little-endian
- - val GlobalSectionSize = BitUtil.align(24, Alignment)
- + val GlobalSectionSize: Int = BitUtil.align(24, Alignment)
- val StartTimeStampOffset = 4
- val LogHeaderSize = 16
- val SnapshotCount = 4
- - val SnapshotMask = SnapshotCount - 1
- + val SnapshotMask: Int = SnapshotCount - 1
- // TODO: Dummy values right now, format is under construction
- val AlertRecordSize = 128
- val LoFreqRecordSize = 128
- val HiFreqBatchSize = 62
- - val HiFreqRecordSize = 16 * (HiFreqBatchSize + 2) // (batched events + header)
- + val HiFreqRecordSize: Int = 16 * (HiFreqBatchSize + 2) // (batched events + header)
- val AlertWindow = 256
- val LoFreqWindow = 256
- val HiFreqWindow = 256 // This is counted in batches !
- - val AlertLogSize = BitUtil.align(LogHeaderSize + (AlertWindow * AlertRecordSize), Alignment)
- - val LoFreqLogSize = BitUtil.align(LogHeaderSize + (LoFreqWindow * LoFreqRecordSize), Alignment)
- - val HiFreqLogSize = BitUtil.align(LogHeaderSize + (HiFreqWindow * HiFreqRecordSize), Alignment)
- + val AlertLogSize: Int = BitUtil.align(LogHeaderSize + (AlertWindow * AlertRecordSize), Alignment)
- + val LoFreqLogSize: Int = BitUtil.align(LogHeaderSize + (LoFreqWindow * LoFreqRecordSize), Alignment)
- + val HiFreqLogSize: Int = BitUtil.align(LogHeaderSize + (HiFreqWindow * HiFreqRecordSize), Alignment)
- - val AlertSectionSize = AlertLogSize * SnapshotCount
- - val LoFreqSectionSize = LoFreqLogSize * SnapshotCount
- - val HiFreqSectionSize = HiFreqLogSize * SnapshotCount
- + val AlertSectionSize: Int = AlertLogSize * SnapshotCount
- + val LoFreqSectionSize: Int = LoFreqLogSize * SnapshotCount
- + val HiFreqSectionSize: Int = HiFreqLogSize * SnapshotCount
- - val AlertSectionOffset = GlobalSectionSize
- - val LoFreqSectionOffset = GlobalSectionSize + AlertSectionSize
- - val HiFreqSectionOffset = GlobalSectionSize + AlertSectionSize + LoFreqSectionSize
- + val AlertSectionOffset: Int = GlobalSectionSize
- + val LoFreqSectionOffset: Int = GlobalSectionSize + AlertSectionSize
- + val HiFreqSectionOffset: Int = GlobalSectionSize + AlertSectionSize + LoFreqSectionSize
- - val TotalSize = GlobalSectionSize + AlertSectionSize + LoFreqSectionSize + HiFreqSectionSize
- + val TotalSize: Int = GlobalSectionSize + AlertSectionSize + LoFreqSectionSize + HiFreqSectionSize
- val HiFreqEntryCountFieldOffset = 16
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala
- index c1d585fd3c..7a6a6d2787 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala
- @@ -7,7 +7,7 @@ private[remote] object FlightRecorderEvents {
- // Note: Remember to update dictionary when adding new events!
- - val NoMetaData = Array.empty[Byte]
- + val NoMetaData: Array[Byte] = Array.empty[Byte]
- // Top level remoting events
- val Transport_MediaDriverStarted = 0
- @@ -52,7 +52,7 @@ private[remote] object FlightRecorderEvents {
- val Compression_Inbound_RunClassManifestAdvertisement = 95
- // Used for presentation of the entries in the flight recorder
- - lazy val eventDictionary = Map(
- + lazy val eventDictionary: Map[Long, String] = Map(
- Transport_MediaDriverStarted → "Transport: Media driver started",
- Transport_AeronStarted → "Transport: Aeron started",
- Transport_AeronErrorLogStarted → "Transport: Aeron error log started",
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderReader.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderReader.scala
- index d3efcdeb53..dd7d6cbb16 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderReader.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderReader.scala
- @@ -16,7 +16,7 @@ import scala.collection.{ SortedSet, immutable }
- */
- object FlightRecorderDump extends App {
- require(args.size == 1, "Usage: FlightRecorderDump afr-file")
- - val path = FileSystems.getDefault.getPath(args(0))
- + val path: Path = FileSystems.getDefault.getPath(args(0))
- FlightRecorderReader.dumpToStdout(path)
- }
- @@ -50,7 +50,7 @@ private[akka] object FlightRecorderReader {
- """.stripMargin
- }
- - val AlertSectionParameters = SectionParameters(
- + val AlertSectionParameters: SectionParameters = SectionParameters(
- offset = AlertSectionOffset,
- sectionSize = AlertSectionSize,
- logSize = AlertLogSize,
- @@ -58,7 +58,7 @@ private[akka] object FlightRecorderReader {
- recordSize = AlertRecordSize,
- entriesPerRecord = 1)
- - val LoFreqSectionParameters = SectionParameters(
- + val LoFreqSectionParameters: SectionParameters = SectionParameters(
- offset = LoFreqSectionOffset,
- sectionSize = LoFreqSectionSize,
- logSize = LoFreqLogSize,
- @@ -66,7 +66,7 @@ private[akka] object FlightRecorderReader {
- recordSize = LoFreqRecordSize,
- entriesPerRecord = 1)
- - val HiFreqSectionParameters = SectionParameters(
- + val HiFreqSectionParameters: SectionParameters = SectionParameters(
- offset = HiFreqSectionOffset,
- sectionSize = HiFreqSectionSize,
- logSize = HiFreqLogSize,
- @@ -138,8 +138,8 @@ private[akka] final class FlightRecorderReader(fileChannel: FileChannel) {
- def richEntries: Iterator[RichEntry] = {
- new Iterator[RichEntry] {
- - var recordOffset = offset + RollingEventLogSection.RecordsOffset
- - var recordsLeft = math.min(head, sectionParameters.window)
- + var recordOffset: Long = offset + RollingEventLogSection.RecordsOffset
- + var recordsLeft: Long = math.min(head, sectionParameters.window)
- override def hasNext: Boolean = recordsLeft > 0
- @@ -164,9 +164,9 @@ private[akka] final class FlightRecorderReader(fileChannel: FileChannel) {
- def compactEntries: Iterator[CompactEntry] = {
- new Iterator[CompactEntry] {
- - var recordOffset = offset + RollingEventLogSection.RecordsOffset
- - var entryOffset = recordOffset + RollingEventLogSection.CommitEntrySize
- - var recordsLeft = math.min(head, sectionParameters.window)
- + var recordOffset: Long = offset + RollingEventLogSection.RecordsOffset
- + var entryOffset: Long = recordOffset + RollingEventLogSection.CommitEntrySize
- + var recordsLeft: Long = math.min(head, sectionParameters.window)
- var entriesLeft = -1L
- var dirty = false
- var timeStamp: Instant = _
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala
- index 8e67a07dc4..07ef29945b 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala
- @@ -59,7 +59,7 @@ private[remote] trait InboundEnvelope {
- * INTERNAL API
- */
- private[remote] object ReusableInboundEnvelope {
- - def createObjectPool(capacity: Int) = new ObjectPool[ReusableInboundEnvelope](
- + def createObjectPool(capacity: Int): ObjectPool[ReusableInboundEnvelope] = new ObjectPool[ReusableInboundEnvelope](
- capacity,
- create = () ⇒ new ReusableInboundEnvelope, clear = inEnvelope ⇒ inEnvelope.asInstanceOf[ReusableInboundEnvelope].clear())
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala
- index 18501a504b..aae70082fd 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala
- @@ -25,7 +25,7 @@ private[remote] class InboundQuarantineCheck(inboundContext: InboundContext) ext
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
- - override protected def logSource = classOf[InboundQuarantineCheck]
- + override protected def logSource: Class[InboundQuarantineCheck] = classOf[InboundQuarantineCheck]
- // InHandler
- override def onPush(): Unit = {
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala b/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala
- index 06b8c4c272..e099f43f0f 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala
- @@ -130,7 +130,7 @@ private[akka] abstract class LruBoundedCache[K: ClassTag, V <: AnyRef: ClassTag]
- private def probeDistanceOf(slot: Int): Int = probeDistanceOf(idealSlot = hashes(slot) & Mask, actualSlot = slot)
- // Protected for exposing it to unit tests
- - protected def probeDistanceOf(idealSlot: Int, actualSlot: Int) = ((actualSlot - idealSlot) + capacity) & Mask
- + protected def probeDistanceOf(idealSlot: Int, actualSlot: Int): Int = ((actualSlot - idealSlot) + capacity) & Mask
- @tailrec private def move(position: Int, k: K, h: Int, value: V, elemEpoch: Int, probeDistance: Int): Unit = {
- if (values(position) eq null) {
- @@ -181,7 +181,7 @@ private[akka] abstract class LruBoundedCache[K: ClassTag, V <: AnyRef: ClassTag]
- protected def isCacheable(v: V): Boolean
- - override def toString =
- + override def toString: String =
- s"LruBoundedCache(" +
- s" values = ${values.mkString("[", ",", "]")}," +
- s" hashes = ${hashes.map(_ & Mask).mkString("[", ",", "]")}," +
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/OutboundEnvelope.scala b/akka-remote/src/main/scala/akka/remote/artery/OutboundEnvelope.scala
- index b95b623c83..a97b319827 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/OutboundEnvelope.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/OutboundEnvelope.scala
- @@ -38,7 +38,7 @@ private[remote] trait OutboundEnvelope {
- * INTERNAL API
- */
- private[remote] object ReusableOutboundEnvelope {
- - def createObjectPool(capacity: Int) = new ObjectPool[ReusableOutboundEnvelope](
- + def createObjectPool(capacity: Int): ObjectPool[ReusableOutboundEnvelope] = new ObjectPool[ReusableOutboundEnvelope](
- capacity,
- create = () ⇒ new ReusableOutboundEnvelope, clear = outEnvelope ⇒ outEnvelope.clear())
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala b/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala
- index e2097cd78a..8175ed0538 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/RemoteInstrument.scala
- @@ -271,7 +271,7 @@ private[remote] final class RemoteInstruments(
- def isEmpty: Boolean = instruments.isEmpty
- def nonEmpty: Boolean = instruments.nonEmpty
- - def timeSerialization = serializationTimingEnabled
- + def timeSerialization: Boolean = serializationTimingEnabled
- }
- /** INTERNAL API */
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala
- index d87b24db63..d77dd0d649 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala
- @@ -271,9 +271,9 @@ private[remote] class SystemMessageAcker(inboundContext: InboundContext) extends
- new GraphStageLogic(shape) with InHandler with OutHandler {
- // TODO we might need have to prune old unused entries
- - var sequenceNumbers = Map.empty[UniqueAddress, Long]
- + var sequenceNumbers: Map[UniqueAddress, Long] = Map.empty[UniqueAddress, Long]
- - def localAddress = inboundContext.localAddress
- + def localAddress: UniqueAddress = inboundContext.localAddress
- // InHandler
- override def onPush(): Unit = {
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala
- index bff26033c1..9556e26d8f 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala
- @@ -113,7 +113,7 @@ private[remote] class OutboundTestStage(outboundContext: OutboundContext, state:
- val out: Outlet[OutboundEnvelope] = Outlet("OutboundTestStage.out")
- override val shape: FlowShape[OutboundEnvelope, OutboundEnvelope] = FlowShape(in, out)
- - override def createLogic(inheritedAttributes: Attributes) = {
- + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
- if (enabled) {
- new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
- @@ -154,7 +154,7 @@ private[remote] class InboundTestStage(inboundContext: InboundContext, state: Sh
- val out: Outlet[InboundEnvelope] = Outlet("InboundTestStage.out")
- override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
- - override def createLogic(inheritedAttributes: Attributes) = {
- + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
- if (enabled) {
- new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala
- index 60c7079803..97bb66b385 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala
- @@ -62,5 +62,5 @@ private[remote] object CompressionTable {
- def compareBy2ndValue[T]: Comparator[Tuple2[T, Int]] = CompareBy2ndValue.asInstanceOf[Comparator[(T, Int)]]
- private[this] val _empty = new CompressionTable[Any](0, 0, Map.empty)
- - def empty[T] = _empty.asInstanceOf[CompressionTable[T]]
- + def empty[T]: CompressionTable[T] = _empty.asInstanceOf[CompressionTable[T]]
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala
- index bd671259ab..61f8f389c0 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala
- @@ -24,7 +24,7 @@ private[remote] final case class DecompressionTable[T](originUid: Long, version:
- CompressionTable(originUid, version, Map(table.zipWithIndex: _*))
- /** Writes complete table as String (heavy operation) */
- - override def toString =
- + override def toString: String =
- s"DecompressionTable($originUid, $version, " +
- s"Map(${table.zipWithIndex.map({ case (t, i) ⇒ s"$i -> $t" }).mkString(",")}))"
- }
- @@ -35,6 +35,6 @@ private[remote] object DecompressionTable {
- val DisabledVersion: Byte = -1
- private[this] val _empty = DecompressionTable(0, 0, Array.empty)
- - def empty[T] = _empty.asInstanceOf[DecompressionTable[T]]
- - def disabled[T] = empty[T].copy(version = DisabledVersion)
- + def empty[T]: DecompressionTable[T] = _empty.asInstanceOf[DecompressionTable[T]]
- + def disabled[T]: DecompressionTable[T] = empty[T].copy(version = DisabledVersion)
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala
- index c935435e14..3a6699cbf7 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala
- @@ -207,7 +207,7 @@ private[remote] object InboundCompression {
- final val KeepOldTablesNumber = 3 // TODO could be configurable
- object Tables {
- - def empty[T] = Tables(
- + def empty[T]: Tables[T] = Tables(
- oldTables = List(DecompressionTable.disabled[T]),
- activeTable = DecompressionTable.empty[T],
- nextTable = DecompressionTable.empty[T].copy(version = 1),
- @@ -451,7 +451,7 @@ private[remote] abstract class InboundCompression[T >: Null](
- CompressionTable(originUid, nextTableVersion, mappings)
- }
- - override def toString =
- + override def toString: String =
- s"""${Logging.simpleName(getClass)}(countMinSketch: $cms, heavyHitters: $heavyHitters)"""
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala
- index 5492e81dfd..29d920f933 100644
- --- a/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala
- +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala
- @@ -27,8 +27,8 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl
- require((max & (max - 1)) == 0, "Maximum numbers of heavy hitters should be in form of 2^k for any natural k")
- - val capacity = max * 2
- - val mask = capacity - 1
- + val capacity: Int = max * 2
- + val mask: Int = capacity - 1
- import TopHeavyHitters._
- @@ -120,7 +120,7 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl
- }
- }
- - def toDebugString =
- + def toDebugString: String =
- s"""TopHeavyHitters(
- | max: $max,
- | lowestHitterIdx: $lowestHitterIndex (weight: $lowestHitterWeight)
- @@ -389,7 +389,7 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl
- heap(0)
- }
- - override def toString =
- + override def toString: String =
- s"${getClass.getSimpleName}(max:$max)"
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/security/provider/AkkaProvider.scala b/akka-remote/src/main/scala/akka/remote/security/provider/AkkaProvider.scala
- index 7fd441cb87..cd429b26a4 100644
- --- a/akka-remote/src/main/scala/akka/remote/security/provider/AkkaProvider.scala
- +++ b/akka-remote/src/main/scala/akka/remote/security/provider/AkkaProvider.scala
- @@ -10,7 +10,7 @@ import java.security.{ PrivilegedAction, AccessController, Provider }
- */
- object AkkaProvider extends Provider("Akka", 1.0, "Akka provider 1.0 that implements a secure AES random number generator") {
- AccessController.doPrivileged(new PrivilegedAction[this.type] {
- - def run = {
- + def run: Null = {
- //SecureRandom
- put("SecureRandom.AES128CounterSecureRNG", classOf[AES128CounterSecureRNG].getName)
- put("SecureRandom.AES256CounterSecureRNG", classOf[AES256CounterSecureRNG].getName)
- diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala b/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala
- index 04326bbfb2..891d0338f1 100644
- --- a/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala
- +++ b/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala
- @@ -22,7 +22,7 @@ private[akka] object ActorRefResolveThreadLocalCache
- override def get(system: ActorSystem): ActorRefResolveThreadLocalCache = super.get(system)
- - override def lookup = ActorRefResolveThreadLocalCache
- + override def lookup: ActorRefResolveThreadLocalCache.type = ActorRefResolveThreadLocalCache
- override def createExtension(system: ExtendedActorSystem): ActorRefResolveThreadLocalCache =
- new ActorRefResolveThreadLocalCache(system)
- diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala
- index 9fd78211b7..6846a33634 100644
- --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala
- +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala
- @@ -15,6 +15,7 @@ import scala.concurrent.{ ExecutionContext, Promise, Future }
- import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
- import akka.remote.transport.AssociationHandle.DisassociateInfo
- import akka.actor.DeadLetterSuppression
- +import akka.remote.RemoteSettings
- trait TransportAdapterProvider {
- /**
- @@ -24,7 +25,7 @@ trait TransportAdapterProvider {
- }
- class TransportAdapters(system: ExtendedActorSystem) extends Extension {
- - val settings = RARP(system).provider.remoteSettings
- + val settings: RemoteSettings = RARP(system).provider.remoteSettings
- private val adaptersTable: Map[String, TransportAdapterProvider] = for ((name, fqn) ← settings.Adapters) yield {
- name → system.dynamicAccess.createInstanceFor[TransportAdapterProvider](fqn, immutable.Seq.empty).recover({
- @@ -40,7 +41,7 @@ class TransportAdapters(system: ExtendedActorSystem) extends Extension {
- object TransportAdaptersExtension extends ExtensionId[TransportAdapters] with ExtensionIdProvider {
- override def get(system: ActorSystem): TransportAdapters = super.get(system)
- - override def lookup = TransportAdaptersExtension
- + override def lookup: TransportAdaptersExtension.type = TransportAdaptersExtension
- override def createExtension(system: ExtendedActorSystem): TransportAdapters =
- new TransportAdapters(system)
- }
- @@ -131,8 +132,8 @@ abstract class AbstractTransportAdapterHandle(
- wrappedHandle,
- addedSchemeIdentifier)
- - override val localAddress = augmentScheme(originalLocalAddress)
- - override val remoteAddress = augmentScheme(originalRemoteAddress)
- + override val localAddress: Address = augmentScheme(originalLocalAddress)
- + override val remoteAddress: Address = augmentScheme(originalRemoteAddress)
- }
- @@ -147,7 +148,7 @@ object ActorTransportAdapter {
- final case class DisassociateUnderlying(info: DisassociateInfo = AssociationHandle.Unknown)
- extends TransportOperation with DeadLetterSuppression
- - implicit val AskTimeout = Timeout(5.seconds)
- + implicit val AskTimeout: Timeout = Timeout(5.seconds)
- }
- abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem)
- diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala
- index 210dd70855..0951c4d654 100644
- --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala
- +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala
- @@ -42,7 +42,7 @@ private[remote] object AkkaPduCodec {
- senderOption: OptionVal[ActorRef],
- seqOption: Option[SeqNo]) extends HasSequenceNumber {
- - def reliableDeliveryEnabled = seqOption.isDefined
- + def reliableDeliveryEnabled: Boolean = seqOption.isDefined
- override def seq: SeqNo = seqOption.get
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala
- index 6157e4e0be..c167fca550 100644
- --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala
- +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala
- @@ -26,6 +26,7 @@ import scala.concurrent.{ Future, Promise }
- import scala.util.control.NonFatal
- import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
- import akka.event.{ LogMarker, Logging }
- +import java.util.concurrent.atomic.AtomicInteger
- @SerialVersionUID(1L)
- class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace {
- @@ -63,7 +64,7 @@ private[remote] class AkkaProtocolSettings(config: Config) {
- private[remote] object AkkaProtocolTransport { //Couldn't these go into the Remoting Extension/ RemoteSettings instead?
- val AkkaScheme: String = "akka"
- val AkkaOverhead: Int = 0 //Don't know yet
- - val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0)
- + val UniqueId: AtomicInteger = new java.util.concurrent.atomic.AtomicInteger(0)
- final case class AssociateUnderlyingRefuseUid(
- remoteAddress: Address,
- @@ -116,8 +117,8 @@ private[remote] class AkkaProtocolTransport(
- }
- override val maximumOverhead: Int = AkkaProtocolTransport.AkkaOverhead
- - protected def managerName = s"akkaprotocolmanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}"
- - protected def managerProps = {
- + protected def managerName: String = s"akkaprotocolmanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}"
- + protected def managerProps: Props = {
- val wt = wrappedTransport
- val s = settings
- Props(classOf[AkkaProtocolManager], wt, s).withDeploy(Deploy.local)
- @@ -131,7 +132,7 @@ private[transport] class AkkaProtocolManager(
- // The AkkaProtocolTransport does not handle the recovery of associations, this task is implemented in the
- // remoting itself. Hence the strategy Stop.
- - override val supervisorStrategy = OneForOneStrategy() {
- + override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
- case NonFatal(_) ⇒ Stop
- }
- @@ -321,7 +322,7 @@ private[transport] class ProtocolStateActor(
- this(InboundUnassociated(associationListener, wrappedHandle), handshakeInfo, refuseUid = None, settings, codec, failureDetector)
- }
- - val localAddress = localHandshakeInfo.origin
- + val localAddress: Address = localHandshakeInfo.origin
- val handshakeTimerKey = "handshake-timer"
- initialData match {
- diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala
- index 01f2096228..4b5f63532e 100644
- --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala
- +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala
- @@ -44,7 +44,7 @@ private[remote] object FailureInjectorTransportAdapter {
- /**
- * Java API: get the singleton instance
- */
- - def getInstance = this
- + def getInstance: PassThru.type = this
- }
- @SerialVersionUID(1L)
- final case class Drop(outboundDropP: Double, inboundDropP: Double) extends GremlinMode
- @@ -64,7 +64,7 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor
- private[transport] val addressChaosTable = new ConcurrentHashMap[Address, GremlinMode]()
- @volatile private var allMode: GremlinMode = PassThru
- - override val addedSchemeIdentifier = FailureInjectorSchemeIdentifier
- + override val addedSchemeIdentifier: String = FailureInjectorSchemeIdentifier
- protected def maximumOverhead = 0
- override def managementCommand(cmd: Any): Future[Boolean] = cmd match {
- diff --git a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala
- index 87840a8957..7095647dd7 100644
- --- a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala
- +++ b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala
- @@ -92,21 +92,21 @@ class TestTransport(
- /**
- * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the listen() method.
- */
- - val listenBehavior = new SwitchableLoggedBehavior[Unit, (Address, Promise[AssociationEventListener])](
- + val listenBehavior: SwitchableLoggedBehavior[Unit, (Address, Promise[AssociationEventListener])] = new SwitchableLoggedBehavior[Unit, (Address, Promise[AssociationEventListener])](
- (_) ⇒ defaultListen,
- (_) ⇒ registry.logActivity(ListenAttempt(localAddress)))
- /**
- * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the associate() method.
- */
- - val associateBehavior = new SwitchableLoggedBehavior[Address, AssociationHandle](
- + val associateBehavior: SwitchableLoggedBehavior[Address, AssociationHandle] = new SwitchableLoggedBehavior[Address, AssociationHandle](
- defaultAssociate _,
- (remoteAddress) ⇒ registry.logActivity(AssociateAttempt(localAddress, remoteAddress)))
- /**
- * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the shutdown() method.
- */
- - val shutdownBehavior = new SwitchableLoggedBehavior[Unit, Boolean](
- + val shutdownBehavior: SwitchableLoggedBehavior[Unit, Boolean] = new SwitchableLoggedBehavior[Unit, Boolean](
- (_) ⇒ defaultShutdown,
- (_) ⇒ registry.logActivity(ShutdownAttempt(localAddress)))
- @@ -139,7 +139,7 @@ class TestTransport(
- * altering the behavior via pushDelayed will turn write to a blocking operation -- use of pushDelayed therefore
- * is not recommended.
- */
- - val writeBehavior = new SwitchableLoggedBehavior[(TestAssociationHandle, ByteString), Boolean](
- + val writeBehavior: SwitchableLoggedBehavior[(TestAssociationHandle, ByteString), Boolean] = new SwitchableLoggedBehavior[(TestAssociationHandle, ByteString), Boolean](
- defaultBehavior = {
- defaultWrite _
- },
- @@ -152,7 +152,7 @@ class TestTransport(
- * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the disassociate() method on handles. All
- * handle calls pass through this call.
- */
- - val disassociateBehavior = new SwitchableLoggedBehavior[TestAssociationHandle, Unit](
- + val disassociateBehavior: SwitchableLoggedBehavior[TestAssociationHandle, Unit] = new SwitchableLoggedBehavior[TestAssociationHandle, Unit](
- defaultBehavior = {
- defaultDisassociate _
- },
- @@ -467,5 +467,5 @@ final case class TestAssociationHandle(
- * Key used in [[akka.remote.transport.TestTransport.AssociationRegistry]] to identify associations. Contains an
- * ordered pair of addresses, where the first element of the pair is always the initiator of the association.
- */
- - val key = if (!inbound) (localAddress, remoteAddress) else (remoteAddress, localAddress)
- + val key: (Address, Address) = if (!inbound) (localAddress, remoteAddress) else (remoteAddress, localAddress)
- }
- diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala
- index 7697a26a50..94c7016587 100644
- --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala
- +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala
- @@ -26,6 +26,7 @@ import akka.dispatch.sysmsg.{ Unwatch, Watch }
- import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
- import akka.event.LoggingAdapter
- import akka.remote.RARP
- +import java.util.concurrent.atomic.AtomicInteger
- class ThrottlerProvider extends TransportAdapterProvider {
- @@ -36,7 +37,7 @@ class ThrottlerProvider extends TransportAdapterProvider {
- object ThrottlerTransportAdapter {
- val SchemeIdentifier = "trttl"
- - val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0)
- + val UniqueId: AtomicInteger = new java.util.concurrent.atomic.AtomicInteger(0)
- sealed trait Direction {
- def includes(other: Direction): Boolean
- @@ -54,7 +55,7 @@ object ThrottlerTransportAdapter {
- /**
- * Java API: get the singleton instance
- */
- - def getInstance = this
- + def getInstance: Send.type = this
- }
- @SerialVersionUID(1L)
- @@ -67,7 +68,7 @@ object ThrottlerTransportAdapter {
- /**
- * Java API: get the singleton instance
- */
- - def getInstance = this
- + def getInstance: Receive.type = this
- }
- @SerialVersionUID(1L)
- @@ -77,7 +78,7 @@ object ThrottlerTransportAdapter {
- /**
- * Java API: get the singleton instance
- */
- - def getInstance = this
- + def getInstance: Both.type = this
- }
- }
- @@ -89,7 +90,7 @@ object ThrottlerTransportAdapter {
- /**
- * Java API: get the singleton instance
- */
- - def getInstance = this
- + def getInstance: SetThrottleAck.type = this
- }
- sealed trait ThrottleMode extends NoSerializationVerificationNeeded {
- @@ -131,7 +132,7 @@ object ThrottlerTransportAdapter {
- /**
- * Java API: get the singleton instance
- */
- - def getInstance = this
- + def getInstance: Unthrottled.type = this
- }
- @@ -143,7 +144,7 @@ object ThrottlerTransportAdapter {
- /**
- * Java API: get the singleton instance
- */
- - def getInstance = this
- + def getInstance: Blackhole.type = this
- }
- /**
- @@ -163,13 +164,13 @@ object ThrottlerTransportAdapter {
- /**
- * Java API: get the singleton instance
- */
- - def getInstance = this
- + def getInstance: ForceDisassociateAck.type = this
- }
- }
- class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedActorSystem) extends ActorTransportAdapter(_wrappedTransport, _system) {
- - override protected def addedSchemeIdentifier = SchemeIdentifier
- + override protected def addedSchemeIdentifier: String = SchemeIdentifier
- override protected def maximumOverhead = 0
- protected def managerName: String = s"throttlermanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}"
- protected def managerProps: Props = {
- @@ -373,7 +374,7 @@ private[transport] class ThrottledAssociation(
- import context.dispatcher
- var inboundThrottleMode: ThrottleMode = _
- - var throttledMessages = Queue.empty[ByteString]
- + var throttledMessages: Queue[ByteString] = Queue.empty[ByteString]
- var upstreamListener: HandleEventListener = _
- override def postStop(): Unit = originalHandle.disassociate("the owning ThrottledAssociation stopped", log)
- diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala
- index 3ac8d5ec9b..f0a0a9e2cd 100644
- --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala
- +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala
- @@ -27,20 +27,20 @@ import java.nio.file.Paths
- private[akka] class SSLSettings(config: Config) {
- import config.{ getBoolean, getString, getStringList }
- - val SSLKeyStore = getString("key-store")
- - val SSLTrustStore = getString("trust-store")
- - val SSLKeyStorePassword = getString("key-store-password")
- - val SSLKeyPassword = getString("key-password")
- + val SSLKeyStore: String = getString("key-store")
- + val SSLTrustStore: String = getString("trust-store")
- + val SSLKeyStorePassword: String = getString("key-store-password")
- + val SSLKeyPassword: String = getString("key-password")
- - val SSLTrustStorePassword = getString("trust-store-password")
- + val SSLTrustStorePassword: String = getString("trust-store-password")
- - val SSLEnabledAlgorithms = immutableSeq(getStringList("enabled-algorithms")).to[Set]
- + val SSLEnabledAlgorithms: Set[String] = immutableSeq(getStringList("enabled-algorithms")).to[Set]
- - val SSLProtocol = getString("protocol")
- + val SSLProtocol: String = getString("protocol")
- - val SSLRandomNumberGenerator = getString("random-number-generator")
- + val SSLRandomNumberGenerator: String = getString("random-number-generator")
- - val SSLRequireMutualAuthentication = getBoolean("require-mutual-authentication")
- + val SSLRequireMutualAuthentication: Boolean = getBoolean("require-mutual-authentication")
- private val sslContext = new AtomicReference[SSLContext]()
- @tailrec final def getOrCreateContext(log: MarkerLoggingAdapter): SSLContext =
- diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala
- index bcd25edfe3..b905d0a686 100644
- --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala
- +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala
- @@ -243,7 +243,7 @@ object ActorMaterializerSettings {
- outputBurstLimit: Int,
- fuzzingMode: Boolean,
- autoFusing: Boolean,
- - maxFixedBufferSize: Int) =
- + maxFixedBufferSize: Int): ActorMaterializerSettings =
- new ActorMaterializerSettings(
- initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
- outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize)
- @@ -285,7 +285,7 @@ object ActorMaterializerSettings {
- outputBurstLimit: Int,
- fuzzingMode: Boolean,
- autoFusing: Boolean,
- - maxFixedBufferSize: Int) =
- + maxFixedBufferSize: Int): ActorMaterializerSettings =
- new ActorMaterializerSettings(
- initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
- outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize)
- @@ -538,10 +538,10 @@ object IOSettings {
- new IOSettings(tcpWriteBufferSize)
- /** Java API */
- - def create(config: Config) = apply(config)
- + def create(config: Config): IOSettings = apply(config)
- /** Java API */
- - def create(system: ActorSystem) = apply(system)
- + def create(system: ActorSystem): IOSettings = apply(system)
- /** Java API */
- def create(tcpWriteBufferSize: Int): IOSettings =
- @@ -560,7 +560,7 @@ final class IOSettings private (val tcpWriteBufferSize: Int) {
- case _ ⇒ false
- }
- - override def toString =
- + override def toString: String =
- s"""IoSettings(${tcpWriteBufferSize})"""
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala
- index c9d57c04ae..3135c3fd3f 100644
- --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala
- +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala
- @@ -331,7 +331,7 @@ object Attributes {
- * Passing in null as any of the arguments sets the level to its default value, which is:
- * `Debug` for `onElement` and `onFinish`, and `Error` for `onFailure`.
- */
- - def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) =
- + def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel): Attributes =
- logLevels(
- onElement = Option(onElement).getOrElse(Logging.DebugLevel),
- onFinish = Option(onFinish).getOrElse(Logging.DebugLevel),
- @@ -343,7 +343,7 @@ object Attributes {
- *
- * See [[Attributes.createLogLevels]] for Java API
- */
- - def logLevels(onElement: Logging.LogLevel = Logging.DebugLevel, onFinish: Logging.LogLevel = Logging.DebugLevel, onFailure: Logging.LogLevel = Logging.ErrorLevel) =
- + def logLevels(onElement: Logging.LogLevel = Logging.DebugLevel, onFinish: Logging.LogLevel = Logging.DebugLevel, onFailure: Logging.LogLevel = Logging.ErrorLevel): Attributes =
- Attributes(LogLevels(onElement, onFinish, onFailure))
- /**
- @@ -398,7 +398,7 @@ object ActorAttributes {
- * Passing in null as any of the arguments sets the level to its default value, which is:
- * `Debug` for `onElement` and `onFinish`, and `Error` for `onFailure`.
- */
- - def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) =
- + def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel): Attributes =
- logLevels(
- onElement = Option(onElement).getOrElse(Logging.DebugLevel),
- onFinish = Option(onFinish).getOrElse(Logging.DebugLevel),
- @@ -410,7 +410,7 @@ object ActorAttributes {
- *
- * See [[Attributes.createLogLevels]] for Java API
- */
- - def logLevels(onElement: Logging.LogLevel = Logging.DebugLevel, onFinish: Logging.LogLevel = Logging.DebugLevel, onFailure: Logging.LogLevel = Logging.ErrorLevel) =
- + def logLevels(onElement: Logging.LogLevel = Logging.DebugLevel, onFinish: Logging.LogLevel = Logging.DebugLevel, onFailure: Logging.LogLevel = Logging.ErrorLevel): Attributes =
- Attributes(LogLevels(onElement, onFinish, onFailure))
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/FlowMonitor.scala b/akka-stream/src/main/scala/akka/stream/FlowMonitor.scala
- index 73dc0943f9..1481d1290b 100644
- --- a/akka-stream/src/main/scala/akka/stream/FlowMonitor.scala
- +++ b/akka-stream/src/main/scala/akka/stream/FlowMonitor.scala
- @@ -59,5 +59,5 @@ object FlowMonitorState {
- /**
- * Java API
- */
- - def finished[U]() = Finished
- + def finished[U](): Finished.type = Finished
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/Graph.scala b/akka-stream/src/main/scala/akka/stream/Graph.scala
- index 1d1f5f32d6..57c45d410d 100644
- --- a/akka-stream/src/main/scala/akka/stream/Graph.scala
- +++ b/akka-stream/src/main/scala/akka/stream/Graph.scala
- @@ -42,7 +42,7 @@ trait Graph[+S <: Shape, +M] {
- *
- * @param dispatcher Run the graph on this dispatcher
- */
- - def async(dispatcher: String) =
- + def async(dispatcher: String): Graph[S, M] =
- addAttributes(
- Attributes.asyncBoundary and ActorAttributes.dispatcher(dispatcher)
- )
- @@ -53,7 +53,7 @@ trait Graph[+S <: Shape, +M] {
- * @param dispatcher Run the graph on this dispatcher
- * @param inputBufferSize Set the input buffer to this size for the graph
- */
- - def async(dispatcher: String, inputBufferSize: Int) =
- + def async(dispatcher: String, inputBufferSize: Int): Graph[S, M] =
- addAttributes(
- Attributes.asyncBoundary and ActorAttributes.dispatcher(dispatcher)
- and Attributes.inputBuffer(inputBufferSize, inputBufferSize)
- diff --git a/akka-stream/src/main/scala/akka/stream/KillSwitch.scala b/akka-stream/src/main/scala/akka/stream/KillSwitch.scala
- index 4006f5a111..bbf1975265 100644
- --- a/akka-stream/src/main/scala/akka/stream/KillSwitch.scala
- +++ b/akka-stream/src/main/scala/akka/stream/KillSwitch.scala
- @@ -69,11 +69,11 @@ object KillSwitches {
- }
- private[stream] object UniqueKillSwitchStage extends GraphStageWithMaterializedValue[FlowShape[Any, Any], UniqueKillSwitch] {
- - override val initialAttributes = Attributes.name("breaker")
- - override val shape = FlowShape(Inlet[Any]("KillSwitch.in"), Outlet[Any]("KillSwitch.out"))
- + override val initialAttributes: Attributes = Attributes.name("breaker")
- + override val shape: FlowShape[Any, Any] = FlowShape(Inlet[Any]("KillSwitch.in"), Outlet[Any]("KillSwitch.out"))
- override def toString: String = "UniqueKillSwitchFlow"
- - override def createLogicAndMaterializedValue(attr: Attributes) = {
- + override def createLogicAndMaterializedValue(attr: Attributes): (KillableGraphStageLogic with InHandler with OutHandler, UniqueKillSwitch) = {
- val promise = Promise[Done]
- val switch = new UniqueKillSwitch(promise)
- @@ -90,13 +90,13 @@ object KillSwitches {
- private[stream] object UniqueBidiKillSwitchStage extends GraphStageWithMaterializedValue[BidiShape[Any, Any, Any, Any], UniqueKillSwitch] {
- - override val initialAttributes = Attributes.name("breaker")
- - override val shape = BidiShape(
- + override val initialAttributes: Attributes = Attributes.name("breaker")
- + override val shape: BidiShape[Any, Any, Any, Any] = BidiShape(
- Inlet[Any]("KillSwitchBidi.in1"), Outlet[Any]("KillSwitchBidi.out1"),
- Inlet[Any]("KillSwitchBidi.in2"), Outlet[Any]("KillSwitchBidi.out2"))
- override def toString: String = "UniqueKillSwitchBidi"
- - override def createLogicAndMaterializedValue(attr: Attributes) = {
- + override def createLogicAndMaterializedValue(attr: Attributes): (KillableGraphStageLogic, UniqueKillSwitch) = {
- val promise = Promise[Done]
- val switch = new UniqueKillSwitch(promise)
- diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala
- index fe2fc8612b..78ce1bb3d3 100644
- --- a/akka-stream/src/main/scala/akka/stream/Shape.scala
- +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala
- @@ -243,7 +243,7 @@ sealed abstract class ClosedShape extends Shape
- object ClosedShape extends ClosedShape {
- override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq
- override val outlets: immutable.Seq[Outlet[_]] = EmptyImmutableSeq
- - override def deepCopy() = this
- + override def deepCopy(): ClosedShape.type = this
- /**
- * Java API: obtain ClosedShape instance
- @@ -260,7 +260,7 @@ object ClosedShape extends ClosedShape {
- * meaningful type of Shape when the building is finished.
- */
- case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]) extends Shape {
- - override def deepCopy() = AmorphousShape(inlets.map(_.carbonCopy()), outlets.map(_.carbonCopy()))
- + override def deepCopy(): AmorphousShape = AmorphousShape(inlets.map(_.carbonCopy()), outlets.map(_.carbonCopy()))
- }
- /**
- diff --git a/akka-stream/src/main/scala/akka/stream/Supervision.scala b/akka-stream/src/main/scala/akka/stream/Supervision.scala
- index 9d3dd1735b..f1b1e4c20e 100644
- --- a/akka-stream/src/main/scala/akka/stream/Supervision.scala
- +++ b/akka-stream/src/main/scala/akka/stream/Supervision.scala
- @@ -18,7 +18,7 @@ object Supervision {
- * Java API: The stream will be completed with failure if application code for processing an element
- * throws an exception.
- */
- - def stop = Stop
- + def stop: Stop.type = Stop
- /**
- * Scala API: The element is dropped and the stream continues if application code for processing
- @@ -30,7 +30,7 @@ object Supervision {
- * Java API: The element is dropped and the stream continues if application code for processing
- * an element throws an exception.
- */
- - def resume = Resume
- + def resume: Resume.type = Resume
- /**
- * Scala API: The element is dropped and the stream continues after restarting the stage
- @@ -46,7 +46,7 @@ object Supervision {
- * Restarting a stage means that any accumulated state is cleared. This is typically
- * performed by creating a new instance of the stage.
- */
- - def restart = Restart
- + def restart: Restart.type = Restart
- type Decider = Function[Throwable, Directive]
- @@ -55,7 +55,7 @@ object Supervision {
- */
- val stoppingDecider: Decider with japi.Function[Throwable, Directive] =
- new Decider with japi.Function[Throwable, Directive] {
- - override def apply(e: Throwable) = Stop
- + override def apply(e: Throwable): Stop.type = Stop
- }
- /**
- @@ -68,7 +68,7 @@ object Supervision {
- */
- val resumingDecider: Decider with japi.Function[Throwable, Directive] =
- new Decider with japi.Function[Throwable, Directive] {
- - override def apply(e: Throwable) = Resume
- + override def apply(e: Throwable): Resume.type = Resume
- }
- /**
- @@ -81,7 +81,7 @@ object Supervision {
- */
- val restartingDecider: Decider with japi.Function[Throwable, Directive] =
- new Decider with japi.Function[Throwable, Directive] {
- - override def apply(e: Throwable) = Restart
- + override def apply(e: Throwable): Restart.type = Restart
- }
- /**
- diff --git a/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala b/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala
- index f4ff684e95..423d296cea 100644
- --- a/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala
- +++ b/akka-stream/src/main/scala/akka/stream/ThrottleMode.scala
- @@ -23,12 +23,12 @@ object ThrottleMode {
- /**
- * Java API: Tells throttle to make pauses before emitting messages to meet throttle rate
- */
- - def shaping = Shaping
- + def shaping: Shaping.type = Shaping
- /**
- * Java API: Makes throttle fail with exception when upstream is faster than throttle rate
- */
- - def enforcing = Enforcing
- + def enforcing: Enforcing.type = Enforcing
- }
- /**
- diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala
- index 822891cc21..f9ff1cfc3c 100644
- --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala
- +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala
- @@ -68,7 +68,7 @@ object ActorPublisherMessage {
- /**
- * Java API: get the singleton instance of the `Cancel` message
- */
- - def cancelInstance = Cancel
- + def cancelInstance: Cancel.type = Cancel
- /**
- * This message is delivered to the [[ActorPublisher]] actor in order to signal the exceeding of an subscription timeout.
- @@ -79,7 +79,7 @@ object ActorPublisherMessage {
- /**
- * Java API: get the singleton instance of the `SubscriptionTimeoutExceeded` message
- */
- - def subscriptionTimeoutExceededInstance = SubscriptionTimeoutExceeded
- + def subscriptionTimeoutExceededInstance: SubscriptionTimeoutExceeded.type = SubscriptionTimeoutExceeded
- }
- /**
- @@ -415,7 +415,7 @@ private[akka] object ActorPublisherState extends ExtensionId[ActorPublisherState
- override def get(system: ActorSystem): ActorPublisherState = super.get(system)
- - override def lookup() = ActorPublisherState
- + override def lookup(): ActorPublisherState.type = ActorPublisherState
- override def createExtension(system: ExtendedActorSystem): ActorPublisherState =
- new ActorPublisherState
- diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala
- index 8d346cd7b6..4b5f85ba56 100644
- --- a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala
- +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala
- @@ -34,7 +34,7 @@ object ActorSubscriberMessage {
- /**
- * Java API: get the singleton instance of the `OnComplete` message
- */
- - def onCompleteInstance = OnComplete
- + def onCompleteInstance: OnComplete.type = OnComplete
- }
- /**
- @@ -64,7 +64,7 @@ case object OneByOneRequestStrategy extends RequestStrategy {
- /**
- * Java API: get the singleton instance
- */
- - def getInstance = this
- + def getInstance: OneByOneRequestStrategy.type = this
- }
- /**
- @@ -77,7 +77,7 @@ case object ZeroRequestStrategy extends RequestStrategy {
- /**
- * Java API: get the singleton instance
- */
- - def getInstance = this
- + def getInstance: ZeroRequestStrategy.type = this
- }
- object WatermarkRequestStrategy {
- @@ -313,7 +313,7 @@ private[akka] final class ActorSubscriberImpl[T](val impl: ActorRef) extends Sub
- private[akka] object ActorSubscriberState extends ExtensionId[ActorSubscriberState] with ExtensionIdProvider {
- override def get(system: ActorSystem): ActorSubscriberState = super.get(system)
- - override def lookup() = ActorSubscriberState
- + override def lookup(): ActorSubscriberState.type = ActorSubscriberState
- override def createExtension(system: ExtendedActorSystem): ActorSubscriberState =
- new ActorSubscriberState
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala
- index d351beae6d..f64dcce891 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala
- @@ -91,7 +91,7 @@ import scala.concurrent.{ Await, ExecutionContextExecutor }
- * The default phases are left in-tact since we still respect `.async` and other tags that were marked within a sub-fused graph.
- */
- private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ⇒ ActorRef) extends Materializer {
- - val subFusingPhase = new Phase[Any] {
- + val subFusingPhase: Phase[Any] = new Phase[Any] {
- override def apply(settings: ActorMaterializerSettings, attributes: Attributes,
- materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = {
- new GraphStageIsland(settings, attributes, materializer, islandName, OptionVal(registerShell)).asInstanceOf[PhaseIsland[Any]]
- @@ -131,7 +131,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa
- */
- @InternalApi private[akka] object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider {
- override def get(system: ActorSystem): FlowNames = super.get(system)
- - override def lookup() = FlowNames
- + override def lookup(): FlowNames.type = FlowNames
- override def createExtension(system: ExtendedActorSystem): FlowNames = new FlowNames
- }
- @@ -139,7 +139,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa
- * INTERNAL API
- */
- @InternalApi private[akka] class FlowNames extends Extension {
- - val name = SeqActorName("Flow")
- + val name: SeqActorNameImpl = SeqActorName("Flow")
- }
- /**
- @@ -173,9 +173,9 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa
- @InternalApi private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor {
- import akka.stream.impl.StreamSupervisor._
- - override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
- + override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
- - def receive = {
- + def receive: PartialFunction[Any, Unit] = {
- case Materialize(props, name) ⇒
- val impl = context.actorOf(props, name)
- sender() ! impl
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala
- index d8c9cf5bf2..97018dee74 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala
- @@ -106,8 +106,8 @@ import akka.event.Logging
- inputBufferElements = 0
- }
- - override def inputsDepleted = upstreamCompleted && inputBufferElements == 0
- - override def inputsAvailable = inputBufferElements > 0
- + override def inputsDepleted: Boolean = upstreamCompleted && inputBufferElements == 0
- + override def inputsAvailable: Boolean = inputBufferElements > 0
- protected def onComplete(): Unit = {
- upstreamCompleted = true
- @@ -167,13 +167,13 @@ import akka.event.Logging
- protected var subscriber: Subscriber[Any] = _
- protected var downstreamDemand: Long = 0L
- protected var downstreamCompleted = false
- - override def demandAvailable = downstreamDemand > 0
- + override def demandAvailable: Boolean = downstreamDemand > 0
- override def demandCount: Long = downstreamDemand
- - override def subreceive = _subreceive
- + override def subreceive: SubReceive = _subreceive
- private val _subreceive = new SubReceive(waitingExposedPublisher)
- - def isSubscribed = subscriber ne null
- + def isSubscribed: Boolean = subscriber ne null
- def enqueueOutputElement(elem: Any): Unit = {
- ReactiveStreamsCompliance.requireNonNullElement(elem)
- @@ -265,7 +265,7 @@ import akka.event.Logging
- /**
- * Subclass may override [[#activeReceive]]
- */
- - final override def receive = new ExposedPublisherReceive(activeReceive, unhandled) {
- + final override def receive: ExposedPublisherReceive = new ExposedPublisherReceive(activeReceive, unhandled) {
- override def receiveExposedPublisher(ep: ExposedPublisher): Unit = {
- primaryOutputs.subreceive(ep)
- context become activeReceive
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala
- index bd8fd86594..1ed20a8d20 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala
- @@ -21,14 +21,14 @@ import akka.stream.stage._
- onFailureMessage: (Throwable) ⇒ Any)
- extends GraphStage[SinkShape[In]] {
- val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in")
- - override def initialAttributes = DefaultAttributes.actorRefWithAck
- + override def initialAttributes: Attributes = DefaultAttributes.actorRefWithAck
- override val shape: SinkShape[In] = SinkShape(in)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new GraphStageLogic(shape) with InHandler {
- implicit def self: ActorRef = stageActor.ref
- - val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
- + val maxBuffer: Int = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
- require(maxBuffer > 0, "Buffer size must be greater than 0")
- val buffer: util.Deque[In] = new util.ArrayDeque[In]()
- @@ -52,7 +52,7 @@ import akka.stream.stage._
- }
- }
- - override def preStart() = {
- + override def preStart(): Unit = {
- setKeepGoing(true)
- getStageActor(receive).watch(ref)
- ref ! onInitMessage
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala
- index 6859d7dd5a..9e1ffe4fc6 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala
- @@ -26,11 +26,11 @@ import akka.annotation.InternalApi
- @InternalApi private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any) extends ActorSubscriber {
- import ActorSubscriberMessage._
- - override val requestStrategy = WatermarkRequestStrategy(highWatermark)
- + override val requestStrategy: WatermarkRequestStrategy = WatermarkRequestStrategy(highWatermark)
- context.watch(ref)
- - def receive = {
- + def receive: PartialFunction[Any, Unit] = {
- case OnNext(elem) ⇒
- ref.tell(elem, ActorRef.noSender)
- case OnError(cause) ⇒
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala
- index 356ba3647c..2738a128d4 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala
- @@ -15,7 +15,7 @@ import akka.stream.ActorMaterializerSettings
- * INTERNAL API
- */
- @InternalApi private[akka] object ActorRefSourceActor {
- - def props(bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = {
- + def props(bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings): Props = {
- require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
- val maxFixedBufferSize = settings.maxFixedBufferSize
- Props(new ActorRefSourceActor(bufferSize, overflowStrategy, maxFixedBufferSize))
- @@ -30,9 +30,9 @@ import akka.stream.ActorMaterializerSettings
- import akka.stream.actor.ActorPublisherMessage._
- // when bufferSize is 0 there the buffer is not used
- - protected val buffer = if (bufferSize == 0) null else Buffer[Any](bufferSize, maxFixedBufferSize)
- + protected val buffer: Buffer[Any] = if (bufferSize == 0) null else Buffer[Any](bufferSize, maxFixedBufferSize)
- - def receive = ({
- + def receive: PartialFunction[Any, Unit] = ({
- case Cancel ⇒
- context.stop(self)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala
- index f63eef96ff..b3844a1e53 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala
- @@ -65,7 +65,7 @@ private[akka] object Buffer {
- else new ModuloFixedSizeBuffer(size)
- sealed abstract class FixedSizeBuffer[T](val capacity: Int) extends Buffer[T] {
- - override def toString = s"Buffer($capacity, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})"
- + override def toString: String = s"Buffer($capacity, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})"
- private val buffer = new Array[AnyRef](capacity)
- protected var readIdx = 0L
- @@ -164,11 +164,11 @@ private[akka] object Buffer {
- private var head = 0
- private var tail = 0
- - override def capacity = BoundedBuffer.this.capacity
- - override def used = tail - head
- - override def isFull = used == capacity
- - override def isEmpty = tail == head
- - override def nonEmpty = tail != head
- + override def capacity: Int = BoundedBuffer.this.capacity
- + override def used: Int = tail - head
- + override def isFull: Boolean = used == capacity
- + override def isEmpty: Boolean = tail == head
- + override def nonEmpty: Boolean = tail != head
- override def enqueue(elem: T): Unit =
- if (tail - head == FixedQueueSize) {
- @@ -205,10 +205,10 @@ private[akka] object Buffer {
- }
- private final class DynamicQueue(startIdx: Int) extends ju.LinkedList[T] with Buffer[T] {
- - override def capacity = BoundedBuffer.this.capacity
- - override def used = size
- - override def isFull = size == capacity
- - override def nonEmpty = !isEmpty()
- + override def capacity: Int = BoundedBuffer.this.capacity
- + override def used: Int = size
- + override def isFull: Boolean = size == capacity
- + override def nonEmpty: Boolean = !isEmpty()
- override def enqueue(elem: T): Unit = add(elem)
- override def dequeue(): T = remove()
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala
- index 08c1964743..d93861c39b 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala
- @@ -6,6 +6,8 @@ package akka.stream.impl
- import akka.annotation.InternalApi
- import akka.japi.function.{ Function ⇒ JFun, Function2 ⇒ JFun2 }
- import akka.japi.{ Pair ⇒ JPair }
- +import akka.japi
- +import akka.japi.function
- /**
- * INTERNAL API
- @@ -16,7 +18,7 @@ import akka.japi.{ Pair ⇒ JPair }
- @throws(classOf[Exception]) override def apply(param: Any): Any = param
- }
- - val JavaPairFunction = new JFun2[AnyRef, AnyRef, AnyRef JPair AnyRef] {
- + val JavaPairFunction: function.Function2[AnyRef, AnyRef, japi.Pair[AnyRef, AnyRef]] = new JFun2[AnyRef, AnyRef, AnyRef JPair AnyRef] {
- def apply(p1: AnyRef, p2: AnyRef): AnyRef JPair AnyRef = JPair(p1, p2)
- }
- @@ -30,16 +32,16 @@ import akka.japi.{ Pair ⇒ JPair }
- def scalaAnyTwoToNone[A, B, C]: (A, B) ⇒ Option[C] = two2none
- def javaAnyToNone[A, B]: A ⇒ Option[B] = none
- - val conforms = (a: Any) ⇒ a
- + val conforms: Any ⇒ Any = (a: Any) ⇒ a
- - val zeroLong = (_: Any) ⇒ 0L
- + val zeroLong: Any ⇒ Long = (_: Any) ⇒ 0L
- - val oneLong = (_: Any) ⇒ 1L
- + val oneLong: Any ⇒ Long = (_: Any) ⇒ 1L
- - val oneInt = (_: Any) ⇒ 1
- + val oneInt: Any ⇒ Int = (_: Any) ⇒ 1
- - val none = (_: Any) ⇒ None
- + val none: Any ⇒ None.type = (_: Any) ⇒ None
- - val two2none = (_: Any, _: Any) ⇒ None
- + val two2none: (Any, Any) ⇒ None.type = (_: Any, _: Any) ⇒ None
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/EmptySource.scala b/akka-stream/src/main/scala/akka/stream/impl/EmptySource.scala
- index 790dea0b84..14ca712613 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/EmptySource.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/EmptySource.scala
- @@ -12,10 +12,10 @@ import akka.stream.stage._
- * INTERNAL API
- */
- @InternalApi private[akka] final object EmptySource extends GraphStage[SourceShape[Nothing]] {
- - val out = Outlet[Nothing]("EmptySource.out")
- - override val shape = SourceShape(out)
- + val out: Outlet[Nothing] = Outlet[Nothing]("EmptySource.out")
- + override val shape: SourceShape[Nothing] = SourceShape(out)
- - override protected def initialAttributes = DefaultAttributes.lazySource
- + override protected def initialAttributes: Attributes = DefaultAttributes.lazySource
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new GraphStageLogic(shape) with OutHandler {
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/FailedSource.scala b/akka-stream/src/main/scala/akka/stream/impl/FailedSource.scala
- index 24c50f16e9..743ea81078 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/FailedSource.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/FailedSource.scala
- @@ -12,8 +12,8 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
- * INTERNAL API
- */
- @InternalApi private[akka] final class FailedSource[T](failure: Throwable) extends GraphStage[SourceShape[T]] {
- - val out = Outlet[T]("FailedSource.out")
- - override val shape = SourceShape(out)
- + val out: Outlet[T] = Outlet[T]("FailedSource.out")
- + override val shape: SourceShape[T] = SourceShape(out)
- override protected def initialAttributes: Attributes = DefaultAttributes.failedSource
- @@ -27,5 +27,5 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
- setHandler(out, this)
- }
- - override def toString = s"FailedSource(${failure.getClass.getName})"
- + override def toString: String = s"FailedSource(${failure.getClass.getName})"
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala
- index 7ed42fa9cf..c796d0dc5e 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala
- @@ -94,7 +94,7 @@ import org.reactivestreams.{ Subscriber, Subscription }
- private var preferredId = 0
- private var _lastDequeuedId = 0
- - def lastDequeuedId = _lastDequeuedId
- + def lastDequeuedId: Int = _lastDequeuedId
- def cancel(): Unit =
- if (!allCancelled) {
- @@ -106,7 +106,7 @@ import org.reactivestreams.{ Subscriber, Subscription }
- }
- }
- - def cancel(input: Int) =
- + def cancel(input: Int): Unit =
- if (!cancelled(input)) {
- inputs(input).cancel()
- cancelled(input, on = true)
- @@ -204,22 +204,22 @@ import org.reactivestreams.{ Subscriber, Subscription }
- dequeue(id)
- }
- - val AllOfMarkedInputs = new TransferState {
- + val AllOfMarkedInputs: TransferState = new TransferState {
- override def isCompleted: Boolean = markedDepleted > 0
- override def isReady: Boolean = markedPending == markCount
- }
- - val AnyOfMarkedInputs = new TransferState {
- + val AnyOfMarkedInputs: TransferState = new TransferState {
- override def isCompleted: Boolean = markedDepleted == markCount && markedPending == 0
- override def isReady: Boolean = markedPending > 0
- }
- - def inputsAvailableFor(id: Int) = new TransferState {
- + def inputsAvailableFor(id: Int): TransferState = new TransferState {
- override def isCompleted: Boolean = depleted(id) || cancelled(id) || (!pending(id) && completed(id))
- override def isReady: Boolean = pending(id)
- }
- - def inputsOrCompleteAvailableFor(id: Int) = new TransferState {
- + def inputsOrCompleteAvailableFor(id: Int): TransferState = new TransferState {
- override def isCompleted: Boolean = false
- override def isReady: Boolean = pending(id) || depleted(id)
- }
- @@ -257,7 +257,7 @@ import org.reactivestreams.{ Subscriber, Subscription }
- import FanIn._
- protected val primaryOutputs: Outputs = new SimpleOutputs(self, this)
- - protected val inputBunch = new InputBunch(inputCount, settings.maxInputBufferSize, this) {
- + protected val inputBunch: InputBunch = new InputBunch(inputCount, settings.maxInputBufferSize, this) {
- override def onError(input: Int, e: Throwable): Unit = fail(e)
- override def onCompleteWhenNoInput(): Unit = pumpFinished()
- }
- @@ -288,7 +288,7 @@ import org.reactivestreams.{ Subscriber, Subscription }
- throw new IllegalStateException("This actor cannot be restarted")
- }
- - def receive = inputBunch.subreceive.orElse[Any, Unit](primaryOutputs.subreceive)
- + def receive: PartialFunction[Any, Unit] = inputBunch.subreceive.orElse[Any, Unit](primaryOutputs.subreceive)
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala
- index f5b413f101..5ea32a45aa 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala
- @@ -22,7 +22,7 @@ import org.reactivestreams.Subscription
- class SubstreamSubscription(val parent: ActorRef, val id: Int) extends Subscription {
- override def request(elements: Long): Unit = parent ! SubstreamRequestMore(id, elements)
- override def cancel(): Unit = parent ! SubstreamCancel(id)
- - override def toString = "SubstreamSubscription" + System.identityHashCode(this)
- + override def toString: String = "SubstreamSubscription" + System.identityHashCode(this)
- }
- class FanoutOutputs(val id: Int, _impl: ActorRef, _pump: Pump) extends SimpleOutputs(_impl, _pump) {
- @@ -76,7 +76,7 @@ import org.reactivestreams.Subscription
- }
- }
- - def complete(output: Int) =
- + def complete(output: Int): Unit =
- if (!completed(output) && !errored(output) && !cancelled(output)) {
- outputs(output).complete()
- completed(output) = true
- @@ -183,12 +183,12 @@ import org.reactivestreams.Subscription
- def onCancel(output: Int): Unit = ()
- - def demandAvailableFor(id: Int) = new TransferState {
- + def demandAvailableFor(id: Int): TransferState = new TransferState {
- override def isCompleted: Boolean = cancelled(id) || completed(id) || errored(id)
- override def isReady: Boolean = pending(id)
- }
- - def demandOrCancelAvailableFor(id: Int) = new TransferState {
- + def demandOrCancelAvailableFor(id: Int): TransferState = new TransferState {
- override def isCompleted: Boolean = false
- override def isReady: Boolean = pending(id) || cancelled(id)
- }
- @@ -198,7 +198,7 @@ import org.reactivestreams.Subscription
- * have demand, and will complete as soon as any of the marked
- * outputs have canceled.
- */
- - val AllOfMarkedOutputs = new TransferState {
- + val AllOfMarkedOutputs: TransferState = new TransferState {
- override def isCompleted: Boolean = markedCancelled > 0 || markedCount == 0
- override def isReady: Boolean = markedPending == markedCount
- }
- @@ -208,7 +208,7 @@ import org.reactivestreams.Subscription
- * have demand, and will complete when all of the marked
- * outputs have canceled.
- */
- - val AnyOfMarkedOutputs = new TransferState {
- + val AnyOfMarkedOutputs: TransferState = new TransferState {
- override def isCompleted: Boolean = markedCancelled == markedCount
- override def isReady: Boolean = markedPending > 0
- }
- @@ -251,7 +251,7 @@ import org.reactivestreams.Subscription
- @DoNotInherit private[akka] abstract class FanOut(val settings: ActorMaterializerSettings, val outputCount: Int) extends Actor with ActorLogging with Pump {
- import FanOut._
- - protected val outputBunch = new OutputBunch(outputCount, self, this)
- + protected val outputBunch: OutputBunch = new OutputBunch(outputCount, self, this)
- protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.maxInputBufferSize, this) {
- override def onError(e: Throwable): Unit = fail(e)
- }
- @@ -282,7 +282,7 @@ import org.reactivestreams.Subscription
- throw new IllegalStateException("This actor cannot be restarted")
- }
- - def receive = primaryInputs.subreceive.orElse[Any, Unit](outputBunch.subreceive)
- + def receive: PartialFunction[Any, Unit] = primaryInputs.subreceive.orElse[Any, Unit](outputBunch.subreceive)
- }
- /**
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala
- index 27f1296a74..870b2ba513 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala
- @@ -1,136 +1,136 @@
- package akka.stream.impl
- -
- -import akka.actor.{ Actor, ActorRef, Deploy, Props }
- -import akka.annotation.{ DoNotInherit, InternalApi }
- -import akka.stream.{ ActorMaterializerSettings, Attributes }
- +
- +import akka.actor.{ Actor, ActorRef, Deploy, Props }
- +import akka.annotation.{ DoNotInherit, InternalApi }
- +import akka.stream.{ ActorMaterializerSettings, Attributes }
- import org.reactivestreams.Subscriber
- -
- +
- /**
- * INTERNAL API
- */
- -@DoNotInherit private[akka] abstract class FanoutOutputs(
- - val maxBufferSize: Int,
- - val initialBufferSize: Int,
- - self: ActorRef,
- - val pump: Pump)
- +@DoNotInherit private[akka] abstract class FanoutOutputs(
- + val maxBufferSize: Int,
- + val initialBufferSize: Int,
- + self: ActorRef,
- + val pump: Pump)
- extends DefaultOutputTransferStates
- - with SubscriberManagement[Any] {
- -
- - override type S = ActorSubscriptionWithCursor[_ >: Any]
- - override def createSubscription(subscriber: Subscriber[_ >: Any]): S =
- - new ActorSubscriptionWithCursor(self, subscriber)
- -
- - protected var exposedPublisher: ActorPublisher[Any] = _
- -
- + with SubscriberManagement[Any] {
- +
- + override type S = ActorSubscriptionWithCursor[_ >: Any]
- + override def createSubscription(subscriber: Subscriber[_ >: Any]): S =
- + new ActorSubscriptionWithCursor(self, subscriber)
- +
- + protected var exposedPublisher: ActorPublisher[Any] = _
- +
- private var downstreamBufferSpace: Long = 0L
- - private var downstreamCompleted = false
- - override def demandAvailable = downstreamBufferSpace > 0
- + private var downstreamCompleted = false
- + override def demandAvailable: Boolean = downstreamBufferSpace > 0
- override def demandCount: Long = downstreamBufferSpace
- -
- - override val subreceive = new SubReceive(waitingExposedPublisher)
- -
- - def enqueueOutputElement(elem: Any): Unit = {
- - ReactiveStreamsCompliance.requireNonNullElement(elem)
- +
- + override val subreceive: SubReceive = new SubReceive(waitingExposedPublisher)
- +
- + def enqueueOutputElement(elem: Any): Unit = {
- + ReactiveStreamsCompliance.requireNonNullElement(elem)
- downstreamBufferSpace -= 1
- - pushToDownstream(elem)
- - }
- -
- - override def complete(): Unit =
- - if (!downstreamCompleted) {
- - downstreamCompleted = true
- - completeDownstream()
- - }
- -
- - override def cancel(): Unit = complete()
- -
- - override def error(e: Throwable): Unit = {
- - if (!downstreamCompleted) {
- - downstreamCompleted = true
- - abortDownstream(e)
- - if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
- - }
- - }
- -
- + pushToDownstream(elem)
- + }
- +
- + override def complete(): Unit =
- + if (!downstreamCompleted) {
- + downstreamCompleted = true
- + completeDownstream()
- + }
- +
- + override def cancel(): Unit = complete()
- +
- + override def error(e: Throwable): Unit = {
- + if (!downstreamCompleted) {
- + downstreamCompleted = true
- + abortDownstream(e)
- + if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
- + }
- + }
- +
- def isClosed: Boolean = downstreamCompleted
- -
- +
- def afterShutdown(): Unit
- -
- +
- override protected def requestFromUpstream(elements: Long): Unit = downstreamBufferSpace += elements
- -
- - private def subscribePending(): Unit =
- +
- + private def subscribePending(): Unit =
- exposedPublisher.takePendingSubscribers() foreach registerSubscriber
- -
- - override protected def shutdown(completed: Boolean): Unit = {
- - if (exposedPublisher ne null) {
- - if (completed) exposedPublisher.shutdown(None)
- - else exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)
- - }
- - afterShutdown()
- - }
- -
- - override protected def cancelUpstream(): Unit = {
- - downstreamCompleted = true
- - }
- -
- - protected def waitingExposedPublisher: Actor.Receive = {
- +
- + override protected def shutdown(completed: Boolean): Unit = {
- + if (exposedPublisher ne null) {
- + if (completed) exposedPublisher.shutdown(None)
- + else exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)
- + }
- + afterShutdown()
- + }
- +
- + override protected def cancelUpstream(): Unit = {
- + downstreamCompleted = true
- + }
- +
- + protected def waitingExposedPublisher: Actor.Receive = {
- case ExposedPublisher(publisher) ⇒
- exposedPublisher = publisher
- - subreceive.become(downstreamRunning)
- + subreceive.become(downstreamRunning)
- case other ⇒
- - throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]")
- - }
- -
- - protected def downstreamRunning: Actor.Receive = {
- + throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]")
- + }
- +
- + protected def downstreamRunning: Actor.Receive = {
- case SubscribePending ⇒
- - subscribePending()
- + subscribePending()
- case RequestMore(subscription, elements) ⇒
- - moreRequested(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]], elements)
- - pump.pump()
- + moreRequested(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]], elements)
- + pump.pump()
- case Cancel(subscription) ⇒
- - unregisterSubscription(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]])
- - pump.pump()
- - }
- -
- -}
- -
- + unregisterSubscription(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]])
- + pump.pump()
- + }
- +
- +}
- +
- /**
- * INTERNAL API
- */
- -@InternalApi private[akka] object FanoutProcessorImpl {
- - def props(attributes: Attributes, actorMaterializerSettings: ActorMaterializerSettings): Props =
- - Props(new FanoutProcessorImpl(attributes, actorMaterializerSettings)).withDeploy(Deploy.local)
- -}
- +@InternalApi private[akka] object FanoutProcessorImpl {
- + def props(attributes: Attributes, actorMaterializerSettings: ActorMaterializerSettings): Props =
- + Props(new FanoutProcessorImpl(attributes, actorMaterializerSettings)).withDeploy(Deploy.local)
- +}
- /**
- * INTERNAL API
- */
- -@InternalApi private[akka] class FanoutProcessorImpl(attributes: Attributes, _settings: ActorMaterializerSettings)
- - extends ActorProcessorImpl(attributes, _settings) {
- -
- - override val primaryOutputs: FanoutOutputs = {
- - val inputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer]
- - new FanoutOutputs(inputBuffer.max, inputBuffer.initial, self, this) {
- - override def afterShutdown(): Unit = afterFlush()
- - }
- - }
- -
- +@InternalApi private[akka] class FanoutProcessorImpl(attributes: Attributes, _settings: ActorMaterializerSettings)
- + extends ActorProcessorImpl(attributes, _settings) {
- +
- + override val primaryOutputs: FanoutOutputs = {
- + val inputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer]
- + new FanoutOutputs(inputBuffer.max, inputBuffer.initial, self, this) {
- + override def afterShutdown(): Unit = afterFlush()
- + }
- + }
- +
- val running: TransferPhase = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒
- - primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement())
- - }
- -
- - override def fail(e: Throwable): Unit = {
- - if (settings.debugLogging)
- - log.debug("fail due to: {}", e.getMessage)
- - primaryInputs.cancel()
- - primaryOutputs.error(e)
- + primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement())
- + }
- +
- + override def fail(e: Throwable): Unit = {
- + if (settings.debugLogging)
- + log.debug("fail due to: {}", e.getMessage)
- + primaryInputs.cancel()
- + primaryOutputs.error(e)
- // Stopping will happen after flush
- - }
- -
- - override def pumpFinished(): Unit = {
- - primaryInputs.cancel()
- - primaryOutputs.complete()
- - }
- -
- - def afterFlush(): Unit = context.stop(self)
- -
- - initialPhase(1, running)
- -}
- + }
- +
- + override def pumpFinished(): Unit = {
- + primaryInputs.cancel()
- + primaryOutputs.complete()
- + }
- +
- + def afterFlush(): Unit = context.stop(self)
- +
- + initialPhase(1, running)
- +}
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala
- index f9d6727126..5c28aeeef2 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala
- @@ -14,20 +14,20 @@ import scala.annotation.switch
- */
- @InternalApi private[akka] object JsonObjectParser {
- - final val SquareBraceStart = '['.toByte
- - final val SquareBraceEnd = ']'.toByte
- - final val CurlyBraceStart = '{'.toByte
- - final val CurlyBraceEnd = '}'.toByte
- - final val DoubleQuote = '"'.toByte
- - final val Backslash = '\\'.toByte
- - final val Comma = ','.toByte
- -
- - final val LineBreak = '\n'.toByte
- - final val LineBreak2 = '\r'.toByte
- - final val Tab = '\t'.toByte
- - final val Space = ' '.toByte
- -
- - final val Whitespace = Set(LineBreak, LineBreak2, Tab, Space)
- + final val SquareBraceStart: Byte = '['.toByte
- + final val SquareBraceEnd: Byte = ']'.toByte
- + final val CurlyBraceStart: Byte = '{'.toByte
- + final val CurlyBraceEnd: Byte = '}'.toByte
- + final val DoubleQuote: Byte = '"'.toByte
- + final val Backslash: Byte = '\\'.toByte
- + final val Comma: Byte = ','.toByte
- +
- + final val LineBreak: Byte = '\n'.toByte
- + final val LineBreak2: Byte = '\r'.toByte
- + final val Tab: Byte = '\t'.toByte
- + final val Space: Byte = ' '.toByte
- +
- + final val Whitespace: Set[Byte] = Set(LineBreak, LineBreak2, Tab, Space)
- def isWhitespace(input: Byte): Boolean =
- Whitespace.contains(input)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala b/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala
- index 745e936d3c..d7b603a607 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala
- @@ -16,17 +16,17 @@ import scala.util.control.NonFatal
- * INTERNAL API
- */
- @InternalApi private[akka] object LazySource {
- - def apply[T, M](sourceFactory: () ⇒ Source[T, M]) = new LazySource[T, M](sourceFactory)
- + def apply[T, M](sourceFactory: () ⇒ Source[T, M]): LazySource[T, M] = new LazySource[T, M](sourceFactory)
- }
- /**
- * INTERNAL API
- */
- @InternalApi private[akka] final class LazySource[T, M](sourceFactory: () ⇒ Source[T, M]) extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] {
- - val out = Outlet[T]("LazySource.out")
- - override val shape = SourceShape(out)
- + val out: Outlet[T] = Outlet[T]("LazySource.out")
- + override val shape: SourceShape[T] = SourceShape(out)
- - override protected def initialAttributes = DefaultAttributes.lazySource
- + override protected def initialAttributes: Attributes = DefaultAttributes.lazySource
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
- val matPromise = Promise[M]()
- @@ -72,7 +72,7 @@ import scala.util.control.NonFatal
- setHandler(out, this)
- - override def postStop() = {
- + override def postStop(): Unit = {
- matPromise.tryFailure(new RuntimeException("LazySource stopped without completing the materialized future"))
- }
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala b/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala
- index 6f075a8bab..8f85177bb5 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala
- @@ -17,10 +17,10 @@ import scala.util.Try
- * INTERNAL API
- */
- @InternalApi private[akka] object MaybeSource extends GraphStageWithMaterializedValue[SourceShape[AnyRef], Promise[Option[AnyRef]]] {
- - val out = Outlet[AnyRef]("MaybeSource.out")
- - override val shape = SourceShape(out)
- + val out: Outlet[AnyRef] = Outlet[AnyRef]("MaybeSource.out")
- + override val shape: SourceShape[AnyRef] = SourceShape(out)
- - override protected def initialAttributes = DefaultAttributes.maybeSource
- + override protected def initialAttributes: Attributes = DefaultAttributes.maybeSource
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Promise[Option[AnyRef]]) = {
- import scala.util.{ Success ⇒ ScalaSuccess, Failure ⇒ ScalaFailure }
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala
- index a590ea0a61..187eed69ef 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala
- @@ -72,7 +72,7 @@ import akka.util.OptionVal
- override protected def label: String = s"PublisherSource($p)"
- - override def create(context: MaterializationContext) = (p, NotUsed)
- + override def create(context: MaterializationContext): (Publisher[Out], NotUsed.type) = (p, NotUsed)
- override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, NotUsed] = new PublisherSource[Out](p, attributes, shape)
- override def withAttributes(attr: Attributes): SourceModule[Out, NotUsed] = new PublisherSource[Out](p, attr, amendShape(attr))
- @@ -85,7 +85,7 @@ import akka.util.OptionVal
- */
- @InternalApi private[akka] final class ActorPublisherSource[Out](props: Props, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) {
- - override def create(context: MaterializationContext) = {
- + override def create(context: MaterializationContext): (Publisher[Out], ActorRef) = {
- val publisherRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props)
- (akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef)
- }
- @@ -104,7 +104,7 @@ import akka.util.OptionVal
- override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)"
- - override def create(context: MaterializationContext) = {
- + override def create(context: MaterializationContext): (Publisher[Out], ActorRef) = {
- val mat = ActorMaterializerHelper.downcast(context.materializer)
- val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings))
- (akka.stream.actor.ActorPublisher[Out](ref), ref)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala
- index 588cfb787a..3fcac88bfa 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala
- @@ -27,6 +27,7 @@ import scala.concurrent.duration.FiniteDuration
- import scala.concurrent.ExecutionContextExecutor
- import scala.annotation.tailrec
- import akka.util.OptionVal
- +import akka.stream.impl.FanOut.SubstreamSubscribePending
- /**
- * INTERNAL API
- @@ -382,7 +383,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
- * When these attributes are needed later in the materialization process it is important that the
- * they are gotten through the attributes and not through the [[ActorMaterializerSettings]]
- */
- - val defaultAttributes = {
- + val defaultAttributes: Attributes = {
- Attributes(
- Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) ::
- ActorAttributes.SupervisionStrategy(settings.supervisionDecider) ::
- @@ -615,7 +616,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
- private var outConnections: List[Connection] = Nil
- private var fullIslandName: OptionVal[String] = OptionVal.None
- - val shell = new GraphInterpreterShell(
- + val shell: GraphInterpreterShell = new GraphInterpreterShell(
- connections = null,
- logics = null,
- settings,
- @@ -887,7 +888,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
- TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, tls.closing).withDispatcher(dispatcher)
- tlsActor = materializer.actorOf(props, islandName)
- def factory(id: Int) = new ActorPublisher[Any](tlsActor) {
- - override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
- + override val wakeUpMsg: SubstreamSubscribePending = FanOut.SubstreamSubscribePending(id)
- }
- publishers = Vector.tabulate(2)(factory)
- tlsActor ! FanOut.ExposedPublishers(publishers)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala
- index a53cd9a97c..b9c652d8c1 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala
- @@ -34,10 +34,10 @@ import scala.util.control.NonFatal
- @InternalApi private[akka] final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] {
- import QueueSource._
- - val out = Outlet[T]("queueSource.out")
- + val out: Outlet[T] = Outlet[T]("queueSource.out")
- override val shape: SourceShape[T] = SourceShape.of(out)
- - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
- + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic with OutHandler with SourceQueueWithComplete[T], GraphStageLogic with OutHandler with SourceQueueWithComplete[T]) = {
- val completion = Promise[Done]
- val stageLogic = new GraphStageLogic(shape) with OutHandler with SourceQueueWithComplete[T] {
- @@ -167,7 +167,7 @@ import scala.util.control.NonFatal
- }
- // SourceQueueWithComplete impl
- - override def watchCompletion() = completion.future
- + override def watchCompletion(): Future[Done] = completion.future
- override def offer(element: T): Future[QueueOfferResult] = {
- val p = Promise[QueueOfferResult]
- callback.invokeWithFeedback(Offer(element, p))
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala b/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala
- index eb8f061e66..49f97d3b4f 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala
- @@ -24,7 +24,7 @@ import akka.annotation.{ DoNotInherit, InternalApi }
- * INTERNAL API
- */
- @InternalApi private[akka] object SeqActorName {
- - def apply(prefix: String) = new SeqActorNameImpl(prefix, new AtomicLong(0))
- + def apply(prefix: String): SeqActorNameImpl = new SeqActorNameImpl(prefix, new AtomicLong(0))
- }
- /**
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala
- index b0a9cca2e9..0d11cfc84d 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala
- @@ -41,6 +41,7 @@ import akka.event.Logging
- import akka.util.OptionVal
- import scala.collection.generic.CanBuildFrom
- +import scala.collection.mutable.Builder
- /**
- * INTERNAL API
- @@ -133,7 +134,7 @@ import scala.collection.generic.CanBuildFrom
- */
- @InternalApi private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {
- - override def create(context: MaterializationContext) = (subscriber, NotUsed)
- + override def create(context: MaterializationContext): (Subscriber[In], NotUsed.type) = (subscriber, NotUsed)
- override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = new SubscriberSink[In](subscriber, attributes, shape)
- override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] = new SubscriberSink[In](subscriber, attr, amendShape(attr))
- @@ -156,7 +157,7 @@ import scala.collection.generic.CanBuildFrom
- */
- @InternalApi private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) {
- - override def create(context: MaterializationContext) = {
- + override def create(context: MaterializationContext): (Subscriber[In], ActorRef) = {
- val subscriberRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props)
- (akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef)
- }
- @@ -172,7 +173,7 @@ import scala.collection.generic.CanBuildFrom
- val attributes: Attributes,
- shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {
- - override def create(context: MaterializationContext) = {
- + override def create(context: MaterializationContext): (Subscriber[In], NotUsed.type) = {
- val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer)
- val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max
- val subscriberRef = actorMaterializer.actorOf(
- @@ -196,7 +197,7 @@ import scala.collection.generic.CanBuildFrom
- override val shape: SinkShape[T] = SinkShape.of(in)
- - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
- + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic with InHandler, Future[Option[T]]) = {
- val p: Promise[Option[T]] = Promise()
- (new GraphStageLogic(shape) with InHandler {
- private[this] var prev: T = null.asInstanceOf[T]
- @@ -237,7 +238,7 @@ import scala.collection.generic.CanBuildFrom
- override val shape: SinkShape[T] = SinkShape.of(in)
- - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
- + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic with InHandler, Future[Option[T]]) = {
- val p: Promise[Option[T]] = Promise()
- (new GraphStageLogic(shape) with InHandler {
- override def preStart(): Unit = pull(in)
- @@ -272,7 +273,7 @@ import scala.collection.generic.CanBuildFrom
- * INTERNAL API
- */
- @InternalApi private[akka] final class SeqStage[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] {
- - val in = Inlet[T]("seq.in")
- + val in: Inlet[T] = Inlet[T]("seq.in")
- override def toString: String = "SeqStage"
- @@ -280,10 +281,10 @@ import scala.collection.generic.CanBuildFrom
- override protected def initialAttributes: Attributes = DefaultAttributes.seqSink
- - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
- + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic with InHandler, Future[That]) = {
- val p: Promise[That] = Promise()
- val logic = new GraphStageLogic(shape) with InHandler {
- - val buf = cbf()
- + val buf: Builder[T, That with (immutable.Traversable[_$1] forSome { type _$1 })] = cbf()
- override def preStart(): Unit = pull(in)
- @@ -329,17 +330,17 @@ import scala.collection.generic.CanBuildFrom
- @InternalApi private[akka] final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] {
- type Requested[E] = Promise[Option[E]]
- - val in = Inlet[T]("queueSink.in")
- - override def initialAttributes = DefaultAttributes.queueSink
- + val in: Inlet[T] = Inlet[T]("queueSink.in")
- + override def initialAttributes: Attributes = DefaultAttributes.queueSink
- override val shape: SinkShape[T] = SinkShape.of(in)
- override def toString: String = "QueueSink"
- - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
- + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic with InHandler with SinkQueueWithCancel[T], GraphStageLogic with InHandler with SinkQueueWithCancel[T]) = {
- val stageLogic = new GraphStageLogic(shape) with InHandler with SinkQueueWithCancel[T] {
- type Received[E] = Try[Option[E]]
- - val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
- + val maxBuffer: Int = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
- require(maxBuffer > 0, "Buffer size must be greater than 0")
- var buffer: Buffer[Received[T]] = _
- @@ -429,7 +430,7 @@ import scala.collection.generic.CanBuildFrom
- * Helper class to be able to express collection as a fold using mutable data
- */
- @InternalApi private[akka] final class CollectorState[T, R](val collector: java.util.stream.Collector[T, Any, R]) {
- - lazy val accumulated = collector.supplier().get()
- + lazy val accumulated: Any = collector.supplier().get()
- private lazy val accumulator = collector.accumulator()
- def update(elem: T): CollectorState[T, R] = {
- @@ -462,13 +463,13 @@ import scala.collection.generic.CanBuildFrom
- * INTERNAL API
- */
- @InternalApi final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]], zeroMat: () ⇒ M) extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
- - val in = Inlet[T]("lazySink.in")
- - override def initialAttributes = DefaultAttributes.lazySink
- + val in: Inlet[T] = Inlet[T]("lazySink.in")
- + override def initialAttributes: Attributes = DefaultAttributes.lazySink
- override val shape: SinkShape[T] = SinkShape.of(in)
- override def toString: String = "LazySink"
- - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
- + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic with InHandler, Future[M]) = {
- lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- var completed = false
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
- index f3ce424bd2..573c9e4ff8 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala
- @@ -6,6 +6,7 @@ package akka.stream.impl
- import akka.annotation.InternalApi
- import akka.stream.Attributes._
- import akka.stream._
- +import akka.stream.ActorAttributes.Dispatcher
- /**
- * INTERNAL API
- @@ -14,126 +15,126 @@ import akka.stream._
- object DefaultAttributes {
- // reusable common attributes
- - val IODispatcher = ActorAttributes.IODispatcher
- - val inputBufferOne = inputBuffer(initial = 1, max = 1)
- + val IODispatcher: Dispatcher = ActorAttributes.IODispatcher
- + val inputBufferOne: Attributes = inputBuffer(initial = 1, max = 1)
- // stage specific default attributes
- - val fused = name("fused")
- - val materializedValueSource = name("matValueSource")
- - val map = name("map")
- - val log = name("log")
- - val filter = name("filter")
- - val filterNot = name("filterNot")
- - val collect = name("collect")
- - val recover = name("recover")
- - val mapAsync = name("mapAsync")
- - val mapAsyncUnordered = name("mapAsyncUnordered")
- - val grouped = name("grouped")
- - val groupedWithin = name("groupedWithin")
- - val groupedWeightedWithin = name("groupedWeightedWithin")
- - val limit = name("limit")
- - val limitWeighted = name("limitWeighted")
- - val sliding = name("sliding")
- - val take = name("take")
- - val drop = name("drop")
- - val takeWhile = name("takeWhile")
- - val dropWhile = name("dropWhile")
- - val scan = name("scan")
- - val scanAsync = name("scanAsync")
- - val fold = name("fold")
- - val foldAsync = name("foldAsync")
- - val reduce = name("reduce")
- - val intersperse = name("intersperse")
- - val buffer = name("buffer")
- - val conflate = name("conflate")
- - val batch = name("batch")
- - val batchWeighted = name("batchWeighted")
- - val expand = name("expand")
- - val statefulMapConcat = name("statefulMapConcat")
- - val detacher = name("detacher")
- - val groupBy = name("groupBy")
- - val prefixAndTail = name("prefixAndTail")
- - val split = name("split")
- - val concatAll = name("concatAll")
- - val processor = name("processor")
- - val processorWithKey = name("processorWithKey")
- - val identityOp = name("identityOp")
- - val delimiterFraming = name("delimiterFraming")
- + val fused: Attributes = name("fused")
- + val materializedValueSource: Attributes = name("matValueSource")
- + val map: Attributes = name("map")
- + val log: Attributes = name("log")
- + val filter: Attributes = name("filter")
- + val filterNot: Attributes = name("filterNot")
- + val collect: Attributes = name("collect")
- + val recover: Attributes = name("recover")
- + val mapAsync: Attributes = name("mapAsync")
- + val mapAsyncUnordered: Attributes = name("mapAsyncUnordered")
- + val grouped: Attributes = name("grouped")
- + val groupedWithin: Attributes = name("groupedWithin")
- + val groupedWeightedWithin: Attributes = name("groupedWeightedWithin")
- + val limit: Attributes = name("limit")
- + val limitWeighted: Attributes = name("limitWeighted")
- + val sliding: Attributes = name("sliding")
- + val take: Attributes = name("take")
- + val drop: Attributes = name("drop")
- + val takeWhile: Attributes = name("takeWhile")
- + val dropWhile: Attributes = name("dropWhile")
- + val scan: Attributes = name("scan")
- + val scanAsync: Attributes = name("scanAsync")
- + val fold: Attributes = name("fold")
- + val foldAsync: Attributes = name("foldAsync")
- + val reduce: Attributes = name("reduce")
- + val intersperse: Attributes = name("intersperse")
- + val buffer: Attributes = name("buffer")
- + val conflate: Attributes = name("conflate")
- + val batch: Attributes = name("batch")
- + val batchWeighted: Attributes = name("batchWeighted")
- + val expand: Attributes = name("expand")
- + val statefulMapConcat: Attributes = name("statefulMapConcat")
- + val detacher: Attributes = name("detacher")
- + val groupBy: Attributes = name("groupBy")
- + val prefixAndTail: Attributes = name("prefixAndTail")
- + val split: Attributes = name("split")
- + val concatAll: Attributes = name("concatAll")
- + val processor: Attributes = name("processor")
- + val processorWithKey: Attributes = name("processorWithKey")
- + val identityOp: Attributes = name("identityOp")
- + val delimiterFraming: Attributes = name("delimiterFraming")
- - val initial = name("initial")
- - val completion = name("completion")
- - val idle = name("idle")
- - val idleTimeoutBidi = name("idleTimeoutBidi")
- - val delayInitial = name("delayInitial")
- - val idleInject = name("idleInject")
- - val backpressureTimeout = name("backpressureTimeout")
- + val initial: Attributes = name("initial")
- + val completion: Attributes = name("completion")
- + val idle: Attributes = name("idle")
- + val idleTimeoutBidi: Attributes = name("idleTimeoutBidi")
- + val delayInitial: Attributes = name("delayInitial")
- + val idleInject: Attributes = name("idleInject")
- + val backpressureTimeout: Attributes = name("backpressureTimeout")
- - val merge = name("merge")
- - val mergePreferred = name("mergePreferred")
- - val mergePrioritized = name("mergePrioritized")
- - val flattenMerge = name("flattenMerge")
- - val recoverWith = name("recoverWith")
- - val broadcast = name("broadcast")
- - val balance = name("balance")
- - val zip = name("zip")
- - val zipN = name("zipN")
- - val zipWithN = name("zipWithN")
- - val zipWithIndex = name("zipWithIndex")
- - val unzip = name("unzip")
- - val concat = name("concat")
- - val orElse = name("orElse")
- - val repeat = name("repeat")
- - val unfold = name("unfold")
- - val unfoldAsync = name("unfoldAsync")
- - val delay = name("delay")
- + val merge: Attributes = name("merge")
- + val mergePreferred: Attributes = name("mergePreferred")
- + val mergePrioritized: Attributes = name("mergePrioritized")
- + val flattenMerge: Attributes = name("flattenMerge")
- + val recoverWith: Attributes = name("recoverWith")
- + val broadcast: Attributes = name("broadcast")
- + val balance: Attributes = name("balance")
- + val zip: Attributes = name("zip")
- + val zipN: Attributes = name("zipN")
- + val zipWithN: Attributes = name("zipWithN")
- + val zipWithIndex: Attributes = name("zipWithIndex")
- + val unzip: Attributes = name("unzip")
- + val concat: Attributes = name("concat")
- + val orElse: Attributes = name("orElse")
- + val repeat: Attributes = name("repeat")
- + val unfold: Attributes = name("unfold")
- + val unfoldAsync: Attributes = name("unfoldAsync")
- + val delay: Attributes = name("delay")
- - val terminationWatcher = name("terminationWatcher")
- + val terminationWatcher: Attributes = name("terminationWatcher")
- - val publisherSource = name("publisherSource")
- - val iterableSource = name("iterableSource")
- - val cycledSource = name("cycledSource")
- - val futureSource = name("futureSource")
- - val futureFlattenSource = name("futureFlattenSource")
- - val tickSource = name("tickSource")
- - val singleSource = name("singleSource")
- - val emptySource = name("emptySource")
- - val maybeSource = name("MaybeSource")
- - val failedSource = name("failedSource")
- - val concatSource = name("concatSource")
- - val concatMatSource = name("concatMatSource")
- - val subscriberSource = name("subscriberSource")
- - val actorPublisherSource = name("actorPublisherSource")
- - val actorRefSource = name("actorRefSource")
- - val queueSource = name("queueSource")
- - val inputStreamSource = name("inputStreamSource") and IODispatcher
- - val outputStreamSource = name("outputStreamSource") and IODispatcher
- - val fileSource = name("fileSource") and IODispatcher
- - val unfoldResourceSource = name("unfoldResourceSource") and IODispatcher
- - val unfoldResourceSourceAsync = name("unfoldResourceSourceAsync") and IODispatcher
- - val asJavaStream = name("asJavaStream") and IODispatcher
- - val javaCollectorParallelUnordered = name("javaCollectorParallelUnordered")
- - val javaCollector = name("javaCollector")
- + val publisherSource: Attributes = name("publisherSource")
- + val iterableSource: Attributes = name("iterableSource")
- + val cycledSource: Attributes = name("cycledSource")
- + val futureSource: Attributes = name("futureSource")
- + val futureFlattenSource: Attributes = name("futureFlattenSource")
- + val tickSource: Attributes = name("tickSource")
- + val singleSource: Attributes = name("singleSource")
- + val emptySource: Attributes = name("emptySource")
- + val maybeSource: Attributes = name("MaybeSource")
- + val failedSource: Attributes = name("failedSource")
- + val concatSource: Attributes = name("concatSource")
- + val concatMatSource: Attributes = name("concatMatSource")
- + val subscriberSource: Attributes = name("subscriberSource")
- + val actorPublisherSource: Attributes = name("actorPublisherSource")
- + val actorRefSource: Attributes = name("actorRefSource")
- + val queueSource: Attributes = name("queueSource")
- + val inputStreamSource: Attributes = name("inputStreamSource") and IODispatcher
- + val outputStreamSource: Attributes = name("outputStreamSource") and IODispatcher
- + val fileSource: Attributes = name("fileSource") and IODispatcher
- + val unfoldResourceSource: Attributes = name("unfoldResourceSource") and IODispatcher
- + val unfoldResourceSourceAsync: Attributes = name("unfoldResourceSourceAsync") and IODispatcher
- + val asJavaStream: Attributes = name("asJavaStream") and IODispatcher
- + val javaCollectorParallelUnordered: Attributes = name("javaCollectorParallelUnordered")
- + val javaCollector: Attributes = name("javaCollector")
- - val subscriberSink = name("subscriberSink")
- - val cancelledSink = name("cancelledSink")
- - val headSink = name("headSink") and inputBufferOne
- - val headOptionSink = name("headOptionSink") and inputBufferOne
- - val lastSink = name("lastSink")
- - val lastOptionSink = name("lastOptionSink")
- - val seqSink = name("seqSink")
- - val publisherSink = name("publisherSink")
- - val fanoutPublisherSink = name("fanoutPublisherSink")
- - val ignoreSink = name("ignoreSink")
- - val actorRefSink = name("actorRefSink")
- - val actorRefWithAck = name("actorRefWithAckSink")
- - val actorSubscriberSink = name("actorSubscriberSink")
- - val queueSink = name("queueSink")
- - val lazySink = name("lazySink")
- - val lazySource = name("lazySource")
- - val outputStreamSink = name("outputStreamSink") and IODispatcher
- - val inputStreamSink = name("inputStreamSink") and IODispatcher
- - val fileSink = name("fileSink") and IODispatcher
- - val fromJavaStream = name("fromJavaStream")
- + val subscriberSink: Attributes = name("subscriberSink")
- + val cancelledSink: Attributes = name("cancelledSink")
- + val headSink: Attributes = name("headSink") and inputBufferOne
- + val headOptionSink: Attributes = name("headOptionSink") and inputBufferOne
- + val lastSink: Attributes = name("lastSink")
- + val lastOptionSink: Attributes = name("lastOptionSink")
- + val seqSink: Attributes = name("seqSink")
- + val publisherSink: Attributes = name("publisherSink")
- + val fanoutPublisherSink: Attributes = name("fanoutPublisherSink")
- + val ignoreSink: Attributes = name("ignoreSink")
- + val actorRefSink: Attributes = name("actorRefSink")
- + val actorRefWithAck: Attributes = name("actorRefWithAckSink")
- + val actorSubscriberSink: Attributes = name("actorSubscriberSink")
- + val queueSink: Attributes = name("queueSink")
- + val lazySink: Attributes = name("lazySink")
- + val lazySource: Attributes = name("lazySource")
- + val outputStreamSink: Attributes = name("outputStreamSink") and IODispatcher
- + val inputStreamSink: Attributes = name("inputStreamSink") and IODispatcher
- + val fileSink: Attributes = name("fileSink") and IODispatcher
- + val fromJavaStream: Attributes = name("fromJavaStream")
- }
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala
- index 5783f58ced..05d1a08d6e 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala
- @@ -34,11 +34,11 @@ import scala.util.control.NonFatal
- */
- @InternalApi private[stream] object VirtualProcessor {
- case object Inert {
- - val subscriber = new CancellingSubscriber[Any]
- + val subscriber: CancellingSubscriber[Any] = new CancellingSubscriber[Any]
- }
- case class Both(subscriber: Subscriber[Any])
- object Both {
- - def create(s: Subscriber[_]) = Both(s.asInstanceOf[Subscriber[Any]])
- + def create(s: Subscriber[_]): Both = Both(s.asInstanceOf[Subscriber[Any]])
- }
- }
- @@ -250,7 +250,7 @@ import scala.util.control.NonFatal
- case object PassThrough extends SubscriptionState { override def demand: Long = 0 }
- case class Buffering(demand: Long) extends SubscriptionState
- - val NoBufferedDemand = Buffering(0)
- + val NoBufferedDemand: WrappedSubscription.Buffering = Buffering(0)
- }
- // Extdending AtomicReference to make the hot memory location share the same cache line with the Subscription
- @@ -361,11 +361,11 @@ import scala.util.control.NonFatal
- @InternalApi private[akka] final case class ProcessorModule[In, Out, Mat](
- val createProcessor: () ⇒ (Processor[In, Out], Mat),
- attributes: Attributes = DefaultAttributes.processor) extends StreamLayout.AtomicModule[FlowShape[In, Out], Mat] {
- - val inPort = Inlet[In]("ProcessorModule.in")
- - val outPort = Outlet[Out]("ProcessorModule.out")
- - override val shape = new FlowShape(inPort, outPort)
- + val inPort: Inlet[In] = Inlet[In]("ProcessorModule.in")
- + val outPort: Outlet[Out] = Outlet[Out]("ProcessorModule.out")
- + override val shape: FlowShape[In, Out] = new FlowShape(inPort, outPort)
- - override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
- + override def withAttributes(attributes: Attributes): ProcessorModule[In, Out, Mat] = copy(attributes = attributes)
- override def toString: String = f"ProcessorModule [${System.identityHashCode(this)}%08x]"
- override private[stream] def traversalBuilder =
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala
- index 3b673a3062..28250ac0ae 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala
- @@ -30,7 +30,7 @@ private[akka] object SubscriberManagement {
- def apply[T](subscriber: Subscriber[T]): Unit = tryOnError(subscriber, cause)
- }
- - val ShutDown = new ErrorCompleted(ActorPublisher.NormalShutdownReason)
- + val ShutDown: ErrorCompleted = new ErrorCompleted(ActorPublisher.NormalShutdownReason)
- }
- /**
- @@ -96,7 +96,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
- // if non-null, holds the end-of-stream state
- private[this] var endOfStream: EndOfStream = NotReached
- - def cursors = subscriptions
- + def cursors: SubscriberManagement.this.Subscriptions = subscriptions
- /**
- * more demand was signaled from a given subscriber
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala
- index d909f16043..82e597616b 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala
- @@ -38,7 +38,7 @@ import scala.concurrent.duration.{ FiniteDuration, _ }
- var willStop = false
- var currentElement: T = _
- - val enforcing = mode match {
- + val enforcing: Boolean = mode match {
- case Enforcing ⇒ true
- case Shaping ⇒ false
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala
- index 53b74ee9e6..1bcbc70cde 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala
- @@ -33,7 +33,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
- }
- final class Initial[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
- - override def initialAttributes = DefaultAttributes.initial
- + override def initialAttributes: Attributes = DefaultAttributes.initial
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new TimerGraphStageLogic(shape) with InHandler with OutHandler {
- @@ -60,7 +60,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
- }
- final class Completion[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
- - override def initialAttributes = DefaultAttributes.completion
- + override def initialAttributes: Attributes = DefaultAttributes.completion
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new TimerGraphStageLogic(shape) with InHandler with OutHandler {
- @@ -81,7 +81,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
- }
- final class Idle[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
- - override def initialAttributes = DefaultAttributes.idle
- + override def initialAttributes: Attributes = DefaultAttributes.idle
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new TimerGraphStageLogic(shape) with InHandler with OutHandler {
- @@ -108,7 +108,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
- }
- final class BackpressureTimeout[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
- - override def initialAttributes = DefaultAttributes.backpressureTimeout
- + override def initialAttributes: Attributes = DefaultAttributes.backpressureTimeout
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new TimerGraphStageLogic(shape) with InHandler with OutHandler {
- @@ -140,13 +140,13 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
- }
- final class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] {
- - val in1 = Inlet[I]("in1")
- - val in2 = Inlet[O]("in2")
- - val out1 = Outlet[I]("out1")
- - val out2 = Outlet[O]("out2")
- - val shape = BidiShape(in1, out1, in2, out2)
- + val in1: Inlet[I] = Inlet[I]("in1")
- + val in2: Inlet[O] = Inlet[O]("in2")
- + val out1: Outlet[I] = Outlet[I]("out1")
- + val out2: Outlet[O] = Outlet[O]("out2")
- + val shape: BidiShape[I, I, O, O] = BidiShape(in1, out1, in2, out2)
- - override def initialAttributes = DefaultAttributes.idleTimeoutBidi
- + override def initialAttributes: Attributes = DefaultAttributes.idleTimeoutBidi
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
- private var nextDeadline: Long = System.nanoTime + timeout.toNanos
- @@ -178,7 +178,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
- }
- final class DelayInitial[T](val delay: FiniteDuration) extends SimpleLinearGraphStage[T] {
- - override def initialAttributes = DefaultAttributes.delayInitial
- + override def initialAttributes: Attributes = DefaultAttributes.delayInitial
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new TimerGraphStageLogic(shape) with InHandler with OutHandler {
- @@ -209,7 +209,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
- val in: Inlet[I] = Inlet("IdleInject.in")
- val out: Outlet[O] = Outlet("IdleInject.out")
- - override def initialAttributes = DefaultAttributes.idleInject
- + override def initialAttributes: Attributes = DefaultAttributes.idleInject
- override val shape: FlowShape[I, O] = FlowShape(in, out)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala
- index d950d58e38..b0cf33d074 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala
- @@ -45,11 +45,11 @@ import akka.annotation.InternalApi
- */
- @InternalApi private[akka] trait DefaultInputTransferStates extends Inputs {
- override val NeedsInput: TransferState = new TransferState {
- - def isReady = inputsAvailable
- - def isCompleted = inputsDepleted
- + def isReady: Boolean = inputsAvailable
- + def isCompleted: Boolean = inputsDepleted
- }
- override val NeedsInputOrComplete: TransferState = new TransferState {
- - def isReady = inputsAvailable || inputsDepleted
- + def isReady: Boolean = inputsAvailable || inputsDepleted
- def isCompleted = false
- }
- }
- @@ -81,11 +81,11 @@ import akka.annotation.InternalApi
- */
- @InternalApi private[akka] trait DefaultOutputTransferStates extends Outputs {
- override val NeedsDemand: TransferState = new TransferState {
- - def isReady = demandAvailable
- - def isCompleted = isClosed
- + def isReady: Boolean = demandAvailable
- + def isCompleted: Boolean = isClosed
- }
- override def NeedsDemandOrCancel: TransferState = new TransferState {
- - def isReady = demandAvailable || isClosed
- + def isReady: Boolean = demandAvailable || isClosed
- def isCompleted = false
- }
- }
- @@ -97,7 +97,7 @@ import akka.annotation.InternalApi
- @InternalApi private[akka] trait TransferState {
- def isReady: Boolean
- def isCompleted: Boolean
- - def isExecutable = isReady && !isCompleted
- + def isExecutable: Boolean = isReady && !isCompleted
- def ||(other: TransferState): TransferState = new TransferState {
- def isReady: Boolean = TransferState.this.isReady || other.isReady
- @@ -189,7 +189,7 @@ import akka.annotation.InternalApi
- final def isPumpFinished: Boolean = transferState.isCompleted
- - protected final val completedPhase = TransferPhase(Completed) {
- + protected final val completedPhase: TransferPhase = TransferPhase(Completed) {
- () ⇒ throw new IllegalStateException("The action of completed phase must be never executed")
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala
- index a9f99c9f6c..ce0fcd789d 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala
- @@ -1076,7 +1076,7 @@ import scala.collection.immutable.Map.Map1
- * See comments in akka.stream.impl.package for more details.
- */
- @InternalApi private[akka] final class BuilderKey extends TraversalBuildStep {
- - override def toString = s"K:$hashCode"
- + override def toString: String = s"K:$hashCode"
- }
- /**
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala
- index 1ccf76d98a..4c4414b36a 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala
- @@ -46,7 +46,7 @@ import scala.util.{ Failure, Success, Try }
- private[this] var state = s
- private[this] var asyncHandler: Function1[Try[Option[(S, E)]], Unit] = _
- - override def preStart() = {
- + override def preStart(): Unit = {
- val ac = getAsyncCallback[Try[Option[(S, E)]]] {
- case Failure(ex) ⇒ fail(out, ex)
- case Success(None) ⇒ complete(out)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala
- index 296e5a3fa2..17d331247d 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala
- @@ -11,6 +11,7 @@ import akka.stream.stage._
- import scala.annotation.tailrec
- import scala.util.control.NonFatal
- +import akka.stream.Supervision.Decider
- /**
- * INTERNAL API
- @@ -19,12 +20,12 @@ import scala.util.control.NonFatal
- create: () ⇒ S,
- readData: (S) ⇒ Option[T],
- close: (S) ⇒ Unit) extends GraphStage[SourceShape[T]] {
- - val out = Outlet[T]("UnfoldResourceSource.out")
- - override val shape = SourceShape(out)
- + val out: Outlet[T] = Outlet[T]("UnfoldResourceSource.out")
- + override val shape: SourceShape[T] = SourceShape(out)
- override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource
- - def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
- - lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- + def createLogic(inheritedAttributes: Attributes): GraphStageLogic with OutHandler = new GraphStageLogic(shape) with OutHandler {
- + lazy val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- var open = false
- var blockingStream: S = _
- setHandler(out, this)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala
- index dfc8f0e03a..03aab78cc9 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala
- @@ -13,6 +13,7 @@ import akka.stream.stage._
- import scala.concurrent.Future
- import scala.util.{ Failure, Success, Try }
- import scala.util.control.NonFatal
- +import akka.stream.Supervision.Decider
- /**
- * INTERNAL API
- @@ -21,12 +22,12 @@ import scala.util.control.NonFatal
- create: () ⇒ Future[S],
- readData: (S) ⇒ Future[Option[T]],
- close: (S) ⇒ Future[Done]) extends GraphStage[SourceShape[T]] {
- - val out = Outlet[T]("UnfoldResourceSourceAsync.out")
- - override val shape = SourceShape(out)
- + val out: Outlet[T] = Outlet[T]("UnfoldResourceSourceAsync.out")
- + override val shape: SourceShape[T] = SourceShape(out)
- override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync
- - def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
- - lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- + def createLogic(inheritedAttributes: Attributes): GraphStageLogic with OutHandler = new GraphStageLogic(shape) with OutHandler {
- + lazy val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- private implicit def ec = ActorMaterializerHelper.downcast(materializer).system.dispatcher
- private var state: Option[S] = None
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala
- index f845ab9371..53b66ed68c 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala
- @@ -23,6 +23,7 @@ import scala.annotation.tailrec
- import scala.collection.immutable
- import scala.concurrent.Promise
- import scala.util.control.NonFatal
- +import akka.event.LoggingAdapter
- /**
- * INTERNAL API
- @@ -331,7 +332,7 @@ import scala.util.control.NonFatal
- val in: Inlet[Any] = Inlet[Any]("UpstreamBoundary:" + internalPortName)
- in.id = 0
- - val publisher = new OutputBoundaryPublisher(this, internalPortName)
- + val publisher: OutputBoundaryPublisher = new OutputBoundaryPublisher(this, internalPortName)
- @volatile private var actor: ActorRef = null
- def setActor(actor: ActorRef): Unit = this.actor = actor
- @@ -401,7 +402,7 @@ import scala.util.control.NonFatal
- val subscription = new Subscription {
- override def request(elements: Long): Unit = actor ! RequestMore(ActorOutputBoundary.this, elements)
- override def cancel(): Unit = actor ! Cancel(ActorOutputBoundary.this)
- - override def toString = s"BoundarySubscription[$actor, $internalPortName]"
- + override def toString: String = s"BoundarySubscription[$actor, $internalPortName]"
- }
- tryOnSubscribe(subscriber, subscription)
- @@ -448,7 +449,7 @@ import scala.util.control.NonFatal
- import ActorGraphInterpreter._
- private var self: ActorRef = _
- - lazy val log = Logging(mat.system.eventStream, self)
- + lazy val log: LoggingAdapter = Logging(mat.system.eventStream, self)
- /**
- * @param promise Will be completed upon processing the event, or failed if processing the event throws
- @@ -525,7 +526,7 @@ import scala.util.control.NonFatal
- * because no data can enter “fast enough” from the outside
- */
- // TODO: Fix event limit heuristic
- - val shellEventLimit = attributes.mandatoryAttribute[Attributes.InputBuffer].max * 16
- + val shellEventLimit: Int = attributes.mandatoryAttribute[Attributes.InputBuffer].max * 16
- // Limits the number of events processed by the interpreter on an abort event.
- // TODO: Better heuristic here
- private val abortLimit = shellEventLimit * 2
- @@ -571,7 +572,7 @@ import scala.util.control.NonFatal
- private var waitingForShutdown: Boolean = false
- - val resume = ResumeShell(this)
- + val resume: GraphInterpreterShell.this.ResumeShell = ResumeShell(this)
- def sendResume(sendResume: Boolean): Unit = {
- resumeScheduled = true
- @@ -667,9 +668,9 @@ import scala.util.control.NonFatal
- @InternalApi private[akka] final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor with ActorLogging {
- import ActorGraphInterpreter._
- - var activeInterpreters = Set.empty[GraphInterpreterShell]
- + var activeInterpreters: Set[GraphInterpreterShell] = Set.empty[GraphInterpreterShell]
- var newShells: List[GraphInterpreterShell] = Nil
- - val subFusingMaterializerImpl = new SubFusingActorMaterializerImpl(_initial.mat, registerShell)
- + val subFusingMaterializerImpl: SubFusingActorMaterializerImpl = new SubFusingActorMaterializerImpl(_initial.mat, registerShell)
- def tryInit(shell: GraphInterpreterShell): Boolean =
- try {
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala
- index d5fbae163f..b033695680 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala
- @@ -84,7 +84,7 @@ import scala.util.control.NonFatal
- var portState: Int = InReady
- var slot: Any = Empty
- - override def toString =
- + override def toString: String =
- if (GraphInterpreter.Debug) s"Connection($id, $inOwner, $outOwner, $inHandler, $outHandler, $portState, $slot)"
- else s"Connection($id, $portState, $slot, $inHandler, $outHandler)"
- }
- @@ -95,7 +95,7 @@ import scala.util.control.NonFatal
- * when this accidentally leaks onto threads that are not stopped when this
- * class should be unloaded.
- */
- - override def initialValue = new Array(1)
- + override def initialValue: Array[AnyRef] = new Array(1)
- }
- /**
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala
- index 92b5197a61..4d060574d7 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala
- @@ -51,13 +51,13 @@ import scala.util.control.NonFatal
- * INTERNAL API
- */
- @InternalApi private[akka] abstract class SimpleLinearGraphStage[T] extends GraphStage[FlowShape[T, T]] {
- - val in = Inlet[T](Logging.simpleName(this) + ".in")
- - val out = Outlet[T](Logging.simpleName(this) + ".out")
- - override val shape = FlowShape(in, out)
- + val in: Inlet[T] = Inlet[T](Logging.simpleName(this) + ".in")
- + val out: Outlet[T] = Outlet[T](Logging.simpleName(this) + ".out")
- + override val shape: FlowShape[T, T] = FlowShape(in, out)
- }
- private object Identity extends SimpleLinearGraphStage[Any] {
- - override def initialAttributes = DefaultAttributes.identityOp
- + override def initialAttributes: Attributes = DefaultAttributes.identityOp
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
- def onPush(): Unit = push(out, grab(in))
- @@ -70,13 +70,13 @@ import scala.util.control.NonFatal
- override def toString = "Identity"
- }
- - def identity[T] = Identity.asInstanceOf[SimpleLinearGraphStage[T]]
- + def identity[T]: SimpleLinearGraphStage[T] = Identity.asInstanceOf[SimpleLinearGraphStage[T]]
- /**
- * INTERNAL API
- */
- @InternalApi private[akka] final class Detacher[T] extends SimpleLinearGraphStage[T] {
- - override def initialAttributes = DefaultAttributes.detacher
- + override def initialAttributes: Attributes = DefaultAttributes.detacher
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
- @@ -111,9 +111,9 @@ import scala.util.control.NonFatal
- def detacher[T]: GraphStage[FlowShape[T, T]] = _detacher.asInstanceOf[GraphStage[FlowShape[T, T]]]
- private object TerminationWatcher extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Done]] {
- - val in = Inlet[Any]("terminationWatcher.in")
- - val out = Outlet[Any]("terminationWatcher.out")
- - override val shape = FlowShape(in, out)
- + val in: Inlet[Any] = Inlet[Any]("terminationWatcher.in")
- + val out: Outlet[Any] = Outlet[Any]("terminationWatcher.out")
- + override val shape: FlowShape[Any, Any] = FlowShape(in, out)
- override def initialAttributes: Attributes = DefaultAttributes.terminationWatcher
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
- @@ -154,16 +154,16 @@ import scala.util.control.NonFatal
- TerminationWatcher.asInstanceOf[GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]]]
- private class FlowMonitorImpl[T] extends AtomicReference[Any](Initialized) with FlowMonitor[T] {
- - override def state = get match {
- + override def state: StreamState[T] = get match {
- case s: StreamState[_] ⇒ s.asInstanceOf[StreamState[T]]
- case msg ⇒ Received(msg.asInstanceOf[T])
- }
- }
- private class MonitorFlow[T] extends GraphStageWithMaterializedValue[FlowShape[T, T], FlowMonitor[T]] {
- - val in = Inlet[T]("FlowMonitor.in")
- - val out = Outlet[T]("FlowMonitor.out")
- - val shape = FlowShape.of(in, out)
- + val in: Inlet[T] = Inlet[T]("FlowMonitor.in")
- + val out: Outlet[T] = Outlet[T]("FlowMonitor.out")
- + val shape: FlowShape[T, T] = FlowShape.of(in, out)
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, FlowMonitor[T]) = {
- val monitor: FlowMonitorImpl[T] = new FlowMonitorImpl[T]
- @@ -232,16 +232,16 @@ import scala.util.control.NonFatal
- final class TickSource[T](val initialDelay: FiniteDuration, val interval: FiniteDuration, val tick: T)
- extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] {
- - override val shape = SourceShape(Outlet[T]("TickSource.out"))
- - val out = shape.out
- + override val shape: SourceShape[T] = SourceShape(Outlet[T]("TickSource.out"))
- + val out: Outlet[T] = shape.out
- override def initialAttributes: Attributes = DefaultAttributes.tickSource
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Cancellable) = {
- val logic = new TimerGraphStageLogic(shape) with Cancellable {
- - val cancelled = new AtomicBoolean(false)
- + val cancelled: AtomicBoolean = new AtomicBoolean(false)
- val cancelCallback: AtomicReference[Option[AsyncCallback[Unit]]] = new AtomicReference(None)
- - override def preStart() = {
- + override def preStart(): Unit = {
- cancelCallback.set(Some(getAsyncCallback[Unit](_ ⇒ completeStage())))
- if (cancelled.get)
- completeStage()
- @@ -251,16 +251,16 @@ import scala.util.control.NonFatal
- setHandler(out, eagerTerminateOutput)
- - override protected def onTimer(timerKey: Any) =
- + override protected def onTimer(timerKey: Any): Unit =
- if (isAvailable(out) && !isCancelled) push(out, tick)
- - override def cancel() = {
- + override def cancel(): Boolean = {
- val success = !cancelled.getAndSet(true)
- if (success) cancelCallback.get.foreach(_.invoke(()))
- success
- }
- - override def isCancelled = cancelled.get
- + override def isCancelled: Boolean = cancelled.get
- override def toString: String = "TickSourceLogic"
- }
- @@ -274,9 +274,9 @@ import scala.util.control.NonFatal
- final class SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] {
- override def initialAttributes: Attributes = DefaultAttributes.singleSource
- ReactiveStreamsCompliance.requireNonNullElement(elem)
- - val out = Outlet[T]("single.out")
- - val shape = SourceShape(out)
- - def createLogic(attr: Attributes) =
- + val out: Outlet[T] = Outlet[T]("single.out")
- + val shape: SourceShape[T] = SourceShape(out)
- + def createLogic(attr: Attributes): GraphStageLogic with OutHandler =
- new GraphStageLogic(shape) with OutHandler {
- def onPull(): Unit = {
- push(out, elem)
- @@ -294,9 +294,9 @@ import scala.util.control.NonFatal
- ReactiveStreamsCompliance.requireNonNullElement(futureSource)
- val out: Outlet[T] = Outlet("FutureFlattenSource.out")
- - override val shape = SourceShape(out)
- + override val shape: SourceShape[T] = SourceShape(out)
- - override def initialAttributes = DefaultAttributes.futureFlattenSource
- + override def initialAttributes: Attributes = DefaultAttributes.futureFlattenSource
- override def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, Future[M]) = {
- val materialized = Promise[M]()
- @@ -373,10 +373,10 @@ import scala.util.control.NonFatal
- final class FutureSource[T](val future: Future[T]) extends GraphStage[SourceShape[T]] {
- ReactiveStreamsCompliance.requireNonNullElement(future)
- - val shape = SourceShape(Outlet[T]("FutureSource.out"))
- - val out = shape.out
- + val shape: SourceShape[T] = SourceShape(Outlet[T]("FutureSource.out"))
- + val out: Outlet[T] = shape.out
- override def initialAttributes: Attributes = DefaultAttributes.futureSource
- - override def createLogic(attr: Attributes) =
- + override def createLogic(attr: Attributes): GraphStageLogic with OutHandler =
- new GraphStageLogic(shape) with OutHandler {
- def onPull(): Unit = {
- if (future.isCompleted) {
- @@ -408,10 +408,10 @@ import scala.util.control.NonFatal
- */
- @InternalApi private[akka] object IgnoreSink extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] {
- - val in = Inlet[Any]("Ignore.in")
- - val shape = SinkShape(in)
- + val in: Inlet[Any] = Inlet[Any]("Ignore.in")
- + val shape: SinkShape[Any] = SinkShape(in)
- - override def initialAttributes = DefaultAttributes.ignoreSink
- + override def initialAttributes: Attributes = DefaultAttributes.ignoreSink
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
- val promise = Promise[Done]()
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
- index d11a2c4812..59717aca47 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
- @@ -27,14 +27,15 @@ import akka.stream.ActorAttributes.SupervisionStrategy
- import scala.concurrent.duration.{ FiniteDuration, _ }
- import akka.stream.impl.Stages.DefaultAttributes
- import akka.util.OptionVal
- +import akka.stream.Supervision.Decider
- /**
- * INTERNAL API
- */
- @InternalApi private[akka] final case class Map[In, Out](f: In ⇒ Out) extends GraphStage[FlowShape[In, Out]] {
- - val in = Inlet[In]("Map.in")
- - val out = Outlet[Out]("Map.out")
- - override val shape = FlowShape(in, out)
- + val in: Inlet[In] = Inlet[In]("Map.in")
- + val out: Outlet[Out] = Outlet[Out]("Map.out")
- + override val shape: FlowShape[In, Out] = FlowShape(in, out)
- override def initialAttributes: Attributes = DefaultAttributes.map
- @@ -70,7 +71,7 @@ import akka.util.OptionVal
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new GraphStageLogic(shape) with OutHandler with InHandler {
- - def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- + def decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- override def onPush(): Unit = {
- try {
- @@ -106,7 +107,7 @@ import akka.util.OptionVal
- new GraphStageLogic(shape) with OutHandler with InHandler {
- override def toString = "TakeWhileLogic"
- - def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- + def decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- override def onPush(): Unit = {
- try {
- @@ -137,7 +138,7 @@ import akka.util.OptionVal
- @InternalApi private[akka] final case class DropWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
- override def initialAttributes: Attributes = DefaultAttributes.dropWhile
- - def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
- + def createLogic(inheritedAttributes: Attributes): SupervisedGraphStageLogic with InHandler with OutHandler = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
- override def onPush(): Unit = {
- val elem = grab(in)
- withSupervision(() ⇒ p(elem)) match {
- @@ -149,8 +150,8 @@ import akka.util.OptionVal
- }
- }
- - def rest = new InHandler {
- - def onPush() = push(out, grab(in))
- + def rest: InHandler = new InHandler {
- + def onPush(): Unit = push(out, grab(in))
- }
- override def onResume(t: Throwable): Unit = if (!hasBeenPulled(in)) pull(in)
- @@ -200,17 +201,17 @@ private[stream] object Collect {
- * INTERNAL API
- */
- @InternalApi private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] {
- - val in = Inlet[In]("Collect.in")
- - val out = Outlet[Out]("Collect.out")
- - override val shape = FlowShape(in, out)
- + val in: Inlet[In] = Inlet[In]("Collect.in")
- + val out: Outlet[Out] = Outlet[Out]("Collect.out")
- + override val shape: FlowShape[In, Out] = FlowShape(in, out)
- override def initialAttributes: Attributes = DefaultAttributes.collect
- - def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
- + def createLogic(inheritedAttributes: Attributes): SupervisedGraphStageLogic with InHandler with OutHandler = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
- import Collect.NotApplied
- - val wrappedPf = () ⇒ pf.applyOrElse(grab(in), NotApplied)
- + val wrappedPf: () ⇒ Any = () ⇒ pf.applyOrElse(grab(in), NotApplied)
- override def onPush(): Unit = withSupervision(wrappedPf) match {
- case Some(result) ⇒ result match {
- @@ -282,7 +283,7 @@ private[stream] object Collect {
- * would log the `t2` error.
- */
- @InternalApi private[akka] final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] {
- - override def createLogic(attr: Attributes) =
- + override def createLogic(attr: Attributes): GraphStageLogic with InHandler with OutHandler =
- new GraphStageLogic(shape) with InHandler with OutHandler {
- override def onPush(): Unit = push(out, grab(in))
- @@ -352,7 +353,7 @@ private[stream] object Collect {
- * INTERNAL API
- */
- @InternalApi private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] {
- - override val shape = FlowShape[In, Out](Inlet("Scan.in"), Outlet("Scan.out"))
- + override val shape: FlowShape[In, Out] = FlowShape[In, Out](Inlet("Scan.in"), Outlet("Scan.out"))
- override def initialAttributes: Attributes = DefaultAttributes.scan
- @@ -413,8 +414,8 @@ private[stream] object Collect {
- import akka.dispatch.ExecutionContexts
- - val in = Inlet[In]("ScanAsync.in")
- - val out = Outlet[Out]("ScanAsync.out")
- + val in: Inlet[In] = Inlet[In]("ScanAsync.in")
- + val out: Outlet[Out] = Outlet[Out]("ScanAsync.out")
- override val shape: FlowShape[In, Out] = FlowShape[In, Out](in, out)
- override val initialAttributes: Attributes = Attributes.name("scanAsync")
- @@ -529,13 +530,13 @@ private[stream] object Collect {
- */
- @InternalApi private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] {
- - val in = Inlet[In]("Fold.in")
- - val out = Outlet[Out]("Fold.out")
- + val in: Inlet[In] = Inlet[In]("Fold.in")
- + val out: Outlet[Out] = Outlet[Out]("Fold.out")
- override val shape: FlowShape[In, Out] = FlowShape(in, out)
- override def toString: String = "Fold"
- - override val initialAttributes = DefaultAttributes.fold
- + override val initialAttributes: Attributes = DefaultAttributes.fold
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new GraphStageLogic(shape) with InHandler with OutHandler {
- @@ -586,17 +587,17 @@ private[stream] object Collect {
- import akka.dispatch.ExecutionContexts
- - val in = Inlet[In]("FoldAsync.in")
- - val out = Outlet[Out]("FoldAsync.out")
- - val shape = FlowShape.of(in, out)
- + val in: Inlet[In] = Inlet[In]("FoldAsync.in")
- + val out: Outlet[Out] = Outlet[Out]("FoldAsync.out")
- + val shape: FlowShape[In, Out] = FlowShape.of(in, out)
- override def toString: String = "FoldAsync"
- - override val initialAttributes = DefaultAttributes.foldAsync
- + override val initialAttributes: Attributes = DefaultAttributes.foldAsync
- def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new GraphStageLogic(shape) with InHandler with OutHandler {
- - val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- + val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- private var aggregator: Out = zero
- private var aggregating: Future[Out] = Future.successful(aggregator)
- @@ -669,7 +670,7 @@ private[stream] object Collect {
- setHandlers(in, out, this)
- - override def toString =
- + override def toString: String =
- s"FoldAsync.Logic(completed=${aggregating.isCompleted})"
- }
- }
- @@ -683,7 +684,7 @@ private[stream] object Collect {
- if (end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get)
- override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
- - val startInHandler = new InHandler {
- + val startInHandler: InHandler = new InHandler {
- override def onPush(): Unit = {
- // if else (to avoid using Iterator[T].flatten in hot code)
- if (start.isDefined) emitMultiple(out, Iterator(start.get, grab(in)))
- @@ -697,7 +698,7 @@ private[stream] object Collect {
- }
- }
- - val restInHandler = new InHandler {
- + val restInHandler: InHandler = new InHandler {
- override def onPush(): Unit = emitMultiple(out, Iterator(inject, grab(in)))
- override def onUpstreamFinish(): Unit = {
- @@ -719,8 +720,8 @@ private[stream] object Collect {
- @InternalApi private[akka] final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
- require(n > 0, "n must be greater than 0")
- - val in = Inlet[T]("Grouped.in")
- - val out = Outlet[immutable.Seq[T]]("Grouped.out")
- + val in: Inlet[T] = Inlet[T]("Grouped.in")
- + val out: Outlet[immutable.Seq[T]] = Outlet[immutable.Seq[T]]("Grouped.out")
- override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
- override protected val initialAttributes: Attributes = DefaultAttributes.grouped
- @@ -731,7 +732,7 @@ private[stream] object Collect {
- b.sizeHint(n)
- b
- }
- - var left = n
- + var left: Int = n
- override def onPush(): Unit = {
- buf += grab(in)
- @@ -773,7 +774,7 @@ private[stream] object Collect {
- @InternalApi private[akka] final case class LimitWeighted[T](val n: Long, val costFn: T ⇒ Long) extends SimpleLinearGraphStage[T] {
- override def initialAttributes: Attributes = DefaultAttributes.limitWeighted
- - def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
- + def createLogic(inheritedAttributes: Attributes): SupervisedGraphStageLogic with InHandler with OutHandler = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
- private var left = n
- override def onPush(): Unit = {
- @@ -808,8 +809,8 @@ private[stream] object Collect {
- require(n > 0, "n must be greater than 0")
- require(step > 0, "step must be greater than 0")
- - val in = Inlet[T]("Sliding.in")
- - val out = Outlet[immutable.Seq[T]]("Sliding.out")
- + val in: Inlet[T] = Inlet[T]("Sliding.in")
- + val out: Outlet[immutable.Seq[T]] = Outlet[immutable.Seq[T]]("Sliding.out")
- override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
- override protected val initialAttributes: Attributes = DefaultAttributes.sliding
- @@ -938,14 +939,14 @@ private[stream] object Collect {
- @InternalApi private[akka] final case class Batch[In, Out](val max: Long, val costFn: In ⇒ Long, val seed: In ⇒ Out, val aggregate: (Out, In) ⇒ Out)
- extends GraphStage[FlowShape[In, Out]] {
- - val in = Inlet[In]("Batch.in")
- - val out = Outlet[Out]("Batch.out")
- + val in: Inlet[In] = Inlet[In]("Batch.in")
- + val out: Outlet[Out] = Outlet[Out]("Batch.out")
- override val shape: FlowShape[In, Out] = FlowShape.of(in, out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
- - lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- + lazy val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- private var agg: Out = null.asInstanceOf[Out]
- private var left: Long = max
- @@ -974,7 +975,7 @@ private[stream] object Collect {
- }
- }
- - override def preStart() = pull(in)
- + override def preStart(): Unit = pull(in)
- def onPush(): Unit = {
- val elem = grab(in)
- @@ -1061,11 +1062,11 @@ private[stream] object Collect {
- private val in = Inlet[In]("expand.in")
- private val out = Outlet[Out]("expand.out")
- - override def initialAttributes = DefaultAttributes.expand
- + override def initialAttributes: Attributes = DefaultAttributes.expand
- - override val shape = FlowShape(in, out)
- + override val shape: FlowShape[In, Out] = FlowShape(in, out)
- - override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
- + override def createLogic(attr: Attributes): GraphStageLogic with InHandler with OutHandler = new GraphStageLogic(shape) with InHandler with OutHandler {
- private var iterator: Iterator[Out] = Iterator.empty
- private var expanded = false
- @@ -1145,7 +1146,7 @@ private[stream] object Collect {
- }
- }
- - val NotYetThere = Failure(new Exception with NoStackTrace)
- + val NotYetThere: Failure[Nothing] = Failure(new Exception with NoStackTrace)
- }
- /**
- @@ -1159,14 +1160,14 @@ private[stream] object Collect {
- private val in = Inlet[In]("MapAsync.in")
- private val out = Outlet[Out]("MapAsync.out")
- - override def initialAttributes = DefaultAttributes.mapAsync
- + override def initialAttributes: Attributes = DefaultAttributes.mapAsync
- - override val shape = FlowShape(in, out)
- + override val shape: FlowShape[In, Out] = FlowShape(in, out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new GraphStageLogic(shape) with InHandler with OutHandler {
- - lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- + lazy val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- var buffer: BufferImpl[Holder[Out]] = _
- private val futureCB = getAsyncCallback[Holder[Out]](holder ⇒
- @@ -1255,15 +1256,15 @@ private[stream] object Collect {
- private val in = Inlet[In]("MapAsyncUnordered.in")
- private val out = Outlet[Out]("MapAsyncUnordered.out")
- - override def initialAttributes = DefaultAttributes.mapAsyncUnordered
- + override def initialAttributes: Attributes = DefaultAttributes.mapAsyncUnordered
- - override val shape = FlowShape(in, out)
- + override val shape: FlowShape[In, Out] = FlowShape(in, out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new GraphStageLogic(shape) with InHandler with OutHandler {
- - override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
- + override def toString: String = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
- - val decider =
- + val decider: Decider =
- inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- private var inFlight = 0
- @@ -1343,7 +1344,7 @@ private[stream] object Collect {
- private var logLevels: LogLevels = _
- private var log: LoggingAdapter = _
- - def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- + def decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- override def preStart(): Unit = {
- logLevels = inheritedAttributes.get[LogLevels](DefaultLogLevels)
- @@ -1417,7 +1418,7 @@ private[stream] object Collect {
- * Must be located here to be visible for implicit resolution, when [[Materializer]] is passed to [[Logging]]
- * More specific LogSource than `fromString`, which would add the ActorSystem name in addition to the supervision to the log source.
- */
- - final val fromMaterializer = new LogSource[Materializer] {
- + final val fromMaterializer: LogSource[Materializer] = new LogSource[Materializer] {
- // do not expose private context classes (of OneBoundedInterpreter)
- override def getClazz(t: Materializer): Class[_] = classOf[Materializer]
- @@ -1459,12 +1460,12 @@ private[stream] object Collect {
- require(maxWeight > 0, "maxWeight must be greater than 0")
- require(interval > Duration.Zero)
- - val in = Inlet[T]("in")
- - val out = Outlet[immutable.Seq[T]]("out")
- + val in: Inlet[T] = Inlet[T]("in")
- + val out: Outlet[immutable.Seq[T]] = Outlet[immutable.Seq[T]]("out")
- - override def initialAttributes = DefaultAttributes.groupedWeightedWithin
- + override def initialAttributes: Attributes = DefaultAttributes.groupedWeightedWithin
- - val shape = FlowShape(in, out)
- + val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
- @@ -1487,7 +1488,7 @@ private[stream] object Collect {
- private var totalWeight = 0L
- private var hasElements = false
- - override def preStart() = {
- + override def preStart(): Unit = {
- schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval)
- pull(in)
- }
- @@ -1575,7 +1576,7 @@ private[stream] object Collect {
- else tryCloseGroup()
- }
- - override protected def onTimer(timerKey: Any) = if (hasElements) {
- + override protected def onTimer(timerKey: Any): Unit = if (hasElements) {
- if (isAvailable(out)) emitGroup()
- else pushEagerly = true
- }
- @@ -1594,9 +1595,9 @@ private[stream] object Collect {
- override def initialAttributes: Attributes = DefaultAttributes.delay
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
- - val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max
- + val size: Int = inheritedAttributes.mandatoryAttribute[InputBuffer].max
- - val delayMillis = d.toMillis
- + val delayMillis: Long = d.toMillis
- var buffer: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp with upstream element
- @@ -1741,7 +1742,7 @@ private[stream] object Collect {
- push(out, grab(in))
- // change the in handler to avoid System.nanoTime call after timeout
- setHandler(in, new InHandler {
- - def onPush() = push(out, grab(in))
- + def onPush(): Unit = push(out, grab(in))
- })
- }
- }
- @@ -1763,7 +1764,7 @@ private[stream] object Collect {
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
- self ⇒
- - override def toString = s"Reduce.Logic(aggregator=$aggregator)"
- + override def toString: String = s"Reduce.Logic(aggregator=$aggregator)"
- var aggregator: T = _
- @@ -1823,22 +1824,22 @@ private[stream] object Collect {
- @InternalApi private[akka] final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
- - override def initialAttributes = DefaultAttributes.recoverWith
- + override def initialAttributes: Attributes = DefaultAttributes.recoverWith
- - override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
- + override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
- var attempt = 0
- setHandler(in, new InHandler {
- override def onPush(): Unit = push(out, grab(in))
- - override def onUpstreamFailure(ex: Throwable) = onFailure(ex)
- + override def onUpstreamFailure(ex: Throwable): Unit = onFailure(ex)
- })
- setHandler(out, new OutHandler {
- override def onPull(): Unit = pull(in)
- })
- - def onFailure(ex: Throwable) =
- + def onFailure(ex: Throwable): Unit =
- if ((maximumRetries < 0 || attempt < maximumRetries) && pf.isDefinedAt(ex)) {
- switchTo(pf(ex))
- attempt += 1
- @@ -1853,7 +1854,7 @@ private[stream] object Collect {
- override def onUpstreamFinish(): Unit = completeStage()
- - override def onUpstreamFailure(ex: Throwable) = onFailure(ex)
- + override def onUpstreamFailure(ex: Throwable): Unit = onFailure(ex)
- })
- val outHandler = new OutHandler {
- @@ -1875,18 +1876,18 @@ private[stream] object Collect {
- * INTERNAL API
- */
- @InternalApi private[akka] final class StatefulMapConcat[In, Out](val f: () ⇒ In ⇒ immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] {
- - val in = Inlet[In]("StatefulMapConcat.in")
- - val out = Outlet[Out]("StatefulMapConcat.out")
- - override val shape = FlowShape(in, out)
- + val in: Inlet[In] = Inlet[In]("StatefulMapConcat.in")
- + val out: Outlet[Out] = Outlet[Out]("StatefulMapConcat.out")
- + override val shape: FlowShape[In, Out] = FlowShape(in, out)
- override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat
- - def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
- - lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- + def createLogic(inheritedAttributes: Attributes): GraphStageLogic with InHandler with OutHandler = new GraphStageLogic(shape) with InHandler with OutHandler {
- + lazy val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- var currentIterator: Iterator[Out] = _
- - var plainFun = f()
- + var plainFun: In ⇒ immutable.Iterable[Out] = f()
- - def hasNext = if (currentIterator != null) currentIterator.hasNext else false
- + def hasNext: Boolean = if (currentIterator != null) currentIterator.hasNext else false
- setHandlers(in, out, this)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
- index 23e7a5d47e..8caf640e72 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
- @@ -24,6 +24,7 @@ import akka.stream.impl.CancellingSubscriber
- import akka.stream.impl.{ Buffer ⇒ BufferImpl }
- import scala.collection.JavaConverters._
- +import akka.stream.Supervision.Decider
- /**
- * INTERNAL API
- @@ -32,12 +33,12 @@ import scala.collection.JavaConverters._
- private val in = Inlet[Graph[SourceShape[T], M]]("flatten.in")
- private val out = Outlet[T]("flatten.out")
- - override def initialAttributes = DefaultAttributes.flattenMerge
- - override val shape = FlowShape(in, out)
- + override def initialAttributes: Attributes = DefaultAttributes.flattenMerge
- + override val shape: FlowShape[Graph[SourceShape[T], M], T] = FlowShape(in, out)
- - override def createLogic(enclosingAttributes: Attributes) = new GraphStageLogic(shape) {
- + override def createLogic(enclosingAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
- var sources = Set.empty[SubSinkInlet[T]]
- - def activeSources = sources.size
- + def activeSources: Int = sources.size
- var q: BufferImpl[SubSinkInlet[T]] = _
- @@ -66,7 +67,7 @@ import scala.collection.JavaConverters._
- }
- })
- - val outHandler = new OutHandler {
- + val outHandler: OutHandler = new OutHandler {
- // could be unavailable due to async input having been executed before this notification
- override def onPull(): Unit = if (q.nonEmpty && isAvailable(out)) pushOut()
- }
- @@ -112,7 +113,7 @@ import scala.collection.JavaConverters._
- val out: Outlet[(immutable.Seq[T], Source[T, NotUsed])] = Outlet("PrefixAndTail.out")
- override val shape: FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])] = FlowShape(in, out)
- - override def initialAttributes = DefaultAttributes.prefixAndTail
- + override def initialAttributes: Attributes = DefaultAttributes.prefixAndTail
- private final class PrefixAndTailLogic(_shape: Shape) extends TimerGraphStageLogic(_shape) with OutHandler with InHandler {
- @@ -219,11 +220,11 @@ import scala.collection.JavaConverters._
- val in: Inlet[T] = Inlet("GroupBy.in")
- val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out")
- override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)
- - override def initialAttributes = DefaultAttributes.groupBy
- + override def initialAttributes: Attributes = DefaultAttributes.groupBy
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with OutHandler with InHandler {
- parent ⇒
- - lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- + lazy val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]()
- private val closedSubstreams = new java.util.HashSet[Any]()
- private var timeout: FiniteDuration = _
- @@ -341,7 +342,7 @@ import scala.collection.JavaConverters._
- private class SubstreamSource(name: String, val key: K, var firstElement: T) extends SubSourceOutlet[T](name) with OutHandler {
- def firstPush(): Boolean = firstElement != null
- - def hasNextForSubSource = hasNextElement && nextElementKey == key
- + def hasNextForSubSource: Boolean = hasNextElement && nextElementKey == key
- private def completeSubStream(): Unit = {
- complete()
- activeSubstreamsMap.remove(key)
- @@ -445,7 +446,7 @@ import scala.collection.JavaConverters._
- }
- })
- - val initInHandler = new InHandler {
- + val initInHandler: InHandler = new InHandler {
- override def onPush(): Unit = {
- val handler = new SubstreamHandler
- val elem = grab(in)
- @@ -608,8 +609,8 @@ import scala.collection.JavaConverters._
- private val in = Inlet[T](s"SubSink($name).in")
- - override def initialAttributes = Attributes.name(s"SubSink($name)")
- - override val shape = SinkShape(in)
- + override def initialAttributes: Attributes = Attributes.name(s"SubSink($name)")
- + override val shape: SinkShape[T] = SinkShape(in)
- private val status = new AtomicReference[ /* State */ AnyRef](Uninitialized)
- @@ -633,7 +634,7 @@ import scala.collection.JavaConverters._
- throw new IllegalStateException(s"${newState.command} on subsink is illegal when ${cmd.command} is still pending")
- }
- - override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler {
- + override def createLogic(attr: Attributes): GraphStageLogic with InHandler = new GraphStageLogic(shape) with InHandler {
- setHandler(in, this)
- override def onPush(): Unit = externalCallback(ActorSubscriberMessage.OnNext(grab(in)))
- @@ -677,7 +678,7 @@ import scala.collection.JavaConverters._
- import SubSink._
- val out: Outlet[T] = Outlet(s"SubSource($name).out")
- - override def initialAttributes = Attributes.name(s"SubSource($name)")
- + override def initialAttributes: Attributes = Attributes.name(s"SubSource($name)")
- override val shape: SourceShape[T] = SourceShape(out)
- private val status = new AtomicReference[AnyRef]
- @@ -705,7 +706,7 @@ import scala.collection.JavaConverters._
- def timeout(d: FiniteDuration): Boolean =
- status.compareAndSet(null, ActorSubscriberMessage.OnError(new SubscriptionTimeoutException(s"Substream Source has not been materialized in $d")))
- - override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
- + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic with OutHandler = new GraphStageLogic(shape) with OutHandler {
- setHandler(out, this)
- @tailrec private def setCB(cb: AsyncCallback[ActorSubscriberMessage]): Unit = {
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala
- index d8c843ec99..edc461013e 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala
- @@ -20,8 +20,8 @@ import scala.util.control.{ NoStackTrace, NonFatal }
- private val bytesIn = Inlet[ByteString]("bytesIn")
- private val objOut = Outlet[T]("objOut")
- - override def initialAttributes = Attributes.name("ByteStringParser")
- - final override val shape = FlowShape(bytesIn, objOut)
- + override def initialAttributes: Attributes = Attributes.name("ByteStringParser")
- + final override val shape: FlowShape[ByteString, T] = FlowShape(bytesIn, objOut)
- class ParsingLogic extends GraphStageLogic(shape) with InHandler with OutHandler {
- private var buffer = ByteString.empty
- @@ -174,13 +174,13 @@ import scala.util.control.{ NoStackTrace, NonFatal }
- }
- object FinishedParser extends ParseStep[Nothing] {
- - override def parse(reader: ByteReader) =
- + override def parse(reader: ByteReader): Nothing =
- throw new IllegalStateException("no initial parser installed: you must use startWith(...)")
- }
- class ParsingException(msg: String, cause: Throwable) extends RuntimeException(msg, cause)
- - val NeedMoreData = new Exception with NoStackTrace
- + val NeedMoreData: Exception with NoStackTrace = new Exception with NoStackTrace
- class ByteReader(input: ByteString) {
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala
- index 2ef4f34dd2..a89e546e6e 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala
- @@ -19,7 +19,7 @@ import scala.util.{ Failure, Success }
- /** INTERNAL API */
- @InternalApi private[akka] object FileSubscriber {
- - def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, startPosition: Long, openOptions: Set[OpenOption]) = {
- + def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, startPosition: Long, openOptions: Set[OpenOption]): Props = {
- require(bufSize > 0, "buffer size must be > 0")
- require(startPosition >= 0, s"startPosition must be >= 0 (was $startPosition)")
- Props(classOf[FileSubscriber], f, completionPromise, bufSize, startPosition, openOptions).withDeploy(Deploy.local)
- @@ -31,7 +31,7 @@ import scala.util.{ Failure, Success }
- extends akka.stream.actor.ActorSubscriber
- with ActorLogging {
- - override protected val requestStrategy = WatermarkRequestStrategy(highWatermark = bufSize)
- + override protected val requestStrategy: WatermarkRequestStrategy = WatermarkRequestStrategy(highWatermark = bufSize)
- private var chan: FileChannel = _
- @@ -50,7 +50,7 @@ import scala.util.{ Failure, Success }
- cancel()
- }
- - def receive = {
- + def receive: PartialFunction[Any, Unit] = {
- case ActorSubscriberMessage.OnNext(bytes: ByteString) ⇒
- try {
- bytesWritten += chan.write(bytes.asByteBuffer)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala
- index 4eb5d366fe..d54e565277 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala
- @@ -15,6 +15,7 @@ import akka.util.ByteString
- import scala.collection.immutable
- import scala.concurrent.{ Future, Promise }
- +import org.reactivestreams.Subscriber
- /**
- * INTERNAL API
- @@ -26,7 +27,7 @@ import scala.concurrent.{ Future, Promise }
- override protected def label: String = s"FileSink($f, $options)"
- - override def create(context: MaterializationContext) = {
- + override def create(context: MaterializationContext): (Subscriber[ByteString], Future[IOResult]) = {
- val materializer = ActorMaterializerHelper.downcast(context.materializer)
- val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max
- @@ -53,7 +54,7 @@ import scala.concurrent.{ Future, Promise }
- @InternalApi private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, val attributes: Attributes, shape: SinkShape[ByteString], autoFlush: Boolean)
- extends SinkModule[ByteString, Future[IOResult]](shape) {
- - override def create(context: MaterializationContext) = {
- + override def create(context: MaterializationContext): (Subscriber[ByteString], Future[IOResult]) = {
- val materializer = ActorMaterializerHelper.downcast(context.materializer)
- val ioResultPromise = Promise[IOResult]()
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala
- index 90958ae5d2..ed9994b024 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala
- @@ -29,7 +29,7 @@ import scala.util.{ Failure, Success, Try }
- private[akka] object FileSource {
- - val completionHandler = new CompletionHandler[Integer, Try[Int] ⇒ Unit] {
- + val completionHandler: Object with CompletionHandler[Integer, Try[Int] ⇒ Unit] = new CompletionHandler[Integer, Try[Int] ⇒ Unit] {
- override def completed(result: Integer, attachment: Try[Int] ⇒ Unit): Unit = {
- attachment(Success(result))
- @@ -48,19 +48,19 @@ private[akka] object FileSource {
- private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition: Long)
- extends GraphStageWithMaterializedValue[SourceShape[ByteString], Future[IOResult]] {
- require(chunkSize > 0, "chunkSize must be greater than 0")
- - val out = Outlet[ByteString]("FileSource.out")
- + val out: Outlet[ByteString] = Outlet[ByteString]("FileSource.out")
- - override val shape = SourceShape(out)
- + override val shape: SourceShape[ByteString] = SourceShape(out)
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = {
- val ioResultPromise = Promise[IOResult]()
- val logic = new GraphStageLogic(shape) with OutHandler {
- handler ⇒
- - val buffer = ByteBuffer.allocate(chunkSize)
- - val maxReadAhead = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
- + val buffer: ByteBuffer = ByteBuffer.allocate(chunkSize)
- + val maxReadAhead: Int = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
- var channel: FileChannel = _
- - var position = startPosition
- + var position: Long = startPosition
- var chunkCallback: Try[Int] ⇒ Unit = _
- var eofEncountered = false
- var availableChunks: Vector[ByteString] = Vector.empty[ByteString]
- @@ -137,7 +137,7 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition:
- (logic, ioResultPromise.future)
- }
- - override def toString = s"FileSource($path, $chunkSize)"
- + override def toString: String = s"FileSource($path, $chunkSize)"
- }
- /**
- @@ -146,7 +146,7 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition:
- */
- @InternalApi private[akka] final class InputStreamSource(createInputStream: () ⇒ InputStream, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString])
- extends SourceModule[ByteString, Future[IOResult]](shape) {
- - override def create(context: MaterializationContext) = {
- + override def create(context: MaterializationContext): (Publisher[ByteString], Future[IOResult]) = {
- val materializer = ActorMaterializerHelper.downcast(context.materializer)
- val ioResultPromise = Promise[IOResult]()
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala
- index bf4e55da97..18da203600 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala
- @@ -36,10 +36,10 @@ import scala.util.{ Failure, Success }
- import InputStreamPublisher._
- - val arr = new Array[Byte](chunkSize)
- + val arr: Array[Byte] = new Array[Byte](chunkSize)
- var readBytesTotal = 0L
- - def receive = {
- + def receive: PartialFunction[Any, Unit] = {
- case ActorPublisherMessage.Request(elements) ⇒ readAndSignal()
- case Continue ⇒ readAndSignal()
- case ActorPublisherMessage.Cancel ⇒ context.stop(self)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala
- index c4c637e42c..3cea90d3e3 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala
- @@ -39,7 +39,7 @@ private[stream] object InputStreamSinkStage {
- */
- @InternalApi final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SinkShape[ByteString], InputStream] {
- - val in = Inlet[ByteString]("InputStreamSink.in")
- + val in: Inlet[ByteString] = Inlet[ByteString]("InputStreamSink.in")
- override def initialAttributes: Attributes = DefaultAttributes.inputStreamSink
- override val shape: SinkShape[ByteString] = SinkShape.of(in)
- @@ -65,7 +65,7 @@ private[stream] object InputStreamSinkStage {
- if (dataQueue.remainingCapacity() > 1 && !hasBeenPulled(in))
- pull(in)
- - override def preStart() = {
- + override def preStart(): Unit = {
- dataQueue.add(Initialized)
- pull(in)
- }
- @@ -114,7 +114,7 @@ private[stream] object InputStreamSinkStage {
- var isInitialized = false
- var isActive = true
- var isStageAlive = true
- - def subscriberClosedException = new IOException("Reactive stream is terminated, no reads are possible")
- + def subscriberClosedException: IOException = new IOException("Reactive stream is terminated, no reads are possible")
- var detachedChunk: Option[ByteString] = None
- @scala.throws(classOf[IOException])
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala
- index 65b71c2a4b..d53ec459a0 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala
- @@ -35,8 +35,8 @@ private[stream] object OutputStreamSourceStage {
- }
- final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SourceShape[ByteString], OutputStream] {
- - val out = Outlet[ByteString]("OutputStreamSource.out")
- - override def initialAttributes = DefaultAttributes.outputStreamSource
- + val out: Outlet[ByteString] = Outlet[ByteString]("OutputStreamSource.out")
- + override def initialAttributes: Attributes = DefaultAttributes.outputStreamSource
- override val shape: SourceShape[ByteString] = SourceShape.of(out)
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, OutputStream) = {
- @@ -159,7 +159,7 @@ private[akka] class OutputStreamAdapter(
- var isActive = true
- var isPublisherAlive = true
- - def publisherClosedException = new IOException("Reactive stream is terminated, no writes are possible")
- + def publisherClosedException: IOException = new IOException("Reactive stream is terminated, no writes are possible")
- @scala.throws(classOf[IOException])
- private[this] def send(sendAction: () ⇒ Unit): Unit = {
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala
- index 3943aa963d..4a8d5cb79b 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala
- @@ -17,7 +17,7 @@ import scala.util.{ Failure, Success }
- /** INTERNAL API */
- @InternalApi private[akka] object OutputStreamSubscriber {
- - def props(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean) = {
- + def props(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean): Props = {
- require(bufSize > 0, "buffer size must be > 0")
- Props(classOf[OutputStreamSubscriber], os, completionPromise, bufSize, autoFlush).withDeploy(Deploy.local)
- }
- @@ -29,11 +29,11 @@ import scala.util.{ Failure, Success }
- extends akka.stream.actor.ActorSubscriber
- with ActorLogging {
- - override protected val requestStrategy = WatermarkRequestStrategy(highWatermark = bufSize)
- + override protected val requestStrategy: WatermarkRequestStrategy = WatermarkRequestStrategy(highWatermark = bufSize)
- private var bytesWritten: Long = 0
- - def receive = {
- + def receive: PartialFunction[Any, Unit] = {
- case ActorSubscriberMessage.OnNext(bytes: ByteString) ⇒
- try {
- // blocking write
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala
- index 4ceac8c983..53c8060477 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala
- @@ -56,10 +56,10 @@ import scala.util.{ Failure, Success, Try }
- import TLSActor._
- - protected val outputBunch = new OutputBunch(outputCount = 2, self, this)
- + protected val outputBunch: OutputBunch = new OutputBunch(outputCount = 2, self, this)
- outputBunch.markAllOutputs()
- - protected val inputBunch = new InputBunch(inputCount = 2, maxInputBufferSize, this) {
- + protected val inputBunch: InputBunch = new InputBunch(inputCount = 2, maxInputBufferSize, this) {
- override def onError(input: Int, e: Throwable): Unit = fail(e)
- }
- @@ -152,13 +152,13 @@ import scala.util.{ Failure, Success, Try }
- // The engine could also be instantiated in ActorMaterializerImpl but if creation fails
- // during materialization it would be worse than failing later on.
- - val engine =
- + val engine: SSLEngine =
- try createSSLEngine(context.system) catch { case NonFatal(ex) ⇒ fail(ex, closeTransport = true); throw ex }
- engine.beginHandshake()
- lastHandshakeStatus = engine.getHandshakeStatus
- - var currentSession = engine.getSession
- + var currentSession: SSLSession = engine.getSession
- def setNewSessionParameters(params: NegotiateNewSession): Unit = {
- if (tracing) log.debug(s"applying $params")
- @@ -193,35 +193,35 @@ import scala.util.{ Failure, Success, Try }
- * representing the Engine.
- */
- - val engineNeedsWrap = new TransferState {
- - def isReady = lastHandshakeStatus == NEED_WRAP
- - def isCompleted = engine.isOutboundDone
- + val engineNeedsWrap: TransferState = new TransferState {
- + def isReady: Boolean = lastHandshakeStatus == NEED_WRAP
- + def isCompleted: Boolean = engine.isOutboundDone
- }
- - val engineInboundOpen = new TransferState {
- + val engineInboundOpen: TransferState = new TransferState {
- def isReady = true
- - def isCompleted = engine.isInboundDone
- + def isCompleted: Boolean = engine.isInboundDone
- }
- - val userHasData = new TransferState {
- - def isReady = !corkUser && userInChoppingBlock.isReady && lastHandshakeStatus != NEED_UNWRAP
- - def isCompleted = inputBunch.isCancelled(UserIn) || inputBunch.isDepleted(UserIn)
- + val userHasData: TransferState = new TransferState {
- + def isReady: Boolean = !corkUser && userInChoppingBlock.isReady && lastHandshakeStatus != NEED_UNWRAP
- + def isCompleted: Boolean = inputBunch.isCancelled(UserIn) || inputBunch.isDepleted(UserIn)
- }
- - val userOutCancelled = new TransferState {
- - def isReady = outputBunch.isCancelled(UserOut)
- - def isCompleted = engine.isInboundDone || outputBunch.isErrored(UserOut)
- + val userOutCancelled: TransferState = new TransferState {
- + def isReady: Boolean = outputBunch.isCancelled(UserOut)
- + def isCompleted: Boolean = engine.isInboundDone || outputBunch.isErrored(UserOut)
- }
- // bidirectional case
- - val outbound = (userHasData || engineNeedsWrap) && outputBunch.demandAvailableFor(TransportOut)
- - val inbound = (transportInChoppingBlock && outputBunch.demandAvailableFor(UserOut)) || userOutCancelled
- + val outbound: TransferState = (userHasData || engineNeedsWrap) && outputBunch.demandAvailableFor(TransportOut)
- + val inbound: TransferState = (transportInChoppingBlock && outputBunch.demandAvailableFor(UserOut)) || userOutCancelled
- // half-closed
- - val outboundHalfClosed = engineNeedsWrap && outputBunch.demandAvailableFor(TransportOut)
- - val inboundHalfClosed = transportInChoppingBlock && engineInboundOpen
- + val outboundHalfClosed: TransferState = engineNeedsWrap && outputBunch.demandAvailableFor(TransportOut)
- + val inboundHalfClosed: TransferState = transportInChoppingBlock && engineInboundOpen
- - val bidirectional = TransferPhase(outbound || inbound) { () ⇒
- + val bidirectional: TransferPhase = TransferPhase(outbound || inbound) { () ⇒
- if (tracing) log.debug("bidirectional")
- val continue = doInbound(isOutboundClosed = false, inbound)
- if (continue) {
- @@ -230,20 +230,20 @@ import scala.util.{ Failure, Success, Try }
- }
- }
- - val flushingOutbound = TransferPhase(outboundHalfClosed) { () ⇒
- + val flushingOutbound: TransferPhase = TransferPhase(outboundHalfClosed) { () ⇒
- if (tracing) log.debug("flushingOutbound")
- try doWrap()
- catch { case ex: SSLException ⇒ nextPhase(completedPhase) }
- }
- - val awaitingClose = TransferPhase(inputBunch.inputsAvailableFor(TransportIn) && engineInboundOpen) { () ⇒
- + val awaitingClose: TransferPhase = TransferPhase(inputBunch.inputsAvailableFor(TransportIn) && engineInboundOpen) { () ⇒
- if (tracing) log.debug("awaitingClose")
- transportInChoppingBlock.chopInto(transportInBuffer)
- try doUnwrap(ignoreOutput = true)
- catch { case ex: SSLException ⇒ nextPhase(completedPhase) }
- }
- - val outboundClosed = TransferPhase(outboundHalfClosed || inbound) { () ⇒
- + val outboundClosed: TransferPhase = TransferPhase(outboundHalfClosed || inbound) { () ⇒
- if (tracing) log.debug("outboundClosed")
- val continue = doInbound(isOutboundClosed = true, inbound)
- if (continue && outboundHalfClosed.isReady) {
- @@ -253,7 +253,7 @@ import scala.util.{ Failure, Success, Try }
- }
- }
- - val inboundClosed = TransferPhase(outbound || inboundHalfClosed) { () ⇒
- + val inboundClosed: TransferPhase = TransferPhase(outbound || inboundHalfClosed) { () ⇒
- if (tracing) log.debug("inboundClosed")
- val continue = doInbound(isOutboundClosed = false, inboundHalfClosed)
- if (continue) {
- @@ -422,7 +422,7 @@ import scala.util.{ Failure, Success, Try }
- }
- }
- - override def receive = inputBunch.subreceive.orElse[Any, Unit](outputBunch.subreceive)
- + override def receive: PartialFunction[Any, Unit] = inputBunch.subreceive.orElse[Any, Unit](outputBunch.subreceive)
- initialPhase(2, bidirectional)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala
- index 3822736478..ed1107bab4 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala
- @@ -42,7 +42,7 @@ import scala.concurrent.{ Future, Promise }
- import ConnectionSourceStage._
- val out: Outlet[StreamTcp.IncomingConnection] = Outlet("IncomingConnections.out")
- - override def initialAttributes = Attributes.name("ConnectionSource")
- + override def initialAttributes: Attributes = Attributes.name("ConnectionSource")
- val shape: SourceShape[StreamTcp.IncomingConnection] = SourceShape(out)
- // TODO: Timeout on bind
- @@ -52,9 +52,9 @@ import scala.concurrent.{ Future, Promise }
- val logic = new TimerGraphStageLogic(shape) {
- implicit def self: ActorRef = stageActor.ref
- - val connectionFlowsAwaitingInitialization = new AtomicLong()
- + val connectionFlowsAwaitingInitialization: AtomicLong = new AtomicLong()
- var listener: ActorRef = _
- - val unbindPromise = Promise[Unit]()
- + val unbindPromise: Promise[Unit] = Promise[Unit]()
- var unbindStarted = false
- override def preStart(): Unit = {
- @@ -305,7 +305,7 @@ private[stream] object ConnectionSourceStage {
- } else completeStage()
- }
- - val readHandler = new OutHandler {
- + val readHandler: OutHandler = new OutHandler {
- override def onPull(): Unit = {
- connection ! ResumeReading
- }
- @@ -373,7 +373,7 @@ private[stream] object ConnectionSourceStage {
- val bytesIn: Inlet[ByteString] = Inlet("IncomingTCP.in")
- val bytesOut: Outlet[ByteString] = Outlet("IncomingTCP.out")
- - override def initialAttributes = Attributes.name("IncomingConnection")
- + override def initialAttributes: Attributes = Attributes.name("IncomingConnection")
- val shape: FlowShape[ByteString, ByteString] = FlowShape(bytesIn, bytesOut)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
- @@ -383,7 +383,7 @@ private[stream] object ConnectionSourceStage {
- new TcpStreamLogic(shape, Inbound(connection, halfClose, ioSettings), remoteAddress)
- }
- - override def toString = s"TCP-from($remoteAddress)"
- + override def toString: String = s"TCP-from($remoteAddress)"
- }
- /**
- @@ -403,7 +403,7 @@ private[stream] object ConnectionSourceStage {
- val bytesIn: Inlet[ByteString] = Inlet("IncomingTCP.in")
- val bytesOut: Outlet[ByteString] = Outlet("IncomingTCP.out")
- - override def initialAttributes = Attributes.name("OutgoingConnection")
- + override def initialAttributes: Attributes = Attributes.name("OutgoingConnection")
- val shape: FlowShape[ByteString, ByteString] = FlowShape(bytesIn, bytesOut)
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[StreamTcp.OutgoingConnection]) = {
- @@ -425,7 +425,7 @@ private[stream] object ConnectionSourceStage {
- (logic, localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(ExecutionContexts.sameThreadExecutionContext))
- }
- - override def toString = s"TCP-to($remoteAddress)"
- + override def toString: String = s"TCP-to($remoteAddress)"
- }
- /** INTERNAL API */
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/CompressionUtils.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/CompressionUtils.scala
- index b992d8c499..1dd3f1a994 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/CompressionUtils.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/CompressionUtils.scala
- @@ -20,7 +20,7 @@ import akka.util.ByteString
- Flow.fromGraph {
- new SimpleLinearGraphStage[ByteString] {
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
- - val compressor = newCompressor()
- + val compressor: Compressor = newCompressor()
- override def onPush(): Unit = {
- val data = compressor.compressAndFlush(grab(in))
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala
- index cdd03265d1..d23432e002 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala
- @@ -14,7 +14,7 @@ import scala.annotation.tailrec
- @InternalApi private[akka] class DeflateCompressor(level: Int = Deflater.BEST_COMPRESSION, nowrap: Boolean = false) extends Compressor {
- import DeflateCompressor._
- - protected lazy val deflater = new Deflater(level, nowrap)
- + protected lazy val deflater: Deflater = new Deflater(level, nowrap)
- override final def compressAndFlush(input: ByteString): ByteString = {
- val buffer = newTempBuffer(input.size)
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala
- index 4404e2e577..5d28901bf2 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala
- @@ -12,7 +12,7 @@ import akka.stream.Attributes
- @InternalApi private[akka] class DeflateDecompressor(maxBytesPerChunk: Int)
- extends DeflateDecompressorBase(maxBytesPerChunk) {
- - override def createLogic(attr: Attributes) = new DecompressorParsingLogic {
- + override def createLogic(attr: Attributes): DeflateDecompressor.this.DecompressorParsingLogic = new DecompressorParsingLogic {
- override val inflater: Inflater = new Inflater()
- override case object inflating extends Inflate(noPostProcessing = true) {
- diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala
- index 9b968a8c48..bac849ac29 100644
- --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala
- +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala
- @@ -10,7 +10,7 @@ import akka.util.ByteString
- /** INTERNAL API */
- @InternalApi private[akka] class GzipCompressor(compressionLevel: Int = Deflater.BEST_COMPRESSION) extends DeflateCompressor(compressionLevel, true) {
- - override protected lazy val deflater = new Deflater(compressionLevel, true)
- + override protected lazy val deflater: Deflater = new Deflater(compressionLevel, true)
- private val checkSum = new CRC32 // CRC32 of uncompressed data
- private var headerSent = false
- private var bytesRead = 0L
- diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala
- index cff4f69c47..f4d38cf058 100644
- --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala
- +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala
- @@ -8,6 +8,7 @@ import akka.japi.function
- import akka.stream._
- import scala.concurrent.duration.FiniteDuration
- +import akka.stream.impl.TraversalBuilder
- object BidiFlow {
- @@ -96,8 +97,8 @@ object BidiFlow {
- }
- final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends Graph[BidiShape[I1, O1, I2, O2], Mat] {
- - override def traversalBuilder = delegate.traversalBuilder
- - override def shape = delegate.shape
- + override def traversalBuilder: TraversalBuilder = delegate.traversalBuilder
- + override def shape: BidiShape[I1, O1, I2, O2] = delegate.shape
- def asScala: scaladsl.BidiFlow[I1, O1, I2, O2, Mat] = delegate
- diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
- index 680acfbced..84554a67e8 100644
- --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
- +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
- @@ -17,6 +17,7 @@ import java.util.Comparator
- import java.util.concurrent.CompletionStage
- import scala.compat.java8.FutureConverters._
- +import akka.stream.impl.{ LinearTraversalBuilder, TraversalBuilder }
- object Flow {
- @@ -206,7 +207,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
- import scala.collection.JavaConverters._
- override def shape: FlowShape[In, Out] = delegate.shape
- - override def traversalBuilder = delegate.traversalBuilder
- + override def traversalBuilder: LinearTraversalBuilder = delegate.traversalBuilder
- override def toString: String = delegate.toString
- @@ -2355,8 +2356,8 @@ object RunnableGraph {
- /** INTERNAL API */
- private final class RunnableGraphAdapter[Mat](runnable: scaladsl.RunnableGraph[Mat]) extends RunnableGraph[Mat] {
- - override def shape = ClosedShape
- - override def traversalBuilder = runnable.traversalBuilder
- + override def shape: ClosedShape.type = ClosedShape
- + override def traversalBuilder: TraversalBuilder = runnable.traversalBuilder
- override def toString: String = runnable.toString
- diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Keep.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Keep.scala
- index 7eb2d8188b..cab463b92b 100644
- --- a/akka-stream/src/main/scala/akka/stream/javadsl/Keep.scala
- +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Keep.scala
- @@ -8,10 +8,10 @@ import akka.japi.function
- import akka.japi.Pair
- object Keep {
- - private val _left = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = l }
- - private val _right = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = r }
- - private val _both = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = new akka.japi.Pair(l, r) }
- - private val _none = new function.Function2[Any, Any, NotUsed] with ((Any, Any) ⇒ NotUsed) { def apply(l: Any, r: Any) = NotUsed }
- + private val _left = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any): Any = l }
- + private val _right = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any): Any = r }
- + private val _both = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any): Pair[Any, Any] = new akka.japi.Pair(l, r) }
- + private val _none = new function.Function2[Any, Any, NotUsed] with ((Any, Any) ⇒ NotUsed) { def apply(l: Any, r: Any): NotUsed.type = NotUsed }
- def left[L, R]: function.Function2[L, R, L] = _left.asInstanceOf[function.Function2[L, R, L]]
- def right[L, R]: function.Function2[L, R, R] = _right.asInstanceOf[function.Function2[L, R, R]]
- diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala
- index 46bf553dc3..96d9fab556 100644
- --- a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala
- +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala
- @@ -103,7 +103,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
- override def get(system: ActorSystem): Tcp = super.get(system)
- - def lookup() = Tcp
- + def lookup(): Tcp.type = Tcp
- def createExtension(system: ExtendedActorSystem): Tcp = new Tcp(system)
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala
- index 7d2435ad48..add8049673 100644
- --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala
- +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala
- @@ -10,7 +10,7 @@ import akka.stream.impl.io.compression._
- import akka.util.ByteString
- object Compression {
- - final val MaxBytesPerChunkDefault = 64 * 1024
- + final val MaxBytesPerChunkDefault: Int = 64 * 1024
- /**
- * Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor
- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala
- index dc8aeabc96..03c093a689 100644
- --- a/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala
- +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala
- @@ -75,7 +75,7 @@ private[stream] class CoupledTerminationBidi[I, O] extends GraphStage[BidiShape[
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
- - val handler1 = new InHandler with OutHandler {
- + val handler1: InHandler with OutHandler = new InHandler with OutHandler {
- override def onPush(): Unit = push(out1, grab(in1))
- override def onPull(): Unit = pull(in1)
- @@ -84,7 +84,7 @@ private[stream] class CoupledTerminationBidi[I, O] extends GraphStage[BidiShape[
- override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
- }
- - val handler2 = new InHandler with OutHandler {
- + val handler2: InHandler with OutHandler = new InHandler with OutHandler {
- override def onPush(): Unit = push(out2, grab(in2))
- override def onPull(): Unit = pull(in2)
- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
- index 47b749bd68..ec28cd4d86 100644
- --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
- +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
- @@ -510,7 +510,7 @@ object RunnableGraph {
- * Flow with attached input and output, can be executed.
- */
- final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] {
- - override def shape = ClosedShape
- + override def shape: ClosedShape.type = ClosedShape
- /**
- * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were.
- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala
- index 3e7141eb80..9063b4cd35 100644
- --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala
- +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala
- @@ -162,7 +162,7 @@ object Framing {
- }
- private class SimpleFramingProtocolEncoder(maximumMessageLength: Long) extends SimpleLinearGraphStage[ByteString] {
- - override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
- + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic with InHandler with OutHandler = new GraphStageLogic(shape) with InHandler with OutHandler {
- setHandlers(in, out, this)
- override def onPush(): Unit = {
- @@ -184,8 +184,8 @@ object Framing {
- private class DelimiterFramingStage(val separatorBytes: ByteString, val maximumLineBytes: Int, val allowTruncation: Boolean)
- extends GraphStage[FlowShape[ByteString, ByteString]] {
- - val in = Inlet[ByteString]("DelimiterFramingStage.in")
- - val out = Outlet[ByteString]("DelimiterFramingStage.out")
- + val in: Inlet[ByteString] = Inlet[ByteString]("DelimiterFramingStage.in")
- + val out: Outlet[ByteString] = Outlet[ByteString]("DelimiterFramingStage.out")
- override val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out)
- override def initialAttributes: Attributes = DefaultAttributes.delimiterFraming
- @@ -284,8 +284,8 @@ object Framing {
- case ByteOrder.LITTLE_ENDIAN ⇒ littleEndianDecoder
- }
- - val in = Inlet[ByteString]("LengthFieldFramingStage.in")
- - val out = Outlet[ByteString]("LengthFieldFramingStage.out")
- + val in: Inlet[ByteString] = Inlet[ByteString]("LengthFieldFramingStage.in")
- + val out: Outlet[ByteString] = Outlet[ByteString]("LengthFieldFramingStage.out")
- override val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
- @@ -343,7 +343,7 @@ object Framing {
- tryPushFrame()
- }
- - override def onPull() = tryPushFrame()
- + override def onPull(): Unit = tryPushFrame()
- override def onUpstreamFinish(): Unit = {
- if (buffer.isEmpty) {
- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
- index 82ffa83e3a..0e6374a276 100644
- --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
- +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
- @@ -80,7 +80,7 @@ final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends Gr
- val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Merge.in" + i))
- val out: Outlet[T] = Outlet[T]("Merge.out")
- - override def initialAttributes = DefaultAttributes.merge
- + override def initialAttributes: Attributes = DefaultAttributes.merge
- override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
- @@ -131,7 +131,7 @@ final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends Gr
- } else pendingQueue.enqueue(i)
- }
- - override def onUpstreamFinish() =
- + override def onUpstreamFinish(): Unit =
- if (eagerComplete) {
- var ix2 = 0
- while (ix2 < in.size) {
- @@ -165,7 +165,7 @@ object MergePreferred {
- override protected def construct(init: Init[T]): FanInShape[T] = new MergePreferredShape(secondaryPorts, init)
- override def deepCopy(): MergePreferredShape[T] = super.deepCopy().asInstanceOf[MergePreferredShape[T]]
- - val preferred = newInlet[T]("preferred")
- + val preferred: Inlet[T] = newInlet[T]("preferred")
- }
- /**
- @@ -195,7 +195,7 @@ object MergePreferred {
- final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
- require(secondaryPorts >= 1, "A MergePreferred must have 1 or more secondary input ports")
- - override def initialAttributes = DefaultAttributes.mergePreferred
- + override def initialAttributes: Attributes = DefaultAttributes.mergePreferred
- override val shape: MergePreferred.MergePreferredShape[T] =
- new MergePreferred.MergePreferredShape(secondaryPorts, "MergePreferred")
- @@ -204,7 +204,7 @@ final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolea
- def preferred: Inlet[T] = shape.preferred
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
- - var openInputs = secondaryPorts + 1
- + var openInputs: Int = secondaryPorts + 1
- def onComplete(): Unit = {
- openInputs -= 1
- if (eagerComplete || openInputs == 0) completeStage()
- @@ -217,7 +217,7 @@ final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolea
- setHandler(out, eagerTerminateOutput)
- - val pullMe = Array.tabulate(secondaryPorts)(i ⇒ {
- + val pullMe: Array[() ⇒ Unit] = Array.tabulate(secondaryPorts)(i ⇒ {
- val port = in(i)
- () ⇒ tryPull(port)
- })
- @@ -243,7 +243,7 @@ final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolea
- tryPull(preferred)
- }
- - val emitted = () ⇒ {
- + val emitted: () ⇒ Unit = () ⇒ {
- preferredEmitting -= 1
- if (isAvailable(preferred)) emitPreferred()
- else if (preferredEmitting == 0) emitSecondary()
- @@ -499,9 +499,9 @@ final class MergeSorted[T: Ordering] extends GraphStage[FanInShape2[T, T, T]] {
- private val right = Inlet[T]("right")
- private val out = Outlet[T]("out")
- - override val shape = new FanInShape2(left, right, out)
- + override val shape: FanInShape2[T, T, T] = new FanInShape2(left, right, out)
- - override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
- + override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
- import Ordering.Implicits._
- setHandler(left, ignoreTerminateInput)
- setHandler(right, ignoreTerminateInput)
- @@ -514,12 +514,12 @@ final class MergeSorted[T: Ordering] extends GraphStage[FanInShape2[T, T, T]] {
- if (l < r) { other = r; emit(out, l, readL) }
- else { other = l; emit(out, r, readR) }
- - val dispatchR = dispatch(other, _: T)
- - val dispatchL = dispatch(_: T, other)
- - val passR = () ⇒ emit(out, other, () ⇒ { nullOut(); passAlong(right, out, doPull = true) })
- - val passL = () ⇒ emit(out, other, () ⇒ { nullOut(); passAlong(left, out, doPull = true) })
- - val readR = () ⇒ read(right)(dispatchR, passL)
- - val readL = () ⇒ read(left)(dispatchL, passR)
- + val dispatchR: T ⇒ Unit = dispatch(other, _: T)
- + val dispatchL: T ⇒ Unit = dispatch(_: T, other)
- + val passR: () ⇒ Unit = () ⇒ emit(out, other, () ⇒ { nullOut(); passAlong(right, out, doPull = true) })
- + val passL: () ⇒ Unit = () ⇒ emit(out, other, () ⇒ { nullOut(); passAlong(left, out, doPull = true) })
- + val readR: () ⇒ Unit = () ⇒ read(right)(dispatchR, passL)
- + val readL: () ⇒ Unit = () ⇒ read(left)(dispatchL, passR)
- override def preStart(): Unit = {
- // all fan-in stages need to eagerly pull all inputs to get cycles started
- @@ -562,7 +562,7 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends
- require(outputPorts >= 1, "A Broadcast must have one or more output ports")
- val in: Inlet[T] = Inlet[T]("Broadcast.in")
- val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Broadcast.out" + i))
- - override def initialAttributes = DefaultAttributes.broadcast
- + override def initialAttributes: Attributes = DefaultAttributes.broadcast
- override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler {
- @@ -604,7 +604,7 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends
- tryPull()
- }
- - override def onDownstreamFinish() = {
- + override def onDownstreamFinish(): Unit = {
- if (eagerCancel) completeStage()
- else {
- downstreamsRunning -= 1
- @@ -665,7 +665,7 @@ final class Partition[T](val outputPorts: Int, val partitioner: T ⇒ Int) exten
- private var outPendingIdx: Int = _
- private var downstreamRunning = outputPorts
- - def onPush() = {
- + def onPush(): Unit = {
- val elem = grab(in)
- val idx = partitioner(elem)
- if (idx < 0 || idx >= outputPorts) {
- @@ -693,7 +693,7 @@ final class Partition[T](val outputPorts: Int, val partitioner: T ⇒ Int) exten
- out.zipWithIndex.foreach {
- case (o, idx) ⇒
- setHandler(o, new OutHandler {
- - override def onPull() = {
- + override def onPull(): Unit = {
- if (outPendingElem != null) {
- val elem = outPendingElem.asInstanceOf[T]
- @@ -727,7 +727,7 @@ final class Partition[T](val outputPorts: Int, val partitioner: T ⇒ Int) exten
- }
- }
- - override def toString = s"Partition($outputPorts)"
- + override def toString: String = s"Partition($outputPorts)"
- }
- @@ -764,7 +764,7 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean)
- require(outputPorts >= 1, "A Balance must have one or more output ports")
- val in: Inlet[T] = Inlet[T]("Balance.in")
- val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Balance.out" + i))
- - override def initialAttributes = DefaultAttributes.balance
- + override def initialAttributes: Attributes = DefaultAttributes.balance
- override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler {
- @@ -815,7 +815,7 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean)
- } else pendingQueue.enqueue(o)
- }
- - override def onDownstreamFinish() = {
- + override def onDownstreamFinish(): Unit = {
- downstreamsRunning -= 1
- if (downstreamsRunning == 0) completeStage()
- else if (!hasPulled && needDownstreamPulls > 0) {
- @@ -921,7 +921,7 @@ object ZipN {
- /**
- * Create a new `ZipN`.
- */
- - def apply[A](n: Int) = new ZipN[A](n)
- + def apply[A](n: Int): ZipN[A] = new ZipN[A](n)
- }
- /**
- @@ -938,7 +938,7 @@ object ZipN {
- * '''Cancels when''' downstream cancels
- */
- final class ZipN[A](n: Int) extends ZipWithN[A, immutable.Seq[A]](ConstantFun.scalaIdentityFunction)(n) {
- - override def initialAttributes = DefaultAttributes.zipN
- + override def initialAttributes: Attributes = DefaultAttributes.zipN
- override def toString = "ZipN"
- }
- @@ -946,7 +946,7 @@ object ZipWithN {
- /**
- * Create a new `ZipWithN`.
- */
- - def apply[A, O](zipper: immutable.Seq[A] ⇒ O)(n: Int) = new ZipWithN[A, O](zipper)(n)
- + def apply[A, O](zipper: immutable.Seq[A] ⇒ O)(n: Int): ZipWithN[A, O] = new ZipWithN[A, O](zipper)(n)
- }
- /**
- @@ -963,8 +963,8 @@ object ZipWithN {
- * '''Cancels when''' downstream cancels
- */
- class ZipWithN[A, O](zipper: immutable.Seq[A] ⇒ O)(n: Int) extends GraphStage[UniformFanInShape[A, O]] {
- - override def initialAttributes = DefaultAttributes.zipWithN
- - override val shape = new UniformFanInShape[A, O](n)
- + override def initialAttributes: Attributes = DefaultAttributes.zipWithN
- + override val shape: UniformFanInShape[A, O] = new UniformFanInShape[A, O](n)
- def out: Outlet[O] = shape.out
- @deprecated("use `shape.inlets` or `shape.in(id)` instead", "2.5.5")
- @@ -975,8 +975,8 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] ⇒ O)(n: Int) extends GraphStage[
- // Without this field the completion signalling would take one extra pull
- var willShutDown = false
- - val grabInlet = grab[A] _
- - val pullInlet = pull[A] _
- + val grabInlet: Inlet[A] ⇒ A = grab[A] _
- + val pullInlet: Inlet[A] ⇒ Unit = pull[A] _
- private def pushAll(): Unit = {
- push(out, zipper(shape.inlets.map(grabInlet)))
- @@ -1041,10 +1041,10 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[
- require(inputPorts > 1, "A Concat must have more than 1 input ports")
- val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Concat.in" + i))
- val out: Outlet[T] = Outlet[T]("Concat.out")
- - override def initialAttributes = DefaultAttributes.concat
- + override def initialAttributes: Attributes = DefaultAttributes.concat
- override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
- - override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
- + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic with OutHandler = new GraphStageLogic(shape) with OutHandler {
- var activeStream: Int = 0
- {
- @@ -1054,11 +1054,11 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[
- val i = in(idxx)
- val idx = idxx // close over val
- setHandler(i, new InHandler {
- - override def onPush() = {
- + override def onPush(): Unit = {
- push(out, grab(i))
- }
- - override def onUpstreamFinish() = {
- + override def onUpstreamFinish(): Unit = {
- if (idx == activeStream) {
- activeStream += 1
- // Skip closed inputs
- @@ -1072,7 +1072,7 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[
- }
- }
- - def onPull() = pull(in(activeStream))
- + def onPull(): Unit = pull(in(activeStream))
- setHandler(out, this)
- }
- @@ -1082,7 +1082,7 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[
- object OrElse {
- private val singleton = new OrElse[Nothing]
- - def apply[T]() = singleton.asInstanceOf[OrElse[T]]
- + def apply[T](): OrElse[T] = singleton.asInstanceOf[OrElse[T]]
- }
- /**
- @@ -1104,9 +1104,9 @@ object OrElse {
- * '''Cancels when''' downstream cancels
- */
- private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T, T]] {
- - val primary = Inlet[T]("OrElse.primary")
- - val secondary = Inlet[T]("OrElse.secondary")
- - val out = Outlet[T]("OrElse.out")
- + val primary: Inlet[T] = Inlet[T]("OrElse.primary")
- + val secondary: Inlet[T] = Inlet[T]("OrElse.secondary")
- + val out: Outlet[T] = Outlet[T]("OrElse.out")
- override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, primary, secondary)
- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala
- index 24e778b10e..ab00ad439e 100644
- --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala
- +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala
- @@ -375,7 +375,7 @@ private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMater
- private[this] val callbackPromise: Promise[AsyncCallback[HubEvent]] = Promise()
- private[this] val noRegistrationsState = Open(callbackPromise.future, Nil)
- - val state = new AtomicReference[HubState](noRegistrationsState)
- + val state: AtomicReference[BroadcastHub.this.HubState] = new AtomicReference[HubState](noRegistrationsState)
- // Start from values that will almost immediately overflow. This has no effect on performance, any starting
- // number will do, however, this protects from regressions as these values *almost surely* overflow and fail
- @@ -874,7 +874,7 @@ object PartitionHub {
- }
- object ConsumerQueue {
- - val empty = ConsumerQueue(Queue.empty, 0)
- + val empty: ConsumerQueue = ConsumerQueue(Queue.empty, 0)
- }
- final case class ConsumerQueue(queue: Queue[Any], size: Int) {
- @@ -1012,7 +1012,7 @@ object PartitionHub {
- private val callbackPromise: Promise[AsyncCallback[HubEvent]] = Promise()
- private val noRegistrationsState = Open(callbackPromise.future, Nil)
- - val state = new AtomicReference[HubState](noRegistrationsState)
- + val state: AtomicReference[HubState] = new AtomicReference[HubState](noRegistrationsState)
- private var initialized = false
- private val queue = createQueue()
- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala
- index 018053e5e5..d32a10818b 100644
- --- a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala
- +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala
- @@ -44,7 +44,7 @@ object JsonFraming {
- override protected def initialAttributes: Attributes = Attributes.name("JsonFraming.objectScanner")
- - override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
- + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic with InHandler with OutHandler = new GraphStageLogic(shape) with InHandler with OutHandler {
- private val buffer = new JsonObjectParser(maximumObjectLength)
- setHandlers(in, out, this)
- @@ -64,7 +64,7 @@ object JsonFraming {
- }
- }
- - def tryPopBuffer() = {
- + def tryPopBuffer(): Unit = {
- try buffer.poll() match {
- case Some(json) ⇒ push(out, json)
- case _ ⇒ if (isClosed(in)) completeStage() else pull(in)
- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala
- index 6727de91a1..d5e42a85f9 100644
- --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala
- +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala
- @@ -9,6 +9,7 @@ import akka.stream._
- import akka.stream.stage.{ GraphStage, InHandler, OutHandler, TimerGraphStageLogicWithLogging }
- import scala.concurrent.duration.FiniteDuration
- +import scala.concurrent.duration.Deadline
- /**
- * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
- @@ -130,15 +131,15 @@ private final class RestartWithBackoffSource[T](
- onlyOnFailures: Boolean,
- maxRestarts: Int) extends GraphStage[SourceShape[T]] { self ⇒
- - val out = Outlet[T]("RestartWithBackoffSource.out")
- + val out: Outlet[T] = Outlet[T]("RestartWithBackoffSource.out")
- - override def shape = SourceShape(out)
- - override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
- + override def shape: SourceShape[T] = SourceShape(out)
- + override def createLogic(inheritedAttributes: Attributes): RestartWithBackoffLogic[SourceShape[T]] = new RestartWithBackoffLogic(
- "Source", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) {
- override protected def logSource = self.getClass
- - override protected def startGraph() = {
- + override protected def startGraph(): Unit = {
- val sinkIn = createSubInlet(out)
- sourceFactory().runWith(sinkIn.sink)(subFusingMaterializer)
- if (isAvailable(out)) {
- @@ -146,7 +147,7 @@ private final class RestartWithBackoffSource[T](
- }
- }
- - override protected def backoff() = {
- + override protected def backoff(): Unit = {
- setHandler(out, new OutHandler {
- override def onPull() = ()
- })
- @@ -231,19 +232,19 @@ private final class RestartWithBackoffSink[T](
- randomFactor: Double,
- maxRestarts: Int) extends GraphStage[SinkShape[T]] { self ⇒
- - val in = Inlet[T]("RestartWithBackoffSink.in")
- + val in: Inlet[T] = Inlet[T]("RestartWithBackoffSink.in")
- - override def shape = SinkShape(in)
- - override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
- + override def shape: SinkShape[T] = SinkShape(in)
- + override def createLogic(inheritedAttributes: Attributes): RestartWithBackoffLogic[SinkShape[T]] = new RestartWithBackoffLogic(
- "Sink", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts) {
- override protected def logSource = self.getClass
- - override protected def startGraph() = {
- + override protected def startGraph(): Unit = {
- val sourceOut = createSubOutlet(in)
- Source.fromGraph(sourceOut.source).runWith(sinkFactory())(subFusingMaterializer)
- }
- - override protected def backoff() = {
- + override protected def backoff(): Unit = {
- setHandler(in, new InHandler {
- override def onPush() = ()
- })
- @@ -326,18 +327,18 @@ private final class RestartWithBackoffFlow[In, Out](
- randomFactor: Double,
- maxRestarts: Int) extends GraphStage[FlowShape[In, Out]] { self ⇒
- - val in = Inlet[In]("RestartWithBackoffFlow.in")
- - val out = Outlet[Out]("RestartWithBackoffFlow.out")
- + val in: Inlet[In] = Inlet[In]("RestartWithBackoffFlow.in")
- + val out: Outlet[Out] = Outlet[Out]("RestartWithBackoffFlow.out")
- - override def shape = FlowShape(in, out)
- - override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
- + override def shape: FlowShape[In, Out] = FlowShape(in, out)
- + override def createLogic(inheritedAttributes: Attributes): RestartWithBackoffLogic[FlowShape[In, Out]] = new RestartWithBackoffLogic(
- "Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts) {
- var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None
- override protected def logSource = self.getClass
- - override protected def startGraph() = {
- + override protected def startGraph(): Unit = {
- val sourceOut = createSubOutlet(in)
- val sinkIn = createSubInlet(out)
- Source.fromGraph(sourceOut.source).via(flowFactory()).runWith(sinkIn.sink)(subFusingMaterializer)
- @@ -347,7 +348,7 @@ private final class RestartWithBackoffFlow[In, Out](
- activeOutIn = Some((sourceOut, sinkIn))
- }
- - override protected def backoff() = {
- + override protected def backoff(): Unit = {
- setHandler(in, new InHandler {
- override def onPush() = ()
- })
- @@ -385,7 +386,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
- onlyOnFailures: Boolean,
- maxRestarts: Int) extends TimerGraphStageLogicWithLogging(shape) {
- var restartCount = 0
- - var resetDeadline = minBackoff.fromNow
- + var resetDeadline: Deadline = minBackoff.fromNow
- // This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we
- // don't want to restart the sub inlet when it finishes, we just finish normally.
- var finishing = false
- @@ -397,8 +398,8 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
- val sinkIn = new SubSinkInlet[T](s"RestartWithBackoff$name.subIn")
- sinkIn.setHandler(new InHandler {
- - override def onPush() = push(out, sinkIn.grab())
- - override def onUpstreamFinish() = {
- + override def onPush(): Unit = push(out, sinkIn.grab())
- + override def onUpstreamFinish(): Unit = {
- if (finishing || maxRestartsReached() || onlyOnFailures) {
- complete(out)
- } else {
- @@ -406,7 +407,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
- scheduleRestartTimer()
- }
- }
- - override def onUpstreamFailure(ex: Throwable) = {
- + override def onUpstreamFailure(ex: Throwable): Unit = {
- if (finishing || maxRestartsReached()) {
- fail(out, ex)
- } else {
- @@ -417,8 +418,8 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
- })
- setHandler(out, new OutHandler {
- - override def onPull() = sinkIn.pull()
- - override def onDownstreamFinish() = {
- + override def onPull(): Unit = sinkIn.pull()
- + override def onDownstreamFinish(): Unit = {
- finishing = true
- sinkIn.cancel()
- }
- @@ -431,14 +432,14 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
- val sourceOut = new SubSourceOutlet[T](s"RestartWithBackoff$name.subOut")
- sourceOut.setHandler(new OutHandler {
- - override def onPull() = if (isAvailable(in)) {
- + override def onPull(): Unit = if (isAvailable(in)) {
- sourceOut.push(grab(in))
- } else {
- if (!hasBeenPulled(in)) {
- pull(in)
- }
- }
- - override def onDownstreamFinish() = {
- + override def onDownstreamFinish(): Unit = {
- if (finishing || maxRestartsReached()) {
- cancel(in)
- } else {
- @@ -449,14 +450,14 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
- })
- setHandler(in, new InHandler {
- - override def onPush() = if (sourceOut.isAvailable) {
- + override def onPush(): Unit = if (sourceOut.isAvailable) {
- sourceOut.push(grab(in))
- }
- - override def onUpstreamFinish() = {
- + override def onUpstreamFinish(): Unit = {
- finishing = true
- sourceOut.complete()
- }
- - override def onUpstreamFailure(ex: Throwable) = {
- + override def onUpstreamFailure(ex: Throwable): Unit = {
- finishing = true
- sourceOut.fail(ex)
- }
- @@ -465,7 +466,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
- sourceOut
- }
- - protected final def maxRestartsReached() = {
- + protected final def maxRestartsReached(): Boolean = {
- // Check if the last start attempt was more than the minimum backoff
- if (resetDeadline.isOverdue()) {
- log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff)
- @@ -475,7 +476,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
- }
- // Set a timer to restart after the calculated delay
- - protected final def scheduleRestartTimer() = {
- + protected final def scheduleRestartTimer(): Unit = {
- val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
- log.debug("Restarting graph in {}", restartDelay)
- scheduleOnce("RestartTimer", restartDelay)
- @@ -485,11 +486,11 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
- }
- // Invoked when the backoff timer ticks
- - override protected def onTimer(timerKey: Any) = {
- + override protected def onTimer(timerKey: Any): Unit = {
- startGraph()
- resetDeadline = minBackoff.fromNow
- }
- // When the stage starts, start the source
- - override def preStart() = startGraph()
- + override def preStart(): Unit = startGraph()
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
- index de29f07f95..0de25a9e6e 100644
- --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
- +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
- @@ -331,9 +331,9 @@ object Sink {
- def newOnCompleteStage(): GraphStage[FlowShape[T, NotUsed]] = {
- new GraphStage[FlowShape[T, NotUsed]] {
- - val in = Inlet[T]("in")
- - val out = Outlet[NotUsed]("out")
- - override val shape = FlowShape.of(in, out)
- + val in: Inlet[T] = Inlet[T]("in")
- + val out: Outlet[NotUsed] = Outlet[NotUsed]("out")
- + override val shape: FlowShape[T, NotUsed] = FlowShape.of(in, out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new GraphStageLogic(shape) with InHandler with OutHandler {
- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala
- index 407a894deb..eac9662482 100644
- --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala
- +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala
- @@ -181,7 +181,7 @@ object StreamConverters {
- nextElementFuture = queue.pull()
- next
- }
- - }, 0), false).onClose(new Runnable { def run = queue.cancel() }))
- + }, 0), false).onClose(new Runnable { def run: Unit = queue.cancel() }))
- .withAttributes(DefaultAttributes.asJavaStream)
- }
- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala
- index 47f72e2d13..c6f55987c1 100644
- --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala
- +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala
- @@ -69,7 +69,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
- override def get(system: ActorSystem): Tcp = super.get(system)
- - def lookup() = Tcp
- + def lookup(): Tcp.type = Tcp
- def createExtension(system: ExtendedActorSystem): Tcp = new Tcp(system)
- @@ -89,7 +89,7 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
- private val settings = ActorMaterializerSettings(system)
- // TODO maybe this should be a new setting, like `akka.stream.tcp.bind.timeout` / `shutdown-timeout` instead?
- - val bindShutdownTimeout = settings.subscriptionTimeoutSettings.timeout
- + val bindShutdownTimeout: FiniteDuration = settings.subscriptionTimeoutSettings.timeout
- /**
- * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
- diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala
- index 3e0c26c78f..f6fbbcd04a 100644
- --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala
- +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala
- @@ -1296,7 +1296,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
- _sink.cancelSubstream()
- }
- - override def toString = s"SubSinkInlet($name)"
- + override def toString: String = s"SubSinkInlet($name)"
- }
- /**
- @@ -1389,7 +1389,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
- _source.failSubstream(ex)
- }
- - override def toString = s"SubSourceOutlet($name)"
- + override def toString: String = s"SubSourceOutlet($name)"
- }
- }
- @@ -1484,7 +1484,7 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap
- cancelTimer(timerKey)
- val id = timerIdGen.next()
- val task = interpreter.materializer.schedulePeriodically(initialDelay, interval, new Runnable {
- - def run() = getTimerAsyncCallback.invoke(Scheduled(timerKey, id, repeating = true))
- + def run(): Unit = getTimerAsyncCallback.invoke(Scheduled(timerKey, id, repeating = true))
- })
- keyToTimers(timerKey) = Timer(id, task)
- }
- @@ -1498,7 +1498,7 @@ abstract class TimerGraphStageLogic(_shape: Shape) extends GraphStageLogic(_shap
- cancelTimer(timerKey)
- val id = timerIdGen.next()
- val task = interpreter.materializer.scheduleOnce(delay, new Runnable {
- - def run() = getTimerAsyncCallback.invoke(Scheduled(timerKey, id, repeating = false))
- + def run(): Unit = getTimerAsyncCallback.invoke(Scheduled(timerKey, id, repeating = false))
- })
- keyToTimers(timerKey) = Timer(id, task)
- }
- diff --git a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala
- index 3ad728a1a9..7a9b9bcbee 100644
- --- a/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala
- +++ b/akka-stream/src/main/scala/com/typesafe/sslconfig/akka/AkkaSSLConfig.scala
- @@ -23,7 +23,7 @@ object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with ExtensionIdProvider
- override def get(system: ActorSystem): AkkaSSLConfig = super.get(system)
- def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system)
- - override def lookup() = AkkaSSLConfig
- + override def lookup(): AkkaSSLConfig.type = AkkaSSLConfig
- override def createExtension(system: ExtendedActorSystem): AkkaSSLConfig =
- new AkkaSSLConfig(system, defaultSSLConfigSettings(system))
- @@ -66,9 +66,9 @@ final class AkkaSSLConfig(system: ExtendedActorSystem, val config: SSLConfigSett
- def convertSettings(f: java.util.function.Function[SSLConfigSettings, SSLConfigSettings]): AkkaSSLConfig =
- new AkkaSSLConfig(system, f.apply(config))
- - val hostnameVerifier = buildHostnameVerifier(config)
- + val hostnameVerifier: HostnameVerifier = buildHostnameVerifier(config)
- - val sslEngineConfigurator = {
- + val sslEngineConfigurator: DefaultSSLEngineConfigurator = {
- val sslContext = if (config.default) {
- log.info("ssl-config.default is true, using the JDK's default SSLContext")
- validateDefaultTrustManager(config)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement