1.编解码技术介绍
基于Netty的NIO网络开发,我们关注的重点之一是网络传输。当进行远程跨进程服务调用时,需要把被传输的Java对象编码为字节数组或者ByteBuffer对象。而当远程服务读取到ByteBuffer对象或者字节数组时,需要将其编码为发送时的Java对象。这被称为Java对象编解码技术。
Netty 自身提供了一些编解码器如StringEncoder和StringDecoder对字符串进行编解码,ObjectEncoder和ObjectDecoder对Java对象进行编解码,但是底层使用的是Java提供的序列化技术,由于Java自带序列化缺陷较多,衍生出了多种编解码技术和框架。业界和Netty结合使用的主流编解码框架有如下四种:
- MessagePack
- Protobuf
- Thrift
- JBoss Marshalling
本篇以Netty利用MessagePack为案例进行说明:
2.
MessagePack是一个高效的二进制序列号框架,它像JSON一样支持不同语言间的数据交换。
MessagePack优点:
- 支持跨语言,提供市面上绝大多数语言的支持;
- 使用非常的简单;
- 编解码高效;(相对于JDK原生的,比Protobuf差点意思)
- 序列化之后的码流小;
MessagePack解决的是实体对象的编解码问题,如果要解决粘包/拆包问题,我们需要使用专门的粘包拆包解码器。
3.配置思路
创建客户端、服务器端Handler初始化配置类思路:
MessagePack已经把消息转换成字节序列,我们使用自定义长度解码器LengthFieldBasedFrameDecoder 进行解码,我们需要在消息字节序列前面加上一个字节长度,这个可以直接利用Netty的LengthFieldPrepender编码器来处理,该编码器作用是自动获取字节序列总长度的值,然后在字节序列前面加上长度域,长度域中填写字节序列值。
所以Hander初始化配置类中的Handler的顺序如下:
4.代码实现
实现步骤:
1、pom.xml中增加MessagePack的依赖;
<dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.12</version> </dependency>
@Message
// 传输的实体对象 @Message public class Information { //消息头 private short header; //消息类型 private byte msgtype; //数据内容 private String content; public short getHeader() { return header; } public void setHeader(short header) { this.header = header; } public byte getMsgtype() { return msgtype; } public void setMsgtype(byte msgtype) { this.msgtype = msgtype; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
ByteBuf,重写decode方法,
// 通过扩展MessageToMessageDecoder来自定义解码器,基于MessagePack的解码器,把byte数组反序列化为消息实体对象 public class MessagePackDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { MessagePack messagePack=new MessagePack(); int intLength=byteBuf.readableBytes(); byte[] raw=new byte[intLength]; byteBuf.getBytes(byteBuf.readerIndex(),raw,0,intLength); list.add(messagePack.read(raw,Information.class)); } }
4、创建编码器,该解码器继承的是MessageToByteEncoder,类型为实体,如Information;重写encode方法;
// 通过扩展MessageToByteEncoder来自定义编码器 public class MessagePackEncoder extends MessageToByteEncoder<Information> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Information information, ByteBuf byteBuf) throws Exception { MessagePack messagePack=new MessagePack(); //进行序列化(编码) byte[] raw=messagePack.write(information); byteBuf.writeBytes(raw); } }
5、创建客户端、服务器端handler,注意这里读取到的消息就直接是实体了;
客户端handler代码:
public class ClientHandler extends SimpleChannelInboundHandler<Information> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Information info) throws Exception { System.out.println("收到了一条信息:"+info.getContent()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 处理I/O事件的异常 cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //发送自定义协议格式的消息 Information info=new Information(); info.setHeader((short)0x5A5A); info.setMsgtype((byte)0x01); info.setContent("你好,天王盖地虎!"); System.out.println("发送了一条消息:"+info.getContent()); ctx.writeAndFlush(info); } }
服务器端handler代码:
public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取消息,并按照自定义的协议格式进行消息的处理 Information in=(Information)msg; System.out.println("收到了一条消息:"+in.getContent()); Information returnInfo=new Information(); returnInfo.setHeader((short)0x3C3C); returnInfo.setMsgtype((byte)0x01); returnInfo.setContent("你好,宝塔镇河妖!"); //发送消息 ctx.writeAndFlush(returnInfo); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
6、创建客户端、服务器启动类代码,并添加相应pipeline
客户端启动类代码:
public class NettyClient { private String host; private int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch)throws Exception { //自定义长度解码器LengthFieldBasedFrameDecoder ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,2,0,2)); ch.pipeline().addLast(new MessagePackDecoder()); ch.pipeline().addLast(new LengthFieldPrepender(2)); ch.pipeline().addLast(new MessagePackEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); //连接到服务端,connect是异步链接,再调用同步方法sync,等待连接成功 ChannelFuture f = bootstrap.connect().sync(); //阻塞直到客户端通道关闭 f.channel().closeFuture().sync(); } finally { //优雅退出,释放NIO线程组 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient("127.0.0.1", 8080).start(); } }
服务器端启动类代码:
public class NettyServer { private int port; public EchoServer(int port){ this.port = port; } public void run() throws Exception{ //配置服务端的线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,2,0,2)); ch.pipeline().addLast(new MessagePackDecoder()); ch.pipeline().addLast(new LengthFieldPrepender(2)); ch.pipeline().addLast(new MessagePackEncoder()); ch.pipeline().addLast(new ServerHandler()); } }); System.out.println("Echo 服务器启动"); //绑定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //等待服务端监听端口关闭 channelFuture.channel().closeFuture().sync(); }finally { //优雅退出,释放线程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args.length > 0) { port = Integer.parseInt(args[0]); } new EchoServer(port).run(); } }
转载请注明:西门飞冰的博客 » Netty 利用MessagePack传输实体对象