Java J.U.C 包入门

Java J.U.C 并发包入门;

J.U.C 包简介

J.U.C 即 java.util.concurrent,该包参考自 EDU.oswego.cs.dl.util.concurrent,是 JSR 166 标准规范的一个实现;那么 JSR 166 以及 J.U.C 包的作者是谁呢,没错,就是 Doug Lea 大神,膜拜!

J.U.C 框架是 Java 5 中引入的,而我们最熟悉的线程池机制就在这个包,合理使用线程池能够带来 3 个好处:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就立即执行;
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以统一的分配、调优和监控;

J.U.C 框架包含的内容有:

  • Executor 框架(线程池、Callable、Future),任务的执行和调度框架;
  • AbstractQueuedSynchronizer(AQS框架),J.U.C 中实现锁和同步机制的基础;
  • Locks & Condition(锁和条件变量),比 synchronized、wait、notify 更细粒度的锁机制;
  • Synchronizers(同步器),主要用于协助线程同步,有 CountDownLatch、CyclicBarrier、Semaphore、Exchanger;
  • Atomic Variables(原子变量),方便程序员在多线程环境下,无锁的进行原子操作,核心操作是 CAS 原子操作,所谓的 CAS 操作,即 compare and swap,指的是将预期值与当前变量的值比较(compare),如果相等则使用新值替换(swap)当前变量,否则不作操作;
  • BlockingQueue(阻塞队列),阻塞队列提供了可阻塞的入队和出对操作,如果队列满了,入队操作将阻塞直到有空间可用,如果队列空了,出队操作将阻塞直到有元素可用;
  • Concurrent Collections(并发容器),说到并发容器,不得不提同步容器,在 JDK1.5 之前,为了线程安全,我们一般都是使用同步容器,同步容器主要的缺点是:对所有容器状态的访问都串行化,严重降低了并发性;某些复合操作,仍然需要加锁来保护;迭代期间,若其它线程并发修改该容器,会抛出 ConcurrentModificationException 异常,即快速失败机制;
  • Fork/Join 并行计算框架,这块内容是在 JDK1.7 中引入的,可以方便利用多核平台的计算能力,简化并行程序的编写,开发人员仅需关注如何划分任务和组合中间结果;框架的核心是 ForkJoinPool 类,实现了工作窃取算法(对那些处理完自身任务的线程,会从其它线程窃取任务执行)并且能够执行 ForkJoinTask 任务;
  • TimeUnit 枚举,TimeUnit 是 java.util.concurrent 包下面的一个枚举类,TimeUnit 提供了可读性更好的线程暂停操作,以及方便的时间单位转换方法;

Executor 框架

Executor 框架继承关系图:
线程池继承关系图

Callable 接口

在介绍线程池之前,我们先来看一下 Callable 接口(函数式接口),它只有一个 call() 方法;

在 Java5 之前,我们只有一个 Runnable 接口,用来定义任务,将它交给一个 Thread 对象去执行;
但是 Runnable 接口的 run() 方法是没有返回值的,也不能抛出任何检查性异常,有些时候不方便。

现在我们可以利用 Callable 接口,来定义有返回值并且可抛出检查异常的任务,将它交给 ExecutorService 去执行;
ExecutorService.submit() 方法将返回一个 Future 对象(待完成的任务结果对象),Future.get() 方法可获取结果。

Future 接口

Future 表示异步任务的结果,Future 提供了任务查询、任务取消、获取任务结果等实用方法;

FutureTask 类

FutureTask 表示异步计算任务;它同时实现了 Runnable、Future 接口,因此可直接交给 Thread 执行,并获取结果;

例子:

Executor 接口

最顶层是 Executor 接口,它的定义很简单,只有一个用于执行任务的 execute() 方法:

ExecutorService 接口

ExecutorService 接口继承自 Executor 公共接口,它提供了更丰富的线程池控制方法,比如 shutdown() 用于平滑关闭线程池,submit() 用于提交 Callable 任务(相比 Runnable 任务,它可以有返回值、可以抛出异常):

AbstractExecutorService 抽象类

AbstractExecutorService 抽象类实现了 ExecutorService 接口的 submit、invoke 系列方法;

ThreadPoolExecutor 类

ThreadPoolExecutor 核心类,创建自定义线程池就靠它了,下面是其主要方法:

构造函数详解
前三个构造函数都是调用的第四个构造函数进行初始化操作,各参数的作用:

  • corePoolSize:线程池中的核心线程数,也就是正式员工数量;
  • maximumPoolSize:线程池中能同时拥有的最大线程数(maximumPoolSize - corePoolSize = 临时线程数);
  • keepAliveTime:空闲线程的存活时间(默认针对临时线程);
  • unit:keepAliveTime 单位;
    • TimeUnit.NANOSECONDS纳秒
    • TimeUnit.MICROSECONDS微秒
    • TimeUnit.MILLISECONDS毫秒
    • TimeUnit.SECONDS
    • TimeUnit.MINUTES
    • TimeUnit.HOURS
    • TimeUnit.DAYS
  • workQueue:缓存任务的阻塞队列;
    • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列;
    • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列;
    • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列;
    • LinkedBlockingDeque:一个由链表结构组成的有界阻塞双端队列;
    • SynchronousQueue:一个不存储元素的无界阻塞(同步)队列;
    • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列;
    • DelayQueue:一个支持延时获取元素的无界阻塞队列;
  • threadFactory:创建线程的工厂;
  • handler:当 workQueue 已满,且线程数达 maximumPoolSize 时,拒绝新任务采取的策略;
    • ThreadPoolExecutor.AbortPolicy:(默认)丢弃新任务并抛出 RejectedExecutionException 异常(RT)
    • ThreadPoolExecutor.CallerRunsPolicy:让调用线程执行新任务
    • ThreadPoolExecutor.DiscardPolicy:丢弃新任务,不抛出异常
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃旧任务(最先提交却未得到执行的任务),然后重新尝试执行新任务

一个线程即一个执行流,一个任务即一个 Runnable/Callable 对象;

提交任务之后的流程
当试图通过 execute()、submit() 方法将一个任务添加到线程池中时,将按照如下顺序处理:

  1. 如果线程池中的线程数量少于 corePoolSize,即使线程池中有空闲线程,也会创建一个新线程来执行新添加的任务;
  2. 如果线程池中的线程数量为 corePoolSize,并且缓冲队列 workQueue 未满,则将新添加的任务放到 workQueue 中,等待线程池中的空闲线程按照 FIFO 原则依次从队列中取出任务并执行;
  3. 如果线程池中的线程数量为 corePoolSize,并且缓冲队列 workQueue 已满,则创建新的线程(临时工)来执行新添加的任务,直到线程池中的线程数达到 maximumPoolSize;
  4. 如果线程池中的线程数量达到 maximumPoolSize,则按照饱和策略进行处理,默认为丢弃任务并抛出 RejectedExecutionException RT异常;
  5. 当(临时工)线程在线程池中的空闲时间超过 keepAliveTime 后,该(临时工)线程将被自动结束,移出线程池,直到线程数恢复到 corePoolSize;

线程池并没有标记哪个线程是核心线程,哪个是非核心线程,线程池只关心核心线程的数量;

线程池工作流程简述
通俗解释,如果把线程池比作一个单位的话,corePoolSize 就表示正式工,线程就可以表示一个员工;
当我们向单位委派一项工作时,如果单位发现正式工还没招满,单位就会招个正式工来完成这项工作;

随着我们向这个单位委派的工作增多,即使正式工全部满了,工作还是干不完,那么单位只能按照我们新委派的工作按先后顺序将它们找个地方搁置起来,这个地方就是 workQueue,等正式工完成了手上的工作,就到这里来取新的任务;

