Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package co.xxx.protocol.handler.codec;
- import co.xxx.protocol.handler.codec.command.CommandFactory;
- import org.jboss.netty.buffer.ChannelBuffer;
- import org.jboss.netty.channel.Channel;
- import org.jboss.netty.channel.ChannelHandlerContext;
- import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
- import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
- /**
- * Created by IntelliJ IDEA.
- * User: Keyston
- * Date: 12/1/11
- * Time: 2:31 PM
- */
- public class ProtocolDecoder extends ReplayingDecoder<ProtocolDecoder.DecoderState>
- {
- private Header header;
- public ProtocolDecoder()
- {
- super(true);
- this.reset();
- }
- private void reset()
- {
- header = new Header();
- checkpoint(DecoderState.READ_PROTOCOL_ID);
- }
- @Override
- protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, DecoderState state) throws Exception
- {
- switch (state) {
- case READ_PROTOCOL_ID:
- byte[] bytes = new byte[4];
- buffer.readBytes(bytes);
- header.protocolId = new String(bytes);
- checkpoint(DecoderState.READ_SIZE);
- case READ_SIZE:
- header.structureSize = buffer.readShort();
- checkpoint(DecoderState.READ_STATUS);
- case READ_STATUS:
- // status, not used
- buffer.readInt();
- checkpoint(DecoderState.READ_COMMAND);
- case READ_COMMAND:
- header.command = Command.fromShort(buffer.readShort());
- checkpoint(DecoderState.READ_MESSAGE_ID);
- case READ_MESSAGE_ID:
- header.messageId = buffer.readLong();
- checkpoint(DecoderState.READ_SESSION_ID);
- case READ_SESSION_ID:
- header.sessionId = buffer.readLong();
- checkpoint(DecoderState.READ_BODY);
- case READ_BODY:
- SimpleChannelUpstreamHandler handler = CommandFactory.get(header.command);
- if (ctx.getPipeline().getNames().indexOf("0") != -1) {
- ctx.getPipeline().addAfter("0", "bodyDecoder", handler);
- } else {
- ctx.getPipeline().addLast("bodyDecoder", handler);
- }
- ctx.getPipeline().remove(this);
- if (handler != null) {
- // TODO dispatch error and close channel
- }
- try {
- if (buffer.readable()) {
- return new Object[]{
- header,
- buffer.readBytes(super.actualReadableBytes())
- };
- } else {
- return header;
- }
- } finally {
- this.reset();
- }
- default:
- throw new Error("Shouldn't reach here");
- }
- }
- public enum DecoderState
- {
- READ_PROTOCOL_ID,
- READ_SIZE,
- READ_STATUS,
- READ_COMMAND,
- READ_MESSAGE_ID,
- READ_SESSION_ID,
- READ_BODY
- }
- }
Add Comment
Please, Sign In to add comment