由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Future 异步编程

多线程 西门飞冰 1431℃
[隐藏]

1.Future 介绍

Future 是Java5新加的一个接口,它提供了一种异步并行计算的功能

如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务或者先行结束,在通过Future获取计算结果。

一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

FutureTask是Future接口的实现类,三合一功能:

  • 作为线程:实现了Runnable接口
  • 异步处理:实现了Future接口
  • 有返回值:构造注入了Callable<T>,提供了Callable功能

2.FutureTask入门代码

通过代码实现一个带有返回值的异步多线程任务

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new MyThread());
        Thread t1 = new Thread(futureTask, "t1");
        t1.start();
        // 接收返回值
        System.out.println(futureTask.get());
    }
}

class MyThread implements Callable<String>{
    @Override
    public String call() throws Exception {
        System.out.println("-----come in call() ----异步执行");
        return "hello Callable 返回值";
    }
}

//执行结果
//-----come in call() ----异步执行
//hello Callable 返回值

3.FutureTask+线程池使用

future+线程池异步多线程任务配合,能显著提高程序的执行效率。

案例说明:

同步:3个任务,只有一个线程main来处理,耗时1115 毫秒

public class FutureThreadPoolDemo1 {
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        // 暂停毫秒
        try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
        try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
        try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");

        System.out.println(Thread.currentThread().getName()+"\t -----end");
    }
}

开启多个异步任务线程来处理: 耗损和成本是原来的三分之二,耗时807 毫秒

public class FutureThreadPoolDemo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        long startTime = System.currentTimeMillis();

        FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
            try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
            return "task1 over";
        });
        threadPool.submit(futureTask1);

        FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
            try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
            return "task2 over";
        });
        threadPool.submit(futureTask2);

        System.out.println(futureTask1.get());
        System.out.println(futureTask2.get());

        try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");


        System.out.println(Thread.currentThread().getName()+"\t -----end");
        threadPool.shutdown();
    }
}

4.FutureTask缺点

Future 对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。

4.1.get阻塞:

一旦调用get()方法,不管是否计算完成,都会导致阻塞(所以一般get方法放到最后)

示例:get放到最后不阻塞主线程

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---- 副线程come in");
            // 暂停几秒
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        //-----------------------------------------------------------注意顺序
        System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
        System.out.println(futureTask.get());
        //----------------------------------------------------------注意顺序
    }
}

示例:get放到主线程逻辑中,5秒后才出现下面的结果————-说明一旦调用get()方法直接去找副线程了,阻塞了主线程

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(()->{
            System.out.println(Thread.currentThread().getName()+"\t ------副线程come in");
            try {
                TimeUnit.SECONDS.sleep(5);//暂停几秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        //-----------------------------------------------------------注意顺序
        System.out.println(futureTask.get());
        System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
        //----------------------------------------------------------注意顺序
    }
}

4.2.isDone轮训:

isDone用来判断异步任务是否执行完成,主要目的是为了解决get方法获取异步结果的阻塞问题

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---- 副线程come in");
            // 暂停几秒
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();

        System.out.println(Thread.currentThread().getName() + "\t -----忙其他任务了");

        while (true){
            if (futureTask.isDone()){
                // 异步任务处理完成
                System.out.println(futureTask.get());
                break;
            }else{
                // 异步任务没有处理完成
                System.out.println("正在处理中……");
                // 暂停500毫秒
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }
}

轮训的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果

但是Future如果想要异步获取结果,通常都会以轮训的方式去获取结果,尽量不阻塞

5.CompletableFuture介绍

 Future 阻塞的方式和异步编程的设计理念相违背,而isDone轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture,它最大的作用是提供了一个回调机制,可以在任务完成后,自动回调后续的处理,这样,整个程序可以把“结果等待”完全给移除了。

CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

CompletableFuture实现了FutureCompletion Stage接口,因此,Future接口所有方法的Completable Future都具备,Future不具备的,通过Completable Stage接口进行扩展加强

Completable Stage接口有几十个方法,如下图所示:

image-20221121124441005

5.1.CompletionStage接口介绍

Completion Stage代表异步计算过程中的某一个阶段, 一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数

一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:stage.then Apply(x->square(x) ) .then Accept (x->System.out.print(x) ) .then Run() ->System.out.print In() )

一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

6.CompletableFuture创建方式

CompletableFuture 提供了四个静态方法来创建一个异步操作

6.1.runAsync无返回值

runAsync方法不支持返回值.适用于多个接口之间没有任何先后关系

6.1.1.runAsync

静态方法:

public static CompletableFuture<Void> runAsync(Runnable runnable)

代码示例:

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            // 停顿几秒线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(voidCompletableFuture.get());
    }
}

// 执行结果:
// ForkJoinPool.commonPool-worker-1	//默认的线程池
// null	// 没有返回值