如果不巧,年末了,各个部门都向这个单位委派任务,导致 workQueue 已经没有空位置放新的任务,于是单位决定招点临时工吧(临时工:又是我!);
临时工也不是想招多少就招多少,上级部门通过这个单位的 maximumPoolSize 确定了你这个单位的人数的最大值,换句话说最多招 maximumPoolSize - corePoolSize 个临时工;当然,在线程池中,谁是正式工,谁是临时工是没有区别的,完全同工同酬;

如果单位已经招了些临时工,但新任务没有继续增加,所以随着每个员工忙完手头的工作,都来 workQueue 领取新的任务;
随着各个员工齐心协力,任务越来越少,员工数没变,那么就必定有闲着没事干的员工;于是领导想了个办法,设定了 keepAliveTime,当空闲的员工在 keepAliveTime 这段时间还没有找到事情干,就被辞退啦,毕竟地主家也没有余粮啊!当然辞退到 corePoolSize 个员工时就不再辞退了,领导也不想当光杆司令啊;

如果单位招满了临时工,但新任务依然继续增加,线程池从上到下,从里到外真心忙的不可开交,阻塞队列也满了,只好拒绝上级委派下来的任务;怎么拒绝也是一门艺术哈;

预启动线程
在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用 prestartAllCoreThreads() 和 prestartCoreThread() 方法,从方法名字可以看出,是预创建线程的意思,即在没有任务到来之前,就创建 corePoolSize 个线程或 1 个线程;

keepAliveTime 超时
默认情况下,keepAliveTime 只在线程数大于 corePoolSize 时才会生效;但是如果调用了allowCoreThreadTimeOut(true)方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为 0;

阻塞队列
阻塞队列用来存储等待执行的任务;该参数很重要,会对线程池的运行过程产生很大的影响,一般而言,有以下几种选择:

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列;
  • LinkedBlockingQueue:基于链表结构的有界(默认为 Integer.MAX_VALUE)阻塞队列,吞吐量通常要高于 ArrayBlockingQueue;
  • SynchronousQueue:不存储元素的无界阻塞队列;每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,反之亦然;吞吐量通常要高于 LinkedBlockingQueue;

线程池的状态
线程池具有以下五种状态,当创建一个线程池时初始化状态为 RUNNING:
RUNNING:允许提交并处理任务,线程池新创建时的状态;
SHUTDOWN:不允许提交新的任务,调用 shutdown() 方法的状态;
STOP:不允许提交新的任务并向池中线程发送中断信号,调用 shutdownNow() 方法的状态;
TIDYING:所有任务都已执行完毕,池中工作的线程数为 0,等待执行 terminated() 钩子方法;
TERMINATED:terminated() 钩子方法执行完毕,线程池已完全关闭的状态;

默认线程工厂
在 ThreadPoolExecutor 的构造方法中,如果我们不指定线程工厂对象,那么它会使用 Executors.DefaultThreadFactory:

  • 创建的线程属于同一个线程组
  • 线程优先级均为 Thread.NORM_PRIORITY (5)
  • 线程名规则为 "pool-XXX-thread-xxx"
  • 创建的线程均不是守护线程

当然我们也可以使用自定义的线程工厂,只需实现 java.util.concurrent.ThreadFactory 接口:

例子,虽然 maximumPoolSize 为 Integer.MAX_VALUE,但是由于线程工厂中限制了最大线程数,因此新任务被拒绝:

我们先来看该线程池的相关参数:
1) 核心线程数为 0,临时线程数为 Integer.MAX_VALUE,因此理论可存在任意多个线程;
2) 临时线程超时时间为 15 秒,因为池内所有线程都是临时线程,因此它们都会被应用超时策略;
3) 因为任务队列为 SynchronousQueue(只要来任务就表示队列已满),因此随时都可能启动新线程;
4) 但是由于 MyThreadFactory 限制了最大线程数为 5,因此池中线程数超过该值后将启用拒绝策略。

线程池例子

ScheduledExecutorService 接口

ScheduledExecutorService 是 J.U.C 包提供的计划任务执行器,在这之前,我们通常使用 java.util.Timer 来执行计划任务;

  • Timer:优点是简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务;
  • ScheduledExecutorService:依赖于 JDK1.5 的线程池机制,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduedExecutor 才会真正启动一个线程,其余时间 ScheduledExecutor 都是在轮询任务的状态;

这里提到了 ScheduledFuture 接口,我们来看一下 ScheduledFuture 接口有什么东西:

ScheduledThreadPoolExecutor 类

实现了 ScheduledExecutorService 接口,是 ThreadPoolExecutor 的子类:

周期任务的例子
我们先来试试一次性任务,这个和 Linux 下的 at 很相似,只执行一次;

上面我们演示了 at,接下来我们再看一下 crontab,即周期性任务:

Executors 工具类

因为 ThreadPoolExecutor 的配置参数很多,对于不太熟悉线程池的人来说,要手动配置一个线程池并非易事;
因此 J.U.C 框架提供了 Executors 工具类,Executors 提供了一系列创建常见线程池的工厂方法,方便 Java 程序的编写;

CompletionService 接口

批量任务的执行方式
方式一:首先定义任务集合,然后定义 Future 集合用于存放执行结果,执行任务,最后遍历 Future 集合获取结果;
优点:可以依次得到有序的结果;
缺点:不能及时获取已完成的结果;

方式二:首先定义任务集合,通过 CompletionService 包装 Executor 来执行任务,然后调用其 take() 方法去取 Future 对象;
优点:能及时得到已完成的结果;
缺点:不能依次得到有序的结果;

在方式一中,从集合中遍历的每个 Future 对象并不一定处于完成状态,这时调用 get() 方法就会被阻塞住,所以后面的任务即使已完成也不能得到结果;
而方式二中,CompletionService 的实现是维护一个保存 Future 对象的 BlockingQueue,只有当这个 Future 对象状态是结束的时候,才会加入到这个 Queue 中,所以调用 take() 能从阻塞队列中拿到最新的已完成任务的结果;

方式一的例子:

CompletionService 接口
为了了解第二种方式,我们必须先来看看 CompletionService 接口:

ExecutorCompletionService 类

了解 CompletionService 接口后,我们来看一下它的实现类 - ExecutorCompletionService,用于包装一个 Executor:

这个类也很简单,就是实现了接口定义的方法而已,没有其它的了。我们来看看它的简单使用:

Fork/Join 并行框架

什么是 Fork/Join 框架
Fork/Join 框架是 Java7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

我们再通过 Fork 和 Join 这两个单词来理解下 Fork/Join 框架:Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算 1 + 2 + 3 + … + 10000,可以分割成 10 个子任务,每个子任务分别对 1000 个数进行求和,最终汇总这 10 个子任务的结果。Fork/Join 的运行流程图如下:
Fork/Join 运行流程图

工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其它队列里窃取任务来执行。工作窃取的运行流程图如下:
工作窃取算法

为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如 A 线程负责处理 A 队列里的任务。但是有的线程会先把自己队列里的任务干完,而其它线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其它线程干活,于是它就去其它线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

Fork/Join 框架的介绍
我们已经很清楚 Fork/Join 框架的需求了,那么我们可以思考一下,如果让我们来设计一个 Fork/Join 框架,该如何设计?这个思考有助于你理解 Fork/Join 框架的设计。

  • 第一步 - 分割任务:首先我们需要有一个 fork 类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
  • 第二步 - 执行任务并合并结果:分割的子任务分别放在双端队列里,然后启动几个线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join 框架的两个类
