由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Netty 四个粘包拆包解码器

JAVA 西门飞冰 2629℃
[隐藏]

1.为什么会有粘包拆包?

TCP 是个”流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

关于粘包和拆包可以参考下图的几种情况:

如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题。

如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP就会将其拆分为多次发送,这就是拆包。

image-20221107104113604

UDP: 是没有粘包和拆包的问题,因为UDP有消息保护边界

2.粘包拆包问题案例

案例:写一个Netty程序,客户端连续发送100个fblinux.com字符给服务端,没有添加粘包拆包解码器的情况下,模拟出现粘包和拆包场景。

service 端代码:

public class AppServerEnDecoder {

    private int port;

    public AppServerEnDecoder(int port){
        this.port = port;
    }

    public void run() throws Exception{

        //配置服务端的线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup workGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)

                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ServerHandler());
                        }
                    });

            System.out.println("Echo 服务器启动");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture =  serverBootstrap.bind(port).sync();
            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();

        }finally {
            //优雅退出,释放线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        int port = 8080;

        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new AppServerEnDecoder(port).run();
    }
}

public class ServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf buf = (ByteBuf)msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);

        String body = new String(bytes,"UTF-8").
                substring(0,bytes.length - System.getProperty("line.separator").length());

        System.out.println("服务端收到消息内容为:" + body + ", 收到消息次数:" + ++counter);

    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端代码:

public class AppClientEnDecoder {
    private String host;
    private int port;

    public AppClientEnDecoder(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)throws Exception {
                            ch.pipeline().addLast(new ClientHandler());
                        } });
            //连接到服务端,connect是异步链接,再调用同步方法sync,等待连接成功
            ChannelFuture f = bootstrap.connect().sync();
            //阻塞直到客户端通道关闭
            f.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放NIO线程组
            group.shutdownGracefully().sync();
        }
    }


    public static void main(String[] args) throws Exception {
        new AppClientEnDecoder("127.0.0.1", 8080).start();
    }
}

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        ByteBuf mes = null;
        byte [] req = ("fblinux.com"+System.getProperty("line.separator")).getBytes();
        //连续发送
        for(int i=0; i< 100; i++){
            mes = Unpooled.buffer(req.length);
            mes.writeBytes(req);
            ctx.writeAndFlush(mes);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        cause.printStackTrace();
        ctx.close();
    }
}

服务端输出结果:客户端发送100次请求,服务端仅收到了2次,这种情况就是粘包,出现数据被截断的情况就是拆包。

image-20221105202407835

3.粘包拆包解决方案

发送方:可以关闭Nagle算法(Nagle算法:多个小包集合到一起发送),但是会影响性能

接收方: TCP是无界的数据流,并没有处理粘包现象的机制, 且协议本身无法避免粘包,半包读写的发生需要在应用层进行处理

应用层解决半包读写的办法

1)设置定长消息 (10字符),但是消息单一,不够灵活

2)设置消息的边界 ($$ 切割)

3)使用带消息头的协议,消息头存储消息开始标识及消息的长度信息 Header+Body

Netty提供了四种粘包拆包解决方案:

FixedLengthFrameDecoder:定长解码器,按固定长度进行消息的读取

LineBasedFrameDecoder:行解码器,按行(\r\n)进行消息的读取

DelimiterBasedFrameDecoder:分隔符解码器,按照特殊的分隔符作为消息分隔符进行消息的读

LengthFieldBasedFrameDecoder:自定义长度解码器,通过在消息头中定义消息长度字段来标志消息体的长度,然后根据消息的总长度来读取消息

Netty 无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。

4.解码器之定长解码器

定长解码器不灵活,基本没有啥使用场景。

FixedLengthFrameDecoder:消息长度固定,累积读取到长度总和为定长 LEN 的报文后,就认为读取到了一个完整的消息,再将计数器置位,重新读取下一个数据报。

使用示例:

Service启动类配置:

image-20221105230933048

ServiceHandler类:

public class ServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 因为配置了StringDecoder 所以这里需要强转成String类型
        String body = (String) msg;
        System.out.println("服务端收到消息内容为:" + body + ", 收到消息次数:" + ++counter);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ClientHandler类:

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        String strMsg = "fblinux.com";
        //连续发送
        for(int i=0; i< 100; i++){
            ctx.writeAndFlush(Unpooled.copiedBuffer(strMsg, CharsetUtil.UTF_8));
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        cause.printStackTrace();
        ctx.close();
    }
}

验证问题解决:

image-20221105231854554

5.解码器之行解码器

行解码器:使用换行符\n或者\r\n作为依据,遇到\n或者\r\n都认为是一条完整的消息。

行解码器提供两个构造函数:

LineBasedFrameDecoder(int maxLength)

LineBasedFrameDecoder(int maxLength,boolean stripDelimiter,boolean failFast)

对应的三个参数代表的意思:

maxLength: 表示一行最大的长度,如果超过这个长度依然没有检测到\n或者\r\n,抛出异常

stripDelimiter: 解码后的消息是否去除\n,\r\n分隔符,true:去除换行符,false:原样输出

failFast: 与maxLength联合使用,表示超过maxLength后,抛出TooLongFrameException的时机,true:超出maxLength后立即抛出异常,结束解码,false:等次消息包收完后,再抛出异常

使用示例:

Service启动类配置:

 image-20221105213241447

ServiceHandler类:

public class ServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 因为配置了StringDecoder 所以这里需要强转成String类型
        String body = (String) msg;
        System.out.println("服务端收到消息内容为:" + body + ", 收到消息次数:" + ++counter);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ClientHandler类:

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        // 因为是按行分割,所以这里需要添加换行符
        String strMsg = "fblinux.com"+"\r\n";
        //连续发送
        for(int i=0; i< 100; i++){
            ctx.writeAndFlush(Unpooled.copiedBuffer(strMsg, CharsetUtil.UTF_8));
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        cause.printStackTrace();
        ctx.close();
    }
}

验证问题解决:

image-20221105233053091

6.解码器之分隔符解码器

DelimiterBasedFrameDecoder与LineBasedFrameDecoder类似,只不过更加通用,允许我们指定任意特殊字符作为分隔符。

分隔符解码器可以同时指定多个分隔符,如果指定多个分隔符,则会选择内容最短的一个分隔符作为依据

分隔符解码器有多个构造函数,区别在于以下四个参数的组合:

maxFrameLength: 表示一行最大的长度,如果超过这个长度依然没有检测自定义分隔符,将会抛出TooLongFrameException

stripDelimiter: 解码后的消息是否去除分隔符,true:去除换行符,false:原样输出;

failFast: 与maxFrameLength联合使用,表示超过maxFrameLength后,抛出异常的时机,true:超出maxLength后立即抛出异常,结束解码,false:等消息包收完后,再抛出异常;

delimiters: 分隔符,这个分隔符的类型要以ByteBuf的形式出现。

使用示例:

Service启动类配置:

image-20221105221657109

ClientHandler:

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        String message = "Netty is a NIO client server framework which enables quick&_" +
                "and easy development of network applications&_ " +
                "such as protocol servers and clients.&_" +
                " It greatly simplifies and streamlines&_" +
                "network programming such as TCP and UDP socket server.&_";

        ByteBuf mes = null;
        mes = Unpooled.buffer(message.getBytes().length);
        mes.writeBytes(message.getBytes());
        ctx.writeAndFlush(mes);

    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

测试验证:

image-20221105215419806

7.解码器之自定义长度解码器

自定义长度解码器,通过在消息头中定义消息长度字段来标志消息体的长度,然后根据消息的总长度来读取消息

自定义长度解码器能够适应各种复杂的通信协议格式

该解码器是四个解码器中相对来说稍显复杂一点,不是实现复杂,是因为以下两个方面的原因:

一是参数较多,二是准确计算各参数的值。

自定义长度解码器有多个构造函数,这些构造函数是7个参数的不同组合:

maxFrameLength: 消息的最大长度,超过最大长度抛出异常;

lengthFieldOffset: 长度域偏移量,指的是长度域位于整个数据包字节数组中的下标;

lengthFieldLength: 长度域自己的字节数长度;

lengthAdjustment: 默认值0,长度域的偏移量矫正,长度域的长度值是否要减去其它长度才是内容的值;

initialBytesToStrip: 默认值0,丢弃的起始字节数;

failFast: 默认值true,超过消息最大长度是否立即抛出异常;

ByteOrder: 字节序列模式,默认是大端模式,也可以设置为小端模式。

典型数据通信协议格式:

image-20221107221717592

图解各参数:

image-20221107221842318

使用示例:

ClientHandler:

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 100; i++) {
            //发送自定义协议格式的消息
            short header=0x5A5A;
            byte[] msgtype=new byte[]{(byte)0x01};  // 构造器要求需要传入字节数组,所以这里定义成字节数组
            String strContent="fblinux.com";
            int contentLen=strContent.getBytes().length;

            //写入通道
            ctx.write(Unpooled.copyShort(header));
            ctx.write(Unpooled.copiedBuffer(msgtype));
            ctx.write(Unpooled.copyInt(contentLen));
            ctx.write(Unpooled.copiedBuffer(strContent, CharsetUtil.UTF_8));

            //发送消息
            ctx.flush();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ServiceHandler:

public class ServerHandler extends ChannelInboundHandlerAdapter {

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //读取消息,并按照自定义的协议格式进行消息的处理
        ByteBuf in=(ByteBuf)msg;

        //打印报文
        String srcInfo= ByteBufUtil.hexDump(in).toUpperCase();
//        System.out.println("收到的原始报文: " + srcInfo);

        //进行报文的解析
        int header1=in.readByte();
        int msgtype1=in.readUnsignedShort();
        int contentLen1=in.readInt();
        ByteBuf bufContent1=in.readBytes(contentLen1);
        String strContent1=bufContent1.toString(CharsetUtil.UTF_8);
        System.out.println("服务端收到消息内容为:" + strContent1 + ", 收到消息次数:" + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

服务端启动类:

image-20221106164703772

然后就可以启动程序,验证TCP粘包和拆包问题是否正常解决

转载请注明:西门飞冰的博客 » Netty 四个粘包拆包解码器

喜欢 (1)or分享 (0)