并行框架的应用场景和需求
譬如用户请求“我的订单”,后台在收到请求后,就需要去调用用户详情rpc、商品详情rpc、库存rpc、优惠券rpc等等很多个服务。有些服务是可以并行去请求的,但有些服务是依赖于某个服务的返回值的(如查库存、优惠券,就依赖于商品详情回复到达后才能去请求)。
- 譬如在数据清洗领域,经常会有这样的需求,从多个数据源分别拉取数据,做第一步清洗,之后等某一步完成、或某几步都完成、或至少某几步完成,进行下一步任务。整个流程有明显的依赖顺序,以及任意可能存在的阻塞、异常、超时等情况。
如何将整个流程进行编排并让其按照设定顺序执行,并能合理处理异常情况,是一个并行框架所要有的功能。
当然,如果你对jdk1.8里的completeableFuture非常熟悉,通过一大堆的组合包装,也能做到上图的编排,但是它可能称不上一个框架,只能应用于自己的业务。
我希望多个并发任务,每一步都可以监控
CompleteableFuture很多人用过,里面有supply、then、allOf等等方法,都可以用来接收一个任务,最终将多个任务汇总成一个结果。
不知道大家意识到没有,你supply一个任务后,这个任务就黑盒了,如果你编排了很多个任务,那到底具体每一个任务的执行情况,执行到哪一步了,每一步的执行结果情况,我们是不知道的。只能等它最终执行完毕后,最后的汇总结果。
所以,一个并行框架,它最好是带每一步的监控,就是每一步的执行结果,无论成功与失败,它应该有个回调,才算完整。
此时,可能大家会意识到,异步带回调的有哪些?Future?Future并不带回调功能,它只能让你不停地去轮询get方法。
那么,到底哪个异步带回调呢,其实有个大名鼎鼎的netty**。netty就是一个全程带回调的异步网络框架,它扩充了jdk的future,增加了addListener等方法,可以让你避免去轮询get一个普通的future,而是等待回调。这多么美妙是吧。**
我希望各个任务的执行顺序,有强依赖的弱依赖
如上图的3,A和B并发执行,最后是C。
有些场景下,我们希望A和B都执行完毕后,才能执行C,CompletableFuture里有个allOf(futures...).then()方法可以做到。
有些场景下,我们希望A或者B任何一个执行完毕,就执行C,CompletableFuture里有个anyOf(futures...).then()方法可以做到。
我的框架同样提供了类似的功能,通过设定wrapper里的addDepend依赖时,可以指定依赖的任务是否must执行完毕。如果依赖的是must要执行的,那么就一定会等待所有的must依赖项全执行完毕,才执行自己。
如果依赖的都不是must,那么就可以任意一个依赖项执行完毕,就可以执行自己了。
我希望某个任务可以依赖上游的执行结果作为入参
CompletableFuture.thenCompose()
譬如A-B-C三个执行单元,A的入参是String,出参是int,B呢它需要用A的结果作为自己的入参。也就是说A、B并不是独立的,而是有结果依赖关系的。
在A执行完毕之前,B是取不到结果的,只是知道A的结果类型。
那么,我的框架也支持这样的场景。可以在编排时,就取A的结果包装类,作为B的入参。虽然此时尚未执行,必然是空,但可以保证A执行完毕后,B的入参会被赋值。
希望全组任务可以设置超时时间
一组任务,虽然内部的各个执行单元的时间不可控,但是我可以控制全组的执行时间不超过某个值。通过设置timeOut,来控制全组的执行阈值。
希望框架能够高性能、低线程数
该框架全程无锁,没有一个加锁的地方。
创建线程量少。如这样的,A会运行在B、C执行更慢的那个单元的线程上,而不会额外创建线程。
这个框架和CompleteableFuture的优点?
并发场景可能存在的需求-每个执行结果的回调
但有一个问题,你supply一个任务后,这个任务就黑盒了。如果你编排了很多个任务,每一个任务的执行情况,执行到哪一步了,每一步的执行结果情况,我们是不知道的。只能等它最终执行完毕后,最后汇总结果。
一个并行框架,它最好是对每一步的执行都能监控。每一步的执行结果,无论成功与失败,它应该有个回调,才算完整。拥有回调的任务,可以监控任务的执行状况,如果执行失败、超时,可以记录异常信息或者处理个性化的默认值。
CompleteableFuture中也有一些回调方法,例如:thenAccept(),whenComplete(),handle(),exceptionally()等,这些方法也能支持任务的回调,但是前提是任务执行了,才能完成回调。在某些场景中,有些任务单元是可能被SKIP跳过不执行的,不执行的任务也应该有回调。
任务的顺序编排
全串行
这种是最简单的,依次串行即可。
假如有3个任务,譬如每个任务由一个worker来完成,共计3个worker,这3个worker有明显的先后顺序。要描述这种依赖关系和前后顺序,我们对任务的包装类应该至少有两个属性,nextWrappers和dependWrappers。分别代表我这个任务的后面的任务和依赖的任务。
以B为例,nextWrapper就是C,dependWrapper就是A。
可用的实现方式为:CompletableFuture.thenApply()、thenAccept() 和 thenRun() 都可以实现。
全并行
这种也很简单,也比较常见。全并行的实现方式很多,譬如可以将所有的worker放到list里,写个for循环依次start;也可以全部提交到线程池里;也可以用CompleteableFuture.supply()去接收多个worker任务。
可用的实现方式为:CompletableFuture.allOf()
先串后并
可用的实现方式为:CompletableFuture.runAsync(),阻塞get获取结果,再调用CompletableFuture.allOf()
这种稍微有点难度,首先它们的数据结构是这样的,A的nextWrappers是B和C,B、C的dependWrappers都是A。
执行A不必多说,就直接在主线程里执行它,或者新开线程执行它都可以,主要是A执行完毕后,当发现自己的nextWrappers有多个时,该怎么办。
还好,CompleteableFuture提供了allOf这个方法,它可以让你传入多个future,并且能够等待这多个future都完成时再统一返回。见下图代码。
先并后串
首先BC肯定是在两个线程上。A将来是要运行在B的线程或者C的线程的,视具体规则而定。
1.BC都完成才能执行A
可用的实现方式为:CompletableFuture.allOf(futures…).then();或者runAfterBoth
BC都完成才能执行A,这个比较简单,因为B的nextWrapper是A,C的nextWrapper也是A,B或者C执行完后就会去调用A。那么在A这里就可以做判断了,判断自己的dependWrappers的状态,遍历一遍,如果所有的dependWrappers的状态都是已完成,那么自然是所有的依赖都完成了,就可以执行自己了。 如果有任何一个的状态没完成,那就说明依赖项还未完全解除,就什么也不干,等待下一次被调用,再次判断即可。
2.BC任意一个完成都可以执行A
CompletableFuture里有个anyOf(futures…).then();或者runAfterEither
BC任意一个完成就能执行A,这种假如B先完成,那么B执行完就会走nextWrapper,那就是A。如果此时C也完事了,也会进入A,那么A就要做判断了,如果状态已经不是初始化时的状态,说明已经被执行了,就什么也不干,就是C完毕后调用自己时,自己什么也不干。
3.明确指定B完成、或者C完成才能执行A,也就是当有多个dependWrapper时,可以指定必须哪些个dependWrapper执行完,才能执行自己。
CompletableFuture不支持。
我们就需要在dependWrapper里加个属性了,代表是不是必须执行完毕。当执行到A时,A就要去遍历所有的dependWrappers,看看那些必须要完毕的wrapper的状况,是不是都全部已经执行完毕了。倘若还有至少1个强依赖项没执行完,那么自己什么也不干,直接return即可。
难点:异步回调
譬如我的主线程执行过程中,要执行一个非常耗时的逻辑,自然我们会想到用异步的形式去完成这个耗时逻辑,新建个线程,让它去一边执行就好了,只要不阻塞我的主线程。但问题来了,异步执行没毛病,执行成功、失败后出结果了,该怎么通知主线程?
Jdk的Future不便之处
Java的Future某种程度上来说是用来解决异步问题的,它让你在发起一个异步任务时,迅速能在主线程得到一个Future对象,通过该对象,就可以去获取到异步任务的执行结果。
但是有一个比较尴尬的问题,就是当你想获取异步执行结果时,要通过future.get()方法,这一步还是阻塞的!而且我们无法确定到底异步任务何时执行完毕,提前get了,就还是阻塞,get晚了,可能会漏掉执行结果,写个死循环,不停去轮询是否执行完毕,又浪费资源。所以,这个Future并不好用。
如何自己实现一个简单带回调的异步任务
首先我们来拆分一下需求,我有N个耗时任务,可能是一次网络请求,可能是一个耗时文件IO,可能是一堆复杂的逻辑,我在主线程里发起这个任务的调用,但不希望它阻塞主线程,而期望它执行完毕(成功\失败)后,来发起一次回调,最好还有超时、异常的回调控制。
据此,我们拆分出几个角色,master主线程,调度器(发起异步调用),worker(异步工作线程)。然后就是将他们组合起来,完成各种异步回调,以及每个worker的正常、异常、超时等的回调。
public interface Worker {
String action(Object object);
}
一个worker,它需要有个方法,来代表这个worker将来做什么,action就可以理解为一个耗时任务。action可以接收一个参数。
再看一下回调器的定义:
public interface Listener {
void result(Object result);
}
这个listener用来做为回调,将worker的执行结果,放到result的参数里。
此外,我们还需要一个包装器Wrapper,来将worker和回调器包装一下。
难点:相互依赖模型的建立
如图1,B必须在A完成后才能执行,C必须在B完成后才能执行。也就是B depends A,他们之间有个depends的关系,而且是强依赖。
如图4,A依赖于B、C,这里就分不同情况了。第一种情况,A强依赖于B、C,必须B and C都完成后才能执行A;第二种情况,A强依赖于B、弱依赖于C,如果仅仅是C完成了,B还未完成,那么A不会执行,必须B完成了,A才会执行(此时就不在乎C是否完成了);第三种情况,A弱依赖于B、C,无论B或C中的哪个先执行完,A都会开始执行,而不去等待其他的依赖项。
猛的看起来,要实现这有点难,其实等会经过我仔细分析后,就会发现并不难。
从上一篇我们学到了要完成回调,需要个wrapper,那么这一篇我们要学会,要完成依赖关系,需要depends属性。
每个wrapper都有的属性有nextWrappers和dependWrappers。
nextWrappers就是紧跟在自己后面要执行的,如果只有一个,那就是图1那种串行,如果有多个,那就是图3那种,自己执行完后,并行执行后面的所有nextWrappers。
dependWrappers就是自己依赖的,为什么我又定义了一个DependWrapper对象,而不是用WorkerWrapper呢。上面其实已经说了,强依赖和弱依赖,所以DependWrapper对象里有个must属性,用来标记该依赖任务是否必须执行完毕才能执行自己。
来剖析一下几种场景:
1 依赖单个wrapper,这个好说,实现就是在执行自己时,判断dependWrapper的数量为1,那就直接执行自己就好了。
2 依赖多个wrapper,这就需要判断了。首先遍历所有的dependWrapper,判断是否有must的,如果没有,那该场景就是全部弱依赖,那么就可以直接执行自己了。如果有must,并且该次调用来自于非must的wrapper,那就直接return,什么都不干。如果该次调用来自于must的wrapper,那就需要判断其他的must的所有wrapper是否已经全部执行完毕了,如果还有没执行完的,那么就return什么也不干,如果其他的must全部执行完了,那么就执行自己。
这个处理依赖是个难点和重点,建议多多思考,仔细看看代码。
超时异常处理
递归获取所有的 WorkerWrapper
,通过一个 Set
将所有 WorkerWrapper>
通过递归的方式统计起来。
//1.递归获取所有的WorkerWrapper
private static void totalWorkers(List<WorkerWrapper> workerWrappers, Set<WorkerWrapper> set) {
set.addAll(workerWrappers);
for (WorkerWrapper wrapper : workerWrappers) {
if (wrapper.getNextWrappers() == null) {
continue;
}
List<WorkerWrapper> wrappers = wrapper.getNextWrappers();
//递归
totalWorkers(wrappers, set);
}
}
根据3.1中统计的任务集合。循环停止所有尚未执行、正在执行的任务。注意已经执行完毕的任务是不处理的(包括异常的)。
/**
* 总控制台超时,停止所有任务
*/
public void stopNow() {
if (getState() == INIT || getState() == WORKING) {
fastFail(getState(), null);
}
}
fastFail()
首页通过CAS方式,将WorkerWrapper的state属性设置为ERROR状态;并且将workResult设置为默认值,然后调用 callback.result()
回调方法处理。
/**
* 快速失败
*/
private boolean fastFail(int expect, Exception e) {
//试图将它从expect状态,改成Error
if (!compareAndSetState(expect, ERROR)) {
return false;
}
//只要任务没有完成就处理;完成的就不管了,包括超时的和异常的任务
if (checkIsNullResult()) {
if (e == null) {
workResult = defaultResult();
} else {
workResult = defaultExResult(e);
}
}
//回调result
callback.result(false, param, workResult);
return true;
}
流程
- 先写一个任务类 继承我们的Itask接口和Icallback接口 他俩负责一个作为执行动作的接口 一个回调接口
- 然后我们可以编排任务 用next 指向下一个 depend指向上一个
- 编排好任务后 我们beginTask 参数是 过期时间 和 最开始的任务
- 将任务封装成CompletableFuture数组 执行我们的task方法
- 然后利用CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
- 我们catch 如果超时 **递归获取所有的
WorkerWrapper
,通过一个Set
将所有WorkerWrapper
通过递归的方式统计起来。 然后采用fastFail()
首页通过CAS方式,将WorkerWrapper的state属性设置为ERROR状态;并且将workResult设置为默认值,然后调用callback.result()>
回调方法处理。 - 如果没超时 我们就正常进入每一个任务的task方法
- task 方法有几个参数
executorService:自定义线程池,不自定义的话,就走默认的COMMON_POOL。默认的线程池是不定长线程池。
* fromWrapper:本次task是由哪个上游TaskWrapper发起的。
* remainTime:剩余的时间,用来监控任务超时的。随着一组任务的执行,这个值从全局设置的timeout时间逐渐减少,当remainTime<=0时,任务就超时了。
* forParamUseWrappers:缓存一组任务所有的TaskWrapper。key:id,value:TaskWrapper引用。
task流程是这样的:
-
收集所有的wrapper,key是id,以便用于在Task工作单元中,获取任意Task的执行结果。
-
任务超时处理 给当前的任务进行fastFail() 然后beginNext 下一个next链任务
-
如果自己已经执行过了,继续处理下一个任务 他的原因可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了 就是通过判断任务的一个属性state 是否等于finish 或者error 如果等于我们beginnext执行下一个任务
-
**如果在执行前需要校验nextWrapper的状态,仅在nextWrappers <= 1时有效,如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了,SKIP跳过任务,不执行。(可选) **
譬如A-B和C-D并行,B和D执行完毕后,再执行F,譬如CD这条线执行的比较快,已经先到达了F,并且把F给执行完了。此时A刚执行完,准备执行B了,但是B后面的F都已经完毕了,那么B是否还要继续执行。needCheckNextWrapperResult就是校验该 任务后续的任务是否已经执行完毕了,自己还要不要执行
-
如果没有任何依赖,说明自己就是第一批要执行的 就去执行fire方法就是执行当前方法 和beginnext后续方法
-
如果有前方依赖,存在两种情况
** 一种是前面只有一个wrapper。即 A -> B**
** 一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。**
** 所以需要B来做判断,必须A、C、D都完成,自己才能执行**
-
如果前置依赖只有一个 则取判断前置依赖 是否超时,如果超时,则自己也超时。
任务是否异常,如果异常,则自己也异常。
依赖任务正常完成了,则自己正常执行。fire方法
然后执行beginnext方法 -
有多个依赖时 分为3种情况:
依赖任务全部执行完才能执行自己。
指定某个依赖任务完成,就可以执行自己。
依赖任务都不是must属性,也就是说不是强依赖,此时当前任务会在运行最快的那个依赖任务的线程上执行。执行流程如下:
首先synchronized 加锁 防止多个依赖同时完成都执行到当前节点 导致幂等问题
然后进行check幂等 如果不是初始化状态就返回 因为已经开始执行了
1.判断是否有must强依赖,如果没有强依赖,当前任务就可以正常执行了。fire执行本身 然后beginnext执行下一个
** 2.如果有强依赖,判断fromWrapper是否是must?如果不是must的就return了。**
** 3.有强依赖,需要看强依赖任务中,是否有超时或者异常的任务,如果有,当前任务也超时、异常,fastFail。**
** 4.有强依赖,判断依赖任务是否全部完成?如果完成了,可以执行当前任务fire 然后beginnext下个任务;如果没有完成,return什么也不做。**
我在讲一下执行当前方法也就是fire方法
主要有7个步骤:
- Check 重复执行,避免任务重复执行。
就是检查当前的结果状态是否为默认 为默认就是第一次执行
- CAS设置任务状态,state运行状态由 init - > working 工作中
- 回调 callback.begin()
- 执行耗时操作action 他是一个接口里的方法 我们继承这个ITask接口 来进行业务操作
- CAS设置任务状态,state运行状态由 tasking - > finsh
- 回调 callback.result()
然后我在讲一下beginnext 执行后续方法
-
判断当前任务是否有next后续任务,如果没有任务了,就是最后一个任务,就return
-
**next后续只有1个任务:判断next任务数量,如果数量只有1个,使用当前任务的线程执行next任务(调用task() **
-
**next后续有多个任务:判断next任务数量,如果有多个,有几个任务就新起几个线程执行 用CompletableFuture数组封装 **
-
阻塞get获取结果。CompletableFuture.allOf(futures).get();
注意点:
** 1.beginNext() 中后续任务的处理,也是通过 task() 来处理逻辑的。注意超时时间的处理,**
** 使用 remainTime 剩余时间 - costTime花费时间,这个值在整组任务的执行过程中,是逐渐减小的。例如A、B、C串行执行,**
** 整组任务的超时时间是1000ms,A执行消耗了200ms,到B执行时,B的可用时间 = 1000-200 = 800ms,这个时间是逐渐减小的。**
** 如果这个值小于0了,说明已经超过了整组任务设定的超时时间,任务就 FastFail() 了。**
** 2.beginNext() 中第4点针对后续有多个任务的处理,这里并没有使用带有超时的get方法。单个任务是没有超时监控的,**
** 如果要监控每个任务的超时,就需要一个额外的线程,有几个任务就需要几个线程,高并发场景下,会造成线程 ”爆炸“。全组任务超时,是在Async执行器中控制的。**
WorkerWrapper处理任务
核心流程:task()
<span class="ne-text">WorkerWrapper.work()</span>
方法是WorkerWrapper任务单元执行的入口。每次处理一个任务。
<span class="ne-text">work()</span>
方法的定义:
**work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) **
<span class="ne-text">executorService</span>
:自定义线程池,不自定义的话,就走默认的COMMON_POOL。默认的线程池是不定长线程池**。关于不定长线程池,在《第5步AsyncTool执行器工作过程》中已经分析了。**<span class="ne-text">fromWrapper</span>
:本次work是由哪个上游WorkerWrapper发起的。<span class="ne-text">remainTime</span>
:剩余的时间,用来监控任务超时的。随着一组任务的执行,这个值从全局设置的timeout时间逐渐减少,当remainTime<=0时,任务就超时了。<span class="ne-text">forParamUseWrappers</span>
:缓存一组任务所有的WorkerWrapper。key:id,value:WorkerWrapper引用。
<span class="ne-text">work()</span>
具体执行逻辑。主要分为6步:
- 缓存所有WorkerWrapper
- 任务超时处理
- Check是否执行过了,避免重复处理
- Check 后继next是否已经开始执行了,避免多余的处理
- 没有依赖Wrapper情况处理,则当前任务就是起始节点。
- 有依赖Wrapper情况处理,又区分只有1个依赖任务,或者有多个依赖任务的处理。
/**
* 开始工作
* fromWrapper代表这次work是由哪个上游wrapper发起的
*/
private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
//引用指向
this.forParamUseWrappers = forParamUseWrappers;
//1.收集所有的wrapper,key是id,以便用于在Worker工作单元中,获取任意Worker的执行结果。
forParamUseWrappers.put(id, this);
//时钟类获取当前时间
long now = SystemClock.now();
//2.总的已经超时了,就快速失败,进行下一个
if (remainTime <= 0) {
fastFail(INIT, null);
beginNext(executorService, now, remainTime);
return;
}
//3.如果自己已经执行过了,继续处理下一个任务
//可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
if (getState() == FINISH || getState() == ERROR) {
beginNext(executorService, now, remainTime);
return;
}
//4.如果在执行前需要校验nextWrapper的状态,仅在nextWrappers <= 1时有效
if (needCheckNextWrapperResult) {
//如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了,SKIP跳过任务,不执行。
if (!checkNextWrapperResult()) {
//FastFail SKIP,new SkippedException()
fastFail(INIT, new SkippedException());
beginNext(executorService, now, remainTime);
return;
}
}
//5.如果没有任何依赖,说明自己就是第一批要执行的
if (dependWrappers == null || dependWrappers.size() == 0) {
//5.1 执行当前任务
fire();
//5.2 开始后继任务
beginNext(executorService, now, remainTime);
return;
}
/*如果有前方依赖,存在两种情况
一种是前面只有一个wrapper。即 A -> B
一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。
所以需要B来做判断,必须A、C、D都完成,自己才能执行 */
//6.处理前置有依赖的情况
//6.1只有一个依赖
if (dependWrappers.size() == 1) {
//6-1.1:依赖任务正常结束了,就执行自己
doDependsOneJob(fromWrapper);
//6-1.2:开始后继任务
beginNext(executorService, now, remainTime);
}
//6.2有多个依赖时
else {
//6-2.1:多个依赖任务的判断处理
doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime);
}
}
开炮:fire()
fire()
方法是具体执行Worker任务的**
- Check 重复执行,避免任务重复执行。
CAS
设置任务状态,state
运行状态由init
- >working
- 回调
callback.begin()
- 执行耗时操作action
CAS
设置任务状态, state
运行状态由working
- >finsh
- 回调
callback.result()
- **异常处理
fastFail()
。CAS
设置任务状态,state
运行状态由working
- >finsh
;设置默认值、异常信息;
/**
* 具体的单个worker执行任务
*/
private WorkResult<V> workerDoJob() {
//1.Check重复执行
if (!checkIsNullResult()) {
return workResult;
}
try {
//2.设置Wrapper状态为Working
//如果已经不是init状态了,说明正在被执行或已执行完毕。这一步很重要,可以保证任务不被重复执行
if (!compareAndSetState(INIT, WORKING)) {
return workResult;
}
//3.回调begin
callback.begin();
//4.执行耗时操作action
V resultValue = worker.action(param, forParamUseWrappers);
//5.设置Wrapper状态为FINISH
//如果状态不是在working,说明别的地方已经修改了
if (!compareAndSetState(WORKING, FINISH)) {
return workResult;
}
workResult.setResultState(ResultState.SUCCESS);
workResult.setResult(resultValue);
//6.回调result
callback.result(true, param, workResult);
return workResult;
} catch (Exception e) {
//7.异常处理:设置状态ERROR\EXCEPTION,结果设置为默认值
fastFail(WORKING, e);
return workResult;
}
}
6.3、beginNext()
当前WorkerWrapper的 fire()
执行完毕后,就调用 beginNext()
方法执行后续节点。
beginNext()
的执行步骤主要有4步:
- 判断当前任务是否有next后续任务,如果没有任务了,就是最后一个任务,就结束了。
- next后续只有1个任务:判断next任务数量,如果数量只有1个,使用当前任务的线程执行next任务(调用
work()
方法) - next后续有多个任务:判断next任务数量,如果有多个,有几个任务就新起几个线程执行(调用
work()
方法) - 阻塞get获取结果。(针对处理next任务有多个的场景)
/**
* 进行下一个任务
*/
private void beginNext(ExecutorService executorService, long now, long remainTime) {
//花费的时间
long costTime = SystemClock.now() - now;
//1.后续没有任务了
if (nextWrappers == null) {
return;
}
//2.后续只有1个任务,使用当前任务的线程执行next任务
if (nextWrappers.size() == 1) {
nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
return;
}
//3.后续有多个任务,使用CompletableFuture[]包装,有几个任务就起几个线程执行
CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
for (int i = 0; i < nextWrappers.size(); i++) {
int finalI = i;
futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
.work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
}
//4.阻塞获取Future结果,注意这里没有超时时间,超时时间由全局统一控制。
try {
CompletableFuture.allOf(futures).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
6.4、单依赖 doDependsOneJob()
1、doDependsOneJob()
方法是 work()
方法中处理单依赖的场景w ork()
中的第6步。6-1.1先判断依赖任务是否完成,如果完成了就执行自己,然后6-1.2再开始执行next后继任务。
- 判断依赖任务是否超时,如果超时,则自己也超时。
- 判断依赖任务是否异常,如果异常,则自己也异常。
- 依赖任务正常完成了,则自己正常执行。
6.5、多依赖 doDependsJobs()
处理多个依赖任务,需要考虑到依赖任务的配置。分为3种情况:
- 依赖任务全部执行完才能执行自己。
- 指定某个依赖任务完成,就可以执行自己。
- 依赖任务都不是must属性,也就是说不是强依赖,此时当前任务会在运行最快的那个依赖任务的线程上执行。
3、执行流程如下:
- 判断是否有must强依赖,如果没有强依赖,当前任务就可以正常执行了。
- 如果有强依赖,判断依赖任务是否是must?如果不是must的就return了。
- 有强依赖,需要看依赖任务中,是否有超时或者异常的任务,如果有,当前任务也超时、异常,fastFail。
- 有强依赖,判断依赖任务是否全部完成?如果完成了,可以执行当前任务;如果没有完成,return什么也不做。
private synchronized void doDependsJobs(ExecutorService executorService, List<DependWrapper> dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) {
//如果当前依赖是非必须的,跳过不处理
boolean nowDependIsMust = false;
//Set统计必须完成的上游wrapper集合
Set<DependWrapper> mustWrapper = new HashSet<>();
for (DependWrapper dependWrapper : dependWrappers) {
if (dependWrapper.isMust()) {
mustWrapper.add(dependWrapper);
}
if (dependWrapper.getDependWrapper().equals(fromWrapper)) {
nowDependIsMust = dependWrapper.isMust();
}
}
//1.如果全部是不必须的条件,那么只要到了这里,就执行自己。
if (mustWrapper.size() == 0) {
//超时处理
if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) {
fastFail(INIT, null);
}
//正常执行情况
else {
fire();
}
beginNext(executorService, now, remainTime);
return;
}
//2.如果当前依赖是非必须的,跳过不处理(非must情况)
if (!nowDependIsMust) {
return;
}
//如果fromWrapper是必须的
boolean existNoFinish = false;
boolean hasError = false;
//先判断前面必须要执行的依赖任务的执行结果,如果有任何一个失败,那就不用走action了,直接给自己设置为失败,进行下一步就是了
for (DependWrapper dependWrapper : mustWrapper) {
WorkerWrapper workerWrapper = dependWrapper.getDependWrapper();
WorkResult tempWorkResult = workerWrapper.getWorkResult();
//为null或者isWorking,说明它依赖的某个任务还没执行到或没执行完
if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) {
existNoFinish = true;
break;
}
if (ResultState.TIMEOUT == tempWorkResult.getResultState()) {
workResult = defaultResult();
hasError = true;
break;
}
if (ResultState.EXCEPTION == tempWorkResult.getResultState()) {
workResult = defaultExResult(workerWrapper.getWorkResult().getEx());
hasError = true;
break;
}
}
//3.只要有失败、异常的
if (hasError) {
fastFail(INIT, null);
beginNext(executorService, now, remainTime);
return;
}
//如果上游都没有失败,分为两种情况,一种是都finish了,一种是有的在working
//4.依赖任务都完成了,可以执行自己了。
if (!existNoFinish) {
fire();
beginNext(executorService, now, remainTime);
return;
}
}
评论区