Fork/Join 使用两个类来完成以上两件事情:

  • ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
    • RecursiveAction:用于没有返回结果的任务;
    • RecursiveTask:用于有返回结果的任务。
  • ForkJoinPool:ForkJoinTask 需要通过 ForkJoinPool 来执行,ForkJoinPool 是 Executor 接口的一个实现类,ForkJoinPool 与普通线程池的区别在于”工作窃取算法”;当一个工作线程的队列里暂时没有任务时,它会随机从其它工作线程的队列的尾部获取任务并执行。

Fork/Join 继承类图
Fork/Join 框架继承类图

对于一般的 ForkJoin 任务,我们仅需重写 compute() 方法即可:
RecursiveTaskprotected abstract V compute()
RecursiveActionprotected abstract void compute()

几个需要注意的地方

  • 每个 task 应该执行相对少量的计算,一个非常粗略的经验是:每个 task 执行的基本运算步骤应该在 100 - 10000 之间。但是还是需要依据实际情况而定,不可一概而论。如果单个任务过大,并行性反而不高;如果单个任务过小,会带来较大的内存压力和内部队列的维护开销。
  • ForkJoinTask 最好不要执行 IO 操作以及其它可能阻塞的操作,因为 ForkJoin 框架的初衷是利用多核处理器来提高纯计算任务的执行效率,提高并行性。
  • ForkJoinPool 默认的线程数为当前主机可用的 CPU 核数,启用过少线程则无法充分利用多个 CPU 核心,启用过多线程则会带来额外的线程切换及线程维护开销,反而得不偿失,而启用 CPU 核心数个线程正好合适。

Executors 的相关工厂方法
ExecutorService newWorkStealingPool():新建”工作窃取线程池”,线程数为可用 CPU 核数;
ExecutorService newWorkStealingPool(int parallelism):新建”工作窃取线程池”,使用给定的线程数;

ForkJoin 例子
利用 ForkJoin 给大数组求和(PS:个人测试发现,对于这种”轻量级”计算任务,ForkJoin 的效率反而不高,甚至比单线程还低)

Lock、Condition

在 Java 5 中,专门提供了锁对象,利用锁可以方便的实现资源的封锁,用来控制对竞争资源并发访问的控制,这些内容主要集中在 java.util.concurrent.locks 包下面,里面有三个重要的接口 Lock、ReadWriteLock、Condition;

  • Lock:互斥锁,Lock 提供了比 synchronized 方法和语句块更广泛、更细粒度的锁定操作;
  • ReadWriteLock:读写锁,ReadWriteLock 分为读锁、写锁,它们是一个整体,读锁有任意多把,写锁只有一把,读锁和写锁不能同一时间锁定;
  • Condition:条件变量,Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用;

Lock 可以说是 synchronized 的一个替代品,synchronized 能做的事,lock 基本都可以做,而且能做得更好;它们的一些区别是:

  • Lock 在获取锁的过程可以被中断;
  • Lock 可以尝试获取锁,如果锁被其它线程持有,则返回 false,不会使当前线程阻塞;
  • Lock 在尝试获取锁的时候,传入一个时间参数,如果在这个时间范围内,没有获得锁,那么就终止请求;
  • synchronized 会自动释放锁,Lock 则不会自动释放锁,需要调用 unlock() 进行释放;

这样可以看到,Lock 比起 synchronized 具有更细粒度的控制;但是也不是说 Lock 就完全可以取代 synchronized,因为 Lock 的学习成本,复杂度等方面要比 synchronized 高,对于初级 Java 程序员,使用 synchronized 的风险要比 Lock 低;

Lock 锁相关的接口、类
所在的包:java.util.concurrent.locks
接口:Lock、ReadWriteLock、Condition;
实现:ReentrantLock、ReentrantReadWriteLock、ConditionObject;

Lock 接口

ReadWriteLock 接口

Condition 接口

ReentrantLock 类

ReentrantReadWriteLock 类

可重入锁是什么意思

ReentrantLock 中的 Reentrant 是可重入的意思;也就是说 ReentrantLock 是可重入锁,synchronized 也是可重入锁,读写锁类推;

可重入锁是什么意思
可重入锁,也叫做递归锁,指的是同一线程在外层函数获得锁之后,内层递归函数仍然有获取该锁的代码,但不受影响(可以立即获取,而不需要等待);
在 JAVA 环境下 ReentrantLock、ReentrantReadWriteLock 和 synchronized 都是可重入锁;虽是可重入的,但是 lock() 与 unlock() 必须成对存在。

可重入锁最大的作用是避免死锁

synchronized、Lock 对比

在 JDK1.5 中,synchronized 是性能低效的;因为这是一个重量级操作,它对性能最大的影响是阻塞的实现,挂起线程和恢复线程的操作都需要转入内核态中完成,这些操作给系统的并发性带来了很大的压力;

相比之下使用 Java 提供的 Lock 对象,性能更高一些;Brian Goetz 对这两种锁在 JDK1.5、单核处理器及双 Xeon 处理器环境下做了一组吞吐量对比的实验,发现多线程环境下,synchronized 的吞吐量下降的非常严重,而 ReentrantLock 则能基本保持在同一个比较稳定的水平上;

但与其说 ReetrantLock 性能好,倒不如说 synchronized 还有非常大的优化余地,于是到了 JDK1.6,发生了变化,对 synchronized 加入了很多优化措施,有自适应自旋,锁消除,锁粗化,轻量级锁,偏向锁等等;导致在 JDK1.6 上 synchronize 的性能并不比 Lock 差;

官方也表示,他们也更支持 synchronized,在未来的版本中还有优化余地,所以还是提倡在 synchronized 能实现需求的情况下,优先考虑使用 synchronized 来进行同步;

阻塞同步、非阻塞同步

互斥同步最主要的问题就是进行线程阻塞和唤醒所带来的性能问题,因而这种同步又称为阻塞同步,它属于一种悲观的并发策略,即线程获得的是独占锁;独占锁意味着其它线程只能依靠阻塞来等待线程释放锁;而在 CPU 转换线程阻塞时会引起线程上下文切换,当有很多线程竞争锁的时候,会引起 CPU 频繁的上下文切换导致效率很低;synchronized 采用的便是这种并发策略;

随着指令集的发展,我们有了另一种选择:基于冲突检测的乐观并发策略,通俗地讲就是先进性操作,如果没有其他线程争用共享数据,那操作就成功了,如果共享数据被争用,产生了冲突,那就再进行其它的补偿措施(最常见的补偿措施就是不断地重试,直到试成功为止),这种乐观的并发策略的许多实现都不需要把线程挂起,因此这种同步被称为非阻塞同步;ReetrantLock 采用的便是这种并发策略;

在乐观的并发策略中,需要操作和冲突检测这两个步骤具备原子性,它靠硬件指令来保证,这里用的是CAS操作(Compare and Swap);JDK1.5 之后,Java 程序才可以使用 CAS 操作;我们可以进一步研究 ReentrantLock 的源代码,会发现其中比较重要的获得锁的一个方法是 compareAndSetState,这里其实就是调用的 CPU 提供的特殊指令;现代的 CPU 提供了指令,可以自动更新共享数据,而且能够检测到其它线程的干扰,而 compareAndSet() 就用这些代替了锁定;这个算法称作非阻塞算法,意思是一个线程的失败或者挂起不应该影响其他线程的失败或挂起;

Java5 中引入了 AutomicInteger、AutomicLong、AutomicReference 等特殊的原子性变量类,它们提供的如:compareAndSet()、incrementAndSet() 和 getAndIncrement() 等方法都使用了 CAS 操作;因此,它们都是由硬件指令来保证的原子方法;

Lock 对象的使用方式

Lock 接口有 3 个实现它的类:ReentrantLock、ReetrantReadWriteLock.ReadLock 和 ReetrantReadWriteLock.WriteLock,即可重入锁、读锁和写锁;

为了保证锁最终一定会被释放(可能会有异常发生),要把互斥区放在 try 语句块内,并在 finally 语句块中释放锁,尤其当有 return 语句时,return 语句必须放在 try 字句中,以确保 unlock()不会过早发生,从而将数据暴露给第二个任务;

因此,采用 Lock 加锁和释放锁的一般形式如下:

synchronized、Lock 用途比较

基本语法上,ReentrantLock 与 synchronized 很相似,它们都具备一样的线程重入特性,只是代码写法上有点区别而已,一个表现为 API 层面的互斥锁(Lock),一个表现为原生语法层面的互斥锁(synchronized);ReentrantLock 相对 synchronized 而言还是增加了一些高级功能,主要有以下三项:

1) 等待可中断:当持有锁的线程长期不释放锁时,正在等待的线程可以选择放弃等待,改为处理其他事情;而在等待由 synchronized 产生的互斥锁时,会一直阻塞,是不能被中断的;
2) 可实现公平锁:多个线程在等待同一个锁时,必须按照申请锁的时间顺序排队等待,而非公平锁则不保证这点,在锁释放时,任何一个等待锁的线程都有机会获得锁;synchronized中的锁是非公平锁ReentrantLock默认情况下也是非公平锁,但可以通过构造方法ReentrantLock(ture)来要求使用公平锁
3) 锁可以绑定多个条件:ReentrantLock 对象可以同时绑定多个 Condition 对象,而在 synchronized 中,锁对象的 wait() 和 notify()/notifyAll() 方法可以实现一个隐含条件,但如果要和多于一个的条件关联的时候,就不得不额外地添加一个锁,而 ReentrantLock 则无需这么做,只需要多次调用 newCondition() 方法即可;而且我们还可以通过绑定 Condition 对象来判断当前线程通知的是哪些线程(即与 Condition 对象绑定在一起的其他线程);

Condition 的 await()、signal()、signalAll() 方法必须在配套的 Lock 的保护下进行,这点和 Object 的 wait()、notify()、notifyAll() 是一样的,否则抛出 IllegalMonitorStateException 异常!

不可中断锁、可中断锁

ReetrantLock 有两种锁:忽略中断锁(不可中断)和响应中断锁(可中断);忽略中断锁与 synchronized 实现的互斥锁一样,不能响应中断,而响应中断锁可以响应中断;Lock 对象获得响应中断锁的方法:lock.lockInterruptibly()

synchronized 不可中断的例子:

ReentrantLock 可中断锁,例子:

Condition 条件变量

Condition 对象需要通过 Lock.newCondition() 方法来获取,Condition 总是和 Lock 一起出现;
一个 Lock 可以有多个不同的 Condition,它们之间互不影响,而 wait()、notify()/notifyAll() 不能;

一个简单的生产者消费者例子:

从这个例子中并不能看出 Condition 相比 Object 的 wait()、notify/notifyAll() 方法有什么优势;
但在处理复杂多线程问题时,条件变量的优势就体现出来了,因此要具体情况具体分析,不能一概而论。

Java 中的各种锁总结

公平锁/非公平锁
公平锁:严格按照先来先得的顺序排队等待去获取锁;
非公平锁:每次获取锁时,先直接尝试获取锁,若获取不到再按照先来先得的顺序排队等待。

对于 ReentrantLock 而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁;非公平锁的优点在于吞吐量比公平锁大;
对于 synchronized 而言,也是一种非公平锁;由于其并不像 ReentrantLock 是通过 AQS 的来实现线程调度,所以并没有任何办法使其变成公平锁。

可重入锁
可重入锁:又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁;

对于 ReentrantLock 而言,从名字就可以看出是一个可重入锁,其名字是 Re entrant Lock;
对于 synchronized 而言,也是一个可重入锁;可重入锁的一个好处是可一定程度避免死锁。

独享锁/共享锁
独享锁:是指该锁同时只能被一个线程持有;
共享锁:是指该锁可同时被多个线程所持有。

对于 ReentrantLock、synchronized 而言,都是独享锁;
对于 ReadWriteLock 而言,其读锁是共享锁,其写锁是独享锁。

互斥锁/读写锁
上面讲的独享锁/共享锁就是一种广义的说法,互斥锁/读写锁就是具体的实现;
互斥锁:在 Java 中的具体实现就是 ReentrantLock、synchronized;
读写锁:在 Java 中的具体实现就是 ReentrantReadWriteLock。

自旋锁
自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗 CPU。

乐观锁/悲观锁
乐观锁与悲观锁不是指具体的什么类型的锁,而是指看待并发同步的角度。

  • 悲观锁:假定会发生并发冲突,屏蔽一切可能违反数据完整性的操作
    悲观锁认为对于同一个数据的并发操作,一定是会发生修改的,哪怕没有修改,也会认为修改;因此对于同一个数据的并发操作,悲观锁采取加锁的形式;悲观的认为,不加锁的并发操作一定会出问题;
  • 乐观锁:假定不会发生并发冲突,只在提交操作时检测是否违反数据完整性。(使用版本号或者时间戳等来配合实现)
    乐观锁则认为对于同一个数据的并发操作,是不会发生修改的;在更新数据的时候,会采用尝试更新,不断重新的方式更新数据;乐观的认为,不加锁的并发操作是没有事情的。

从上面的描述我们可以看出,悲观锁适合写操作非常多的场景,乐观锁适合读操作非常多的场景;
悲观锁有:内置锁之重量级锁(互斥量);
乐观锁有:内置锁之偏向锁、内置锁之轻量级锁、J.U.C 的 ReentrantLock、CAS 原子类等(主要基于 CAS)。

偏向锁/轻量级锁/重量级锁
它们是指的 Java 内置锁的不同状态,在 JDK1.6 后,JVM 对 synchronized 进行了大量的优化,引入了分级锁的概念,性能得到很大的提升,因此在 JDK1.6 之后请优先考虑使用 synchronized 内置锁,性能不比 ReentrantLock 差。

锁的信息存放在哪
锁的信息存放在对象头。在 32 位和 64 位下,对象头的三部分内容是:
Java 对象头

Mark Word 结构(32 位 JVM,锁的不同状态)
Mark Word 结构

注意,启用偏向锁和未启用偏向锁时的对象头 Mark Word 初始状态是不一样的:

  • 启用偏向锁:对象创建时 Mark Word 就是上图中的”偏向锁状态”,持有锁的线程 ID 为 0;
  • 禁用偏向锁:对象创建时 Mark Word 就是上图中的”未锁定状态”,存储的是对象的哈希值。

启用偏向锁的情况下,如果 Mark Word 的锁状态为偏向锁,并且线程 ID 为 0,那么我们将其称为可偏向状态

在 JDK1.6 之后(含),偏向锁默认启用,但是默认有 4000 毫秒的启动延迟(用于判断是否存在锁竞争):

  • 启用偏向锁:-XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0(消除延迟);
  • 禁用偏向锁:-XX:-UseBiasedLocking(默认启用偏向锁);

当禁用偏向锁时,锁状态根据竞争激烈程度从弱到强分别为:无锁状态 -> 轻量级锁 -> 重量级锁。
当启用偏向锁时,锁状态根据竞争激烈程度从弱到强分别为:可偏向状态 -> 偏向锁 -> 轻量级锁 -> 重量级锁;

注意,锁只能升级,不能降级。因为锁降级的效率较低,如果频繁升降级的话对 JVM 性能会造成影响。

使用分级锁的原因:避免直接使用互斥量(即重量级锁)进行线程同步,减少线程频繁挂起和恢复带来的上下文切换开销

CAS 的概念
CAS,Compare And Swap,即比较并交换。是原子操作的一种,是 CPU 的一个指令。不要与锁的概念混淆了,CAS 不是锁,它可以用来实现锁。不要将它们等价。

还有,后面会经常提到”CAS操作成功”、”CAS操作失败”,什么意思呢?

  • CAS 成功:即我们提供的预期值与内存中的实际值一致,成功将数据替换为了新值;
  • CAS 失败:即我们提供的预期值与内存中的实际值不一致,导致实际数据未被替换。

各种锁的适用情形

  • 偏向锁:只有一个线程进入临界区;
  • 轻量级锁:多个线程交替进入临界区(允许短暂自旋);
  • 重量级锁:多个线程同时进入临界区(持锁时间较长);

偏向锁的获取过程

  1. 先检查是否为可偏向状态,即锁标志位为 01 且偏向锁标志位为 1;
  2. 如果是可偏向状态,则测试线程 ID 是否指向当前线程,如果是则执行步骤 5,否则执行步骤 3;
  3. 如果线程 ID 未指向当前线程,则尝试通过 CAS 置换为当前线程 ID,如果成功则执行步骤 5,否则执行步骤 4;
  4. 如果置换线程 ID 失败,说明此时已发生竞争,当到达全局安全点(safepoint)时获得偏向锁的线程将被挂起,偏向锁升级为轻量级锁,然后被挂起的线程继续执行剩下的同步代码(撤销偏向锁时发生 stop-the-world);
  5. 此时当前线程已持有偏向锁,于是开始执行临界区的代码。

偏向锁的释放过程
当线程持有偏向锁后,并不会主动去释放偏向锁,只有当其它线程尝试竞争锁时才会发生偏向锁撤销。具体的撤销细节在上面的步骤 4 中有说到。因为存在 STW,虽然很短暂,但是如果频繁出现,会对性能产生较大影响。

轻量级锁的获取过程

  1. 如果对象为无锁状态,即锁标志位为 01 且偏向锁标志位为 0;那么将在当前线程栈中创建锁记录(Lock Record)空间,用于存放当前对象头的 Mark Word 拷贝(称为 Displaced Mark Word);
  2. 拷贝当前对象头的 Mark Word 字段至锁记录空间;
  3. 接着尝试使用 CAS 将当前对象头的锁记录指针指向当前线程栈中的锁记录空间,并将当前锁记录的 owner 指针指向对象头的 Mark Word,如果成功则执行步骤 4,否则执行步骤 5;
  4. 如果这个 CAS 置换动作成功了,那么当前线程就拥有了这个对象的锁,并且对象的 Mark Word 的锁标志位设为 00,表示此对象处于轻量级锁定状态;
  5. 如果这个 CAS 置换动作失败了,首先会检查对象的 Mark Word 的锁记录指针是否已指向当前线程栈中的锁记录空间,如果是则说明已获取锁,可以进入同步块;否则说明存在锁竞争,此时当前线程会自旋一段时间,如果获取到了锁则不进行锁升级,否则轻量级锁会膨胀为重量级锁。

轻量级锁的释放过程

  1. 通过 CAS 操作尝试把线程中复制的 Displaced Mark Word 替换对象当前的 Mark Word;
  2. 如果替换成功,整个同步过程就完成了;
  3. 如果替换失败,说明此时已不是轻量级锁定状态,已经膨胀为重量级锁;那就要在释放锁的同时,唤醒被挂起的线程。

轻量级锁什么情况下升级至重量级锁
轻量级锁认为竞争存在,但是竞争的程度很轻,一般多个线程对于同一个锁的操作都会错开,或者说稍微等待一下(自旋),另一个线程就会释放锁。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁就会膨胀为重量级锁,重量级锁使除了拥有锁的线程以外的线程都阻塞,防止 CPU 空转。

偏向锁是为了在只有一个线程执行同步块时提高性能轻量级锁是为了在多个线程交替执行同步块时提高性能

重量级锁
重量级锁是使用底层操作系统提供的 Mutex Lock 来实现的,当线程尝试获取锁失败时,该线程就会被挂起。线程之间的切换需要从用户态转换到内核态,这个成本非常高,状态之间的转换需要相对比较长的时间,这就是为什么 synchronized(JDK1.6 之前)效率低的原因。因此,我们将这种依赖于操作系统互斥量实现的锁称为”重量级锁”。JDK 中对 synchronized 做的种种优化,其核心都是为了减少这种重量级锁的使用。也就是使用我们前面说的”偏向锁”、”轻量级锁”,只有在不得已的情况下才会动用”重量级锁”。

synchronized 获取锁的过程

  • 检测 Mark Word 里面是不是当前线程的 ID,如果是则表示当前线程处于偏向锁状态;
  • 如果不是,则使用 CAS 将它置换为当前线程 ID,如果成功则表示当前线程获得偏向锁;
  • 如果失败,则说明发生竞争,撤销偏向锁,进而升级为轻量级锁;
  • 然后使用 CAS 将 Mark Word 的锁记录指针指向当前线程栈中的锁记录,如果成功,则获得轻量级锁;
  • 如果失败,表示其它线程竞争锁,当前线程便尝试使用自旋来获取锁;
  • 如果自旋成功则依然处于轻量级状态;
  • 如果自旋失败,则升级为重量级锁。

JVM 的其它优化手段
1、适应性自旋(Adaptive Spinning):从轻量级锁获取的流程中我们知道,当线程在获取轻量级锁的过程中执行 CAS 操作失败时,是要通过自旋来获取轻量级锁的。问题在于,自旋是需要消耗 CPU 的,如果一直获取不到锁的话,那该线程就一直处在自旋状态,白白浪费 CPU 资源。因此 JDK 采用了聪明的方式 - 适应性自旋,基本认为一个线程上下文切换的时间是最佳的一个时间,同时 JVM 还会针对当前 CPU 的负荷情况做优化。

2、锁粗化(Lock Coarsening):锁粗化的概念应该比较好理解,就是将多次连接在一起的加锁、解锁操作合并为一次,将多个连续的锁扩展成一个范围更大的锁。举个例子:

因为 StringBuffer.append() 是同步方法,因此调用三次就意味着要连续加锁解锁三次,并且中间没有任何其它代码。如果 JVM 检测到有一系列连串的对同一个对象加锁和解锁操作,就会将其合并成一次范围更大的加锁和解锁操作,即在第一次 append() 方法时进行加锁,最后一次 append() 方法结束后进行解锁。

3、锁消除(Lock Elimination):锁消除即删除不必要的加锁操作。根据代码逃逸技术,如果判断到一段代码中,堆上的数据不会逃逸出当前线程,那么可以认为这段代码是线程安全的,不必要加锁。看下面这段程序:

在运行这段代码时,JVM 可以明显检测到变量 vector 没有逃逸出方法 vectorTest() 之外(意思就是说在该方法外不可能拿到 vector 对象的引用),所以 JVM 可以大胆地将 vector 内部的加锁操作消除。

编码上的优化
我们不能依赖 JVM 的锁优化手段,因为你不能保证 JVM 能理解你的烂代码而去采取优化手段,我们必须显式的帮助 JVM 去优化代码。

