侧边栏壁纸
博主头像
Haenu的Blog 博主等级

坚持学习,慢慢进步!

  • 累计撰写 35 篇文章
  • 累计创建 10 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

并行框架的应用场景和需求

Haenu
2024-12-06 / 0 评论 / 0 点赞 / 46 阅读 / 0 字

并行框架的应用场景和需求

譬如用户请求“我的订单”,后台在收到请求后,就需要去调用用户详情rpc、商品详情rpc、库存rpc、优惠券rpc等等很多个服务。有些服务是可以并行去请求的,但有些服务是依赖于某个服务的返回值的(如查库存、优惠券,就依赖于商品详情回复到达后才能去请求)。

  1. 譬如在数据清洗领域,经常会有这样的需求,从多个数据源分别拉取数据,做第一步清洗,之后等某一步完成、或某几步都完成、或至少某几步完成,进行下一步任务。整个流程有明显的依赖顺序,以及任意可能存在的阻塞、异常、超时等情况。

image-17f7.png

如何将整个流程进行编排并让其按照设定顺序执行,并能合理处理异常情况,是一个并行框架所要有的功能。

当然,如果你对jdk1.8里的completeableFuture非常熟悉,通过一大堆的组合包装,也能做到上图的编排,但是它可能称不上一个框架,只能应用于自己的业务。

我希望多个并发任务,每一步都可以监控

CompleteableFuture很多人用过,里面有supply、then、allOf等等方法,都可以用来接收一个任务,最终将多个任务汇总成一个结果。

不知道大家意识到没有,你supply一个任务后,这个任务就黑盒了,如果你编排了很多个任务,那到底具体每一个任务的执行情况,执行到哪一步了,每一步的执行结果情况,我们是不知道的。只能等它最终执行完毕后,最后的汇总结果。

所以,一个并行框架,它最好是带每一步的监控,就是每一步的执行结果,无论成功与失败,它应该有个回调,才算完整。

此时,可能大家会意识到,异步带回调的有哪些?Future?Future并不带回调功能,它只能让你不停地去轮询get方法。

那么,到底哪个异步带回调呢,其实有个大名鼎鼎的netty**。netty就是一个全程带回调的异步网络框架,它扩充了jdk的future,增加了addListener等方法,可以让你避免去轮询get一个普通的future,而是等待回调。这多么美妙是吧。**

我希望各个任务的执行顺序,有强依赖的弱依赖

如上图的3,A和B并发执行,最后是C。

有些场景下,我们希望A和B都执行完毕后,才能执行C,CompletableFuture里有个allOf(futures...).then()方法可以做到。

有些场景下,我们希望A或者B任何一个执行完毕,就执行C,CompletableFuture里有个anyOf(futures...).then()方法可以做到。

我的框架同样提供了类似的功能,通过设定wrapper里的addDepend依赖时,可以指定依赖的任务是否must执行完毕。如果依赖的是must要执行的,那么就一定会等待所有的must依赖项全执行完毕,才执行自己。

如果依赖的都不是must,那么就可以任意一个依赖项执行完毕,就可以执行自己了。

我希望某个任务可以依赖上游的执行结果作为入参

CompletableFuture.thenCompose()

譬如A-B-C三个执行单元,A的入参是String,出参是int,B呢它需要用A的结果作为自己的入参。也就是说A、B并不是独立的,而是有结果依赖关系的。

在A执行完毕之前,B是取不到结果的,只是知道A的结果类型。

那么,我的框架也支持这样的场景。可以在编排时,就取A的结果包装类,作为B的入参。虽然此时尚未执行,必然是空,但可以保证A执行完毕后,B的入参会被赋值。

希望全组任务可以设置超时时间

一组任务,虽然内部的各个执行单元的时间不可控,但是我可以控制全组的执行时间不超过某个值。通过设置timeOut,来控制全组的执行阈值。

希望框架能够高性能、低线程数

该框架全程无锁,没有一个加锁的地方。

创建线程量少。如这样的,A会运行在B、C执行更慢的那个单元的线程上,而不会额外创建线程。

这个框架和CompleteableFuture的优点?

并发场景可能存在的需求-每个执行结果的回调

但有一个问题,你supply一个任务后,这个任务就黑盒了。如果你编排了很多个任务,每一个任务的执行情况,执行到哪一步了,每一步的执行结果情况,我们是不知道的。只能等它最终执行完毕后,最后汇总结果。

