1.介绍
Netty作为一个基础的NIO通信框架,被广泛应用于大数据处理、互联网消息中间件等场景。这些应用场景都是分布式场景,总结就是把一个服务的不同角色在分散在不同的服务器上。各个角色在运行过程中都需要通过Netty进行数据或者参数的传递,这个过程一定离不开网络。可以把网络理解成联通每一个城市(服务器、仓储计算节点)的高速公路,通过这条高速公路可以看到城市间的协调联通,也可以看到它们之间都有哪些协作的工作,从而更深的理解城市间的关系。
例如网卡,交换机故障等,这时底层的TCP连接会断开
Netty在网络可靠性这个层面常用的机制有超时检测、心跳机制、断线重连等手段,来应对网络通信故障。可以让开发人员检测到具体的状态后,进行相应的处理。
生产环境中,靠Netty的断线重连机制,基本可以搞定95%的网络通信问题。要是断线重连一直不行,就需要根据实际业务情况考虑处理的方式了。
2.超时检测
超时分三种:读超时、写超时、读写超时;
Netty中实现超时检测相当方便,只需做两步工作就能优雅的实现超时检测(客户端、服务器端实现步骤一样):
1)Handler中重写userEventTriggered方法
2)在Pipeline中增加IdleStateHandler实例,该实例的构造函数有四个:
- readerIdleTime:读超时,设置为0是,表示不启用
- writeIdleTime:写超时,设置为0是,表示不启用
- allIdleTime:读写超时,设置为0是,表示不启用
- TimeUnit:时间单位,该参数可不设置,默认是秒
代码配置示例:
3.心跳机制
为什么要有心跳机制?
(1)发现长期不用的连接就关闭掉,减轻服务器的连接压力;
(2)检测异常连接,在很多异常场合,表面上看连接还存在,其实已经挂掉,所以一般发送一个空数据包来测试一下链路是否有效。
如何建立心跳机制?
很简单,利用超时检测,检测到超时后就向对方发送一个心跳包。
超时和心跳实现既可以在客户端,也可以在服务器端,根据实际场景来定。
严格来说这是一种心律不齐的心跳机制,后面课程会讲到如何利用定时任务来实现真正的心跳机制。
代码示例:在Handler类重写的userEventTriggered方法中,添加心跳检测代码,因为要根据超时来发送心跳
验证:
服务端心跳消息接收:
4.断线重连
说明:这个动作只能由客户端发起
为断线重连需要考虑到两种情况?
(1)客户端启动时,服务器端还未启动或已崩掉,需要客户端自动不断发起连接,而不是退出程序;
(2)服务器端正常,但客户端断开连接了,这时也需要自动发起重新连接请求。
断线重连实现步骤:
(1)客户端的连接由同步阻塞改为异步阻塞(去掉连接的sync())
5.三个内容整体代码
服务端启动类:
public class AppServerReconnHeart { private int port; public AppServerReconnHeart(int port) { this.port = port; } public void run() throws Exception { // Netty的Reactor线程池 EventLoopGroup group = new NioEventLoopGroup(); try { //启动NIO服务 ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { //ChannelInitializer是一个特殊的处理类,他的目的是帮助使用者配置一个新的Channel,用于把许多自定义的处理类增加到pipline上来 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new HandlerServerReconnHeart()); } }); //绑定服务器 ChannelFuture channelFuture= b.bind().sync(); System.out.println("在" + channelFuture.channel().localAddress()+"上开启监听"); //阻塞操作 channelFuture.channel().closeFuture().sync(); } finally { // 优雅关闭 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new AppServerReconnHeart(18080).run(); } }
服务端Handler类:
@ChannelHandler.Sharable public class HandlerServerReconnHeart extends ChannelInboundHandlerAdapter { //通道数组,保存所有注册到EventLoop的通道 public static ChannelGroup channels=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //处理收到的数据,并反馈消息到到客户端 ByteBuf in = (ByteBuf) msg; System.out.println("收到客户端发过来的消息: " + in.toString(CharsetUtil.UTF_8)); //写入并发送信息到远端(客户端) SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式 String strDate=df.format(new Date()); ctx.writeAndFlush(Unpooled.copiedBuffer("这是服务器端在Read方法中反馈的消息 "+strDate+"\r\n", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //出现异常的时候执行的动作(打印并关闭通道) cause.printStackTrace(); ctx.close(); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //新建立连接时触发的动作 Channel incoming=ctx.channel(); System.out.println("客户端:"+incoming.remoteAddress()+"已连接上来"); channels.add(incoming); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //连接断开时触发的动作 Channel incoming=ctx.channel(); System.out.println("客户端:"+incoming.remoteAddress()+"已断开"); channels.remove(incoming); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //通道处于活动状态触发的动作,该方法只会在通道建立时调用一次 Channel incoming=ctx.channel(); System.out.println("客户端:"+incoming.remoteAddress()+"在线"); //写入并发送信息到远端(客户端) SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式 String strDate=df.format(new Date()); ctx.writeAndFlush(Unpooled.copiedBuffer("这是服务器端在Active方法中反馈的消息 "+strDate+"\r\n", CharsetUtil.UTF_8)); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //通道处于非活动状态触发的动作,该方法只会在通道失效时调用一次 Channel incoming=ctx.channel(); System.out.println("客户端:"+incoming.remoteAddress()+"掉线"); } }
客户端启动类:
public class AppClientReconnHeart { private final String host; private final int port; public AppClientReconnHeart(String host, int port) { this.host = host; this.port = port; } public void run() throws Exception { // I/O线程池 EventLoopGroup group = new NioEventLoopGroup(); try { // 客户端辅助启动类 Bootstrap bs = new Bootstrap(); bs.group(group) // 实例化一个Channel .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host,port)) .handler(new ChannelInitializer<SocketChannel>()//进行通道初始化配置 { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 超时检测handler socketChannel.pipeline().addLast(new IdleStateHandler(5,0,0, TimeUnit.SECONDS)); // 我们自定义的Handler socketChannel.pipeline().addLast(new HandlerClientReconnHeart()); } }); //连接到远程节点;等待连接完成 ChannelFuture future=bs.connect(); future.addListener(new ListenerClientReconnHeart()); //每隔6秒,自动向服务器发送一条消息 while (true) { Thread.sleep(6000); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式 String strDate=df.format(new Date()); future.channel().writeAndFlush(Unpooled.copiedBuffer(strDate, CharsetUtil.UTF_8)); } } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new AppClientReconnHeart("127.0.0.1",18080).run(); } }
客户端Handler类:
@ChannelHandler.Sharable public class HandlerClientReconnHeart extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // 处理接收到的消息 System.out.println("接收到的消息:"+byteBuf.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 处理I/O事件的异常 cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //建立连接后该channelActive()方法只会被调用一次,这里的逻辑:建立连接后,字节序列被发送到服务器,编码格式是utf-8 SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式 String strDate=df.format(new Date()); ctx.writeAndFlush(Unpooled.copiedBuffer("这是客户端通过Active方法发送的消息 "+strDate+"\r\n", CharsetUtil.UTF_8)); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //通道处于非活动状态触发的动作,该方法只会在通道失效时调用一次 Channel incoming=ctx.channel(); System.out.println("掉线"); //检测到掉线后,重新开始连接 new AppClientReconnHeart("127.0.0.1",18080).run(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //超时检测 if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { System.out.println("客户端读消息包超时"); //检测到读超时,就向服务器端发送一个消息包 SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式 String strDate = df.format(new Date()); ctx.writeAndFlush(Unpooled.copiedBuffer("我是心跳消息 " + strDate + "\r\n", CharsetUtil.UTF_8)); } } } }
监听类:
// 通过接口ChannelFutureListener来实现客户端的自动重连 public class ListenerClientReconnHeart implements ChannelFutureListener { private AppClientReconnHeart appClientReconnHeart=new AppClientReconnHeart("127.0.0.1",18080); @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if(!channelFuture.isSuccess()) { EventLoop loop=channelFuture.channel().eventLoop(); ScheduledFuture<?> schedule=loop.schedule(new Runnable() { @Override public void run() { try { System.out.println(">>>自动启动客户端,开始连接服务器..."); appClientReconnHeart.run(); } catch (Exception e) { e.printStackTrace(); } } },5, TimeUnit.SECONDS); } else { System.out.println(">>>服务器连接成功..."); } } }
转载请注明:西门飞冰的博客 » Netty 的网络可靠性保障