1、减少锁的持有时间:只在必要的时候使用锁,不要随意放大同步代码块的范围,比如:

2、避免频繁加/解同一把锁:当然大多数情况下我们自己是一清二楚的,是否存在竞争、变量是否逃逸等等,我们不会笨到去频繁加锁解锁。但是有时候这个不是我们能够控制的,在使用 Java 类库的时候,很多线程安全的类都存在隐式的加锁、解锁,比如 StringBuffer.append() 方法。这个只能交给 JVM 去发现了,我们无能为力。

3、锁分离:比如在读多写少的情况下考虑使用 J.U.C 的 ReadWriteLock 读写锁,来提高性能、吞吐量。当然读写锁不只用在表面的”读和写”,只要是操作互不影响,就可以利用读写分离思想。

三种内置锁的对比

锁类型 优点 缺点 适用场景
偏向锁 加锁和解锁过程不需要额外的消耗,和执行非同步代码块仅存在纳秒级别的差距 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 适用于只有一个线程访问同步块的场景
轻量级锁 竞争的线程不会阻塞,而是采取自适应自旋的方式等待,提高了程序的响应速度 如果参与竞争的线程始终无法得到锁,那么自旋会白白浪费 CPU 资源 适用于追求响应时间,但要求同步块的执行速度非常快,避免因自旋导致 CPU 的持续空转
重量级锁 线程竞争不使用自旋,如果竞争不到锁,线程将会被挂起(休眠),并释放 CPU 资源 如果参与锁竞争失败则线程因为线程调度而挂起,但是频繁的上下文切换带来的开销很大且响应时间缓慢 适用于同步块的执行时间较长的情况

Synchronizer 同步器

J.U.C 中的同步器主要用于协助线程同步,有以下四种:
1) 闭锁 CountDownLatch
2) 栅栏 CyclicBarrier
3) 信号量 Semaphore
4) 交换器 Exchanger

CountDownLatch 闭锁

闭锁的作用:允许一个或多个线程等待,直到在其它线程中执行的一组操作完成

CountDownLatch 用给定的计数初始化,线程调用 await() 方法后将被阻塞,直到当前计数由于调用 countDown() 方法而达到零,在此之后所有等待的线程被释放,并且任何后续的调用立即返回。这是一次性现象,即计数不能被重置。

例子,我、爸爸、妈妈,一家三口人,约定今天晚上一起去饭店吃饭:

CyclicBarrier 栅栏

CyclicBarrier 和 CountDownLatch 有点类似,但是又有点不一样:栅栏是多个线程互相等待,直到全部线程都到齐,等待的线程才会继续运行

并且,栅栏是可循环利用的,从它的名字 Cyclic 也看得出,而闭锁只能使用一次。

同时,栅栏还支持一个可选的 Runnable 任务,该任务将会被最后一个到达的线程执行,执行完该任务后所有的线程才会被释放,这个特性对更新共享状态是很有用的。

例子,A、B、C、D、E 五个人,参加 100 米短跑比赛:

Exchanger 交换器

Exchanger:用来给两个线程互换数据的交换器,可以理解为 SynchronousQueue 同步队列的双向形式。

例子,线程 A、B 互换数据:

Semaphore 信号量

计数信号量,从概念上讲,信号量拥有一套许可证。使用 acquire() 方法申请许可证,使用完后调用 release() 方法归还许可证。在 Semaphore 的构造函数中可以指定一个数值,表示可用的许可证数量。

如果许可证数量为 1,则可以作为互斥锁使用。我们把拥有一个许可证的信号量称为二进制信号量,因为它只有两个状态,资源可用,资源不可用。

信号量除了作为互斥锁使用,还常用于实现资源池,如数据库连接池、线程池。

例子,作为互斥锁来使用:

Atomic 原子变量

原子变量主要是方便程序员在多线程环境下,无锁的进行原子操作。原子类是sun.misc.Unsafe类的包装类,其核心操作是 CAS 原子操作。所谓 CAS 原子操作,即 Compare And Swap,指的是将预期值与内存中的实际值进行比较(Compare),如果相等则使用新值替换(Swap)当前的值,否则不作操作。而这个比较并交换的过程是不可被外部因素打断的,这就是其原子性的体现。

原子变量的底层使用了处理器提供的原子指令,但是不同的 CPU 架构可能提供的原子指令不一样,也有可能需要某种形式的内部锁,所以该方法不能绝对保证线程不被阻塞。

原子性:指一个操作是按原子的方式执行的。要么该操作不被执行;要么以原子方式执行,即执行过程中不会被其它线程中断。

  • 除了longdouble外的所有类型(基本类型、引用类型),读取和写入操作都是原子的;
  • 对于声明为volatile的所有变量(包括longdouble),读取和写入操作都是原子的。

longdouble的非原子性协定
对于 64 位的数据,如 long 和 double,Java 内存模型规范允许虚拟机将没有被 volatile 修饰的 64 位数据的读写操作划分为两次 32 位的操作来进行,即允许虚拟机实现选择可以不保证 64 位数据类型的 load、store、read 和 write 这四个操作的原子性;即如果有多个线程共享一个并未声明为 volatile 的 long 或 double 类型的变量,并且同时对它们进行读取和修改操作,那么某些线程可能会读取到一个既非原值,也不是其他线程修改值的代表了“半个变量”的数值。

但由于目前各种平台下的商用虚拟机几乎都选择把 64 位数据的读写操作作为原子操作来对待,因此在编写代码时一般也不需要将用到的 long 和 double 变量专门声明为 volatile

因此我们可以认为所有类型的变量(基本类型、引用类型)的读写操作都是原子的;但类似于i++volatile_var++这种复合操作并不具备原子性

在 Atomic 包中一共有 12 个类(JDK1.8 中又增加了 4 个类,稍后介绍),四种原子更新方式,分别是原子更新基本类型原子更新数组原子更新引用原子更新字段;Atomic 包里的类基本都是使用 Unsafe 实现的包装类;

原子更新基本类型

AtomicBoolean:布尔型;
AtomicInteger:整型;
AtomicLong:长整型;

例子:

AtomicBoolean 类

AtomicInteger 类

AtomicLong 类

原子更新数组

AtomicIntegerArray:整型数组;
AtomicLongArray:长整型数组;
AtomicReferenceArray<E>:引用类型数组(存在 ABA 问题);

AtomicIntegerArray 类

AtomicLongArray 类

AtomicReferenceArray 类

原子更新引用

AtomicReference<V>,存在 ABA 问题;
AtomicStampedReference<V>,使用整型标记避免 ABA 问题;
AtomicMarkableReference<V>,使用布尔标记避免 ABA 问题;

ABA 问题是什么
原子更新数组 中,我也提到了 AtomicReferenceArray 存在 ABA 问题。那么什么是 ABA 问题呢?

ABA 不是什么英文的缩写,应该理解为 A -> B -> A 问题。我还是以 C++ 代码来模拟 A-B-A 问题吧:

假设存在两个线程 T1、T2,线程 T1 先执行语句 (a),将 1 置换为了 2,接着又执行语句 (b),将 2 置换回 1;然后线程 T2 执行语句 (c),发现当前的值为 1,于是又将 1 置换为 3。

从上面的描述中并未发现任何问题,是的,对于基本类型来说的确没有问题,因为我们关心的只是值本身而已。但是如果是引用类型就有问题了,因为 CAS 判断是仅仅是内存地址,如果这个地址被重用了呢,CAS 根本发现不了,地址还是那个地址,但是对象已经完全不同了(地址被重用是有可能发生的,一个内存被释放后,再分配,很有可能还是原来的地址),这就是所谓的”你大爷还是你大爷,你大妈已经不是你大妈了”。

