1.Future 介绍
Future 是Java5新加的一个接口,它提供了一种。
如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务或者先行结束,在通过Future获取计算结果。
一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
FutureTask是Future接口的实现类,三合一功能:
- 作为线程:实现了Runnable接口
- 异步处理:实现了Future接口
- 有返回值:构造注入了Callable<T>,提供了Callable功能
2.
通过代码实现一个带有返回值的异步多线程任务
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class FutureTaskDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<>(new MyThread()); Thread t1 = new Thread(futureTask, "t1"); t1.start(); // 接收返回值 System.out.println(futureTask.get()); } } class MyThread implements Callable<String>{ @Override public String call() throws Exception { System.out.println("-----come in call() ----异步执行"); return "hello Callable 返回值"; } } //执行结果 //-----come in call() ----异步执行 //hello Callable 返回值
3.
future+线程池异步多线程任务配合,能显著提高程序的执行效率。
案例说明:
同步:3个任务,只有一个线程main来处理,耗时1115 毫秒
public class FutureThreadPoolDemo1 { public static void main(String[] args) { long startTime = System.currentTimeMillis(); // 暂停毫秒 try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"); System.out.println(Thread.currentThread().getName()+"\t -----end"); } }
开启多个异步任务线程来处理: 耗损和成本是原来的三分之二,耗时807 毫秒
public class FutureThreadPoolDemo1 { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3); long startTime = System.currentTimeMillis(); FutureTask<String> futureTask1 = new FutureTask<String>(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "task1 over"; }); threadPool.submit(futureTask1); FutureTask<String> futureTask2 = new FutureTask<String>(() -> { try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } return "task2 over"; }); threadPool.submit(futureTask2); System.out.println(futureTask1.get()); System.out.println(futureTask2.get()); try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"); System.out.println(Thread.currentThread().getName()+"\t -----end"); threadPool.shutdown(); } }
4.
Future 对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
4.1.get阻塞:
一旦调用get()方法,不管是否计算完成,都会导致阻塞(所以一般get方法放到最后)
示例:get放到最后不阻塞主线程
public class FutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<String>(() -> { System.out.println(Thread.currentThread().getName() + "\t ---- 副线程come in"); // 暂停几秒 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return "task over"; }); Thread t1 = new Thread(futureTask,"t1"); t1.start(); //-----------------------------------------------------------注意顺序 System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了"); System.out.println(futureTask.get()); //----------------------------------------------------------注意顺序 } }
示例:get放到主线程逻辑中,5秒后才出现下面的结果————-说明一旦调用get()方法直接去找副线程了,阻塞了主线程
public class FutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<String>(()->{ System.out.println(Thread.currentThread().getName()+"\t ------副线程come in"); try { TimeUnit.SECONDS.sleep(5);//暂停几秒 } catch (InterruptedException e) { e.printStackTrace(); } return "task over"; }); Thread t1 = new Thread(futureTask,"t1"); t1.start(); //-----------------------------------------------------------注意顺序 System.out.println(futureTask.get()); System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了"); //----------------------------------------------------------注意顺序 } }
4.2.isDone轮训:
isDone用来判断异步任务是否执行完成,主要目的是为了解决get方法获取异步结果的阻塞问题
public class FutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<String>(() -> { System.out.println(Thread.currentThread().getName() + "\t ---- 副线程come in"); // 暂停几秒 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return "task over"; }); Thread t1 = new Thread(futureTask,"t1"); t1.start(); System.out.println(Thread.currentThread().getName() + "\t -----忙其他任务了"); while (true){ if (futureTask.isDone()){ // 异步任务处理完成 System.out.println(futureTask.get()); break; }else{ // 异步任务没有处理完成 System.out.println("正在处理中……"); // 暂停500毫秒 TimeUnit.MILLISECONDS.sleep(500); } } } }
轮训的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果
但是Future如果想要异步获取结果,通常都会以轮训的方式去获取结果,尽量不阻塞
5.
Future 阻塞的方式和异步编程的设计理念相违背,而isDone轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture,它最大的作用是提供了一个回调机制,可以在任务完成后,自动回调后续的处理,这样,整个程序可以把“结果等待”完全给移除了。
CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
CompletableFuture实现了Future
和Completion Stage
接口,因此,Future接口所有方法的Completable Future都具备,Future不具备的,通过Completable Stage接口进行扩展加强
Completable Stage接口有几十个方法,如下图所示:
5.1.CompletionStage接口介绍
Completion Stage代表异步计算过程中的某一个阶段, 一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数
一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:stage.then Apply(x->square(x) ) .then Accept (x->System.out.print(x) ) .then Run() ->System.out.print In() )
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。
6.
6.1.
runAsync方法不支持返回值.适用于多个接口之间没有任何先后关系
6.1.1.runAsync
静态方法:
public static CompletableFuture<Void> runAsync(Runnable runnable)
代码示例:
public class CompletableFutureBuildDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); // 停顿几秒线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(voidCompletableFuture.get()); } } // 执行结果: // ForkJoinPool.commonPool-worker-1 //默认的线程池 // null // 没有返回值
6.1.2.runAsync+线程池
静态方法:
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
代码示例:
public class CompletableFutureBuildDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池 CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); //停顿几秒线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } },executorService); System.out.println(voidCompletableFuture.get()); } } // 执行结果 // pool-1-thread-1 //指定的线程池 // null // 没有返回值
6.2.supplyAsync有返回值
supplyAsync可以支持返回值,我们一般用supplyAsync来创建异步任务
6.2.1.supplyAsync
静态方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
代码示例:
public class CompletableFutureBuildDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "hello supplyasync"; }); System.out.println(objectCompletableFuture.get()); } } // 执行结果 // ForkJoinPool.commonPool-worker-1 // 默认的线程池 // hello supplyasync // 有返回值
6.2.2.supplyAsync+线程池
静态方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
代码示例:
public class CompletableFutureBuildDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3); CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "hello supplyasync"; },executorService); System.out.println(objectCompletableFuture.get()); } } // 执行结果 // pool-1-thread-1 // 自定义线程池 // hello supplyasync // 有返回值
7.
7.1.基本功能
CompletableFuture
可以完成Future
的功能
public class CompletableFutureUseDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Object> objectCompletableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"----副线程come in"); int result = ThreadLocalRandom.current().nextInt(10);//产生一个随机数 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("1秒钟后出结果"+result); return result; }); System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务"); System.out.println(objectCompletableFuture.get()); } } // 执行结果 // main线程先去忙其他任务 // ForkJoinPool.commonPool-worker-1----副线程come in // 1秒钟后出结果9 // 9
7.2.自动回调
CompletableFuture
通过whenComplete
来减少阻塞和轮询实现自动回调
public class CompletableFutureUseDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"--------副线程come in"); // 产生随机数 int result = ThreadLocalRandom.current().nextInt(10); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return result; // 没有异常,v是值,e是异常 }).whenComplete((v,e) -> { if(e == null){ System.out.println("------------------计算完成,更新系统updataValue"+v); } // 有异常的情况 }).exceptionally(e->{ e.printStackTrace(); System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage()); return null; }); //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程 System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } // 执行结果 // ForkJoinPool.commonPool-worker-1--------副线程come in // main线程先去忙其他任务 // ------------------计算完成,更新系统updataValue3
7.3.
public class CompletableFutureUseDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { // 自定义线程池 ExecutorService threadPool = Executors.newFixedThreadPool(3); CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"--------副线程come in"); //产生随机数 int result = ThreadLocalRandom.current().nextInt(10); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return result; // 没有异常,v是值,e是异常 },threadPool).whenComplete((v,e) -> { if(e == null){ System.out.println("------------------计算完成,更新系统updataValue"+v); } // 有异常的情况 }).exceptionally(e->{ e.printStackTrace(); System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage()); return null; }); //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程 System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } }
7.4.异常情况的模拟
设置一个异常 int i = 10 / 0 ;
public class CompletableFutureUseDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3); CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"--------副线程come in"); int result = ThreadLocalRandom.current().nextInt(10);//产生随机数 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("-----结果---异常判断值---"+result); //---------------------异常情况的演示-------------------------------------- if(result > 2){ //我们主动的给一个异常情况 int i = 10 / 0 ; } //------------------------------------------------------------------ return result; // 没有异常,v是值,e是异常 },threadPool).whenComplete((v,e) -> { if(e == null){ System.out.println("------------------计算完成,更新系统updataValue"+v); } // 有异常的情况 }).exceptionally(e->{ e.printStackTrace(); System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage()); return null; }); //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程 System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果:
pool-1-thread-1--------副线程come in main线程先去忙其他任务 -----结果---异常判断值---9 异常情况java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero
8.CompletableFuture常用方法
8.1.获得结果和触发计算
获取结果:
public T get() 不见不散,容易阻塞
public T get(long timeout,TimeUnit unit) 过时不候,超过设定时间会报异常
public T join() 类似于get(),不同的是,join方法不会抛出异常
public T getNow(T valueIfAbsent)
- 没有计算完成的情况下,给一个替代结果
- 立即获取结果不阻塞
- 计算完,返回计算完成后的结果
- 没算完,返回设定的valueAbsent(直接返回了备胎值xxx)
主动触发计算:
public boolean complete(T value)
是否立即打断get()方法返回括号值
如下示例:执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc
代码示例:
public class CompletableFutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> { try { //执行需要2秒 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "abc"; }); try { //等待需要1秒 TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } //执2-等1 返回xxx // System.out.println(uCompletableFuture.getNow("xxx")); //执2-等1 返回true+备胎值completeValue System.out.println(uCompletableFuture.complete("completeValue")+"\t"+uCompletableFuture.get()); } }
8.2.对计算结果进行处理
计算结果存在在依赖关系,使得线程串行化。由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停
public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化, CompletableFuture.supplyAsync(() -> { //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("111"); return 1024; }).thenApply(f -> { System.out.println("222"); return f + 1; }).thenApply(f -> { // int age = 10/0; // 异常情况:那步出错就停在那步。 System.out.println("333"); return f + 1; }).whenCompleteAsync((v,e) -> { System.out.println("*****v: "+v); }).exceptionally(e -> { e.printStackTrace(); return null; }); System.out.println("-----主线程结束,END"); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果:
正常情况: -----主线程结束,END 111 222 333 *****v: 1026 异常情况: -----主线程结束,END 111 222 *****v: null java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { // 当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化, // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理 CompletableFuture.supplyAsync(() -> { // 暂停1秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("111"); return 1024; }).handle((f,e) -> { // 异常语句 int age = 10/0; System.out.println("222"); return f + 1; }).handle((f,e) -> { System.out.println("333"); return f + 1; }).whenCompleteAsync((v,e) -> { System.out.println("*****v: "+v); }).exceptionally(e -> { e.printStackTrace(); return null; }); System.out.println("-----主线程结束,END"); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果:
正常情况: -----主线程结束,END 111 222 333 *****v: 1026 异常情况: -----主线程结束,END 111 333 *****v: null java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because "f" is null
8.3.对计算结果进行消费
接收任务的处理结果,并
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(() -> { return 1; }).thenApply(f -> { return f + 2; }).thenApply(f -> { return f + 3; }).thenApply(f -> { return f + 4; }).thenAccept(r -> System.out.println(r)); }
对比补充,任务直接的顺序执行关系:
thenRun(Runnable runnable):任务A执行完执行B,并且B不需要A的结果
thenAccept(Consumer action):任务A执行完执行B,B需要A的结果,但是任务B无返回值
thenApply(Function fn):任务A执行完执行B,B需要A的结果,同时任务B有返回值
代码示例:
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join()); //null System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join()); //resultA打印出来的 null因为没有返回值 System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join()); //resultAresultB 返回值
8.4.CompleteFuture线程池
上面的几个方法都有普通版本和后面加Async的版本,以thenRun
和thenRunAsync
为例,有什么区别?
1、没有传入自定义线程池,都用默认线程池ForkJoinPool
public class CompletableFutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("1号任务"+"\t"+Thread.currentThread().getName()); return "abcd"; }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("2号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("3号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("4号任务"+"\t"+Thread.currentThread().getName()); }); System.out.println(completableFuture.get(2L,TimeUnit.SECONDS)); } } // 执行结果如下,使用thenRunAsync也一样 // 1号任务 ForkJoinPool.commonPool-worker-1 // 2号任务 ForkJoinPool.commonPool-worker-1 // 3号任务 ForkJoinPool.commonPool-worker-1 // 4号任务 ForkJoinPool.commonPool-worker-1
2、传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池
调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池
public class CompletableFutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ExecutorService threadPool = Executors.newFixedThreadPool(5); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("1号任务"+"\t"+Thread.currentThread().getName()); return "abcd"; },threadPool).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("2号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("3号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("4号任务"+"\t"+Thread.currentThread().getName()); }); System.out.println(completableFuture.get(2L,TimeUnit.SECONDS)); } } // 执行结果 // 1号任务 pool-1-thread-1 // 2号任务 pool-1-thread-1 // 3号任务 pool-1-thread-1 // 4号任务 pool-1-thread-1
调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
public class CompletableFutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ExecutorService threadPool = Executors.newFixedThreadPool(5); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("1号任务"+"\t"+Thread.currentThread().getName()); return "abcd"; },threadPool).thenRunAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("2号任务"+"\t"+Thread.currentThread().getName()); }).thenRunAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("3号任务"+"\t"+Thread.currentThread().getName()); }).thenRunAsync(()->{ try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("4号任务"+"\t"+Thread.currentThread().getName()); }); System.out.println(completableFuture.get(2L,TimeUnit.SECONDS)); } } // 执行结果: // 1号任务 pool-1-thread-1 // 2号任务 ForkJoinPool.commonPool-worker-1 // 3号任务 ForkJoinPool.commonPool-worker-2 // 4号任务 ForkJoinPool.commonPool-worker-2
3、也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)
public class CompletableFutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(5); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{ // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("1号任务"+"\t"+Thread.currentThread().getName()); return "abcd"; },threadPool).thenRun(()->{ // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("2号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("3号任务"+"\t"+Thread.currentThread().getName()); }).thenRun(()->{ //try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("4号任务"+"\t"+Thread.currentThread().getName()); }); } } // 执行结果 // 1号任务 pool-1-thread-1 // 2号任务 main // 3号任务 main // 4号任务 main
8.5.对计算速度进行选用
applyToEither
方法,谁快用谁
public class CompletableFutureFastDemo { public static void main(String[] args) { CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> { System.out.println("A come in"); try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();} return "playA"; }); CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> { System.out.println("B come in"); try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();} return "playB"; }); // 对计算速度进行对比选用 CompletableFuture<String> result = playA.applyToEither(playB, f -> { return f + "is winer"; }); System.out.println(Thread.currentThread().getName()+"\t"+"-----:" + result.join()); } }
执行结果:因为play A暂停了2秒,play B暂停了3秒,所以选择play A
A come in B come in main -----:playAis winer
8.6.对计算结果进行合并
thenCombine
合并,两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理
public class CompletableFutureCombineDemo { public static void main(String[] args) { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t ---启动"); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 10; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t ---启动"); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return 20; }); CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> { System.out.println("-----开始两个结果合并"); return x + y; }); System.out.println(result.join()); } }
执行结果:
ForkJoinPool.commonPool-worker-1 ---启动 ForkJoinPool.commonPool-worker-2 ---启动 -----开始两个结果合并 30
合并版本:
public class CompletableFutureCombineDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1"); return 10; }).thenCombine(CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2"); return 20; }), (x,y) -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3"); return x + y; }).thenCombine(CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4"); return 30; }),(a,b) -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5"); return a + b; }); System.out.println("-----主线程结束,END"); System.out.println(thenCombineResult.get()); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果:
ForkJoinPool.commonPool-worker-1 ---come in 1 ForkJoinPool.commonPool-worker-1 ---come in 2 main ---come in 3 ForkJoinPool.commonPool-worker-2 ---come in 4 main ---come in 5 -----主线程结束,END 60
转载请注明:西门飞冰的博客 » Future 异步编程