6.1.2.runAsync+线程池

静态方法:

public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

代码示例:

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池

        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            //停顿几秒线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },executorService);
        System.out.println(voidCompletableFuture.get());
    }
}
// 执行结果
// pool-1-thread-1	//指定的线程池
// null	// 没有返回值

6.2.supplyAsync有返回值

supplyAsync可以支持返回值,我们一般用supplyAsync来创建异步任务

6.2.1.supplyAsync

静态方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 

代码示例:

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello supplyasync";
        });
        System.out.println(objectCompletableFuture.get());
    }
}
// 执行结果
// ForkJoinPool.commonPool-worker-1	// 默认的线程池
// hello supplyasync	// 有返回值

6.2.2.supplyAsync+线程池

静态方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

代码示例:

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello supplyasync";
        },executorService);
        System.out.println(objectCompletableFuture.get());
    }
}
// 执行结果
// pool-1-thread-1		// 自定义线程池
// hello supplyasync	// 有返回值

7.CompletableFuture使用演示

7.1.基本功能

CompletableFuture可以完成Future的功能

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Object> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"----副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);//产生一个随机数
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1秒钟后出结果"+result);
            return result;
        });

        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        System.out.println(objectCompletableFuture.get());
    }
}
// 执行结果
// main线程先去忙其他任务
// ForkJoinPool.commonPool-worker-1----副线程come in
// 1秒钟后出结果9
// 9

7.2.自动回调

CompletableFuture通过whenComplete减少阻塞和轮询实现自动回调

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"--------副线程come in");
          	// 产生随机数
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        // 没有异常,v是值,e是异常    
        }).whenComplete((v,e) -> {
            if(e == null){
                System.out.println("------------------计算完成,更新系统updataValue"+v);
            }
        // 有异常的情况    
        }).exceptionally(e->{
            e.printStackTrace();
            System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
            return null;
        });

        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
// 执行结果
// ForkJoinPool.commonPool-worker-1--------副线程come in
// main线程先去忙其他任务
// ------------------计算完成,更新系统updataValue3

7.3.换用自定义线程池

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
      	// 自定义线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"--------副线程come in");
            //产生随机数
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        // 没有异常,v是值,e是异常    
        },threadPool).whenComplete((v,e) -> {
            if(e == null){
                System.out.println("------------------计算完成,更新系统updataValue"+v);
            }
        // 有异常的情况    
        }).exceptionally(e->{
            e.printStackTrace();
            System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
            return null;
        });

        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

7.4.异常情况的模拟

设置一个异常 int i = 10 / 0 ;

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"--------副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("-----结果---异常判断值---"+result);
            //---------------------异常情况的演示--------------------------------------
            if(result > 2){
                //我们主动的给一个异常情况
                int i  = 10 / 0 ;
            }
            //------------------------------------------------------------------
            return result;
        // 没有异常,v是值,e是异常
        },threadPool).whenComplete((v,e) -> {
            if(e == null){
                System.out.println("------------------计算完成,更新系统updataValue"+v);
            }
        // 有异常的情况
        }).exceptionally(e->{
            e.printStackTrace();
            System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
            return null;
        });

        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果:

pool-1-thread-1--------副线程come in
main线程先去忙其他任务
-----结果---异常判断值---9
异常情况java.lang.ArithmeticException: / by zero	java.lang.ArithmeticException: / by zero

8.CompletableFuture常用方法

8.1.获得结果和触发计算

获取结果:

public T get() 不见不散,容易阻塞

public T get(long timeout,TimeUnit unit) 过时不候,超过设定时间会报异常

public T join() 类似于get(),不同的是,join方法不会抛出异常

public T getNow(T valueIfAbsent)

  • 没有计算完成的情况下,给一个替代结果
  • 立即获取结果不阻塞
    • 计算完,返回计算完成后的结果
    • 没算完,返回设定的valueAbsent(直接返回了备胎值xxx)

主动触发计算:

public boolean complete(T value)是否立即打断get()方法返回括号值

如下示例:执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc

代码示例:

public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                //执行需要2秒
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "abc";
        });

        try {
            //等待需要1秒
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //执2-等1 返回xxx
//         System.out.println(uCompletableFuture.getNow("xxx"));
        //执2-等1 返回true+备胎值completeValue
        System.out.println(uCompletableFuture.complete("completeValue")+"\t"+uCompletableFuture.get());
    }
}

8.2.对计算结果进行处理

thenApply计算结果存在在依赖关系,使得线程串行化。由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停

