为什么 CompletableFuture 的 thenApplyAsync 没有新起一个线程? - V2EX
amiwrong123
V2EX    Java

为什么 CompletableFuture 的 thenApplyAsync 没有新起一个线程?

  •  
  •   amiwrong123 Aug 22, 2020 2974 views
    This topic created in 2089 days ago, the information mentioned may be changed or developed.
     public static void test() { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { String supplyAsyncResult = " "+Thread.currentThread().getName()+" Hello world! "; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(supplyAsyncResult); return supplyAsyncResult; }).thenApplyAsync(r -> { //添加后续任务 String thenApplyResult = Thread.curentThread().getName()+r + " thenApply! "; System.out.println(thenApplyResult); return thenApplyResult; }); try { System.out.println(completableFuture.get() + " finish!"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } 

    打印:

     ForkJoinPool.commonPool-worker-9 Hello world! ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world! thenApply! ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world! thenApply! finish! 
     public void run() { CompletableFuture<T> d; Supplier<T> f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; //只是为了防止内存泄漏,方便 GC if (d.result == null) { try { d.completeValue(f.get()); //执行 task } catch (Throwable ex) { //执行 task 期间抛出了异常 d.completeThrowable(ex); } } d.postComplete(); } } 

    从源码上来看,supplyAsync 新起了一个线程,等到线程执行完 task,开始执行 d.postComplete(),即开始执行后续 task,然后 postComplete 会执行后续 task 的 completion 对象的 tryFire 方法。

     static final class UniApply<T,V> extends UniCompletion<T,V> { Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this))//这里会发现前一个 stage 执行完毕,但提供了线程池 return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } } 
     final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S,? extends T> f, UniApply<S,T> c) { Object r; Throwable x; if (a == null || (r = a.result) == null || f == null) return false; tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; } try { if (c != null && !c.claim())//会执行到这里,然后发现 claim 返回 false return false; @SuppressWarnings("unchecked") S s = (S) r; completeValue(f.apply(s)); } catch (Throwable ex) { completeThrowable(ex); } } return true; } 
     final boolean claim() { Executor e = executor; if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { if (e == null) return true; executor = null; // disable e.execute(this); //会执行到这里,然后把 this completion 对象提交给线程池执行,当前线程即将返回 } return false; } 

    我的问题在于,当 worker-9 线程执行完第一个 task 之后,它把第二个 task 提交给了 executor (e.execute(this)),然后线程就返回了(从 claim 函数一层一层返回,直到返回 postComplete )。那为什么第二个 task 从打印结果来看,还是同一个 worker-9 线程来执行的?

    还是说,只是因为我的例子比较简单,所以 executor 没有分配一个新的线程出来,其他情况下,thenApplyAsync 里面在执行e.execute(this)时,还是有可能新起一个线程的吗?

    8 replies    2020-08-24 19:15:47 +08:00
    passerbytiny
        1
    passerbytiny  
       Aug 22, 2020 via Android
    supplyAsync 和 thenApplyAsync 虽然都是异步调用,但它们两个之间是串行的,为什么就不能在一个线程(执行器)中被执行。
    amiwrong123
        2
    amiwrong123  
    OP
       Aug 22, 2020
    @passerbytiny
    没说不可以,它们之间肯定是串行的,但不一定是同一个线程吧。从源码上可见,supplyAsync 的线程并不是直接执行下一个 task 的,因为它 e.execute(this)之后就马上返回了。
    zyoo
        3
    zyoo  
       Aug 22, 2020
    async 的语义是不一定同一个线程,所以这个只能说是巧合了,你可以多试几把?
    amiwong123
        4
    amiwrong123  
    OP
       Aug 22, 2020
    @zyoo
    多试几次也一样。我怀疑这跟 ForkJoinPool.commonPool()的线程调度有关系,但我现在还没来得及看它的原理呢。。
    passerbytiny
        5
    passerbytiny  
       Aug 22, 2020 via Android
    @amiwrong123 异步任务都是将任务提交给执行器去执行的,而不是从线程池取出一个线程用来执行任务。选择哪个线程是由执行器自行决定的,任务的提交者很难也不该对线程选择产生影响。

    从高层次上看,thenApplyAsync 是在 applyAsync 完成之后执行的,所以最优选择就是两者使用同一个线程(线程唤醒也是有成本的)。从源码看,你要主要看的应该是 Executor 的源码。
    amiwrong123
        6
    amiwrong123  
    OP
       Aug 22, 2020
    @passerbytiny
    好吧,大概理解了。主要之前我以为我这个例子,applyAsync 和 thenApplyAsync 的执行线程肯定是同一个线程,但从源码上看发现 前一个线程只是提交任务给 Executor 而已。

    所以,applyAsync 和 thenApplyAsync 的执行线程不一定是同一个呗。只是这个例子里,线程池是这样调度的。
    XuHuan1025
        7
    XuHuan1025  
       Aug 22, 2020
    木宝厉害哦
    RedBeanIce
        8
    RedBeanIce  
       Aug 24, 2020
    @amiwrong123 #5
    @passerbytiny #6

    JDK8
    求问一下,我也是看到这里将任务提交到 Executor
    e.execute(new AsyncRun(d, f));
    那么下一步应该看 forkjoinpool ?因为默认是他。
    About     Help     Advertise     Blog     API     FAQ     Solana     3366 Online   Highest 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 38ms UTC 13:45 PVG 21:45 LAX 06:45 JFK 09:45
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86