那么有什么办法解决 ABA 问题呢?利用 AtomicStampedReference、AtomicMarkableReference 原子类:

  • AtomicStampedReference,从名字看的出来,每次使用 CAS 更新后,都给对象盖个戳(使用 int 来计数);
  • AtomicMarkableReference,从名字也看的出来,只要使用 CAS 更新过,就给对象打上布尔标记(如 false)。

举个通俗的例子:你倒了一杯水放在了桌子上,然后你因为临时有点事走开了。这时你的同事把你的水给喝了然后又给倒满了。等你回来,发现那杯水还在,于是你拿起来就喝。如果你不关心这杯水是否被别人喝过,只关心水是否还在,那么就不存在 ABA 问题。如果你是一个讲卫生的小伙子,不但关心水是否还在,还关心这杯水是否被人喝过,于是你放了个纸条在水的旁边,别人喝过水后都会在纸条上加个 1,表示被喝过一次,这就是 AtomicStampedReference 的策略;而如果是 AtomicMarkableReference,就是只关心这杯水是否被人喝过,而不关心它被喝过几次。

AtomicReference 类

AtomicStampedReference 类

AtomicMarkableReference 类

原子更新字段

AtomicIntegerFieldUpdater<T>,整型字段更新器;
AtomicLongFieldUpdater<T>,长整型字段更新器;
AtomicReferenceFieldUpdater<T, V>,引用字段更新器(存在 ABA 问题);

被更新的字段须被volatile修饰,并且确保该字段的访问性,最好为public

例子:

AtomicIntegerFieldUpdater 抽象类

AtomicLongFieldUpdater 抽象类

AtomicReferenceFieldUpdater 抽象类

JDK1.8 新增类

在 Java 8 中,Doug Lea 大神又添加了LongAdderLongAccumulatorDoubleAdderDoubleAccumulator四个类。

LongAdder是 JDK1.8 提供的累加器,基于 Striped64 实现。它常用于状态采集、统计等场景。AtomicLong 也可以用于这种场景,但在线程竞争激烈的情况下,LongAdder 要比 AtomicLong 拥有更高的吞吐量,但会耗费更多的内存空间。
LongAccumulatorLongAdder类似,也基于 Striped64 实现。但要比 LongAdder 更加灵活(要传入一个函数接口),LongAdder 相当于 LongAccumulator 的一种特例。

原有的 Atomic 系列类通过 CAS 来保证并发时操作的原子性,但是高并发也就意味着 CAS 的失败次数会增多,失败次数的增多会引起更多线程的重试,最后导致 AtomicLong 的效率降低。新的四个类通过减少并发,将单一 value 的更新压力分担到多个 value 中去,降低单个 value 的“热度”以提高高并发情况下的吞吐量。

例子:

LongAdder 类

LongAccumulator 类

DoubleAdder 类

DoubleAccumulator 类

BlockingQueue 阻塞队列

阻塞队列提供了可阻塞的入队和出队操作:如果队列满了,入队操作将阻塞直到有空间可用,如果队列空了,出队操作将阻塞直到有元素可用。

如果你想避免使用错综复杂的 wait–notify 语句,BlockingQueue 非常有用;BlockingQueue 的典型用途就是解决生产者-消费者问题。
任何有效的生产者-消费者问题解决方案都是通过控制生产者 put() 方法(生产资源)和消费者 take() 方法(消费资源)的调用来实现的,一旦你实现了对方法的阻塞控制,那么你将解决该问题。

阻塞队列的实现原理也很简单,就是利用 ReentrantLock、Condition,将那些错综复杂的 await-signal 语句隐藏在内部,好让我们专注于实际问题,而不用考虑这些乱七八糟的东西。

BlockingQueueQueue的子接口,同时BlockingQueue还有两个子接口:BlockingDequeTransferQueue;因此,它们三个都是 Java 集合框架的一员。

  • BlockingDeque同时还继承了Deque接口,也就是双端阻塞队列,可以当作 BlockingStack 阻塞栈来使用。
  • TransferQueue被称为传递队列;对于阻塞队列:当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素但队列为空时,消费者会被阻塞;而 TransferQueue 则更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事);新添加的 transfer() 方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程 transfer 到另一个线程的过程中,它有效地实现了元素在线程之间的传递(以建立 Java 内存模型中的 happens-before 关系的方式)。

第一次看到 TransferQueue 时,首先想到了已有的阻塞队列实现类:SynchronousQueue;SynchronousQueue 是利用容量为 0 这个特殊限制来实现元素在线程之间传递的。因为只要有生产者向队列添加元素,队列就是满状态,导致生产者被阻塞直到消费者的到来;只要有消费者从队列移除元素,队列就是空状态,导致消费者被阻塞直到生产者的到来。

BlockingQueue 特点

  • BlockingQueue 不接受 null 元素,否则将抛出 NullPointerException 异常;因为 null 值在内部被用作标记值来指示轮询操作的失败;
  • BlockingQueue 可能是容量有限的(如 ArrayBlockingQueue),而无界阻塞队列总是报告 Integer.MAX_VALUE 的剩余容量;
  • BlockingQueue 主要被用于解决”生产者-消费者问题”,虽然它是集合框架的一员,但是像 remove(e) 一类的方法通常不是非常高效,要尽量少用;
  • BlockingQueue 是线程安全的,然而,批量操作 addAll()、containsAll()、retainAll()、removeAll() 不一定是按照原子方式执行的,除非在实现中另有说明。

阻塞队列的主要实现类有 7 个:

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列(长度不可变);
  • LinkedBlockingQueue:基于链表结构的有界阻塞队列(默认容量 Integer.MAX_VALUE);
  • LinkedTransferQueue:基于链表结构的无界阻塞/传递队列;
  • LinkedBlockingDeque:基于链表结构的有界阻塞双端队列(默认容量 Integer.MAX_VALUE);
  • SynchronousQueue:不存储元素的阻塞/传递队列;
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列;
  • DelayQueue:支持延时获取元素的无界阻塞队列。

阻塞队列方法摘要
阻塞队列方法摘要

BlockingQueue 接口

BlockingDeque 接口

BlockingDeque 方法摘要
BlockingDeque 接口 - 方法预览

BlockingQueue 与 BlockingDeque 方法对比
BlockingDeque vs BlockingQueue

TransferQueue 接口

ArrayBlockingQueue 类

LinkedBlockingQueue 类

SynchronousQueue 类

PriorityBlockingQueue 类

DelayQueue 类

LinkedBlockingDeque 类

LinkedTransferQueue 类

阻塞队列应用例子

同步容器与并发容器

同步容器
在 JDK1.5 之前,Java 提供的主要同步容器有:

  • VectorStackHashtable
  • Collections.synchronizedXXX()

这些同步容器的线程安全是指单个操作是线程安全的,而复合操作不是线程安全的!比如迭代操作就是典型的复合操作,它不是线程安全的。
最主要的是,同步容器一般都是使用 synchronized 进行同步,有的甚至每个公共方法都使用 synchronized 同步,严重降低了并发性。

并发容器
这里主要提这两种并发容器:ConcurrentHashMapCopyOnWriteArrayList

