由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Netty 之普通任务和定时任务

JAVA 西门飞冰 6985℃
[隐藏]

1.介绍

Netty不光可以处理IO流任务,还可以处理普通任务和定时任务

Netty通过两种方式提供异步的普通任务和定时任务:

(1)通过Channel的EventLoop实现普通任务和定时任务;

(2)通过EventExecutorGroup实现普通任务和定时任务。

两者区别:

(1)EventLoop普通任务是为了任何线程与Channel的直接交互服务的,EventExecutorGroup普通任务是通过创建EventLoop线程池的方式,为每一个EventLoop(I/O线程)创建一个线程。

(2)EventLoop每一个连接是一个线程,EventLoopGroup每一个任务是一个线程

注意:一个EventLoop接管一个或多个Handler示例的处理,而EventLoop是单线程,也就是所有的I/O操作都由一个单线程执行,所以至关重要的是不要阻塞这个线程,因为这会对整体的 I/O 处理产生负面的影响。

切记不要用这两种方式来处理长耗时业务,否则会造成严重阻塞!!!

Netty官方建议:处理长耗时业务逻辑的方法是自定义业务线程池

2.EventLoop实现普通和定时任务

需求:使用EventLoop伪代码模拟一个执行5秒的长耗时任务,使用EventLoop实现一个3秒的定时任务模拟心跳。

代码示例:

