1.为什么会有粘包拆包?
TCP 是个”流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
关于粘包和拆包可以参考下图的几种情况:
如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题。
如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP就会将其拆分为多次发送,这就是拆包。
,因为UDP有消息保护边界
2.粘包拆包问题案例
案例:写一个Netty程序,客户端连续发送100个fblinux.com字符给服务端,没有添加粘包拆包解码器的情况下,模拟出现粘包和拆包场景。
service 端代码:
public class AppServerEnDecoder { private int port; public AppServerEnDecoder(int port){ this.port = port; } public void run() throws Exception{ //配置服务端的线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerHandler()); } }); System.out.println("Echo 服务器启动"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); }finally { //优雅退出,释放线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args.length > 0) { port = Integer.parseInt(args[0]); } new AppServerEnDecoder(port).run(); } } public class ServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); String body = new String(bytes,"UTF-8"). substring(0,bytes.length - System.getProperty("line.separator").length()); System.out.println("服务端收到消息内容为:" + body + ", 收到消息次数:" + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端代码:
public class AppClientEnDecoder { private String host; private int port; public AppClientEnDecoder(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch)throws Exception { ch.pipeline().addLast(new ClientHandler()); } }); //连接到服务端,connect是异步链接,再调用同步方法sync,等待连接成功 ChannelFuture f = bootstrap.connect().sync(); //阻塞直到客户端通道关闭 f.channel().closeFuture().sync(); } finally { //优雅退出,释放NIO线程组 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new AppClientEnDecoder("127.0.0.1", 8080).start(); } } public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf mes = null; byte [] req = ("fblinux.com"+System.getProperty("line.separator")).getBytes(); //连续发送 for(int i=0; i< 100; i++){ mes = Unpooled.buffer(req.length); mes.writeBytes(req); ctx.writeAndFlush(mes); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
3.粘包拆包解决方案
发送方:可以关闭Nagle算法(Nagle算法:多个小包集合到一起发送),但是会影响性能
接收方: TCP是无界的数据流,并没有处理粘包现象的机制, 且协议本身无法避免粘包,半包读写的发生需要在应用层进行处理
应用层解决半包读写的办法
1)设置定长消息 (10字符),但是消息单一,不够灵活
2)设置消息的边界 ($$ 切割)
3)使用带消息头的协议,消息头存储消息开始标识及消息的长度信息 Header+Body
Netty提供了四种粘包拆包解决方案:
FixedLengthFrameDecoder:定长解码器,按固定长度进行消息的读取
LineBasedFrameDecoder:行解码器,按行(\r\n)进行消息的读取
DelimiterBasedFrameDecoder:分隔符解码器,按照特殊的分隔符作为消息分隔符进行消息的读
LengthFieldBasedFrameDecoder:自定义长度解码器,通过在消息头中定义消息长度字段来标志消息体的长度,然后根据消息的总长度来读取消息
Netty 无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。
4.解码器之定长解码器
定长解码器不灵活,基本没有啥使用场景。
FixedLengthFrameDecoder:消息长度固定,累积读取到长度总和为定长 LEN 的报文后,就认为读取到了一个完整的消息,再将计数器置位,重新读取下一个数据报。
使用示例:
ServiceHandler类:
public class ServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 因为配置了StringDecoder 所以这里需要强转成String类型 String body = (String) msg; System.out.println("服务端收到消息内容为:" + body + ", 收到消息次数:" + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
ClientHandler类:
public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String strMsg = "fblinux.com"; //连续发送 for(int i=0; i< 100; i++){ ctx.writeAndFlush(Unpooled.copiedBuffer(strMsg, CharsetUtil.UTF_8)); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
5.解码器之行解码器
行解码器:使用换行符\n或者\r\n作为依据,遇到\n或者\r\n都认为是一条完整的消息。
行解码器提供两个构造函数:
LineBasedFrameDecoder(int maxLength)
LineBasedFrameDecoder(int maxLength,boolean stripDelimiter,boolean failFast)
对应的三个参数代表的意思:
maxLength: 表示一行最大的长度,如果超过这个长度依然没有检测到\n或者\r\n,抛出异常
stripDelimiter: 解码后的消息是否去除\n,\r\n分隔符,true:去除换行符,false:原样输出
failFast: 与maxLength联合使用,表示超过maxLength后,抛出TooLongFrameException的时机,true:超出maxLength后立即抛出异常,结束解码,false:等次消息包收完后,再抛出异常
使用示例:
Service启动类配置:
ServiceHandler类:
public class ServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 因为配置了StringDecoder 所以这里需要强转成String类型 String body = (String) msg; System.out.println("服务端收到消息内容为:" + body + ", 收到消息次数:" + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
ClientHandler类:
public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 因为是按行分割,所以这里需要添加换行符 String strMsg = "fblinux.com"+"\r\n"; //连续发送 for(int i=0; i< 100; i++){ ctx.writeAndFlush(Unpooled.copiedBuffer(strMsg, CharsetUtil.UTF_8)); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
6.解码器之分隔符解码器
DelimiterBasedFrameDecoder与LineBasedFrameDecoder类似,只不过更加通用,允许我们指定任意特殊字符作为分隔符。
分隔符解码器可以同时指定多个分隔符,如果指定多个分隔符,则会选择内容最短的一个分隔符作为依据
分隔符解码器有多个构造函数,区别在于以下四个参数的组合:
maxFrameLength: 表示一行最大的长度,如果超过这个长度依然没有检测自定义分隔符,将会抛出TooLongFrameException
stripDelimiter: 解码后的消息是否去除分隔符,true:去除换行符,false:原样输出;
failFast: 与maxFrameLength联合使用,表示超过maxFrameLength后,抛出异常的时机,true:超出maxLength后立即抛出异常,结束解码,false:等消息包收完后,再抛出异常;
delimiters: 分隔符,这个分隔符的类型要以ByteBuf的形式出现。
使用示例:
Service启动类配置:
ClientHandler:
public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String message = "Netty is a NIO client server framework which enables quick&_" + "and easy development of network applications&_ " + "such as protocol servers and clients.&_" + " It greatly simplifies and streamlines&_" + "network programming such as TCP and UDP socket server.&_"; ByteBuf mes = null; mes = Unpooled.buffer(message.getBytes().length); mes.writeBytes(message.getBytes()); ctx.writeAndFlush(mes); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
7.解码器之自定义长度解码器
自定义长度解码器,通过在消息头中定义消息长度字段来标志消息体的长度,然后根据消息的总长度来读取消息
自定义长度解码器能够适应各种复杂的通信协议格式
该解码器是四个解码器中相对来说稍显复杂一点,不是实现复杂,是因为以下两个方面的原因:
一是参数较多,二是准确计算各参数的值。
自定义长度解码器有多个构造函数,这些构造函数是7个参数的不同组合:
maxFrameLength: 消息的最大长度,超过最大长度抛出异常;
lengthFieldOffset: 长度域偏移量,指的是长度域位于整个数据包字节数组中的下标;
lengthFieldLength: 长度域自己的字节数长度;
lengthAdjustment: 默认值0,长度域的偏移量矫正,长度域的长度值是否要减去其它长度才是内容的值;
initialBytesToStrip: 默认值0,丢弃的起始字节数;
failFast: 默认值true,超过消息最大长度是否立即抛出异常;
ByteOrder: 字节序列模式,默认是大端模式,也可以设置为小端模式。
使用示例:
ClientHandler:
public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 100; i++) { //发送自定义协议格式的消息 short header=0x5A5A; byte[] msgtype=new byte[]{(byte)0x01}; // 构造器要求需要传入字节数组,所以这里定义成字节数组 String strContent="fblinux.com"; int contentLen=strContent.getBytes().length; //写入通道 ctx.write(Unpooled.copyShort(header)); ctx.write(Unpooled.copiedBuffer(msgtype)); ctx.write(Unpooled.copyInt(contentLen)); ctx.write(Unpooled.copiedBuffer(strContent, CharsetUtil.UTF_8)); //发送消息 ctx.flush(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
ServiceHandler:
public class ServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取消息,并按照自定义的协议格式进行消息的处理 ByteBuf in=(ByteBuf)msg; //打印报文 String srcInfo= ByteBufUtil.hexDump(in).toUpperCase(); // System.out.println("收到的原始报文: " + srcInfo); //进行报文的解析 int header1=in.readByte(); int msgtype1=in.readUnsignedShort(); int contentLen1=in.readInt(); ByteBuf bufContent1=in.readBytes(contentLen1); String strContent1=bufContent1.toString(CharsetUtil.UTF_8); System.out.println("服务端收到消息内容为:" + strContent1 + ", 收到消息次数:" + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
转载请注明:西门飞冰的博客 » Netty 四个粘包拆包解码器