public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 1024;
        }).thenApply(f -> {
            System.out.println("222");
            return f + 1;
        }).thenApply(f -> {
//            int age = 10/0; // 异常情况:那步出错就停在那步。
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("*****v: "+v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });

        System.out.println("-----主线程结束,END");

        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

执行结果:

正常情况:
-----主线程结束,END
111
222
333
*****v: 1026
异常情况:
-----主线程结束,END
111
222
*****v: null
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

handle类似于thenApply,但是有异常的话仍然可以往下走一步。

public class CompletableFutureDemo2
{

    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        // 当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
        // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
        CompletableFuture.supplyAsync(() -> {
            // 暂停1秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 1024;
        }).handle((f,e) -> {
            // 异常语句
            int age = 10/0;
            System.out.println("222");
            return f + 1;
        }).handle((f,e) -> {
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("*****v: "+v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });

        System.out.println("-----主线程结束,END");

        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

执行结果:

正常情况:
-----主线程结束,END
111
222
333
*****v: 1026
异常情况:
-----主线程结束,END
111
333
*****v: null
java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because "f" is null

8.3.对计算结果进行消费

thenAccept接收任务的处理结果,并消费处理,无返回结果

    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture.supplyAsync(() -> {
            return 1;
        }).thenApply(f -> {
            return f + 2;
        }).thenApply(f -> {
            return f + 3;
        }).thenApply(f -> {
            return f + 4;
        }).thenAccept(r -> System.out.println(r));
    }

对比补充,任务直接的顺序执行关系:

thenRun(Runnable runnable):任务A执行完执行B,并且B不需要A的结果

thenAccept(Consumer action):任务A执行完执行B,B需要A的结果,但是任务B无返回值

thenApply(Function fn):任务A执行完执行B,B需要A的结果,同时任务B有返回值

代码示例:

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
//null 

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
//resultA打印出来的 null因为没有返回值

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
//resultAresultB 返回值

8.4.CompleteFuture线程池

上面的几个方法都有普通版本和后面加Async的版本,以thenRunthenRunAsync为例,有什么区别?

1、没有传入自定义线程池,都用默认线程池ForkJoinPool

public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
        System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));
    }
}
// 执行结果如下,使用thenRunAsync也一样
// 1号任务	ForkJoinPool.commonPool-worker-1
// 2号任务	ForkJoinPool.commonPool-worker-1
// 3号任务	ForkJoinPool.commonPool-worker-1
// 4号任务	ForkJoinPool.commonPool-worker-1

2、传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池

调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池

public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
        System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));
    }
}
// 执行结果
// 1号任务	pool-1-thread-1
// 2号任务	pool-1-thread-1
// 3号任务	pool-1-thread-1
// 4号任务	pool-1-thread-1

调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRunAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRunAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRunAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
        System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));
    }
}
// 执行结果:
// 1号任务	pool-1-thread-1
// 2号任务	ForkJoinPool.commonPool-worker-1
// 3号任务	ForkJoinPool.commonPool-worker-2
// 4号任务	ForkJoinPool.commonPool-worker-2

3、也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)

public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
//            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRun(()->{
            // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            //  try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            //try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    }
}
// 执行结果
// 1号任务	pool-1-thread-1
// 2号任务	main
// 3号任务	main
// 4号任务	main

8.5.对计算速度进行选用

applyToEither方法,谁快用谁

public class CompletableFutureFastDemo {
    public static void main(String[] args) {
        CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
            System.out.println("A come in");
            try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
            return "playA";
        });
        CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
            System.out.println("B come in");
            try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}
            return "playB";
        });

        // 对计算速度进行对比选用
        CompletableFuture<String> result = playA.applyToEither(playB, f -> {
            return f + "is winer";
        });
        System.out.println(Thread.currentThread().getName()+"\t"+"-----:" + result.join());
    }
}

执行结果:因为play A暂停了2秒,play B暂停了3秒,所以选择play A

A come in
B come in
main	-----:playAis winer

8.6.对计算结果进行合并

thenCombine 合并,两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理

标准版:先完成的先等着,等待其它分支任务

public class CompletableFutureCombineDemo
{
    public static void main(String[] args)
    {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---启动");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---启动");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 20;
        });

        CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
            System.out.println("-----开始两个结果合并");
            return x + y;
        });

        System.out.println(result.join());

    }
}

执行结果:

ForkJoinPool.commonPool-worker-1	 ---启动
ForkJoinPool.commonPool-worker-2	 ---启动
-----开始两个结果合并
30

合并版本:

public class CompletableFutureCombineDemo
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
            return 20;
        }), (x,y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
            return x + y;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
            return 30;
        }),(a,b) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
            return a + b;
        });
        System.out.println("-----主线程结束,END");
        System.out.println(thenCombineResult.get());


        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

执行结果:

ForkJoinPool.commonPool-worker-1	---come in 1
ForkJoinPool.commonPool-worker-1	---come in 2
main	---come in 3
ForkJoinPool.commonPool-worker-2	---come in 4
main	---come in 5
-----主线程结束,END
60

转载请注明:西门飞冰的博客 » Future 异步编程

喜欢 (0)or分享 (0)