// 客户端 I/O处理类
@ChannelHandler.Sharable
public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf>
{
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception
    {
        // EventLoop实现普通任务并模拟耗时操作
        // 一个子线程执行耗时任务
        ctx.channel().eventLoop().execute(new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss");//设置日期格式
                    String strDate=df.format(new Date());
                    System.out.println("[->开始耗时任务]"+strDate);
                    Thread.sleep(5000);
                    strDate=df.format(new Date());
                    System.out.println("[->->执行完耗时任务]"+strDate);
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
      	// EventLoop 实现定时任务
        ctx.channel().eventLoop().scheduleWithFixedDelay(new Runnable()
        {
            // 因为线程中不能访问外部局部变量
            // 这里所以采用在线程中创建属性、属性的赋值方法,然后在创建线程时,通过调用这个自身的方法,实现局部变量的方位。
            ChannelHandlerContext ctx;

            @Override
            public void run()
            {
                SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式
                String strDate=df.format(new Date());
                ctx.writeAndFlush(Unpooled.copiedBuffer("这是通过客户端发送的心跳包 "+strDate+"\r\n", CharsetUtil.UTF_8));
            }

            //对自身属性进行赋值
            public Runnable accept(ChannelHandlerContext chct)
            {
                this.ctx=chct;
                return this;
            }
        }.accept(ctx),0,3, TimeUnit.SECONDS);
    }


    // 重写异常处理的方法
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端输出结果:可以看到普通任务,每隔五秒执行了一次,说明普通任务执行正常

image-20221111105653079

服务端输出结果:可以看到定时3秒执行一次的客户端心跳包,也是每隔5秒就发送过来一个这就不正常了,可以验证出Channel的IO操作被长耗时任务阻塞了。

image-20221111105710944

3.EventExecutorGroup实现普通和定时任务

需求:和EventLoop一样,不过现在是使用EventExecutorGroup实现

(1)EventExecutorGroup普通任务是通过创建EventLoop线程池的方式,为每一个EventLoop(I/O线程)创建一个线程。所以要先在启动类定义一个线程池,添加到我们的自定义Handler中。

image-20221111111759247

(2)在Handler中使用EventExecutorGroup实现普通任务和定时任务

// 客户端 I/O处理类
@ChannelHandler.Sharable
public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf>
{
    // 重写数据读取的方法
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception
    {
				// EventExecutorGroup实现普通任务并模拟耗时操作
        ctx.executor().execute(new Runnable() {
            @Override
            public void run() {
                try
                {
                    SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss");//设置日期格式
                    String strDate=df.format(new Date());
                    System.out.println("[->开始耗时任务]"+strDate);
                    Thread.sleep(5000);
                    strDate=df.format(new Date());
                    System.out.println("[->->执行完耗时任务]"+strDate);
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        // EventExecutorGroup 实现定时任务
        ctx.executor().scheduleWithFixedDelay(new Runnable() {

            ChannelHandlerContext ctx;

            @Override
            public void run() {
                SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式
                String strDate=df.format(new Date());
                ctx.writeAndFlush(Unpooled.copiedBuffer("这是通过客户端发送的心跳包 "+strDate+"\r\n", CharsetUtil.UTF_8));
            }

            //对自身属性进行赋值
            public Runnable accept(ChannelHandlerContext chct)
            {
                this.ctx=chct;
                return this;
            }
        }.accept(ctx),0,3,TimeUnit.SECONDS);
    }


    // 重写异常处理的方法
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端输出结果:可以看到普通任务,每隔五秒执行了一次,说明普通任务执行正常

image-20221111111844212

服务端输出结果:可以看到定时3秒执行一次的客户端心跳包,也是每隔5秒就发送过来一个这就不正常了,验证出Channel的IO操作被长耗时任务阻塞了。

image-20221111111844212

4.自定义业务线程池处理长耗时业务

由于EventLoop和EventExecutorGroup提供的普通任务不能解决处理长耗时业务引起的I/O线程阻塞问题。根据Netty官方建议我们使用自定义业务线程池处理长耗时业务。

Guava是谷歌推出的基于开源的Java库,是谷歌很多项目的核心库,该库是为了增强Java的功能和处理能力,我们利用它来实现我们的业务线程池。

具体实现如下:

(1)引入Guava依赖

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>29.0-jre</version>
</dependency>

(2)在Handler中定义业务线程池,并将耗时业务放入线程池。

注意:我们利用Guava创建线程池,这里面包含了线程启动的方式,线程池必要参数的设置,注意这里一定要设置线程为守护线程而不是用户线程,要不然麻烦比较多,守护线程和用户线程的主要区别:主线程结束后用户线程还会继续运行,守护线程则会自动结束。

// 客户端 I/O处理类
@ChannelHandler.Sharable
public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf>
{
    //创建自定义业务线程池,用于非阻塞处理长耗时业务
    protected static ExecutorService newFixedThreadPool()
    {
        final ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("netty-business-%d")
                .setDaemon(true)//默认为false用户线程,这里设置为true守护线程。注意区别:主线程结束后用户线程还会继续运行,守护线程则会自动结束
                .build();
        return new ThreadPoolExecutor(
                20,//核心线程数
                200,//线程池中的能拥有的最多线程数
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10000),threadFactory);//10000表示用于缓存任务的阻塞队列,其实理解为最大并发量
    }
    final static ListeningExecutorService service = MoreExecutors.listeningDecorator(newFixedThreadPool());

    // 重写数据读取的方法
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception
    {
      	// 耗时任务放入Guava线程池中
        service.submit(new Runnable() {
            @Override
            public void run() {
                try
                {
                    SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss");//设置日期格式
                    String strDate=df.format(new Date());
                    System.out.println("[->开始耗时任务]"+strDate);
                    Thread.sleep(5000);
                    strDate=df.format(new Date());
                    System.out.println("[->->执行完耗时任务]"+strDate);
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        // EventLoop 实现定时任务
        ctx.executor().scheduleWithFixedDelay(new Runnable() {

            ChannelHandlerContext ctx;

            @Override
            public void run() {
                SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式
                String strDate=df.format(new Date());
                ctx.writeAndFlush(Unpooled.copiedBuffer("这是通过客户端发送的心跳包 "+strDate+"\r\n", CharsetUtil.UTF_8));
            }

            //对自身属性进行赋值
            public Runnable accept(ChannelHandlerContext chct)
            {
                this.ctx=chct;
                return this;
            }
        }.accept(ctx),0,3,TimeUnit.SECONDS);
    }


    // 重写异常处理的方法
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端输出结果:可以看到普通任务,每隔五秒执行了一次,说明普通任务执行正常

image-20221111113104884

服务端输出结果:可以看到心跳包每隔3秒发送一次,说明心跳包发送也正常。可以得出结论,使用自定义业务线程池处理长耗时业务,不会阻塞netty的IO操作。

image-20221111113131624

转载请注明:西门飞冰的博客 » Netty 之普通任务和定时任务

喜欢 (0)or分享 (0)