Java并发 JDK工具篇
线程池原理
为什么要使用线程池
使用线程池主要有以下三个原因:
- 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程。
- 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
- 可以对线程做统一管理。
线程池的原理
Java中的线程池顶层接口是Executor
接口,ThreadPoolExecutor
是这个接口的实现类。
我们先看看ThreadPoolExecutor
类。
ThreadPoolExecutor提供的构造方法
一共有四个构造方法:
1 | // 五个参数的构造函数 |
涉及到5~7个参数,我们先看看必须的5个参数是什么意思:
int corePoolSize:该线程池中核心线程数最大值
核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。
int maximumPoolSize:该线程池中线程总数最大值 。
该值等于核心线程数量 + 非核心线程数量。
long keepAliveTime:非核心线程闲置超时时长。
非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置allowCoreThreadTimeOut(true),则会也作用于核心线程。
TimeUnit unit:keepAliveTime的单位。
TimeUnit是一个枚举类型 ,包括以下属性:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 = 1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒 MINUTES : 分 HOURS : 小时 DAYS : 天
BlockingQueue workQueue:阻塞队列,维护着等待执行的Runnable任务对象。
常用的几个阻塞队列:
LinkedBlockingQueue
链式阻塞队列,底层数据结构是链表,默认大小是
Integer.MAX_VALUE
,也可以指定大小。ArrayBlockingQueue
数组阻塞队列,底层数据结构是数组,需要指定队列的大小。
SynchronousQueue
同步队列,内部容量为0,每个put操作必须等待一个take操作,反之亦然。
DelayQueue
延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。
我们将在下一章中重点介绍各种阻塞队列
好了,介绍完5个必须的参数之后,还有两个非必须的参数。
ThreadFactory threadFactory
创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。
1 | static class DefaultThreadFactory implements ThreadFactory { |
RejectedExecutionHandler handler
拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :
- ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
ThreadPoolExecutor的策略
线程池本身有一个调度线程,这个线程就是用于管理布控整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。
故线程池也有自己的状态。ThreadPoolExecutor
类中使用了一些final int
常量变量来表示线程池的状态 ,分别为RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。
1 | // runState is stored in the high-order bits |
线程池创建后处于RUNNING状态。
调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,会等待阻塞队列的任务完成。
调用shutdownNow()方法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。
当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。
ThreadPoolExecutor中有一个控制状态的属性叫
ctl
,它是一个AtomicInteger类型的变量。线程池状态就是通过AtomicInteger类型的成员变量ctl
来获取的。获取的
ctl
值传入runStateOf
方法,与~CAPACITY
位与运算(CAPACITY
是低29位全1的int变量)。~CAPACITY
在这里相当于掩码,用来获取ctl的高3位,表示线程池状态;而另外的低29位用于表示工作线程数线程池处在TIDYING状态时,执行完terminated()方法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态。
线程池主要的任务处理流程
处理任务的核心方法是execute
,我们看看 JDK 1.8 源码中ThreadPoolExecutor
是如何处理线程任务的:
1 | // JDK 1.8 |
ctl.get()
是获取线程池状态,用int
类型表示。第二步中,入队前进行了一次isRunning
判断,入队之后,又进行了一次isRunning
判断。
为什么要二次检查线程池的状态?
在多线程的环境下,线程池的状态是时刻发生变化的。很有可能刚获取线程池状态后线程池状态就改变了。判断是否将command
加入workqueue
是线程池之前的状态。倘若没有二次检查,万一线程池处于非RUNNING状态(在多线程环境下很有可能发生),那么command
永远不会执行。
总结一下处理流程
- 线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到corePoolSize,在核心线程数量 < corePoolSize时)。注意,这一步需要获得全局锁。
- 线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)。
- 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行这些任务了。于是会创建非核心线程去执行这个任务。注意,这一步需要获得全局锁。
- 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理。
整个过程如图所示:
ThreadPoolExecutor如何做到线程复用的?
我们知道,一个线程在创建的时候会指定一个线程任务,当执行完这个线程任务之后,线程自动销毁。但是线程池却可以复用线程,即一个线程执行完线程任务后不销毁,继续执行另外的线程任务。那么,线程池如何做到线程复用呢?
原来,ThreadPoolExecutor在创建线程时,会将线程封装成工作线程worker,并放入工作线程组中,然后这个worker反复从阻塞队列中拿任务去执行。话不多说,我们继续看看源码(一定要仔细看,前后有联系)
这里的addWorker
方法是在上面提到的execute
方法里面调用的,先看看上半部分:
1 | // ThreadPoolExecutor.addWorker方法源码上半部分 |
上半部分主要是判断线程数量是否超出阈值,超过了就返回false。我们继续看下半部分:
1 | // ThreadPoolExecutor.addWorker方法源码下半部分 |
创建worker
对象,并初始化一个Thread
对象,然后启动这个线程对象。
我们接着看看Worker
类,仅展示部分源码:
1 | // Worker类部分源码 |
Worker
类实现了Runnable
接口,所以Worker
也是一个线程任务。在构造方法中,创建了一个线程,线程的任务就是自己。故addWorker
方法调用addWorker方法源码下半部分中的第4步t.start
,会触发Worker
类的run
方法被JVM调用。
我们再看看runWorker
的逻辑:
1 | // Worker.runWorker方法源代码 |
首先去执行创建这个worker时就有的任务,当执行完这个任务后,worker的生命周期并没有结束,在while
循环中,worker会不断地调用getTask
方法从阻塞队列中获取任务然后调用task.run()
执行任务,从而达到复用线程的目的。只要getTask
方法不返回null
,此线程就不会退出。
当然,核心线程池中创建的线程想要拿到阻塞队列中的任务,先要判断线程池的状态,如果STOP或者TERMINATED,返回null
。
最后看看getTask
方法的实现:
1 | // Worker.getTask方法源码 |
核心线程的会一直卡在workQueue.take
方法,被阻塞并挂起,不会占用CPU资源,直到拿到Runnable
然后返回(当然如果allowCoreThreadTimeOut设置为true
,那么核心线程就会去调用poll
方法,因为poll
可能会返回null
,所以这时候核心线程满足超时条件也会被销毁)。
非核心线程会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null
,Worker对象的run()
方法循环体的判断为null
,任务结束,然后线程被系统回收 。
源码解析完毕,你理解的源码是否和图中的处理流程一致?如果不一致,那么就多看两遍吧,加油。
四种常见的线程池
Executors
类中提供的几个静态方法来创建线程池。大家到了这一步,如果看懂了前面讲的ThreadPoolExecutor
构造方法中各种参数的意义,那么一看到Executors
类中提供的线程池的源码就应该知道这个线程池是干嘛的。
newCachedThreadPool
1 | public static ExecutorService newCachedThreadPool() { |
CacheThreadPool
的运行流程如下:
- 提交任务进线程池。
- 因为corePoolSize为0的关系,不创建核心线程,线程池最大为Integer.MAX_VALUE。
- 尝试将任务添加到SynchronousQueue队列。
- 如果SynchronousQueue入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从SynchronousQueue拉取任务并在当前线程执行。
- 如果SynchronousQueue已有任务在等待,入列操作将会阻塞。
当需要执行很多短时间的任务时,CacheThreadPool的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源。
newFixedThreadPool
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
核心线程数量和总线程数量相等,都是传入的参数nThreads,所以只能创建核心线程,不能创建非核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。
与CachedThreadPool的区别:
- 因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核心线程。 而CachedThreadPool因为corePoolSize=0,所以只会创建非核心线程。
- 在 getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。 CachedThreadPool会在60s后收回。
- 由于线程不会被回收,会一直卡在阻塞,所以没有任务的情况下, FixedThreadPool占用资源更多。
- 都几乎不会触发拒绝策略,但是原理不同。FixedThreadPool是因为阻塞队列可以很大(最大为Integer最大值),故几乎不会触发拒绝策略;CachedThreadPool是因为线程池很大(最大为Integer最大值),几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。
newSingleThreadExecutor
1 | public static ExecutorService newSingleThreadExecutor() { |
有且仅有一个核心线程( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
1 | public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { |
四种常见的线程池基本够我们使用了,但是《阿里巴巴开发手册》不建议我们直接使用Executors类中的线程池,而是通过ThreadPoolExecutor
的方式,这样的处理方式让写的同学需要更加明确线程池的运行规则,规避资源耗尽的风险。
但如果你及团队本身对线程池非常熟悉,又确定业务规模不会大到资源耗尽的程度(比如线程数量或任务队列长度可能达到Integer.MAX_VALUE)时,其实是可以使用JDK提供的这几个接口的,它能让我们的代码具有更强的可读性。
阻塞队列
阻塞队列的由来
我们假设一种场景,生产者一直生产资源,消费者一直消费资源,资源存储在一个缓冲池中,生产者将生产的资源存进缓冲池中,消费者从缓冲池中拿到资源进行消费,这就是大名鼎鼎的生产者-消费者模式。
该模式能够简化开发过程,一方面消除了生产者类与消费者类之间的代码依赖性,另一方面将生产数据的过程与使用数据的过程解耦简化负载。
我们自己coding实现这个模式的时候,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成重复消费和死锁,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者,这些个等待-唤醒逻辑都需要自己实现。(这块不明白的同学,可以看最下方结语部分的链接)
这么容易出错的事情,JDK当然帮我们做啦,这就是阻塞队列(BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。
BlockingQueue是Java util.concurrent包下重要的数据结构,区别于普通的队列,BlockingQueue提供了线程安全的队列访问方式,并发包下很多高级同步类的实现都是基于BlockingQueue实现的。
BlockingQueue一般用于生产者-消费者模式,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。BlockingQueue就是存放元素的容器。
BlockingQueue的操作方法
阻塞队列提供了四组不同的方法用于插入、移除、检查元素:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | - | - |
- 抛出异常:如果试图的操作无法立即执行,抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
- 返回特殊值:如果试图的操作无法立即执行,返回一个特殊值,通常是true / false。
- 一直阻塞:如果试图的操作无法立即执行,则一直阻塞或者响应中断。
- 超时退出:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。
注意之处
- 不能往阻塞队列中插入null,会抛出空指针异常。
- 可以访问阻塞队列中的任意元素,调用remove(o)可以将队列之中的特定对象移除,但并不高效,尽量避免使用。
BlockingQueue的实现类
ArrayBlockingQueue
由数组结构组成的有界阻塞队列。内部结构是数组,故具有数组的特性。
1 | public ArrayBlockingQueue(int capacity, boolean fair){ |
可以初始化队列大小, 且一旦初始化不能改变。构造方法中的fair表示控制对象的内部锁是否采用公平锁,默认是非公平锁。
LinkedBlockingQueue
由链表结构组成的有界阻塞队列。内部结构是链表,具有链表的特性。默认队列的大小是Integer.MAX_VALUE
,也可以指定大小。此队列按照先进先出的原则对元素进行排序。
DelayQueue
1 | 该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。 |
PriorityBlockingQueue
1 | 基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),内部控制线程同步的锁采用的是非公平锁。 |
网上大部分博客上PriorityBlockingQueue为公平锁,其实是不对的,查阅源码(感谢github:ambition0802同学的指出):
1 | public PriorityBlockingQueue(int initialCapacity, |
SynchronousQueue
这个队列比较特殊,没有任何内部容量,甚至连一个队列的容量都没有。并且每个 put 必须等待一个 take,反之亦然。
需要区别容量为1的ArrayBlockingQueue、LinkedBlockingQueue。
以下方法的返回值,可以帮助理解这个队列:
- iterator() 永远返回空,因为里面没有东西
- peek() 永远返回null
- put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。
- offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。
- take() 取出并且remove掉queue里的element,取不到东西他会一直等。
- poll() 取出并且remove掉queue里的element,只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null。
- isEmpty() 永远返回true
- remove()&removeAll() 永远返回false
注意
PriorityBlockingQueue不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。对于使用默认大小的LinkedBlockingQueue也是一样的。
阻塞队列的原理
阻塞队列的原理很简单,利用了Lock锁的多条件(Condition)阻塞控制。接下来我们分析ArrayBlockingQueue JDK 1.8 的源码。
首先是构造器,除了初始化队列的大小和是否是公平锁之外,还对同一个锁(lock)初始化了两个监视器,分别是notEmpty和notFull。这两个监视器的作用目前可以简单理解为标记分组,当该线程是put操作时,给他加上监视器notFull,标记这个线程是一个生产者;当线程是take操作时,给他加上监视器notEmpty,标记这个线程是消费者。
1 | //数据元素数组 |
put操作的源码
1 | public void put(E e) throws InterruptedException { |
总结put的流程:
- 所有执行put操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。
- 判断阻塞队列是否满了,如果满了,则调用await方法阻塞这个线程,并标记为notFull(生产者)线程,同时释放lock锁,等待被消费者线程唤醒。
- 如果没有满,则调用enqueue方法将元素put进阻塞队列。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。
- 唤醒一个标记为notEmpty(消费者)的线程。
take操作的源码
1 | public E take() throws InterruptedException { |
take操作和put操作的流程是类似的,总结一下take操作的流程:
- 所有执行take操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。
- 判断阻塞队列是否为空,如果是空,则调用await方法阻塞这个线程,并标记为notEmpty(消费者)线程,同时释放lock锁,等待被生产者线程唤醒。
- 如果没有空,则调用dequeue方法。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。
- 唤醒一个标记为notFull(生产者)的线程。
注意
- put和take操作都需要先获取锁,没有获取到锁的线程会被挡在第一道大门之外自旋拿锁,直到获取到锁。
- 就算拿到锁了之后,也不一定会顺利进行put/take操作,需要判断队列是否可用(是否满/空),如果不可用,则会被阻塞,并释放锁。
- 在第2点被阻塞的线程会被唤醒,但是在唤醒之后,依然需要拿到锁才能继续往下执行,否则,自旋拿锁,拿到锁了再while判断队列是否可用(这也是为什么不用if判断,而使用while判断的原因)。
示例和使用场景
生产者-消费者模型
1 | public class Test { |
下面是这个例子的输出片段:
1 | 从队列取走一个元素,队列剩余0个元素 |
注意,这个例子中的输出结果看起来可能有问题,比如有几行在插入一个元素之后,队列的剩余空间不变。这是由于System.out.println语句没有锁。考虑到这样的情况:线程1在执行完put/take操作后立即失去CPU时间片,然后切换到线程2执行put/take操作,执行完毕后回到线程1的System.out.println语句并输出,发现这个时候阻塞队列的size已经被线程2改变了,所以这个时候输出的size并不是当时线程1执行完put/take操作之后阻塞队列的size,但可以确保的是size不会超过10个。实际上使用阻塞队列是没有问题的。
线程池中使用阻塞队列
1 | public ThreadPoolExecutor(int corePoolSize, |
Java中的线程池就是使用阻塞队列实现的,我们在了解阻塞队列之后,无论是使用Executors类中已经提供的线程池,还是自己通过ThreadPoolExecutor实现线程池,都会更加得心应手,想要了解线程池的同学,可以看:线程池原理。
注:上面提到了生产者-消费者模式,大家可以参考生产者-消费者模型,可以更好的理解阻塞队列。
锁接口和类
前面我们介绍了Java原生的锁——基于对象的锁,它一般是配合synchronized关键字来使用的。实际上,Java在java.util.concurrent.locks
包下,还为我们提供了几个关于锁的类和接口。它们有更强大的功能或更高的性能。
synchronized的不足之处
我们先来看看synchronized
有什么不足之处。
- 如果临界区是只读操作,其实可以多线程一起执行,但使用synchronized的话,同一时间只能有一个线程执行。
- synchronized无法知道线程有没有成功获取到锁
- 使用synchronized,如果临界区因为IO或者sleep方法等原因阻塞了,而当前线程又没有释放锁,就会导致所有线程等待。
而这些都是locks包下的锁可以解决的。
锁的几种分类
锁可以根据以下几种方式来进行分类,下面我们逐一介绍。
可重入锁和非可重入锁
所谓重入锁,顾名思义。就是支持重新进入的锁,也就是说这个锁支持一个线程对资源重复加锁。
synchronized关键字就是使用的重入锁。比如说,你在一个synchronized实例方法里面调用另一个本实例的synchronized实例方法,它可以重新进入这个锁,不会出现任何异常。
如果我们自己在继承AQS实现同步器的时候,没有考虑到占有锁的线程再次获取锁的场景,可能就会导致线程阻塞,那这个就是一个“非可重入锁”。
ReentrantLock
的中文意思就是可重入锁。也是本文后续要介绍的重点类。
公平锁与非公平锁
这里的“公平”,其实通俗意义来说就是“先来后到”,也就是FIFO。如果对一个锁来说,先对锁获取请求的线程一定会先被满足,后对锁获取请求的线程后被满足,那这个锁就是公平的。反之,那就是不公平的。
一般情况下,非公平锁能提升一定的效率。但是非公平锁可能会发生线程饥饿(有一些线程长时间得不到锁)的情况。所以要根据实际的需求来选择非公平锁和公平锁。
ReentrantLock支持非公平锁和公平锁两种。
读写锁和排它锁
我们前面讲到的synchronized用的锁和ReentrantLock,其实都是“排它锁”。也就是说,这些锁在同一时刻只允许一个线程进行访问。
而读写锁可以在同一时刻允许多个读线程访问。Java提供了ReentrantReadWriteLock类作为读写锁的默认实现,内部维护了两个锁:一个读锁,一个写锁。通过分离读锁和写锁,使得在“读多写少”的环境下,大大地提高了性能。
注意,即使用读写锁,在写线程访问时,所有的读线程和其它写线程均被阻塞。
可见,只是synchronized是远远不能满足多样化的业务对锁的要求的。接下来我们介绍一下JDK中有关锁的一些接口和类。
JDK中有关锁的一些接口和类
众所周知,JDK中关于并发的类大多都在java.util.concurrent
(以下简称juc)包下。而juc.locks包看名字就知道,是提供了一些并发锁的工具类的。前面我们介绍的AQS(AbstractQueuedSynchronizer)就是在这个包下。下面分别介绍一下这个包下的类和接口以及它们之间的关系。
抽象类AQS/AQLS/AOS
这三个抽象类有一定的关系,所以这里放到一起讲。
首先我们看AQS(AbstractQueuedSynchronizer),之前专门有章节介绍这个类,它是在JDK 1.5 发布的,提供了一个“队列同步器”的基本功能实现。而AQS里面的“资源”是用一个int
类型的数据来表示的,有时候我们的业务需求资源的数量超出了int
的范围,所以在JDK 1.6 中,多了一个AQLS(AbstractQueuedLongSynchronizer)。它的代码跟AQS几乎一样,只是把资源的类型变成了long
类型。
AQS和AQLS都继承了一个类叫AOS(AbstractOwnableSynchronizer)。这个类也是在JDK 1.6 中出现的。这个类只有几行简单的代码。从源码类上的注释可以知道,它是用于表示锁与持有者之间的关系(独占模式)。可以看一下它的主要方法:
1 | // 独占模式,锁的持有者 |
接口Condition/Lock/ReadWriteLock
juc.locks包下共有三个接口:Condition
、Lock
、ReadWriteLock
。其中,Lock和ReadWriteLock从名字就可以看得出来,分别是锁和读写锁的意思。Lock接口里面有一些获取锁和释放锁的方法声明,而ReadWriteLock里面只有两个方法,分别返回“读锁”和“写锁”:
1 | public interface ReadWriteLock { |
Lock接口中有一个方法是可以获得一个Condition
:
1 | Condition newCondition(); |
之前我们提到了每个对象都可以用继承自Object
的wait/notify方法来实现等待/通知机制。而Condition接口也提供了类似Object监视器的方法,通过与Lock配合来实现等待/通知模式。
那为什么既然有Object的监视器方法了,还要用Condition呢?这里有一个二者简单的对比:
对比项 | Object监视器 | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock获取锁,调用Lock.newCondition获取Condition对象 |
调用方式 | 直接调用,比如object.notify() | 直接调用,比如condition.await() |
等待队列的个数 | 一个 | 多个 |
当前线程释放锁进入等待状态 | 支持 | 支持 |
当前线程释放锁进入等待状态,在等待状态中不中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态直到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
Condition和Object的wait/notify基本相似。其中,Condition的await方法对应的是Object的wait方法,而Condition的signal/signalAll方法则对应Object的notify/notifyAll()。但Condition类似于Object的等待/通知机制的加强版。我们来看看主要的方法:
方法名称 | 描述 |
---|---|
await() | 当前线程进入等待状态直到被通知(signal)或者中断;当前线程进入运行状态并从await()方法返回的场景包括:(1)其他线程调用相同Condition对象的signal/signalAll方法,并且当前线程被唤醒;(2)其他线程调用interrupt方法中断当前线程; |
awaitUninterruptibly() | 当前线程进入等待状态直到被通知,在此过程中对中断信号不敏感,不支持中断当前线程 |
awaitNanos(long) | 当前线程进入等待状态,直到被通知、中断或者超时。如果返回值小于等于0,可以认定就是超时了 |
awaitUntil(Date) | 当前线程进入等待状态,直到被通知、中断或者超时。如果没到指定时间被通知,则返回true,否则返回false |
signal() | 唤醒一个等待在Condition上的线程,被唤醒的线程在方法返回前必须获得与Condition对象关联的锁 |
signalAll() | 唤醒所有等待在Condition上的线程,能够从await()等方法返回的线程必须先获得与Condition对象关联的锁 |
ReentrantLock
ReentrantLock是一个非抽象类,它是Lock接口的JDK默认实现,实现了锁的基本功能。从名字上看,它是一个”可重入“锁,从源码上看,它内部有一个抽象类Sync
,是继承了AQS,自己实现的一个同步器。同时,ReentrantLock内部有两个非抽象类NonfairSync
和FairSync
,它们都继承了Sync。从名字上看得出,分别是”非公平同步器“和”公平同步器“的意思。这意味着ReentrantLock可以支持”公平锁“和”非公平锁“。
通过看这两个同步器的源码可以发现,它们的实现都是”独占“的。都调用了AOS的setExclusiveOwnerThread
方法,所以ReentrantLock的锁是”独占“的,也就是说,它的锁都是”排他锁“,不能共享。
在ReentrantLock的构造方法里,可以传入一个boolean
类型的参数,来指定它是否是一个公平锁,默认情况下是非公平的。这个参数一旦实例化后就不能修改,只能通过isFair()
方法来查看。
ReentrantReadWriteLock
这个类也是一个非抽象类,它是ReadWriteLock接口的JDK默认实现。它与ReentrantLock的功能类似,同样是可重入的,支持非公平锁和公平锁。不同的是,它还支持”读写锁“。
ReentrantReadWriteLock内部的结构大概是这样:
1 | // 内部结构 |
可以看到,它同样是内部维护了两个同步器。且维护了两个Lock的实现类ReadLock和WriteLock。从源码可以发现,这两个内部类用的是外部类的同步器。
ReentrantReadWriteLock实现了读写锁,但它有一个小弊端,就是在“写”操作的时候,其它线程不能写也不能读。我们称这种现象为“写饥饿”,将在后文的StampedLock类继续讨论这个问题。
StampedLock
StampedLock
类是在Java 8 才发布的,也是Doug Lea大神所写,有人号称它为锁的性能之王。它没有实现Lock接口和ReadWriteLock接口,但它其实是实现了“读写锁”的功能,并且性能比ReentrantReadWriteLock更高。StampedLock还把读锁分为了“乐观读锁”和“悲观读锁”两种。
前面提到了ReentrantReadWriteLock会发生“写饥饿”的现象,但StampedLock不会。它是怎么做到的呢?它的核心思想在于,在读的时候如果发生了写,应该通过重试的方式来获取新的值,而不应该阻塞写操作。这种模式也就是典型的无锁编程思想,和CAS自旋的思想一样。这种操作方式决定了StampedLock在读线程非常多而写线程非常少的场景下非常适用,同时还避免了写饥饿情况的发生。
这里篇幅有限,就不介绍StampedLock的源码了,只是分析一下官方提供的用法(在JDK源码类声明的上方或Javadoc里可以找到)。
1 | class Point { |
乐观读锁的意思就是先假定在这个锁获取期间,共享变量不会被改变,既然假定不会被改变,那就不需要上锁。在获取乐观读锁之后进行了一些操作,然后又调用了validate方法,这个方法就是用来验证tryOptimisticRead之后,是否有写操作执行过,如果有,则获取一个悲观读锁,这里的悲观读锁和ReentrantReadWriteLock中的读锁类似,也是个共享锁。
可以看到,StampedLock获取锁会返回一个long
类型的变量,释放锁的时候再把这个变量传进去。简单看看源码:
1 | // 用于操作state后获取stamp的值 |
StampedLock用这个long类型的变量的前7位(LG_READERS)来表示读锁,每获取一个悲观读锁,就加1(RUNIT),每释放一个悲观读锁,就减1。而悲观读锁最多只能装128个(7位限制),很容易溢出,所以用一个int类型的变量来存储溢出的悲观读锁。
写锁用state变量剩下的位来表示,每次获取一个写锁,就加0000 1000 0000(WBIT)。需要注意的是,写锁在释放的时候,并不是减WBIT,而是再加WBIT。这是为了让每次写锁都留下痕迹,解决CAS中的ABA问题,也为乐观锁检查变化validate方法提供基础。
乐观读锁就比较简单了,并没有真正改变state的值,而是在获取锁的时候记录state的写状态,在操作完成后去检查state的写状态部分是否发生变化,上文提到了,每次写锁都会留下痕迹,也是为了这里乐观锁检查变化提供方便。
总的来说,StampedLock的性能是非常优异的,基本上可以取代ReentrantReadWriteLock的作用。
并发容器集合
同步容器与并发容器
我们知道在java.util包下提供了一些容器类,而Vector和Hashtable是线程安全的容器类,但是这些容器实现同步的方式是通过对方法加锁(sychronized)方式实现的,这样读写均需要锁操作,导致性能低下。
而即使是Vector这样线程安全的类,在面对多线程下的复合操作的时候也是需要通过客户端加锁的方式保证原子性。如下面例子说明:
1 | public class TestVector { |
如果方法一和方法二为一个组合的话。那么当方法一获取到了vector
的size之后,方法二已经执行完毕,这样就导致程序的错误。
如果方法三与方法四组合的话。通过锁机制保证了在vector
上的操作的原子性。
并发容器是Java 5 提供的在多线程编程下用于代替同步容器,针对不同的应用场景进行设计,提高容器的并发访问性,同时定义了线程安全的复合操作。
并发容器类介绍
整体架构(列举常用的容器类)
其中,阻塞队列(BlockingQueue)在有介绍,CopyOnWrite容器(CopyOnWritexxx)在有介绍,这里不做过多介绍。
下面分别介绍一些常用的并发容器类和接口,因篇幅原因,这里只介绍这些类的用途和基本的原理,不做过多的源码解析。
并发Map
ConcurrentMap接口
ConcurrentMap接口继承了Map接口,在Map接口的基础上又定义了四个方法:
1 | public interface ConcurrentMap<K, V> extends Map<K, V> { |
putIfAbsent:与原有put方法不同的是,putIfAbsent方法中如果插入的key相同,则不替换原有的value值;
remove:与原有remove方法不同的是,新remove方法中增加了对value的判断,如果要删除的key-value不能与Map中原有的key-value对应上,则不会删除该元素;
replace(K,V,V):增加了对value值的判断,如果key-oldValue能与Map中原有的key-value对应上,才进行替换操作;
replace(K,V):与上面的replace不同的是,此replace不会对Map中原有的key-value进行比较,如果key存在则直接替换;
ConcurrentHashMap类
ConcurrentHashMap同HashMap一样也是基于散列表的map,但是它提供了一种与Hashtable完全不同的加锁策略,提供更高效的并发性和伸缩性。
ConcurrentHashMap在JDK 1.7 和JDK 1.8中有一些区别。这里我们分开介绍一下。
JDK 1.7
ConcurrentHashMap在JDK 1.7中,提供了一种粒度更细的加锁机制来实现在多线程下更高的性能,这种机制叫分段锁(Lock Striping)。
提供的优点是:在并发环境下将实现更高的吞吐量,而在单线程环境下只损失非常小的性能。
可以这样理解分段锁,就是将数据分段,对每一段数据分配一把锁。当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
有些方法需要跨段,比如size()、isEmpty()、containsValue(),它们可能需要锁定整个表而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。如下图:
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,HashEntry则用于存储键值对数据。
一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。
JDK 1.8
而在JDK 1.8中,ConcurrentHashMap主要做了两个优化:
- 同HashMap一样,链表也会在长度达到8的时候转化为红黑树,这样可以提升大量冲突时候的查询效率;
- 以某个位置的头结点(链表的头结点或红黑树的root结点)为锁,配合自旋+CAS避免不必要的锁开销,进一步提升并发性能。
ConcurrentNavigableMap接口与ConcurrentSkipListMap类
ConcurrentNavigableMap接口继承了NavigableMap接口,这个接口提供了针对给定搜索目标返回最接近匹配项的导航方法。
ConcurrentNavigableMap接口的主要实现类是ConcurrentSkipListMap类。从名字上来看,它的底层使用的是跳表(SkipList)的数据结构。关于跳表的数据结构这里不做太多介绍,它是一种”空间换时间“的数据结构,可以使用CAS来保证并发安全性。
并发Queue
JDK并没有提供线程安全的List类,因为对List来说,很难去开发一个通用并且没有并发瓶颈的线程安全的List。因为即使简单的读操作,拿contains() 这样一个操作来说,很难想到搜索的时候如何避免锁住整个list。
所以退一步,JDK提供了对队列和双端队列的线程安全的类:ConcurrentLinkedQueue和ConcurrentLinkedDeque。因为队列相对于List来说,有更多的限制。这两个类是使用CAS来实现线程安全的。
并发Set
JDK提供了ConcurrentSkipListSet,是线程安全的有序的集合。底层是使用ConcurrentSkipListMap实现。
谷歌的guava框架实现了一个线程安全的ConcurrentHashSet:
1 | Set<String> s = Sets.newConcurrentHashSet(); |
CopyOnWrite容器
什么是CopyOnWrite容器
在说到CopyOnWrite容器之前我们先来谈谈什么是CopyOnWrite机制,CopyOnWrite是计算机设计领域中的一种优化策略,也是一种在并发场景下常用的设计思想——写入时复制思想。
那什么是写入时复制思想呢?就是当有多个调用者同时去请求一个资源数据的时候,有一个调用者出于某些原因需要对当前的数据源进行修改,这个时候系统将会复制一个当前数据源的副本给调用者修改。
CopyOnWrite容器即写时复制的容器,当我们往一个容器中添加元素的时候,不直接往容器中添加,而是将当前容器进行copy,复制出来一个新的容器,然后向新容器中添加我们需要的元素,最后将原容器的引用指向新容器。
这样做的好处在于,我们可以在并发的场景下对容器进行”读操作”而不需要”加锁”,从而达到读写分离的目的。从JDK 1.5 开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器 ,分别是CopyOnWriteArrayList和CopyOnWriteArraySet 。我们着重给大家介绍一下CopyOnWriteArrayList。
CopyOnWriteArrayList
优点: CopyOnWriteArrayList经常被用于“读多写少”的并发场景,是因为CopyOnWriteArrayList无需任何同步措施,大大增强了读的性能。在Java中遍历线程非安全的List(如:ArrayList和 LinkedList)的时候,若中途有别的线程对List容器进行修改,那么会抛出ConcurrentModificationException异常。CopyOnWriteArrayList由于其”读写分离”,遍历和修改操作分别作用在不同的List容器,所以在使用迭代器遍历的时候,则不会抛出异常。
缺点: 第一个缺点是CopyOnWriteArrayList每次执行写操作都会将原容器进行拷贝一份,数据量大的时候,内存会存在较大的压力,可能会引起频繁Full GC(ZGC因为没有使用Full GC)。比如这些对象占用的内存200M左右,那么再写入100M数据进去,内存就会多占用300M。
第二个缺点是CopyOnWriteArrayList由于实现的原因,写和读分别作用在不同新老容器上,在写操作执行过程中,读不会阻塞,但读取到的却是老容器的数据。
现在我们来看一下CopyOnWriteArrayList的add操作源码,它的逻辑很清晰,就是先把原容器进行copy,然后在新的副本上进行“写操作”,最后再切换引用,在此过程中是加了锁的。
1 | public boolean add(E e) { |
我们再来看一下remove操作的源码,remove的逻辑是将要remove元素之外的其他元素拷贝到新的副本中,然后再将原容器的引用指向新的副本中,因为remove操作也是“写操作”所以也是要加锁的。
1 | public E remove(int index) { |
我们再来看看CopyOnWriteArrayList效率最高的读操作的源码
1 | public E get(int index) { |
由上可见“读操作”是没有加锁,直接读取。
CopyOnWrite的业务中实现
接下来,我们结合具体业务场景来实现一个CopyOnWriteMap的并发容器并且使用它。
1 | import java.util.Collection; |
上面就是参考CopyOnWriteArrayList实现的CopyOnWriteMap,我们可以用这个容器来做什么呢?结合我们之前说的CopyOnWrite的复制思想,它最适用于“读多写少”的并发场景。
场景:假如我们有一个搜索的网站需要屏蔽一些“关键字”,“黑名单”每晚定时更新,每当用户搜索的时候,“黑名单”中的关键字不会出现在搜索结果当中,并且提示用户敏感字。
1 | // 黑名单服务 |
这里需要各位小伙伴特别特别注意一个问题,此处的场景是每晚凌晨“黑名单”定时更新,原因是CopyOnWrite容器有数据一致性的问题,它只能保证最终数据一致性。
所以如果我们希望写入的数据马上能准确地读取,请不要使用CopyOnWrite容器。
通信工具类
JDK中提供了一些工具类以供开发者使用。这样的话我们在遇到一些常见的应用场景时就可以使用这些工具类,而不用自己再重复造轮子了。
它们都在java.util.concurrent包下。先总体概括一下都有哪些工具类,它们有什么作用,然后再分别介绍它们的主要使用方法和原理。
类 | 作用 |
---|---|
Semaphore | 限制线程的数量 |
Exchanger | 两个线程交换数据 |
CountDownLatch | 线程等待直到计数器减为0时开始工作 |
CyclicBarrier | 作用跟CountDownLatch类似,但是可以重复使用 |
Phaser | 增强的CyclicBarrier |
下面分别介绍这几个类。
Semaphore
Semaphore介绍
Semaphore翻译过来是信号的意思。顾名思义,这个工具类提供的功能就是多个线程彼此“打信号”。而这个“信号”是一个int
类型的数据,也可以看成是一种“资源”。
可以在构造函数中传入初始资源总数,以及是否使用“公平”的同步器。默认情况下,是非公平的。
1 | // 默认情况下使用非公平 |
最主要的方法是acquire方法和release方法。acquire()方法会申请一个permit,而release方法会释放一个permit。当然,你也可以申请多个acquire(int permits)或者释放多个release(int permits)。
每次acquire,permits就会减少一个或者多个。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为止。
Semaphore案例
Semaphore往往用于资源有限的场景中,去限制线程的数量。举个例子,我想限制同时只能有3个线程在工作:
1 | public class SemaphoreDemo { |
输出:
当前线程是1, 还剩2个资源,还有0个线程在等待
当前线程是0, 还剩1个资源,还有0个线程在等待
当前线程是6, 还剩0个资源,还有0个线程在等待
线程6释放了资源
当前线程是2, 还剩0个资源,还有6个线程在等待
线程2释放了资源
当前线程是4, 还剩0个资源,还有5个线程在等待
线程0释放了资源
当前线程是7, 还剩0个资源,还有4个线程在等待
线程1释放了资源
当前线程是8, 还剩0个资源,还有3个线程在等待
线程7释放了资源
当前线程是5, 还剩0个资源,还有2个线程在等待
线程4释放了资源
当前线程是3, 还剩0个资源,还有1个线程在等待
线程8释放了资源
当前线程是9, 还剩0个资源,还有0个线程在等待
线程9释放了资源
线程5释放了资源
线程3释放了资源
可以看到,在这次运行中,最开始是1, 0, 6这三个线程获得了资源,而其它线程进入了等待队列。然后当某个线程释放资源后,就会有等待队列中的线程获得资源。
当然,Semaphore默认的acquire方法是会让线程进入等待队列,且会抛出中断异常。但它还有一些方法可以忽略中断或不进入阻塞队列:
1 | // 忽略中断 |
Semaphore原理
Semaphore内部有一个继承了AQS的同步器Sync,重写了tryAcquireShared
方法。在这个方法里,会去尝试获取资源。
如果获取失败(想要的资源数量小于目前已有的资源数量),就会返回一个负数(代表尝试获取资源失败)。然后当前线程就会进入AQS的等待队列。
Exchanger
Exchanger类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间传送任何数据。先来一个案例看看如何使用,比如两个线程之间想要传送字符串:
1 | public class ExchangerDemo { |
输出:
这个时候线程A是阻塞的,在等待线程B的数据
这是线程B,得到了另一个线程的数据:这是来自线程A的数据
这是线程A,得到了另一个线程的数据:这是来自线程B的数据
可以看到,当一个线程调用exchange方法后,它是处于阻塞状态的,只有当另一个线程也调用了exchange方法,它才会继续向下执行。看源码可以发现它是使用park/unpark来实现等待状态的切换的,但是在使用park/unpark方法之前,使用了CAS检查,估计是为了提高性能。
Exchanger一般用于两个线程之间更方便地在内存中交换数据,因为其支持泛型,所以我们可以传输任何的数据,比如IO流或者IO缓存。根据JDK里面的注释的说法,可以总结为一下特性:
- 此类提供对外的操作是同步的;
- 用于成对出现的线程之间交换数据;
- 可以视作双向的同步队列;
- 可应用于基因算法、流水线设计等场景。
Exchanger类还有一个有超时参数的方法,如果在指定时间内没有另一个线程调用exchange,就会抛出一个超时异常。
1 | public V exchange(V x, long timeout, TimeUnit unit) |
那么问题来了,Exchanger只能是两个线程交换数据吗?那三个调用同一个实例的exchange方法会发生什么呢?答案是只有前两个线程会交换数据,第三个线程会进入阻塞状态。
需要注意的是,exchange是可以重复使用的。也就是说。两个线程可以使用Exchanger在内存中不断地再交换数据。
CountDownLatch
CountDownLatch介绍
先来解读一下CountDownLatch这个类名字的意义。CountDown代表计数递减,Latch是“门闩”的意思。也有人把它称为“屏障”。而CountDownLatch这个类的作用也很贴合这个名字的意义,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。
CountDownLatch的方法也很简单,如下:
1 | // 构造方法: |
CountDownLatch案例
我们知道,玩游戏的时候,在游戏真正开始之前,一般会等待一些前置任务完成,比如“加载地图数据”,“加载人物模型”,“加载背景音乐”等等。只有当所有的东西都加载完成后,玩家才能真正进入游戏。下面我们就来模拟一下这个demo。
1 | public class CountDownLatchDemo { |
输出:
等待数据加载…
还有3个前置任务
加载人物模型 - 任务完成
加载背景音乐 - 任务完成
加载地图数据 - 任务完成
数据加载完成,正式开始游戏!
CountDownLatch原理
其实CountDownLatch类的原理挺简单的,内部同样是一个继承了AQS的实现类Sync,且实现起来还很简单,可能是JDK里面AQS的子类中最简单的实现了,有兴趣的读者可以去看看这个内部类的源码。
需要注意的是构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。
CyclicBarrier
CyclicBarrier介绍
CyclicBarrirer从名字上来理解是“循环的屏障”的意思。前面提到了CountDownLatch一旦计数值count
被降为0后,就不能再重新设置了,它只能起一次“屏障”的作用。而CyclicBarrier拥有CountDownLatch的所有功能,还可以使用reset()
方法重置屏障。
CyclicBarrier Barrier被破坏
如果参与者(线程)在等待的过程中,Barrier被破坏,就会抛出BrokenBarrierException。可以用isBroken()
方法检测Barrier是否被破坏。
- 如果有线程已经处于等待状态,调用reset方法会导致已经在等待的线程出现BrokenBarrierException异常。并且由于出现了BrokenBarrierException,将会导致始终无法等待。
- 如果在等待的过程中,线程被中断,会抛出InterruptedException异常,并且这个异常会传播到其他所有的线程。
- 如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,其他线程会抛出BrokenBarrierException,屏障被损坏。
- 如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线程会抛出BrokenBarrierException异常。
CyclicBarrier案例
我们同样用玩游戏的例子。如果玩一个游戏有多个“关卡”,那使用CountDownLatch显然不太合适,那需要为每个关卡都创建一个实例。那我们可以使用CyclicBarrier来实现每个关卡的数据加载等待功能。
1 | public class CyclicBarrierDemo { |
输出:
关卡1的任务加载地图数据完成
关卡1的任务加载背景音乐完成
关卡1的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏…
关卡2的任务加载地图数据完成
关卡2的任务加载背景音乐完成
关卡2的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏…
关卡3的任务加载人物模型完成
关卡3的任务加载地图数据完成
关卡3的任务加载背景音乐完成
本关卡所有前置任务完成,开始游戏…
注意这里跟CountDownLatch的代码有一些不同。CyclicBarrier没有分为await()
和countDown()
,而是只有单独的一个await()
方法。
一旦调用await()方法的线程数量等于构造方法中传入的任务总量(这里是3),就代表达到屏障了。CyclicBarrier允许我们在达到屏障的时候可以执行一个任务,可以在构造方法传入一个Runnable类型的对象。上述案例就是在达到屏障时,输出“本关卡所有前置任务完成,开始游戏…”。
1 | // 构造方法 |
CyclicBarrier原理
CyclicBarrier虽说功能与CountDownLatch类似,但是实现原理却完全不同,CyclicBarrier内部使用的是Lock + Condition实现的等待/通知模式。详情可以查看这个方法的源码:
1 | private int dowait(boolean timed, long nanos) |
Phaser
Phaser介绍
Phaser这个单词是“移相器,相位器”的意思(好吧,笔者并不懂这是什么玩意,下方资料来自百度百科)。这个类是从JDK 1.7 中出现的。
移相器(Phaser)能够对波的相位进行调整的一种装置。任何传输介质对在其中传导的波动都会引入相移,这是早期模拟移相器的原理;现代电子技术发展后利用A/D、D/A转换实现了数字移相,顾名思义,它是一种不连续的移相技术,但特点是移相精度高。 移相器在雷达、导弹姿态控制、加速器、通信、仪器仪表甚至于音乐等领域都有着广泛的应用
Phaser类有点复杂,这里只介绍一些基本的用法和知识点。详情可以查看JDK文档,文档里有这个类非常详尽的介绍。
前面我们介绍了CyclicBarrier,可以发现它在构造方法里传入“任务总量”parties
之后,就不能修改这个值了,并且每次调用await()
方法也只能消耗一个parties
计数。但Phaser可以动态地调整任务总量!
名词解释:
- party:对应一个线程,数量可以通过register或者构造参数传入;
- arrive:对应一个party的状态,初始时是unarrived,当调用
arriveAndAwaitAdvance()
或者arriveAndDeregister()
进入arrive状态,可以通过getUnarrivedParties()
获取当前未到达的数量; - register:注册一个party,每一阶段必须所有注册的party都到达才能进入下一阶段;
- deRegister:减少一个party。
- phase:阶段,当所有注册的party都arrive之后,将会调用Phaser的
onAdvance()
方法来判断是否要进入下一阶段。
Phaser终止的两种途径,Phaser维护的线程执行完毕或者onAdvance()
返回true
此外Phaser还能维护一个树状的层级关系,构造的时候new Phaser(parentPhaser),对于Task执行时间短的场景(竞争激烈),也就是说有大量的party, 那可以把每个Phaser的任务量设置较小,多个Phaser共同继承一个父Phaser。
Phasers with large numbers of parties that would otherwise experience heavy synchronization contention costs may instead be set up so that groups of sub-phasers share a common parent. This may greatly increase throughput even though it incurs greater per-operation overhead.
翻译:如果有大量的party,那许多线程可能同步的竞争成本比较高。所以可以拆分成多个子Phaser共享一个共同的父Phaser。这可能会大大增加吞吐量,即使它会带来更多的每次操作开销。
Phaser案例
还是游戏的案例。假设我们游戏有三个关卡,但只有第一个关卡有新手教程,需要加载新手教程模块。但后面的第二个关卡和第三个关卡都不需要。我们可以用Phaser来做这个需求。
代码:
1 | public class PhaserDemo { |
输出:
关卡1,需要加载4个模块,当前模块【加载背景音乐】
关卡1,需要加载4个模块,当前模块【加载新手教程】
下次关卡移除加载【新手教程】模块
关卡1,需要加载3个模块,当前模块【加载地图数据】
关卡1,需要加载3个模块,当前模块【加载人物模型】
第1次关卡准备完成
关卡2,需要加载3个模块,当前模块【加载地图数据】
关卡2,需要加载3个模块,当前模块【加载背景音乐】
关卡2,需要加载3个模块,当前模块【加载人物模型】
第2次关卡准备完成
关卡3,需要加载3个模块,当前模块【加载人物模型】
关卡3,需要加载3个模块,当前模块【加载地图数据】
关卡3,需要加载3个模块,当前模块【加载背景音乐】
第3次关卡准备完成
这里要注意关卡1的输出,在“加载新手教程”线程中调用了arriveAndDeregister()
减少一个party之后,后面的线程使用getRegisteredParties()
得到的是已经被修改后的parties了。但是当前这个阶段(phase),仍然是需要4个parties都arrive才触发屏障的。从下一个阶段开始,才需要3个parties都arrive就触发屏障。
另外Phaser类用来控制某个阶段的线程数量很有用,但它并在意这个阶段具体有哪些线程arrive,只要达到它当前阶段的parties值,就触发屏障。所以我这里的案例虽然制定了特定的线程(加载新手教程)来更直观地表述Phaser的功能,但是其实Phaser是没有分辨具体是哪个线程的功能的,它在意的只是数量,这一点需要读者注意。
Phaser原理
Phaser类的原理相比起来要复杂得多。它内部使用了两个基于Fork-Join框架的原子类辅助:
1 | private final AtomicReference<QNode> evenQ; |
有兴趣的读者可以去看看JDK源代码,这里不做过多叙述。
总的来说,CountDownLatch,CyclicBarrier,Phaser是一个比一个强大,但也一个比一个复杂。根据自己的业务需求合理选择即可。
Fork/Join框架
什么是Fork/Join
Fork/Join框架是一个实现了ExecutorService接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器来提高应用程序的性能。
与其他ExecutorService相关的实现相同的是,Fork/Join框架会将任务分配给线程池中的线程。而与之不同的是,Fork/Join框架在执行任务时使用了工作窃取算法。
fork在英文里有分叉的意思,join在英文里连接、结合的意思。顾名思义,fork就是要使一个大任务分解成若干个小任务,而join就是最后将各个小任务的结果结合起来得到大任务的结果。
Fork/Join的运行流程大致如下所示:
需要注意的是,图里的次级子任务可以一直分下去,一直分到子任务足够小为止。用伪代码来表示如下:
1 | solve(任务): |
通过上面伪代码可以看出,我们通过递归嵌套的计算得到最终结果,这里有体现分而治之(divide and conquer) 的算法思想。
工作窃取算法
工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。
工作窃取流程如下图所示:
值得注意的是,当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。
另外,当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”。
Fork/Join的具体实现
前面我们说Fork/Join框架简单来讲就是对任务的分割与子任务的合并,所以要实现这个框架,先得有任务。在Fork/Join框架里提供了抽象类ForkJoinTask
来实现任务。
ForkJoinTask
ForkJoinTask是一个类似普通线程的实体,但是比普通线程轻量得多。
fork()方法:使用线程池中的空闲线程异步提交任务
1 | // 本文所有代码都引自Java 8 |
其实fork()只做了一件事,那就是把任务推入当前工作线程的工作队列里。
join()方法:等待处理任务的线程处理完毕,获得返回值。
来看下join()的源码:
1 | public final V join() { |
我们在之前介绍过说Thread.join()会使线程阻塞,而ForkJoinPool.join()会使线程免于阻塞,下面是ForkJoinPool.join()的流程图:
RecursiveAction和RecursiveTask
通常情况下,在创建任务的时候我们一般不直接继承ForkJoinTask,而是继承它的子类RecursiveAction和RecursiveTask。
两个都是ForkJoinTask的子类,RecursiveAction可以看做是无返回值的ForkJoinTask,RecursiveTask是有返回值的ForkJoinTask。
此外,两个子类都有执行主要计算的方法compute(),当然,RecursiveAction的compute()返回void,RecursiveTask的compute()有具体的返回值。
ForkJoinPool
ForkJoinPool是用于执行ForkJoinTask任务的执行(线程)池。
ForkJoinPool管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里处理。
我们来大致看下ForkJoinPool的源码:
1 | .misc.Contended |
WorkQueue
双端队列,ForkJoinTask存放在这里。
当工作线程在处理自己的工作队列时,会从队列首取任务来执行(FIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。
ForkJoinPool与传统线程池最显著的区别就是它维护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个工作线程都维护着一个工作队列)。
runState
ForkJoinPool的运行状态。SHUTDOWN状态用负数表示,其他用2的幂次表示。
Fork/Join的使用
上面我们说ForkJoinPool负责管理线程和任务,ForkJoinTask实现fork和join操作,所以要使用Fork/Join框架就离不开这两个类了,只是在实际开发中我们常用ForkJoinTask的子类RecursiveTask 和RecursiveAction来替代ForkJoinTask。
下面我们用一个计算斐波那契数列第n项的例子来看一下Fork/Join的使用:
斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:
1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······
如果设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。
1 | public class FibonacciTest { |
上面例子在本机的输出:
1 | CPU核数:4 |
需要注意的是,上述计算时间复杂度为O(2^n)
,随着n的增长计算效率会越来越低,这也是上面的例子中n不敢取太大的原因。
此外,也并不是所有的任务都适合Fork/Join框架,比如上面的例子任务划分过于细小反而体现不出效率,下面我们试试用普通的递归来求f(n)的值,看看是不是要比使用Fork/Join快:
1 | // 普通递归,复杂度为O(2^n) |
普通递归的例子输出:
1 | 计算结果:102334155 |
通过输出可以很明显的看出来,使用普通递归的效率都要比使用Fork/Join框架要高很多。
这里我们再用另一种思路来计算:
1 | // 通过循环来计算,复杂度为O(n) |
上面例子在笔者所用电脑的输出为:
1 | 计算结果:102334155 |
这里耗时为0不代表没有耗时,是表明这里计算的耗时几乎可以忽略不计,大家可以在自己的电脑试试,即使是n取大很多量级的数据(注意int溢出的问题)耗时也是很短的,或者可以用System.nanoTime()统计纳秒的时间。
为什么在这里普通的递归或循环效率更快呢?因为Fork/Join是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。
如果要计算的任务比较简单(比如我们案例中的斐波那契数列),那当然是直接使用单线程会更快一些。但如果要计算的东西比较复杂,计算机又是多核的情况下,就可以充分利用多核CPU来提高计算速度。
另外,Java 8 Stream的并行操作底层就是用到了Fork/Join框架,下一章我们将从源码及案例两方面介绍Java 8 Stream的并行操作。
Java 8 Stream并行计算原理
Java 8 Stream简介
从Java 8 开始,我们可以使用Stream
接口以及lambda表达式进行“流式计算”。它可以让我们对集合的操作更加简洁、更加可读、更加高效。
Stream接口有非常多用于集合计算的方法,比如判空操作empty、过滤操作filter、求最max值、查找操作findFirst和findAny等等。
Stream单线程串行计算
Stream接口默认是使用串行的方式,也就是说在一个线程里执行。下面举一个例子:
1 | public class StreamDemo { |
我们来理解一下这个方法。首先我们用整数1~9创建了一个Stream
。这里的Stream.of(T… values)方法是Stream接口的一个静态方法,其底层调用的是Arrays.stream(T[] array)方法。
然后我们使用了reduce
方法来计算这个集合的累加和。reduce
方法这里做的是:从前两个元素开始,进行某种操作(我这里进行的是加法操作)后,返回一个结果,然后再拿这个结果跟第三个元素执行同样的操作,以此类推,直到最后的一个元素。
我们来打印一下当前这个reduce操作的线程以及它们被操作的元素和返回的结果以及最后所有reduce方法的结果,也就代表的是数字1到9的累加和。
main: 1 + 2 = 3
main: 3 + 3 = 6
main: 6 + 4 = 10
main: 10 + 5 = 15
main: 15 + 6 = 21
main: 21 + 7 = 28
main: 28 + 8 = 36
main: 36 + 9 = 45
45
可以看到,默认情况下,它是在一个单线程运行的,也就是main线程。然后每次reduce操作都是串行起来的,首先计算前两个数字的和,然后再往后依次计算。
Stream多线程并行计算
我们思考上面一个例子,是不是一定要在单线程里进行串行地计算呢?假如我的计算机是一个多核计算机,我们在理论上能否利用多核来进行并行计算,提高计算效率呢?
当然可以,比如我们在计算前两个元素1 + 2 = 3的时候,其实我们也可以同时在另一个核计算 3 + 4 = 7。然后等它们都计算完成之后,再计算 3 + 7 = 10的操作。
是不是很熟悉这样的操作手法?没错,它就是ForkJoin框架的思想。下面小小地修改一下上面的代码,增加一行代码,使Stream使用多线程来并行计算:
1 | public class StreamParallelDemo { |
可以看到,与上一个案例的代码只有一点点区别,就是在reduce方法被调用之前,调用了parallel()方法。下面来看看这个方法的输出:
ForkJoinPool.commonPool-worker-1: 3 + 4 = 7
ForkJoinPool.commonPool-worker-4: 8 + 9 = 17
ForkJoinPool.commonPool-worker-2: 5 + 6 = 11
ForkJoinPool.commonPool-worker-3: 1 + 2 = 3
ForkJoinPool.commonPool-worker-4: 7 + 17 = 24
ForkJoinPool.commonPool-worker-4: 11 + 24 = 35
ForkJoinPool.commonPool-worker-3: 3 + 7 = 10
ForkJoinPool.commonPool-worker-3: 10 + 35 = 45
45
可以很明显地看到,它使用的线程是ForkJoinPool
里面的commonPool
里面的worker线程。并且它们是并行计算的,并不是串行计算的。但由于Fork/Join框架的作用,它最终能很好的协调计算结果,使得计算结果完全正确。
如果我们用Fork/Join代码去实现这样一个功能,那无疑是非常复杂的。但Java8提供了并行式的流式计算,大大简化了我们的代码量,使得我们只需要写很少很简单的代码就可以利用计算机底层的多核资源。
从源码看Stream并行计算原理
上面我们通过在控制台输出线程的名字,看到了Stream的并行计算底层其实是使用的Fork/Join框架。那它到底是在哪使用Fork/Join的呢?我们从源码上来解析一下上述案例。
Stream.of
方法就不说了,它只是生成一个简单的Stream。先来看看parallel()
方法的源码。这里由于我的数据是int
类型的,所以它其实是使用的BaseStream
接口的parallel()
方法。而BaseStream
接口的JDK唯一实现类是一个叫AbstractPipeline
的类。下面我们来看看这个类的parallel()
方法的代码:
1 | public final S parallel() { |
这个方法很简单,就是把一个标识sourceStage.parallel
设置为true
。然后返回实例本身。
接着我们再来看reduce
这个方法的内部实现。
Stream.reduce()方法的具体实现是交给了ReferencePipeline
这个抽象类,它是继承了AbstractPipeline
这个类的:
1 | // ReferencePipeline抽象类的reduce方法 |
从它的源码可以知道,reduce方法调用了evaluate方法,而evaluate方法会先去检查当前的flag,是否使用并行模式,如果是则会调用evaluateParallel
方法执行并行计算,否则,会调用evaluateSequential
方法执行串行计算。
这里我们再看看TerminalOp
(注意这里是字母l O,而不是数字1 0)接口的evaluateParallel
方法。TerminalOp
接口的实现类有这样几个内部类:
- java.util.stream.FindOps.FindOp
- java.util.stream.ForEachOps.ForEachOp
- java.util.stream.MatchOps.MatchOp
- java.util.stream.ReduceOps.ReduceOp
可以看到,对应的是Stream的几种主要的计算操作。我们这里的示例代码使用的是reduce计算,那我们就看看ReduceOp类的这个方法的源码:
1 | // java.util.stream.ReduceOps.ReduceOp.evaluateParallel |
evaluateParallel方法创建了一个新的ReduceTask实例,并且调用了invoke()方法后再调用get()方法,然后返回这个结果。那这个ReduceTask是什么呢?它的invoke方法内部又是什么呢?
追溯源码我们可以发现,ReduceTask类是ReduceOps类的一个内部类,它继承了AbstractTask类,而AbstractTask类又继承了CountedCompleter类,而CountedCompleter类又继承了ForkJoinTask类!
它们的继承关系如下:
ReduceTask -> AbstractTask -> CountedCompleter -> ForkJoinTask
这里的ReduceTask的invoke方法,其实是调用的ForkJoinTask的invoke方法,中间三层继承并没有覆盖这个方法的实现。
所以这就从源码层面解释了Stream并行的底层原理是使用了Fork/Join框架。
需要注意的是,一个Java进程的Stream并行计算任务默认共享同一个线程池,如果随意的使用并行特性可能会导致方法的吞吐量下降。我们可以通过下面这种方式来让你的某个并行Stream使用自定义的ForkJoin线程池:
1 | ForkJoinPool customThreadPool = new ForkJoinPool(4); |
Stream并行计算的性能提升
我们可以在本地测试一下如果在多核情况下,Stream并行计算会给我们的程序带来多大的效率上的提升。用以下示例代码来计算一千万个随机数的和:
1 | public class StreamParallelDemo { |
输出:
本计算机的核数:8
495156156
单线程计算耗时:223
495156156
多线程计算耗时:95
所以在多核的情况下,使用Stream的并行计算确实比串行计算能带来很大效率上的提升,并且也能保证结果计算完全准确。
本文一直在强调的“多核”的情况。其实可以看到,我的本地电脑有8核,但并行计算耗时并不是单线程计算耗时除以8,因为线程的创建、销毁以及维护线程上下文的切换等等都有一定的开销。所以如果你的服务器并不是多核服务器,那也没必要用Stream的并行计算。因为在单核的情况下,往往Stream的串行计算比并行计算更快,因为它不需要线程切换的开销。
计划任务
自JDK 1.5 开始,JDK提供了ScheduledThreadPoolExecutor
类用于计划任务(又称定时任务),这个类有两个用途:
- 在给定的延迟之后运行任务
- 周期性重复执行任务
在这之前,是使用Timer
类来完成定时任务的,但是Timer
有缺陷:
- Timer是单线程模式;
- 如果在执行任务期间某个TimerTask耗时较久,那么就会影响其它任务的调度;
- Timer的任务调度是基于绝对时间的,对系统时间敏感;
- Timer不会捕获执行TimerTask时所抛出的异常,由于Timer是单线程,所以一旦出现异常,则线程就会终止,其他任务也得不到执行。
所以JDK 1.5之后,大家就摒弃Timer
,使用ScheduledThreadPoolExecutor
吧。
使用案例
假设我有一个需求,指定时间给大家发送消息。那么我们会将消息(包含发送时间)存储在数据库中,然后想用一个定时任务,每隔1秒检查数据库在当前时间有没有需要发送的消息,那这个计划任务怎么写?下面是一个Demo:
1 | public class ThreadPool { |
下面截取前面的输出(这个demo会一直运行下去):
1 | 2019-01-23 16:16:48 |
这就是ScheduledThreadPoolExecutor
的一个简单运用,想要知道奥秘,接下来的东西需要仔细的看哦。
类结构
1 | public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor |
ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor
,实现了ScheduledExecutorService
。 线程池在之前的章节介绍过了,我们先看看ScheduledExecutorService
。
1 | public interface ScheduledExecutorService extends ExecutorService { |
ScheduledExecutorService
实现了ExecutorService
,并增加若干定时相关的接口。 前两个方法用于单次调度执行任务,区别是有没有返回值。
重点理解一下后面两个方法:
scheduleAtFixedRate
该方法在
initialDelay
时长后第一次执行任务,以后每隔period
时长,再次执行任务。注意,period是从任务开始执行算起的。开始执行任务后,定时器每隔period时长检查该任务是否完成,如果完成则再次启动任务,否则等该任务结束后才再次启动任务。scheduleWithFixDelay
该方法在
initialDelay
时长后第一次执行任务,以后每当任务执行完成后,等待delay
时长,再次执行任务。
主要方法介绍
schedule
1 | // delay时长后执行任务command,该任务只执行一次 |
我们先看看里面涉及到的几个类和接口ScheduledFuture
、 RunnableScheduledFuture
、 ScheduledFutureTask
的关系:
我们先看看这几个接口和类:
Delayed接口
1 | // 继承Comparable接口,表示该类对象支持排序 |
Delayed
接口很简单,继承了Comparable
接口,表示对象是可以比较排序的。
ScheduledFuture接口
1 | // 仅仅继承了Delayed和Future接口,自己没有任何代码 |
没有添加其他方法。
RunnableScheduledFuture接口
1 | public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> { |
ScheduledFutureTask类
回到schecule
方法中,它创建了一个ScheduledFutureTask
的对象,由上面的关系图可知,ScheduledFutureTask
直接或者间接实现了很多接口,一起看看ScheduledFutureTask
里面的实现方法吧。
构造方法
1 | ScheduledFutureTask(Runnable r, V result, long ns, long period) { |
Delayed接口的实现
1 | // 实现Delayed接口的getDelay方法,返回任务开始执行的剩余时间 |
Comparable接口的实现
1 | // Comparable接口的compareTo方法,比较两个任务的”大小”。 |
setNextRunTime
1 | // 任务执行完后,设置下次执行的时间 |
Runnable接口实现
1 | public void run() { |
总结一下run方法的执行过程:
- 如果当前线程池运行状态不可以执行任务,取消该任务,然后直接返回,否则执行步骤2;
- 如果不是周期性任务,调用FutureTask中的run方法执行,会设置执行结果,然后直接返回,否则执行步骤3;
- 如果是周期性任务,调用FutureTask中的runAndReset方法执行,不会设置执行结果,然后直接返回,否则执行步骤4和步骤5;
- 计算下次执行该任务的具体时间;
- 重复执行任务。
runAndReset
方法是为任务多次执行而设计的。runAndReset
方法执行完任务后不会设置任务的执行结果,也不会去更新任务的状态,维持任务的状态为初始状态(NEW状态),这也是该方法和FutureTask
的run
方法的区别。
scheduledAtFixedRate
我们看一下代码:
1 | // 注意,固定速率和固定时延,传入的参数都是Runnable,也就是说这种定时任务是没有返回值的 |
scheduleAtFixedRate
这个方法和schedule
类似,不同点是scheduleAtFixedRate
方法内部创建的是ScheduledFutureTask
,带有初始延时和固定周期的任务 。
scheduledAtFixedDelay
FixedDelay
也是通过ScheduledFutureTask
体现的,唯一不同的地方在于创建的ScheduledFutureTask
不同 。这里不再展示源码。
delayedExecute
前面讲到的schedule
、scheduleAtFixedRate
和scheduleAtFixedDelay
最后都调用了delayedExecute
方法,该方法是定时任务执行的主要方法。 一起来看看源码:
1 | private void delayedExecute(RunnableScheduledFuture<?> task) { |
delayedExecute
方法的逻辑也很简单,主要就是将任务添加到等待队列,然后调用ensurePrestart
方法。
1 | void ensurePrestart() { |
ensurePrestart
方法主要是调用了addWorker
,线程池中的工作线程是通过该方法来启动并执行任务的。 具体可以查看前面讲的线程池章节。
对于ScheduledThreadPoolExecutor
,worker
添加到线程池后会在等待队列上等待获取任务,这点是和ThreadPoolExecutor
一致的。但是worker是怎么从等待队列取定时任务的?
因为ScheduledThreadPoolExecutor
使用了DelayedWorkQueue
保存等待的任务,该等待队列队首应该保存的是最近将要执行的任务,如果队首任务的开始执行时间还未到,worker
也应该继续等待。
DelayedWorkQueue
ScheduledThreadPoolExecutor
使用了DelayedWorkQueue
保存等待的任务。
该等待队列队首应该保存的是最近将要执行的任务,所以worker
只关心队首任务即可,如果队首任务的开始执行时间还未到,worker也应该继续等待。
DelayedWorkQueue是一个无界优先队列,使用数组存储,底层是使用堆结构来实现优先队列的功能。我们先看看DelayedWorkQueue的声明和成员变量:
1 | static class DelayedWorkQueue extends AbstractQueue<Runnable> |
当一个线程成为leader,它只要等待队首任务的delay时间即可,其他线程会无条件等待。leader取到任务返回前要通知其他线程,直到有线程成为新的leader。每当队首的定时任务被其他更早需要执行的任务替换时,leader设置为null,其他等待的线程(被当前leader通知)和当前的leader重新竞争成为leader。
同时,定义了锁lock和监视器available用于线程竞争成为leader。
当一个新的任务成为队首,或者需要有新的线程成为leader时,available监视器上的线程将会被通知,然后竞争成为leader线程。 有些类似于生产者-消费者模式。
接下来看看DelayedWorkQueue
中几个比较重要的方法
take
1 | public RunnableScheduledFuture take() throws InterruptedException { |
take
方法是什么时候调用的呢?在线程池的章节中,介绍了getTask
方法,工作线程会循环地从workQueue
中取任务。但计划任务却不同,因为如果一旦getTask
方法取出了任务就开始执行了,而这时可能还没有到执行的时间,所以在take
方法中,要保证只有在到指定的执行时间的时候任务才可以被取走。
总结一下流程:
- 如果堆顶元素为空,在available条件上等待。
- 如果堆顶任务的执行时间已到,将堆顶元素替换为堆的最后一个元素并调整堆使其满足最小堆特性,同时设置任务在堆中索引为-1,返回该任务。
- 如果leader不为空,说明已经有线程成为leader了,其他线程都要在available监视器上等待。
- 如果leader为空,当前线程成为新的leader,并等待直到堆顶任务执行时间到达。
- take方法返回之前,将leader设置为空,并通知其他线程。
再来说一下leader的作用,这里的leader是为了减少不必要的定时等待,当一个线程成为leader时,它只等待下一个节点的时间间隔,但其它线程无限期等待。 leader线程必须在从take()
或poll()
返回之前signal其它线程,除非其他线程成为了leader。
举例来说,如果没有leader,那么在执行take时,都要执行available.awaitNanos(delay)
,假设当前线程执行了该段代码,这时还没有signal,第二个线程也执行了该段代码,则第二个线程也要被阻塞。但只有一个线程返回队首任务,其他的线程在awaitNanos(delay)
之后,继续执行for循环,因为队首任务已经被返回了,所以这个时候的for循环拿到的队首任务是新的,又需要重新判断时间,又要继续阻塞。
所以,为了不让多个线程频繁的做无用的定时等待,这里增加了leader,如果leader不为空,则说明队列中第一个节点已经在等待出队,这时其它的线程会一直阻塞,减少了无用的阻塞(注意,在finally
中调用了signal()
来唤醒一个线程,而不是signalAll()
)。
offer
该方法往队列插入一个值,返回是否成功插入 。
1 | public boolean offer(Runnable x) { |
在堆中插入了一个节点,这个时候堆有可能不满足最小堆的定义,siftUp
用于将堆调整为最小堆,这属于数据结构的基本内容,本文不做介绍。
总结
内部使用优化的DelayQueue来实现,由于使用队列来实现定时器,有出入队调整堆等操作,所以定时并不是非常非常精确。