ConcurrentHashMap
ConcurrentHashMap 与 HashMap、Hashtable 之间的对比:

  • Hashtable、ConcurrentHashMap 线程安全,HashMap 非线程安全;
  • Hashtable 的线程安全仅仅是指单个操作是线程安全的,而复合操作不是(如迭代操作);
  • ConcurrentHashMap 的迭代操作是线程安全的,并且支持在迭代的同时修改映射,但不保证立即可见;
  • HashMap 允许 null 键和 null 值;Hashtable、ConcurrentHashMap 均不允许 null 键和 null 值,因为有些情况下意义不明确;
  • HashMap 如果在迭代期间进行修改操作,则会触发快速失败机制,抛出 ConcurrentModificationException 异常;Hashtable 的迭代器是强一致性的,意味着修改操作对后续迭代操作立即可见;ConcurrentHashMap 的迭代器是弱一致性的,意味着修改操作对后续迭代操作不保证立即可见(尽量保证立即可见)。

迭代器的弱一致性
ConcurrentHashMap 的弱一致性主要是为了提升效率,是一致性与效率之间的一种权衡。要成为强一致性,就得到处使用锁,甚至是全局锁,这就与 Hashtable 和同步的 HashMap 一样了。

在 Java7 中,采用分段锁机制,理论最大并发数与 Segment 个数相等。Java7 中的 ConcurrentHashMap 的底层数据结构仍然是数组和链表。与 HashMap 不同的是,ConcurrentHashMap最外层不是一个大的数组,而是一个 Segment 的数组。每个 Segment 包含一个与 HashMap 数据结构差不多的链表数组。整体数据结构如下图所示:
Java7 ConcurrentHashMap 数据结构

在 Java8 中,为了进一步提高性能,摒弃了分段锁机制,采用更高效的 CAS 操作。底层与同期的 HashMap 一样,都是”数组 + 链表 + 红黑树”。当链表长度超过一定阈值(8)时将链表(寻址时间复杂度为 O(N))转换为红黑树(寻址时间复杂度为 O(log(N)))。整体数据结构如下图所示:
Java8 ConcurrentHashMap 数据结构

CopyOnWriteArrayList
CopyOnWrite写时复制,简称 COW。COW 是一种用于程序设计中的优化策略。其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容 Copy 出去形成一个新的内容然后再改,这是一种延时懒惰策略。从 JDK1.5 开始 Java 并发包里提供了两个使用 CopyOnWrite 机制实现的并发容器,它们是 CopyOnWriteArrayList 和 CopyOnWriteArraySet。CopyOnWrite 容器非常有用,可以在非常多的并发场景中使用到。

什么是 CopyOnWrite 容器
CopyOnWrite 容器即写时复制容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后往新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以 CopyOnWrite 容器也是一种读写分离的思想,读和写不同的容器。

CopyOnWrite 容器的应用场景
CopyOnWrite 并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景等等。

ConcurrentMap 接口

ConcurrentMap 是 Map 的子接口,关于 Map 接口的内容请查看 - Java 集合框架 - Map

ConcurrentNavigableMap 接口

ConcurrentNavigableMap 是 ConcurrentMap、NavigableMap 的子接口,支持一系列的导航方法,是一个有序的 Map。

ConcurrentHashMap 类

相关说明:

  • ConcurrentHashMap 不允许 null 键以及 null 值。HashMap 允许。
  • ConcurrentHashMap 的迭代器(iterators 和 spliterators)是”弱一致”的。

构造函数:

其它方法具体请查看 - Javadoc - ConcurrentHashMap

ConcurrentSkipListMap 类

相关说明:

  • 与大多数并发集合一样,该类不允许 null 键或 null 值,因为一些 null 返回值的意义很不清晰。
  • ConcurrentSkipListMap 实现了 ConcurrentNavigableMap 接口,因此这是一个有序的 Map 映射。
  • ConcurrentSkipListMap 的迭代器(Iterators 和 spliterators)是”弱一致”的。

构造函数:

其它方法具体请查看 - Javadoc - ConcurrentSkipListMap

ConcurrentSkipListSet 类

相关说明:

  • ConcurrentSkipListSet 底层使用 ConcurrentSkipListMap 存储元素,同时实现了 NavigableSet 接口。
  • 批量操作(addAll、removeAll、retainAll、containsAll、equals、toArray)不保证以原子方式执行。
  • 与大多数并发集合一样,该类不允许使用 null 元素,因为对于某些 null 返回值的方法很难明确其意义。
  • ConcurrentSkipListSet 的迭代器(Iterators 和 spliterators)是”弱一致”的。

构造函数:

其它方法具体请查看 - Javadoc - ConcurrentSkipListSet

CopyOnWriteArrayList 类

相关说明:

  • ArrayList 的线程安全版本(不同于 Vector),所有的修改操作都是通过创建底层数组的新副本来实现的。
  • 因为是 CopyOnWrite 机制,因此这类集合不适用于写多读少的场景,特别是底层数组很大的时候,特别的慢。
  • 迭代器保证不抛出 ConcurrentModificationException,因为修改的数组和正在被遍历的数组不是同一个数组。
  • 迭代器本身的元素更改操作(remove()、set()、add())不受支持,会抛出 UnsupportedOperationException 异常。
  • 与其它的并发容器不同,CopyOnWriteArrayList 允许存在包括 null 在内的所有元素。

构造函数:

  • public CopyOnWriteArrayList():空 list
  • public CopyOnWriteArrayList(Collection<? extends E> c):拷贝构造
  • public CopyOnWriteArrayList(E[] toCopyIn):拷贝构造

其它方法具体请查看 - Javadoc - CopyOnWriteArrayList

CopyOnWriteArraySet 类

相关说明:

  • CopyOnWriteArraySet 内部使用 CopyOnWriteArrayList 存储元素,因此 CopyOnWriteArrayList 的特性在此依旧适用。
  • 因为是写时复制型容器,因此只适用于读多写少的应用场景,毕竟每次修改操作都会进行一次 Copy,开销是比较昂贵的。

构造函数:

  • public CopyOnWriteArraySet():空 set
  • public CopyOnWriteArraySet(Collection<? extends E> c):拷贝构造

其它方法具体请查看 - Javadoc - CopyOnWriteArraySet

ConcurrentLinkedQueue 类

相关说明:

  • 基于链表结构的大小无限制的线程安全队列(FIFO,先进先出),与其它并发集合一样,不允许 null 元素。
  • 批量操作(addAll、removeAll、retainAll、containsAll、equals、toArray)不保证以原子方式执行。
  • ConcurrentLinkedQueue 的迭代器(Iterators 和 spliterators)是”弱一致”的。

构造函数:

  • public ConcurrentLinkedQueue():空 queue
  • public ConcurrentLinkedQueue(Collection<? extends E> c):拷贝构造

其它方法具体请查看 - Javadoc - ConcurrentLinkedQueue

ConcurrentLinkedDeque 类

相关说明:

  • 基于链表结构的大小无限制的线程安全双端队列(可在两端进行插入和删除),与其它并发集合一样,不允许 null 元素。
  • 批量操作(addAll、removeAll、retainAll、containsAll、equals、toArray)不保证以原子方式执行。
  • ConcurrentLinkedDeque 的迭代器(Iterators 和 spliterators)是”弱一致”的。

构造函数:

  • public ConcurrentLinkedDeque():空 deque
  • public ConcurrentLinkedDeque(Collection<? extends E> c):拷贝构造

其它方法具体请查看 - Javadoc - ConcurrentLinkedDeque

TimeUnit 枚举

TimeUnit 表示给定粒度单位的持续时间,并提供跨设备转换的实用方法,可在这些时间单元中执行计时和延迟操作。

时间单位的转换

  • 1 天 -> 24 小时
  • 1 小时 -> 60 分钟
  • 1 分钟 -> 60 秒钟
  • 1 秒钟 -> 1000 毫秒
  • 1 毫秒 -> 1000 微秒
  • 1 微妙 -> 1000 纳秒