一个并行框架,它最好是对每一步的执行都能监控。每一步的执行结果,无论成功与失败,它应该有个回调,才算完整。拥有回调的任务,可以监控任务的执行状况,如果执行失败、超时,可以记录异常信息或者处理个性化的默认值。

CompleteableFuture中也有一些回调方法,例如:thenAccept(),whenComplete(),handle(),exceptionally()等,这些方法也能支持任务的回调,但是前提是任务执行了,才能完成回调。在某些场景中,有些任务单元是可能被SKIP跳过不执行的,不执行的任务也应该有回调。

任务的顺序编排

全串行

这种是最简单的,依次串行即可。

假如有3个任务,譬如每个任务由一个worker来完成,共计3个worker,这3个worker有明显的先后顺序。要描述这种依赖关系和前后顺序,我们对任务的包装类应该至少有两个属性,nextWrappers和dependWrappers。分别代表我这个任务的后面的任务和依赖的任务。

以B为例,nextWrapper就是C,dependWrapper就是A。

可用的实现方式为:CompletableFuture.thenApply()、thenAccept() 和 thenRun() 都可以实现。

全并行

这种也很简单,也比较常见。全并行的实现方式很多,譬如可以将所有的worker放到list里,写个for循环依次start;也可以全部提交到线程池里;也可以用CompleteableFuture.supply()去接收多个worker任务。

可用的实现方式为:CompletableFuture.allOf()

先串后并

可用的实现方式为:CompletableFuture.runAsync(),阻塞get获取结果,再调用CompletableFuture.allOf()

这种稍微有点难度,首先它们的数据结构是这样的,A的nextWrappers是B和C,B、C的dependWrappers都是A。

执行A不必多说,就直接在主线程里执行它,或者新开线程执行它都可以,主要是A执行完毕后,当发现自己的nextWrappers有多个时,该怎么办。

还好,CompleteableFuture提供了allOf这个方法,它可以让你传入多个future,并且能够等待这多个future都完成时再统一返回。见下图代码。

先并后串

首先BC肯定是在两个线程上。A将来是要运行在B的线程或者C的线程的,视具体规则而定。

1.BC都完成才能执行A

可用的实现方式为:CompletableFuture.allOf(futures…).then();或者runAfterBoth

BC都完成才能执行A,这个比较简单,因为B的nextWrapper是A,C的nextWrapper也是A,B或者C执行完后就会去调用A。那么在A这里就可以做判断了,判断自己的dependWrappers的状态,遍历一遍,如果所有的dependWrappers的状态都是已完成,那么自然是所有的依赖都完成了,就可以执行自己了。 如果有任何一个的状态没完成,那就说明依赖项还未完全解除,就什么也不干,等待下一次被调用,再次判断即可。

2.BC任意一个完成都可以执行A

CompletableFuture里有个anyOf(futures…).then();或者runAfterEither

BC任意一个完成就能执行A,这种假如B先完成,那么B执行完就会走nextWrapper,那就是A。如果此时C也完事了,也会进入A,那么A就要做判断了,如果状态已经不是初始化时的状态,说明已经被执行了,就什么也不干,就是C完毕后调用自己时,自己什么也不干。

3.明确指定B完成、或者C完成才能执行A,也就是当有多个dependWrapper时,可以指定必须哪些个dependWrapper执行完,才能执行自己。

CompletableFuture不支持。

我们就需要在dependWrapper里加个属性了,代表是不是必须执行完毕。当执行到A时,A就要去遍历所有的dependWrappers,看看那些必须要完毕的wrapper的状况,是不是都全部已经执行完毕了。倘若还有至少1个强依赖项没执行完,那么自己什么也不干,直接return即可。

难点:异步回调

譬如我的主线程执行过程中,要执行一个非常耗时的逻辑,自然我们会想到用异步的形式去完成这个耗时逻辑,新建个线程,让它去一边执行就好了,只要不阻塞我的主线程。但问题来了,异步执行没毛病,执行成功、失败后出结果了,该怎么通知主线程?

Jdk的Future不便之处

Java的Future某种程度上来说是用来解决异步问题的,它让你在发起一个异步任务时,迅速能在主线程得到一个Future对象,通过该对象,就可以去获取到异步任务的执行结果。

但是有一个比较尴尬的问题,就是当你想获取异步执行结果时,要通过future.get()方法,这一步还是阻塞的!而且我们无法确定到底异步任务何时执行完毕,提前get了,就还是阻塞,get晚了,可能会漏掉执行结果,写个死循环,不停去轮询是否执行完毕,又浪费资源。所以,这个Future并不好用。

如何自己实现一个简单带回调的异步任务

首先我们来拆分一下需求,我有N个耗时任务,可能是一次网络请求,可能是一个耗时文件IO,可能是一堆复杂的逻辑,我在主线程里发起这个任务的调用,但不希望它阻塞主线程,而期望它执行完毕(成功\失败)后,来发起一次回调,最好还有超时、异常的回调控制。

据此,我们拆分出几个角色,master主线程,调度器(发起异步调用),worker(异步工作线程)。然后就是将他们组合起来,完成各种异步回调,以及每个worker的正常、异常、超时等的回调。

public interface Worker {
    String action(Object object);
}

一个worker,它需要有个方法,来代表这个worker将来做什么,action就可以理解为一个耗时任务。action可以接收一个参数。

再看一下回调器的定义:

public interface Listener {
    void result(Object result);
}

这个listener用来做为回调,将worker的执行结果,放到result的参数里。

此外,我们还需要一个包装器Wrapper,来将worker和回调器包装一下。

难点:相互依赖模型的建立

image-1pkv.png

如图1,B必须在A完成后才能执行,C必须在B完成后才能执行。也就是B depends A,他们之间有个depends的关系,而且是强依赖。

如图4,A依赖于B、C,这里就分不同情况了。第一种情况,A强依赖于B、C,必须B and C都完成后才能执行A;第二种情况,A强依赖于B、弱依赖于C,如果仅仅是C完成了,B还未完成,那么A不会执行,必须B完成了,A才会执行(此时就不在乎C是否完成了);第三种情况,A弱依赖于B、C,无论B或C中的哪个先执行完,A都会开始执行,而不去等待其他的依赖项。

猛的看起来,要实现这有点难,其实等会经过我仔细分析后,就会发现并不难。

从上一篇我们学到了要完成回调,需要个wrapper,那么这一篇我们要学会,要完成依赖关系,需要depends属性。

每个wrapper都有的属性有nextWrappers和dependWrappers。

nextWrappers就是紧跟在自己后面要执行的,如果只有一个,那就是图1那种串行,如果有多个,那就是图3那种,自己执行完后,并行执行后面的所有nextWrappers。

dependWrappers就是自己依赖的,为什么我又定义了一个DependWrapper对象,而不是用WorkerWrapper呢。上面其实已经说了,强依赖和弱依赖,所以DependWrapper对象里有个must属性,用来标记该依赖任务是否必须执行完毕才能执行自己。

来剖析一下几种场景:

1 依赖单个wrapper,这个好说,实现就是在执行自己时,判断dependWrapper的数量为1,那就直接执行自己就好了。

2 依赖多个wrapper,这就需要判断了。首先遍历所有的dependWrapper,判断是否有must的,如果没有,那该场景就是全部弱依赖,那么就可以直接执行自己了。如果有must,并且该次调用来自于非must的wrapper,那就直接return,什么都不干。如果该次调用来自于must的wrapper,那就需要判断其他的must的所有wrapper是否已经全部执行完毕了,如果还有没执行完的,那么就return什么也不干,如果其他的must全部执行完了,那么就执行自己。

这个处理依赖是个难点和重点,建议多多思考,仔细看看代码。

超时异常处理

递归获取所有的 WorkerWrapper,通过一个 Set将所有 WorkerWrapper>通过递归的方式统计起来。

//1.递归获取所有的WorkerWrapper
private static void totalWorkers(List<WorkerWrapper> workerWrappers, Set<WorkerWrapper> set) {
    set.addAll(workerWrappers);
    for (WorkerWrapper wrapper : workerWrappers) {
        if (wrapper.getNextWrappers() == null) {
            continue;
        }
        List<WorkerWrapper> wrappers = wrapper.getNextWrappers();
      	//递归
        totalWorkers(wrappers, set);
    }

}

根据3.1中统计的任务集合。循环停止所有尚未执行、正在执行的任务。注意已经执行完毕的任务是不处理的(包括异常的)。

