1.Netty介绍
Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
Netty
本质是一个 NIO
框架,适用于服务器通讯相关的多种应用场景。
Netty解决的问题是:以非常轻松的方式解决各种各样的流
Spark、Flink、ElasticSearch、RocketMQ、gRPC底层网络通信都是基于Netty进行开发
使用Netty进行开发的知名项目:https://netty.io/wiki/related-projects.html
2.
2.1.EventLoop模块
(1)EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。1个EventLoop可以服务多个Channel,1个Channel只有一个EventLoop,可以创建多个 EventLoop 来优化资源利用,也就是EventLoopGroup
2)EventLoopGroup 负责分配 EventLoop 到新创建的 Channel,里面包含多个EventLoop
他们的对应关系,如下图所示:
2.1.1.Reactor线程组
服务器端,一般设置两个线程组,监听连接的 parent channel 工作在一个独立的线程组,一般被命名为boss线程组。
连接成功后,负责客户端连接读写的 child channel 工作在另一个线程组,一般被命名为worker线程组。
2.1.2.Channel模块
Channel: 客户端和服务端建立的一个连接通道
ChannelHandler:用来处理 Channel 上的各种事件,分为入站、出站两种。
ChannelPipeline:所有 ChannelHandler 被连成一串,就是 Pipeline
他们是什么关系:
一个Channel包含一个ChannelPipeline,所有ChannelHandler都会顺序加入到ChannelPipeline中
创建Channel时会自动创建一个ChannelPipeline,每个Channel都有一个管理它的pipeline,这关联是永久性的
Channel当状态出现变化,就会触发对应的事件
状态:
(1)channelRegistered: channel注册到一个EventLoop
(2)channelActive: 变为活跃状态(连接到了远程主机),可以接受和发送数据
(3)channelInactive: channel处于非活跃状态,没有连接到远程主机
(4)channelUnregistered: channel已经创建,但是未注册到一个EventLoop里面,也就是没有和Selector绑定
ChannelHandler下主要是两个子接口:
ChannelInboundHandler:(入站)
- 处理输入数据和Channel状态类型改变,
- 适配器 ChannelInboundHandlerAdapter(适配器设计模式)
- 常用的:SimpleChannelInboundHandler
ChannelOutboundHandler:(出站)
- 处理输出数据,适配器 ChannelOutboundHandlerAdapter
2.1.3.Bootstrap模块
Bootstrap
意思是引导,一个 Netty
应用通常由一个 Bootstrap
开始,主要作用是配置整个 Netty
程序,串联各个组件,Netty
中 Bootstrap
类是客户端程序的启动引导类,ServerBootstrap
是服务端启动引导类。
启动一个Bootstrap,大致有8步,如下图:
2.1.4.ChannelFuture模块
ChannelFuture的作用是用来保存Channel异步操作的结果。
我们知道,在Netty中所有的I/O操作都是异步的。这意味着任何的I/O调用都将立即返回,而不保证这些被请求的I/O操作在调用结束的时候已经完成。取而代之地,你会得到一个返回的ChannelFuture实例,这个实例将给你一些关于I/O操作结果或者状态的信息。
3.Netty的Hello World 案例
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.74.Final</version> </dependency>
服务器端启动类:
// 服务器端启动类 public class AppServerHello { private int port; public AppServerHello(int port) { this.port = port; } // 启动流程 public void run() throws InterruptedException { // 配置服务端线程组(Netty的Reactor线程池,初始化了一个NioEventLoop数组,用来处理I/O操作,如接受新的连接和读/写数据) EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { // ServerBootstrap Netty启动引导类,传入boss和group两个线程组启动 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,workGroup) // 通过工厂方法设计模式实例化一个channel .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) // childHandler 指定处理数据的工序 // ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加我们自定义的Handler ch.pipeline().addLast(new HandlerServerHello()); } }); // 绑定服务器和端口:该实例将提供有关IO操作的结果或状态的信息 ChannelFuture channelFuture = b.bind().sync(); System.out.println("在" + channelFuture.channel().localAddress()+"上开启监听"); // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开 channelFuture.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new AppServerHello(18080).run(); } }
服务器端Handler类:
// 服务器端I/O处理类 @ChannelHandler.Sharable public class HandlerServerHello extends ChannelInboundHandlerAdapter { // 重写数据读取方法 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //处理收到的数据,并反馈消息到到客户端 ByteBuf in = (ByteBuf) msg; System.out.println("收到客户端发过来的消息: " + in.toString(CharsetUtil.UTF_8)); //写入并发送信息到远端(客户端) ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是服务端,我已经收到你发送的消息", CharsetUtil.UTF_8)); } // 重写异常处理的方法 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //出现异常的时候执行的动作(打印并关闭通道) cause.printStackTrace(); ctx.close(); } }
客户端启动类:
public class AppClientHello { private final String host; private final int port; // 设置服务器端端IP和端口 public AppClientHello(String host, int port) { this.host = host; this.port = port; } public void run() throws InterruptedException { // 配置客户端线程组 EventLoopGroup group = new NioEventLoopGroup(); try { // 配置客户端辅助启动类,启动客户端线程组 Bootstrap bs = new Bootstrap(); bs.group(group) // 实例化一个Channel .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host,port)) // 指明处理数据的工序 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加我们自定义的handler socketChannel.pipeline().addLast(new HandlerClientHello()); } }); // 连接到服务端,connect是异步连接,在调用同步等待sync,等待连接成功 ChannelFuture future = bs.connect().sync(); future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8)); // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开 future.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { new AppClientHello("127.0.0.1",18080).run(); } }
客户端Handler类:
// 客户端 I/O处理类 @ChannelHandler.Sharable public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf> { // 重写数据读取的方法 @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("接收到的消息:"+byteBuf.toString(CharsetUtil.UTF_8)); } // 重写异常处理的方法 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
程序测试:先启动Server端在启动Client端
服务端输出如下:
4.Netty 开发的基本流程
客户端和服务端端开发流程基本一致,都是创建handler,然后创建服务启动类
5.Netty连接及活动状态监测
当Channel状态出现变化,就会触发对应的事件,我们可以根据Channel状态的变化执行对应的操作
所有的状态,只需要在客户端和服务端的Handler中修改即可
代码示例:服务端和客户端启动类代码和上述案例一致,所以只粘贴Server和Client的Handler代码
HandlerServer:
// 服务器端I/O处理类,连接监测 @ChannelHandler.Sharable public class HandlerServerMonitorEvent extends ChannelInboundHandlerAdapter { //通道数组,保存所有注册到EventLoop的通道 public static ChannelGroup channels=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //处理收到的数据,并反馈消息到到客户端 ByteBuf in = (ByteBuf) msg; System.out.println("收到客户端发过来的消息: " + in.toString(CharsetUtil.UTF_8)); //写入并发送信息到远端(客户端) SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式 String strDate=df.format(new Date()); ctx.writeAndFlush(Unpooled.copiedBuffer("这是服务器端在Read方法中反馈的消息 "+strDate+"\r\n", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //出现异常的时候执行的动作(打印并关闭通道) cause.printStackTrace(); ctx.close(); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //新建立连接时触发的动作 Channel incoming=ctx.channel(); System.out.println("客户端:"+incoming.remoteAddress()+"已连接上来"); channels.add(incoming); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //连接断开时触发的动作 Channel incoming=ctx.channel(); System.out.println("客户端:"+incoming.remoteAddress()+"已断开"); channels.remove(incoming); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //通道处于活动状态触发的动作,该方法只会在通道建立时调用一次 Channel incoming=ctx.channel(); System.out.println("客户端:"+incoming.remoteAddress()+"在线"); //写入并发送信息到远端(客户端) SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式 String strDate=df.format(new Date()); ctx.writeAndFlush(Unpooled.copiedBuffer("这是服务器端在Active方法中反馈的消息 "+strDate+"\r\n", CharsetUtil.UTF_8)); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //通道处于非活动状态触发的动作,该方法只会在通道失效时调用一次 Channel incoming=ctx.channel(); System.out.println("客户端:"+incoming.remoteAddress()+"掉线"); } }
HandlerClient:
// 客户端 I/O处理类 @ChannelHandler.Sharable public class HandlerClientMonitorEvent extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // 处理接收到的消息 System.out.println("接收到的消息:"+byteBuf.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 处理I/O事件的异常 cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //建立连接后该channelActive()方法只会被调用一次,这里的逻辑:建立连接后,字节序列被发送到服务器,编码格式是utf-8 SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式 String strDate=df.format(new Date()); ctx.writeAndFlush(Unpooled.copiedBuffer("这是客户端通过Active方法发送的消息 "+strDate+"\r\n", CharsetUtil.UTF_8)); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //通道处于非活动状态触发的动作,该方法只会在通道失效时调用一次 Channel incoming=ctx.channel(); System.out.println("服务器端掉线"); } }
测试结果:
(1)测试客户端连接和掉线状态
转载请注明:西门飞冰的博客 » Netty 编程的基础内容