做自己的网站多少钱网站地址ip域名查询
欢迎关注公众号(通过文章导读关注:【11来了】),及时收到 AI 前沿项目工具及新技术的推送!
在我后台回复 「资料」 可领取
编程高频电子书
!
在我后台回复「面试」可领取硬核面试笔记
!文章导读地址:点击查看文章导读!
感谢你的关注!
京东并行框架asyncTool如何针对高并发场景进行优化?
由于最近在整理并发相关的内容,整理了 CompletableFuture、CAS、线程池这些方面的内容,但是通过理论知识我们只是学会了:怎么去用?应该怎么去用?
但是并没有学习别人如何去用,没有实际场景的示范,恰巧看到了 tianyaleixiaowu 作者开源出来的 asyncTool 并行框架 ,并且已经在 京东App后台接受苛刻、高并发、海量用户等复杂场景业务的检验测试
所以这篇文章就以这个并行框架为例,来说一下如何在高并发场景中保证比较好的性能,即如何通过 CompletableFuture、CAS、线程池去提升性能表现!
asyncTool 介绍及使用
首先介绍一下这个框架的作用,主要是用来进行一些并行任务的编排的,以及任务执行时的一些监控和回调
那么你可能会想了,不是有 CompletableFuture 来做任务编排呢?为什么还需要这个框架?
这个作者也说了,CompletableFuture 虽然提供了任务编排的能力,但是尚有不足,比如我们有多个任务,并对他们编排,但是我们想要 了解每个任务在开始执行以及执行结束的情况 ,对这些情况进行监控,那么在这种情况下 CompletableFuture 就无能为力了!
这里我们举一个简单的使用例子,有 3 个任务,需要执行完 task1 之后再执行 task2,执行完 task2 之后再执行 task3,流程如下:
接下来定义任务,需要实现 IWorker、ICallback
两个接口,主要是定义其中的回调以及任务执行方法,这里可以不用具体了解,毕竟我们主要是看它是如何使用线程池的,只需要知道这个 MyTask1
是我们需要执行的任务即可
接下来我们定义测试类,这里使用了 3 个任务,只需要将上边定义的 MyTask1
再复制两份即可
在这个测试类中,创建了 3 个任务实例,并且定义了 3 个 WorkerWrapper
包装类,这个 Wrapper
主要对要执行的任务进行 包装、编排 ,比如我们定义了 workerWrapper1
并通过 next
方法指定下一个执行的任务是 workerWrapper2
,通过 next
进行任务的编排
最后通过 Async.beginWork()
来提交任务即可,接下来核心就看 asyncTool
是如何执行任务的
CompletableFuture 和线程池配合使用
上边说如何使用,主要是为了找到任务开始执行的入口,从 入口 开始,看框架对于任务的处理:
从入口进入,最后走到下边这个 核心方法 中:
在这个方法中,可以看到是定义了一个 CompletableFuture 数组 ,来存储任务的异步执行结果
之后将我们定义的任务都扔到 线程池 中来执行,来将任务进行异步执行,提升任务执行速度,最后通过通过 CompletableFuture.allOf().get()
来阻塞等待所有任务执行完毕,最后返回即可
可以看到,在执行一些耗时操作中,异步化基本上都是必备的操作,也就是通过 CompletableFuture 和 线程池 来搭配使用,将任务的耗时操作异步化出去,尽量不影响主干流程
线程池的定义
上边说到了 asyncTool 中将 CompletableFuture 和 线程池 搭配使用,线程池具体如何定义的呢,这里其实使用了 newCachedThreadPool
线程池,具体的参数定义如下:
可以发现该线程池中并没有设置 核心线程 ,并且 线程的存活时间设置为 60s ,任务队列使用了 SynchronousQueue 任务队列,为什么要使用这个线程池呢?
先从场景来看,这个 asyncTool 框架主要是对提交的并行任务进行编排执行的,但是该框架其实并不知道任务什么时候会去提交,以及任务的数量大小的
所以在 asyncTool 中对于默认线程池的线程数量的设置就没有一个合适的值,如果设置的少了,可能任务提交多了之后, 导致任务堆积 OOM ;如果设置的多了,很多线程就会一直空闲,比较浪费线程资源
因此呢,就不设置核心线程,将最大线程数设置为 Integer.MAX_VALUE ,并且要与任务队列 SynchronousQueue 搭配使用
因为线程池的工作流程其实是先来判断有没有核心线程,没有核心线程的话,会将任务提到任务队列中阻塞等待,而 SynchronousQueue 这个任务队列是没有容量的,只做任务传递的作用,因此任务不会阻塞在队列中,直接会创建非核心线程执行任务(如果使用了 有容量的任务队列 ,就会出现问题了,当任务提交到任务队列之后,此时没有核心线程,任务会一直在任务队列中阻塞,得不到执行)
- 线程池参数这样设置的好处
当没有任务的时候,线程池中不需要再维护核心线程的存活,可以节约线程资源
当有任务提交的时候,线程池会根据任务的数量来创建对应的线程数量执行任务,这样就不会因为线程数设置过多或者过少而出现一些问题了
CAS 的使用
asyncTool 框架的目的就是编排并行任务的执行,那么既然是并行任务肯定要保证多线程环境下任务不会重复执行这些情况出现
在 asyncTool 中并没有使用 synchronized 以及 ReentrantLock 这些比较重量级的锁,而是使用 CAS 来保证任务不会重复执行
这里看一下在这个框架中,任务真正被执行时的代码,如何来保证任务不被重复执行的(为了重点代码清晰,省略了一部分非核心代码):
可以看到,在真正执行任务之前,会先通过 CAS 来判断任务的状态是否是 INIT ,是的话,表示任务还没有被执行;如果不是的话,说明已经被执行过了,或者任务出现了异常,这里直接返回结果就好了
如果 CAS 操作失败,说明任务的状态并不是 INIT ,任务已经开始执行了,所以这里就不要重复执行了
并且在执行完任务之后,再次通过 CAS 操作判断任务状态,如果已经不是 WORKING ,说明其他线程已经执行完该任务了或者其他线程在执行时发现任务出现异常,因此这里就直接返回了,避免在后边重复的进行任务回调操作
为什么说 CAS 操作比较轻量呢?
这个如果了解 synchronized 的锁升级流程的话,应该就知道为什么
在 synchronized 锁升级的过程中,轻量级锁就是通过 CAS 来获取锁的,而重量级锁是通过 线程的阻塞、唤醒 进行线程之间的获取锁操作
那么 CAS 操作性能就高在了它只需要去内存中进行值的比对,发现内存的值和期望值不同,就直接返回操作失败就可以了
如果在 asyncTool 中使用 synchronized 来保证线程之间的同步的话,这还需要进行线程之间的阻塞、唤醒操作,因此会出现线程之间的上下文切换以及用户态和内核态之间的转换,导致性能开销比较大
所以在 asyncTool 不去对线程同步进行控制,而是通过 CAS 来避免一些因为重复操作可能会带来的问题,这样性能就比较高了
asyncTool 开源框架项目地址: https://gitee.com/jd-platform-opensource/asyncTool
如果需要测试代码的话,可以从我 fork 的仓库中拉取:https://gitee.com/qylaile/asyncTool