/**
 * 总控制台超时,停止所有任务
 */
public void stopNow() {
    if (getState() == INIT || getState() == WORKING) {
        fastFail(getState(), null);
    }
}

fastFail() 首页通过CAS方式,将WorkerWrapper的state属性设置为ERROR状态;并且将workResult设置为默认值,然后调用 callback.result()回调方法处理。

/**
 * 快速失败
 */
private boolean fastFail(int expect, Exception e) {
    //试图将它从expect状态,改成Error
    if (!compareAndSetState(expect, ERROR)) {
        return false;
    }

    //只要任务没有完成就处理;完成的就不管了,包括超时的和异常的任务
    if (checkIsNullResult()) {
        if (e == null) {
            workResult = defaultResult();
        } else {
            workResult = defaultExResult(e);
        }
    }
		//回调result
    callback.result(false, param, workResult);
    return true;
}

流程

  1. 先写一个任务类 继承我们的Itask接口和Icallback接口 他俩负责一个作为执行动作的接口 一个回调接口
  2. 然后我们可以编排任务 用next 指向下一个 depend指向上一个
  3. 编排好任务后 我们beginTask 参数是 过期时间 和 最开始的任务
  4. 将任务封装成CompletableFuture数组 执行我们的task方法
  5. 然后利用CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
  6. 我们catch 如果超时 **递归获取所有的 WorkerWrapper,通过一个 Set将所有 WorkerWrapper通过递归的方式统计起来。 然后采用 fastFail() 首页通过CAS方式,将WorkerWrapper的state属性设置为ERROR状态;并且将workResult设置为默认值,然后调用 callback.result()>回调方法处理。
  7. 如果没超时 我们就正常进入每一个任务的task方法
  8. task 方法有几个参数

executorService:自定义线程池,不自定义的话,就走默认的COMMON_POOL。默认的线程池是不定长线程池。
* fromWrapper:本次task是由哪个上游TaskWrapper发起的。
* remainTime:剩余的时间,用来监控任务超时的。随着一组任务的执行,这个值从全局设置的timeout时间逐渐减少,当remainTime<=0时,任务就超时了。
* forParamUseWrappers:缓存一组任务所有的TaskWrapper。key:id,value:TaskWrapper引用。

task流程是这样的:

  1. 收集所有的wrapper,key是id,以便用于在Task工作单元中,获取任意Task的执行结果。

  2. 任务超时处理 给当前的任务进行fastFail() 然后beginNext 下一个next链任务

  3. 如果自己已经执行过了,继续处理下一个任务 他的原因可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了 就是通过判断任务的一个属性state 是否等于finish 或者error 如果等于我们beginnext执行下一个任务

  4. **如果在执行前需要校验nextWrapper的状态,仅在nextWrappers <= 1时有效,如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了,SKIP跳过任务,不执行。(可选) **

    譬如A-B和C-D并行,B和D执行完毕后,再执行F,譬如CD这条线执行的比较快,已经先到达了F,并且把F给执行完了。此时A刚执行完,准备执行B了,但是B后面的F都已经完毕了,那么B是否还要继续执行。needCheckNextWrapperResult就是校验该 任务后续的任务是否已经执行完毕了,自己还要不要执行

  5. 如果没有任何依赖,说明自己就是第一批要执行的 就去执行fire方法就是执行当前方法 和beginnext后续方法

  6. 如果有前方依赖,存在两种情况

** 一种是前面只有一个wrapper。即 A -> B**

** 一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。**

** 所以需要B来做判断,必须A、C、D都完成,自己才能执行**

  1. 如果前置依赖只有一个 则取判断前置依赖 是否超时,如果超时,则自己也超时。
    任务是否异常,如果异常,则自己也异常。
    依赖任务正常完成了,则自己正常执行。fire方法
    然后执行beginnext方法

  2. 有多个依赖时 分为3种情况:
    依赖任务全部执行完才能执行自己。
    指定某个依赖任务完成,就可以执行自己。
    依赖任务都不是must属性,也就是说不是强依赖,此时当前任务会在运行最快的那个依赖任务的线程上执行。

    执行流程如下:

首先synchronized 加锁 防止多个依赖同时完成都执行到当前节点 导致幂等问题

