Advertisement
Guest User

ClientSession

a guest
Sep 4th, 2018
146
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.91 KB | None | 0 0
  1. package net
  2.  
  3. import empyrean.game.world.World
  4. import empyrean.game.world.entity.animate.player.Player
  5. import empyrean.net.SessionState
  6. import empyrean.net.login.LoginResponses
  7. import empyrean.util.Misc
  8. import io.netty.channel.ChannelFutureListener
  9. import net.packet.codec.PacketDecoder
  10. import io.netty.channel.socket.SocketChannel
  11. import net.packet.Packet
  12. import net.packet.PacketBuilder
  13. import net.packet.Packets
  14. import net.packet.codec.PacketEncoder
  15. import java.util.concurrent.ConcurrentLinkedQueue
  16.  
  17. /**
  18.  * TODO: add documentation
  19.  *
  20.  * @author Stan van der Bend
  21.  *
  22.  * @param channel   The channel managing this session's connection.
  23.  */
  24. class ClientSession(val channel: SocketChannel) {
  25.  
  26.     private val priorityQueue = ConcurrentLinkedQueue<Packet>()
  27.     private val regularQueue = ConcurrentLinkedQueue<Packet>()
  28.  
  29.     val player = Player(this)
  30.  
  31.     var state = SessionState.CONNECTED
  32.  
  33.     companion object {
  34.         const val PACKET_PROCESS_LIMIT = 25
  35.     }
  36.  
  37.     /**
  38.      * Attempts finalization of a login attempt.
  39.      *
  40.      * @param loginMessage    The player's login information.
  41.      */
  42.     fun finalizeLogin(loginMessage: DetailedLoginMessage) {
  43.  
  44.         val channel = loginMessage.ctx.channel() as SocketChannel
  45.  
  46.         //Update the player
  47.         player.username = loginMessage.username
  48.         player.password = loginMessage.password
  49.         player.hostAddress = loginMessage.host
  50.         player.longUsername = Misc.stringToLong(loginMessage.username)
  51.  
  52.         //Get the response code
  53.         val response = LoginResponses.evaluate(player, loginMessage)
  54.  
  55.         //Write the response and flush the channel
  56.         val future = channel.writeAndFlush(LoginMessage(response, player.getRights()))
  57.  
  58.         //Close the channel after sending the response if it wasn't a successful login
  59.         if (response != LoginResponses.LOGIN_SUCCESSFUL) {
  60.             future.addListener(ChannelFutureListener.CLOSE)
  61.             return
  62.         }
  63.  
  64.         //Wait...
  65.         future.awaitUninterruptibly()
  66.  
  67.         //Replace decoder/encoder to packets
  68.         channel.pipeline().replace("encoder", "encoder", PacketEncoder(loginMessage.encodingRandom))
  69.         channel.pipeline().replace("decoder", "decoder", PacketDecoder(loginMessage.decodingRandom))
  70.  
  71.         //Queue the login
  72.         if (!World.getLoginQueue().contains(player))
  73.             World.getLoginQueue().add(player)
  74.     }
  75.  
  76.     /**
  77.      * Uses state-machine to handle upstream messages from Netty.
  78.      *
  79.      * @param packet the message to handle.
  80.      */
  81.     fun queueIncoming(packet: Packet) {
  82.  
  83.         val totalQueuesSize = regularQueue.size + priorityQueue.size
  84.  
  85.         if(totalQueuesSize >= PACKET_PROCESS_LIMIT)
  86.             return
  87.  
  88.         // handle immediately
  89.         if(packet.opcode == 41){
  90.             read(packet)
  91.             return
  92.         }
  93.  
  94.         if (packet.prioritize())
  95.             priorityQueue.add(packet)
  96.         else
  97.             regularQueue.add(packet)
  98.     }
  99.  
  100.     /**
  101.      * Polls both incoming [Packet] queues and parses the decoded messages.
  102.      * The first queue being polled is the [priorityQueue] and after that comes the [queueIncoming].
  103.      */
  104.     fun parseIncomingPackets() {
  105.  
  106.         var processedPacketCount = 0
  107.  
  108.         while (processedPacketCount < PACKET_PROCESS_LIMIT) {
  109.             val packet = priorityQueue.poll() ?: break
  110.             read(packet)
  111.             processedPacketCount++
  112.         }
  113.  
  114.         while (processedPacketCount < PACKET_PROCESS_LIMIT) {
  115.             val packet = regularQueue.poll() ?: break
  116.             read(packet)
  117.             processedPacketCount++
  118.         }
  119.     }
  120.  
  121.     /**
  122.      * Reads an incoming [Packet].
  123.      *
  124.      * @param packet the incoming [Packet].
  125.      */
  126.     private fun read(packet: Packet) {
  127.         try {
  128.             val reader = Packets.READERS[packet.opcode]
  129.  
  130.             val start = System.currentTimeMillis()
  131.  
  132.             if(reader.canRead(player))
  133.                 reader.read(player, packet)
  134.  
  135.             World.marker.mark("[Packet][${packet.opcode}][${packet.buffer.readableBytes()}] for player ${player.username}!")
  136.  
  137.             val duration = System.currentTimeMillis() - start
  138.  
  139.             if (duration >= 10)
  140.                 System.err.println(this.javaClass.simpleName + ": took " + duration + "ms - packet: " + packet.opcode + " - " + player.username)
  141.  
  142.         } finally {
  143.             packet.buffer.release()
  144.         }
  145.     }
  146.  
  147.     /**
  148.      * Clears the [regularQueue].
  149.      */
  150.     fun clearIncomingPackets() {
  151.         regularQueue.clear()
  152.     }
  153.  
  154.     /**
  155.      * Write an outgoing [Packet] that is encoded by the [PacketEncoder] and write it to the [channel].
  156.      *
  157.      * @param builder the [PacketBuilder] used for constructing the outgoing [Packet].
  158.      */
  159.     fun write(builder: PacketBuilder) {
  160.  
  161.         try {
  162.  
  163.             val packet = builder.build()
  164.  
  165.             builder.reset()
  166.  
  167.             if (!channel.isOpen)
  168.                 return
  169.  
  170.             if(packet.buffer.readableBytes() > 70)
  171.                System.err.println("You send a fairly large packet[${packet.opcode}][${packet.buffer.readableBytes()}] ")
  172.  
  173.             channel.write(packet)
  174.  
  175.         } catch (ex: Exception) {
  176.             ex.printStackTrace()
  177.         }
  178.     }
  179.  
  180.     /**
  181.      * Writes an outgoing [Packet] that is immediately encoded (by the [PacketEncoder]) and sent to the [channel].
  182.      *
  183.      * @param builder the [PacketBuilder] used for constructing the outgoing [Packet].
  184.      */
  185.     fun writeAndFlush(builder: PacketBuilder) {
  186.  
  187.         try {
  188.  
  189.             channel.writeAndFlush(builder.build())
  190.  
  191.             builder.reset()
  192.  
  193.         } catch (ex: Exception) {
  194.             ex.printStackTrace()
  195.         }
  196.     }
  197.  
  198.  
  199.     /**
  200.      * Flushes the [channel].
  201.      */
  202.     fun flush() {
  203.  
  204.         try {
  205.  
  206.             channel.flush()
  207.  
  208.         } catch (ex: Exception) {
  209.             ex.printStackTrace()
  210.         }
  211.     }
  212.  
  213. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement