Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package net
- import empyrean.game.world.World
- import empyrean.game.world.entity.animate.player.Player
- import empyrean.net.SessionState
- import empyrean.net.login.LoginResponses
- import empyrean.util.Misc
- import io.netty.channel.ChannelFutureListener
- import net.packet.codec.PacketDecoder
- import io.netty.channel.socket.SocketChannel
- import net.packet.Packet
- import net.packet.PacketBuilder
- import net.packet.Packets
- import net.packet.codec.PacketEncoder
- import java.util.concurrent.ConcurrentLinkedQueue
- /**
- * TODO: add documentation
- *
- * @author Stan van der Bend
- *
- * @param channel The channel managing this session's connection.
- */
- class ClientSession(val channel: SocketChannel) {
- private val priorityQueue = ConcurrentLinkedQueue<Packet>()
- private val regularQueue = ConcurrentLinkedQueue<Packet>()
- val player = Player(this)
- var state = SessionState.CONNECTED
- companion object {
- const val PACKET_PROCESS_LIMIT = 25
- }
- /**
- * Attempts finalization of a login attempt.
- *
- * @param loginMessage The player's login information.
- */
- fun finalizeLogin(loginMessage: DetailedLoginMessage) {
- val channel = loginMessage.ctx.channel() as SocketChannel
- //Update the player
- player.username = loginMessage.username
- player.password = loginMessage.password
- player.hostAddress = loginMessage.host
- player.longUsername = Misc.stringToLong(loginMessage.username)
- //Get the response code
- val response = LoginResponses.evaluate(player, loginMessage)
- //Write the response and flush the channel
- val future = channel.writeAndFlush(LoginMessage(response, player.getRights()))
- //Close the channel after sending the response if it wasn't a successful login
- if (response != LoginResponses.LOGIN_SUCCESSFUL) {
- future.addListener(ChannelFutureListener.CLOSE)
- return
- }
- //Wait...
- future.awaitUninterruptibly()
- //Replace decoder/encoder to packets
- channel.pipeline().replace("encoder", "encoder", PacketEncoder(loginMessage.encodingRandom))
- channel.pipeline().replace("decoder", "decoder", PacketDecoder(loginMessage.decodingRandom))
- //Queue the login
- if (!World.getLoginQueue().contains(player))
- World.getLoginQueue().add(player)
- }
- /**
- * Uses state-machine to handle upstream messages from Netty.
- *
- * @param packet the message to handle.
- */
- fun queueIncoming(packet: Packet) {
- val totalQueuesSize = regularQueue.size + priorityQueue.size
- if(totalQueuesSize >= PACKET_PROCESS_LIMIT)
- return
- // handle immediately
- if(packet.opcode == 41){
- read(packet)
- return
- }
- if (packet.prioritize())
- priorityQueue.add(packet)
- else
- regularQueue.add(packet)
- }
- /**
- * Polls both incoming [Packet] queues and parses the decoded messages.
- * The first queue being polled is the [priorityQueue] and after that comes the [queueIncoming].
- */
- fun parseIncomingPackets() {
- var processedPacketCount = 0
- while (processedPacketCount < PACKET_PROCESS_LIMIT) {
- val packet = priorityQueue.poll() ?: break
- read(packet)
- processedPacketCount++
- }
- while (processedPacketCount < PACKET_PROCESS_LIMIT) {
- val packet = regularQueue.poll() ?: break
- read(packet)
- processedPacketCount++
- }
- }
- /**
- * Reads an incoming [Packet].
- *
- * @param packet the incoming [Packet].
- */
- private fun read(packet: Packet) {
- try {
- val reader = Packets.READERS[packet.opcode]
- val start = System.currentTimeMillis()
- if(reader.canRead(player))
- reader.read(player, packet)
- World.marker.mark("[Packet][${packet.opcode}][${packet.buffer.readableBytes()}] for player ${player.username}!")
- val duration = System.currentTimeMillis() - start
- if (duration >= 10)
- System.err.println(this.javaClass.simpleName + ": took " + duration + "ms - packet: " + packet.opcode + " - " + player.username)
- } finally {
- packet.buffer.release()
- }
- }
- /**
- * Clears the [regularQueue].
- */
- fun clearIncomingPackets() {
- regularQueue.clear()
- }
- /**
- * Write an outgoing [Packet] that is encoded by the [PacketEncoder] and write it to the [channel].
- *
- * @param builder the [PacketBuilder] used for constructing the outgoing [Packet].
- */
- fun write(builder: PacketBuilder) {
- try {
- val packet = builder.build()
- builder.reset()
- if (!channel.isOpen)
- return
- if(packet.buffer.readableBytes() > 70)
- System.err.println("You send a fairly large packet[${packet.opcode}][${packet.buffer.readableBytes()}] ")
- channel.write(packet)
- } catch (ex: Exception) {
- ex.printStackTrace()
- }
- }
- /**
- * Writes an outgoing [Packet] that is immediately encoded (by the [PacketEncoder]) and sent to the [channel].
- *
- * @param builder the [PacketBuilder] used for constructing the outgoing [Packet].
- */
- fun writeAndFlush(builder: PacketBuilder) {
- try {
- channel.writeAndFlush(builder.build())
- builder.reset()
- } catch (ex: Exception) {
- ex.printStackTrace()
- }
- }
- /**
- * Flushes the [channel].
- */
- fun flush() {
- try {
- channel.flush()
- } catch (ex: Exception) {
- ex.printStackTrace()
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement