1.介绍
Netty不光可以处理IO流任务,还可以处理普通任务和定时任务
Netty通过两种方式提供异步的普通任务和定时任务:
(1)通过Channel的EventLoop实现普通任务和定时任务;
(2)通过EventExecutorGroup实现普通任务和定时任务。
两者区别:
程池的方式,为每一个EventLoop(I/O线程)创建一个线程。
(2)EventLoop每一个连接是一个线程,EventLoopGroup每一个任务是一个线程
注意:一个EventLoop接管一个或多个Handler示例的处理,而EventLoop是单线程,也就是所有的I/O操作都由一个单线程执行,所以至关重要的是不要阻塞这个线程,因为这会对整体的 I/O 处理产生负面的影响。
切记不要用这两种方式来处理长耗时业务,否则会造成严重阻塞!!!
Netty官方建议:处理长耗时业务逻辑的方法是自定义业务线程池
2.EventLoop实现普通和定时任务
需求:使用EventLoop伪代码模拟一个执行5秒的长耗时任务,使用EventLoop实现一个3秒的定时任务模拟心跳。
代码示例:
// 客户端 I/O处理类 @ChannelHandler.Sharable public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { // EventLoop实现普通任务并模拟耗时操作 // 一个子线程执行耗时任务 ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss");//设置日期格式 String strDate=df.format(new Date()); System.out.println("[->开始耗时任务]"+strDate); Thread.sleep(5000); strDate=df.format(new Date()); System.out.println("[->->执行完耗时任务]"+strDate); } catch (Exception e){ e.printStackTrace(); } } }); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // EventLoop 实现定时任务 ctx.channel().eventLoop().scheduleWithFixedDelay(new Runnable() { // 因为线程中不能访问外部局部变量 // 这里所以采用在线程中创建属性、属性的赋值方法,然后在创建线程时,通过调用这个自身的方法,实现局部变量的方位。 ChannelHandlerContext ctx; @Override public void run() { SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式 String strDate=df.format(new Date()); ctx.writeAndFlush(Unpooled.copiedBuffer("这是通过客户端发送的心跳包 "+strDate+"\r\n", CharsetUtil.UTF_8)); } //对自身属性进行赋值 public Runnable accept(ChannelHandlerContext chct) { this.ctx=chct; return this; } }.accept(ctx),0,3, TimeUnit.SECONDS); } // 重写异常处理的方法 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
3.EventExecutorGroup实现普通和定时任务
需求:和EventLoop一样,不过现在是使用EventExecutorGroup实现
(1)EventExecutorGroup普通任务是通过创建EventLoop线程池的方式,为每一个EventLoop(I/O线程)创建一个线程。所以要先在启动类定义一个线程池,添加到我们的自定义Handler中。
(2)在Handler中使用EventExecutorGroup实现普通任务和定时任务
// 客户端 I/O处理类 @ChannelHandler.Sharable public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf> { // 重写数据读取的方法 @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { // EventExecutorGroup实现普通任务并模拟耗时操作 ctx.executor().execute(new Runnable() { @Override public void run() { try { SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss");//设置日期格式 String strDate=df.format(new Date()); System.out.println("[->开始耗时任务]"+strDate); Thread.sleep(5000); strDate=df.format(new Date()); System.out.println("[->->执行完耗时任务]"+strDate); } catch (Exception e){ e.printStackTrace(); } } }); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // EventExecutorGroup 实现定时任务 ctx.executor().scheduleWithFixedDelay(new Runnable() { ChannelHandlerContext ctx; @Override public void run() { SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式 String strDate=df.format(new Date()); ctx.writeAndFlush(Unpooled.copiedBuffer("这是通过客户端发送的心跳包 "+strDate+"\r\n", CharsetUtil.UTF_8)); } //对自身属性进行赋值 public Runnable accept(ChannelHandlerContext chct) { this.ctx=chct; return this; } }.accept(ctx),0,3,TimeUnit.SECONDS); } // 重写异常处理的方法 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
服务端输出结果:可以看到定时3秒执行一次的客户端心跳包,也是每隔5秒就发送过来一个这就不正常了,验证出Channel的IO操作被长耗时任务阻塞了。
4.自定义业务线程池处理长耗时业务
由于EventLoop和EventExecutorGroup提供的普通任务不能解决处理长耗时业务引起的I/O线程阻塞问题。根据Netty官方建议我们使用自定义业务线程池处理长耗时业务。
Guava是谷歌推出的基于开源的Java库,是谷歌很多项目的核心库,该库是为了增强Java的功能和处理能力,我们利用它来实现我们的业务线程池。
具体实现如下:
(1)引入Guava依赖
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>29.0-jre</version> </dependency>
(2)在Handler中定义业务线程池,并将耗时业务放入线程池。
注意:我们利用Guava创建线程池,这里面包含了线程启动的方式,线程池必要参数的设置,注意这里一定要设置线程为守护线程而不是用户线程,要不然麻烦比较多,守护线程和用户线程的主要区别:主线程结束后用户线程还会继续运行,守护线程则会自动结束。
// 客户端 I/O处理类 @ChannelHandler.Sharable public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf> { //创建自定义业务线程池,用于非阻塞处理长耗时业务 protected static ExecutorService newFixedThreadPool() { final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("netty-business-%d") .setDaemon(true)//默认为false用户线程,这里设置为true守护线程。注意区别:主线程结束后用户线程还会继续运行,守护线程则会自动结束 .build(); return new ThreadPoolExecutor( 20,//核心线程数 200,//线程池中的能拥有的最多线程数 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10000),threadFactory);//10000表示用于缓存任务的阻塞队列,其实理解为最大并发量 } final static ListeningExecutorService service = MoreExecutors.listeningDecorator(newFixedThreadPool()); // 重写数据读取的方法 @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { // 耗时任务放入Guava线程池中 service.submit(new Runnable() { @Override public void run() { try { SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss");//设置日期格式 String strDate=df.format(new Date()); System.out.println("[->开始耗时任务]"+strDate); Thread.sleep(5000); strDate=df.format(new Date()); System.out.println("[->->执行完耗时任务]"+strDate); } catch (Exception e){ e.printStackTrace(); } } }); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // EventLoop 实现定时任务 ctx.executor().scheduleWithFixedDelay(new Runnable() { ChannelHandlerContext ctx; @Override public void run() { SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式 String strDate=df.format(new Date()); ctx.writeAndFlush(Unpooled.copiedBuffer("这是通过客户端发送的心跳包 "+strDate+"\r\n", CharsetUtil.UTF_8)); } //对自身属性进行赋值 public Runnable accept(ChannelHandlerContext chct) { this.ctx=chct; return this; } }.accept(ctx),0,3,TimeUnit.SECONDS); } // 重写异常处理的方法 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
转载请注明:西门飞冰的博客 » Netty 之普通任务和定时任务