然后进行check幂等 如果不是初始化状态就返回 因为已经开始执行了
1.判断是否有must强依赖,如果没有强依赖,当前任务就可以正常执行了。fire执行本身 然后beginnext执行下一个

** 2.如果有强依赖,判断fromWrapper是否是must?如果不是must的就return了。**

** 3.有强依赖,需要看强依赖任务中,是否有超时或者异常的任务,如果有,当前任务也超时、异常,fastFail。**

** 4.有强依赖,判断依赖任务是否全部完成?如果完成了,可以执行当前任务fire 然后beginnext下个任务;如果没有完成,return什么也不做。**

我在讲一下执行当前方法也就是fire方法

主要有7个步骤:

  1. Check 重复执行,避免任务重复执行。

就是检查当前的结果状态是否为默认 为默认就是第一次执行

  1. CAS设置任务状态,state运行状态由 init - > working 工作中
  2. 回调 callback.begin()
  3. 执行耗时操作action 他是一个接口里的方法 我们继承这个ITask接口 来进行业务操作
  4. CAS设置任务状态,state运行状态由 tasking - > finsh
  5. 回调 callback.result()

然后我在讲一下beginnext 执行后续方法

  1. 判断当前任务是否有next后续任务,如果没有任务了,就是最后一个任务,就return

  2. **next后续只有1个任务:判断next任务数量,如果数量只有1个,使用当前任务的线程执行next任务(调用task() **

  3. **next后续有多个任务:判断next任务数量,如果有多个,有几个任务就新起几个线程执行 用CompletableFuture数组封装 **

  4. 阻塞get获取结果。CompletableFuture.allOf(futures).get();

    注意点:

** 1.beginNext() 中后续任务的处理,也是通过 task() 来处理逻辑的。注意超时时间的处理,**

** 使用 remainTime 剩余时间 - costTime花费时间,这个值在整组任务的执行过程中,是逐渐减小的。例如A、B、C串行执行,**

** 整组任务的超时时间是1000ms,A执行消耗了200ms,到B执行时,B的可用时间 = 1000-200 = 800ms,这个时间是逐渐减小的。**

** 如果这个值小于0了,说明已经超过了整组任务设定的超时时间,任务就 FastFail() 了。**


** 2.beginNext() 中第4点针对后续有多个任务的处理,这里并没有使用带有超时的get方法。单个任务是没有超时监控的,**

** 如果要监控每个任务的超时,就需要一个额外的线程,有几个任务就需要几个线程,高并发场景下,会造成线程 ”爆炸“。全组任务超时,是在Async执行器中控制的。**

WorkerWrapper处理任务

核心流程:task()

<span class="ne-text">WorkerWrapper.work()</span> 方法是WorkerWrapper任务单元执行的入口。每次处理一个任务。

<span class="ne-text">work()</span>方法的定义:

**work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) **

  • <span class="ne-text">executorService</span>:自定义线程池,不自定义的话,就走默认的COMMON_POOL。默认的线程池是不定长线程池**。关于不定长线程池,在《第5步AsyncTool执行器工作过程》中已经分析了。**
  • <span class="ne-text">fromWrapper</span>:本次work是由哪个上游WorkerWrapper发起的。
  • <span class="ne-text">remainTime</span>:剩余的时间,用来监控任务超时的。随着一组任务的执行,这个值从全局设置的timeout时间逐渐减少,当remainTime<=0时,任务就超时了。
  • <span class="ne-text">forParamUseWrappers</span>:缓存一组任务所有的WorkerWrapper。key:id,value:WorkerWrapper引用。
<span class="ne-text">work()</span> 具体执行逻辑。主要分为6步:
  1. 缓存所有WorkerWrapper
  2. 任务超时处理
  3. Check是否执行过了,避免重复处理
  4. Check 后继next是否已经开始执行了,避免多余的处理
  5. 没有依赖Wrapper情况处理,则当前任务就是起始节点。
  6. 有依赖Wrapper情况处理,又区分只有1个依赖任务,或者有多个依赖任务的处理。
/**
* 开始工作
* fromWrapper代表这次work是由哪个上游wrapper发起的
*/
private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
  //引用指向
  this.forParamUseWrappers = forParamUseWrappers;
  
  //1.收集所有的wrapper,key是id,以便用于在Worker工作单元中,获取任意Worker的执行结果。
  forParamUseWrappers.put(id, this);
  
  //时钟类获取当前时间
  long now = SystemClock.now();

  //2.总的已经超时了,就快速失败,进行下一个
  if (remainTime <= 0) {
    fastFail(INIT, null);
    beginNext(executorService, now, remainTime);
    return;
  }

  //3.如果自己已经执行过了,继续处理下一个任务
  //可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
  if (getState() == FINISH || getState() == ERROR) {
    beginNext(executorService, now, remainTime);
    return;
  }

  //4.如果在执行前需要校验nextWrapper的状态,仅在nextWrappers <= 1时有效
  if (needCheckNextWrapperResult) {
    //如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了,SKIP跳过任务,不执行。
    if (!checkNextWrapperResult()) {
      //FastFail SKIP,new SkippedException()
      fastFail(INIT, new SkippedException());
      beginNext(executorService, now, remainTime);
      return;
    }
  }

  //5.如果没有任何依赖,说明自己就是第一批要执行的
  if (dependWrappers == null || dependWrappers.size() == 0) {
    //5.1 执行当前任务
    fire();
    //5.2 开始后继任务
    beginNext(executorService, now, remainTime);
    return;
  }

  /*如果有前方依赖,存在两种情况
         一种是前面只有一个wrapper。即 A  ->  B
        一种是前面有多个wrapper。A C D ->   B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。
        所以需要B来做判断,必须A、C、D都完成,自己才能执行 */

  //6.处理前置有依赖的情况
  //6.1只有一个依赖
  if (dependWrappers.size() == 1) {
    //6-1.1:依赖任务正常结束了,就执行自己
    doDependsOneJob(fromWrapper);
    //6-1.2:开始后继任务
    beginNext(executorService, now, remainTime);
  }
  //6.2有多个依赖时
  else {
    //6-2.1:多个依赖任务的判断处理
    doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime);
  }

}

开炮:fire()

fire() 方法是具体执行Worker任务的**

  1. Check 重复执行,避免任务重复执行。
  2. CAS设置任务状态,state运行状态由 init - > working
  3. 回调 callback.begin()
  4. 执行耗时操作action
  5. CAS设置任务状态, s tate运行状态由 working- > finsh
  6. 回调 callback.result()
  7. **异常处理 fastFail()CAS设置任务状态,state运行状态由 working - > finsh;设置默认值、异常信息;
/**
* 具体的单个worker执行任务
*/
private WorkResult<V> workerDoJob() {

    //1.Check重复执行
    if (!checkIsNullResult()) {
      return workResult;
    }
  
    try {
      //2.设置Wrapper状态为Working
      //如果已经不是init状态了,说明正在被执行或已执行完毕。这一步很重要,可以保证任务不被重复执行
      if (!compareAndSetState(INIT, WORKING)) {
        return workResult;
      }

      //3.回调begin
      callback.begin();

      //4.执行耗时操作action
      V resultValue = worker.action(param, forParamUseWrappers);

      //5.设置Wrapper状态为FINISH
      //如果状态不是在working,说明别的地方已经修改了
      if (!compareAndSetState(WORKING, FINISH)) {
        return workResult;
      }

      workResult.setResultState(ResultState.SUCCESS);
      workResult.setResult(resultValue);
      //6.回调result
      callback.result(true, param, workResult);

      return workResult;
    } catch (Exception e) {
      //7.异常处理:设置状态ERROR\EXCEPTION,结果设置为默认值
      fastFail(WORKING, e);
      return workResult;
    }
}

6.3、beginNext()

当前WorkerWrapper的 fire() 执行完毕后,就调用 beginNext() 方法执行后续节点。

beginNext()的执行步骤主要有4步:

  1. 判断当前任务是否有next后续任务,如果没有任务了,就是最后一个任务,就结束了。
  2. next后续只有1个任务:判断next任务数量,如果数量只有1个,使用当前任务的线程执行next任务(调用 work()方法)
  3. next后续有多个任务:判断next任务数量,如果有多个,有几个任务就新起几个线程执行(调用 work()方法)
  4. 阻塞get获取结果。(针对处理next任务有多个的场景)
/**
 * 进行下一个任务
 */
private void beginNext(ExecutorService executorService, long now, long remainTime) {
    //花费的时间
    long costTime = SystemClock.now() - now;

    //1.后续没有任务了
    if (nextWrappers == null) {
        return;
    }

    //2.后续只有1个任务,使用当前任务的线程执行next任务
    if (nextWrappers.size() == 1) {
        nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
        return;
    }

    //3.后续有多个任务,使用CompletableFuture[]包装,有几个任务就起几个线程执行
    CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
    for (int i = 0; i < nextWrappers.size(); i++) {
        int finalI = i;
        futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
                .work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
    }

    //4.阻塞获取Future结果,注意这里没有超时时间,超时时间由全局统一控制。
    try {
        CompletableFuture.allOf(futures).get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

6.4、单依赖 doDependsOneJob()

1、doDependsOneJob()方法是 work()方法中处理单依赖的场景w ork()中的第6步。6-1.1先判断依赖任务是否完成,如果完成了就执行自己,然后6-1.2再开始执行next后继任务。

  1. 判断依赖任务是否超时,如果超时,则自己也超时。
  2. 判断依赖任务是否异常,如果异常,则自己也异常。
  3. 依赖任务正常完成了,则自己正常执行。

6.5、多依赖 doDependsJobs()

处理多个依赖任务,需要考虑到依赖任务的配置。分为3种情况:

  1. 依赖任务全部执行完才能执行自己。
  2. 指定某个依赖任务完成,就可以执行自己。
  3. 依赖任务都不是must属性,也就是说不是强依赖,此时当前任务会在运行最快的那个依赖任务的线程上执行。

3、执行流程如下:

  1. 判断是否有must强依赖,如果没有强依赖,当前任务就可以正常执行了。
  2. 如果有强依赖,判断依赖任务是否是must?如果不是must的就return了。
  3. 有强依赖,需要看依赖任务中,是否有超时或者异常的任务,如果有,当前任务也超时、异常,fastFail。
  4. 有强依赖,判断依赖任务是否全部完成?如果完成了,可以执行当前任务;如果没有完成,return什么也不做。
private synchronized void doDependsJobs(ExecutorService executorService, List<DependWrapper> dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) {
  	//如果当前依赖是非必须的,跳过不处理
  	boolean nowDependIsMust = false;
    //Set统计必须完成的上游wrapper集合
    Set<DependWrapper> mustWrapper = new HashSet<>();
    for (DependWrapper dependWrapper : dependWrappers) {
        if (dependWrapper.isMust()) {
            mustWrapper.add(dependWrapper);
        }
      	if (dependWrapper.getDependWrapper().equals(fromWrapper)) {
           nowDependIsMust = dependWrapper.isMust();
        }
    }

    //1.如果全部是不必须的条件,那么只要到了这里,就执行自己。
    if (mustWrapper.size() == 0) {
        //超时处理
        if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) {
            fastFail(INIT, null);
        } 
        //正常执行情况
        else {
            fire();
        }
        beginNext(executorService, now, remainTime);
        return;
    }
  
  	//2.如果当前依赖是非必须的,跳过不处理(非must情况)
    if (!nowDependIsMust) {
        return;
    }


    //如果fromWrapper是必须的
    boolean existNoFinish = false;
    boolean hasError = false;
    //先判断前面必须要执行的依赖任务的执行结果,如果有任何一个失败,那就不用走action了,直接给自己设置为失败,进行下一步就是了
    for (DependWrapper dependWrapper : mustWrapper) {
        WorkerWrapper workerWrapper = dependWrapper.getDependWrapper();
        WorkResult tempWorkResult = workerWrapper.getWorkResult();
        //为null或者isWorking,说明它依赖的某个任务还没执行到或没执行完
        if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) {
            existNoFinish = true;
            break;
        }
        if (ResultState.TIMEOUT == tempWorkResult.getResultState()) {
            workResult = defaultResult();
            hasError = true;
            break;
        }
        if (ResultState.EXCEPTION == tempWorkResult.getResultState()) {
            workResult = defaultExResult(workerWrapper.getWorkResult().getEx());
            hasError = true;
            break;
        }

    }
    //3.只要有失败、异常的
    if (hasError) {
        fastFail(INIT, null);
        beginNext(executorService, now, remainTime);
        return;
    }

    //如果上游都没有失败,分为两种情况,一种是都finish了,一种是有的在working
    //4.依赖任务都完成了,可以执行自己了。
    if (!existNoFinish) {
        fire();
        beginNext(executorService, now, remainTime);
        return;
    }
}
0

评论区