当前位置: 代码迷 >> 综合 >> CONCURRENT—JDK工具篇上
  详细解决方案

CONCURRENT—JDK工具篇上

热度:108   发布时间:2023-10-09 23:36:15.0

CONCURRENT—JDK工具篇上

第??章 线程池原理

为什么要使?线程池

  • 创建/销毁线程消耗系统资源,线程池可以复?已创建的线程

  • 控制并发的数量。并发过多,资源消耗过多,可能造成服务器崩溃(主要原因)

  • 可以对线程做统?管理

线程池的原理

Java中的线程池顶层接口是Executor接口,ThreadPoolExecutor是这个接口的实现类。

ThreadPoolExecutor提供的构造方法

四个构造方法:

// 五个参数的构造函数 (下面的构造参数前五个和此构造参数一致)
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)
// 六个参数的构造函数
public ThreadPoolExecutor(,,,,,ThreadFactory threadFactory)// 六个参数的构造函数
public ThreadPoolExecutor(,,,,,RejectedExecutionHandler handler)// 七个参数的构造函数
public ThreadPoolExecutor(,,,,,ThreadFactory threadFactory,RejectedExecutionHandler handler)
  • int corePoolSize:该线程池中核心线程数最大值

    核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)

  • int maximumPoolSize:该线程池中线程总数最大值

    该值等于核心线程数量 + 非核心线程数量

  • long keepAliveTime非核心线程闲置超时时长

    非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置allowCoreThreadTimeOut(true),则会也作用于核心线程

  • TimeUnit unit:keepAliveTime的单位。

    TimeUnit是一个枚举类型 ,包括以下属性:NANOSECONDS、MICROSECONDS 、MILLISECONDS、SECONDS 、MINUTES 、HOURS 、DAYS

  • BlockingQueue workQueue:阻塞队列,维护着等待执行的Runnable任务对象

    1. LinkedBlockingQueue

      链式阻塞队列,底层数据结构是链表,默认大小是Integer.MAX_VALUE,也可以指定大小。

    2. ArrayBlockingQueue

      数组阻塞队列,底层数据结构是数组,需要指定队列的大小。

    3. SynchronousQueue

      同步队列,内部容量为0,每个put操作必须等待一个take操作,反之亦然。

    4. DelayQueue

      延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。

  • ThreadFactory threadFactory

    创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。

static class DefaultThreadFactory implements ThreadFactory {
    DefaultThreadFactory() {
       // 构造函数SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-thread-";}
}
  • RejectedExecutionHandler handler 拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略
    1. ThreadPoolExecutor.AbortPolicy:默认策略,丢弃任务并抛出RejectedExecutionException异常
    2. ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常
    3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部的任务,重新尝试执行程序(失败再重试)
    4. ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

ThreadPoolExecutor的状态

线程池有一个调度线程,有自己的状态。

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
  • 线程池创建后处于RUNNING状态。

  • 调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,会等待阻塞队列的任务完成。

  • 调用shutdownNow()方法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0, 阻塞队列的size也为0。

  • 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。

    ThreadPoolExecutor中有一个控制状态的属性叫ctl,它是一个AtomicInteger类型的变量。线程池状态就是通过AtomicInteger类型的成员变量ctl来获取的。

    获取的ctl值传入runStateOf方法,与~CAPACITY位与运算(CAPACITY是低29位全1的int变量)。

    ~CAPACITY在这里相当于掩码,用来获取ctl的高3位,表示线程池状态;而另外的低29位用于表示工作线程数

  • 线程池处在TIDYING状态时,执行完terminated()方法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态。

线程池主要的任务处理流程

核心方法execute

// JDK 1.8 
public void execute(Runnable command) {
    if (command == null)throw new NullPointerException();   int c = ctl.get();// 1.当前线程数小于corePoolSize,则调用addWorker创建核心线程执行任务if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))return;c = ctl.get();}// 2.如果不小于corePoolSize,则将任务添加到workQueue队列。if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();// 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。if (! isRunning(recheck) && remove(command))reject(command);// 2.2 线程池处于running状态,但是没有线程,则创建线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3.如果放入workQueue失败,则创建非核心线程执行任务,// 如果这时创建非核心线程失败(当前线程总数不小于maximumPoolSize时),就会执行拒绝策略。else if (!addWorker(command, false))reject(command);
}

ctl.get()是获取线程池状态,用int类型表示。第二步中,入队前进行了一次isRunning判断,入队之后,又进行了一次isRunning判断, why?

多线程的环境下,线程池的状态时刻发生着变化。判断是否将command加入workqueue是线程池之前的状态。万一线程池处于非RUNNING状态(在多线程环境下很有可能发生),那么command永远不会执行。

总结一下处理流程

  1. 线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到corePoolSize,在核心线程数量 < corePoolSize时)。注意,这一步需要获得全局锁。
  2. 线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)。
  3. 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行这些任务了。于是会创建非核心线程去执行这个任务。注意,这一步需要获得全局锁。
  4. 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理。

ThreadPoolExecutor如何做到线程复用的?(简)

ThreadPoolExecutor在创建线程时,会将线程封装成工作线程worker,并放入工作线程组中,然后这个worker反复从阻塞队列中拿任务去执行。addWorker方法是在上面提到的execute方法里面调用的。

**addWroker(Runnable ) -> new Worker(Runnable) -> run() -> runWorker() -> **

(while getTask() -> workQueue.take())->task.run().

while循环中,worker会不断地调用getTask方法从阻塞队列中获取任务然后调用task.run()执行任务,从而达到复用线程的目的。只要getTask方法不返回null,此线程就不会退出。

四种常见的线程池

Executors类中提供的几个静态方法来创建线程池

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

CacheThreadPool运行流程如下:

  1. 提交任务进线程池。
  2. 因为corePoolSize为0的关系,不创建核心线程,线程池最大为Integer.MAX_VALUE。
  3. 尝试将任务添加到SynchronousQueue队列。
  4. 如果SynchronousQueue入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从SynchronousQueue拉取任务并在当前线程执行。
  5. 如果SynchronousQueue已有任务在等待,入列操作将会阻塞。

当需要执行很多短时间的任务时,CacheThreadPool的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

核心线程数量和总线程数量相等,都是传入的参数nThreads,所以只能创建核心线程,不能创建非核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。

与CachedThreadPool的区别

  • 因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核心线程。 而CachedThreadPool因为corePoolSize=0,所以只会创建非核心线程。
  • 在 getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。 CachedThreadPool会在60s后收回。
  • 由于线程不会被回收,会一直卡在阻塞,所以没有任务的情况下, FixedThreadPool占用资源更多
  • 都几乎不会触发拒绝策略,但是原理不同。FixedThreadPool是因为阻塞队列可以很大(最大为Integer最大值),故几乎不会触发拒绝策略;CachedThreadPool是因为线程池很大(最大为Integer最大值),几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

有且仅有一个核心线程( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。

newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue());
}

第十三章 阻塞队列

阻塞队列的由来

生产者-消费者模式中阻塞队列(BlockingQueue),**只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题,**BlockingQueue就是存放元素的容器。

BlockingQueue是Java util.concurrent包下重要的数据结构,区别于普通的队列,BlockingQueue提供了线程安全的队列访问方式,并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

BlockingQueue的操作方法

阻塞队列提供了四组不同的方法用于插入、移除、检查元素:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() - -
  • 抛出异常:如果试图的操作无法立即执行,抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
  • 返回特殊值:如果试图的操作无法立即执行,返回一个特殊值,通常是true / false。
  • 一直阻塞:如果试图的操作无法立即执行,则一直阻塞或者响应中断。
  • 超时退出:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。

注意之处

  • 不能往阻塞队列中插入null,会抛出空指针异常。
  • 调用remove(o)可以将队列之中的特定对象移除,但不高效,尽量避免使用。

BlockingQueue的实现类

ArrayBlockingQueue

数组结构组成的有界阻塞队列。内部结构是数组,故具有数组的特性。

public ArrayBlockingQueue(int capacity, boolean fair){
    } 
// 可以初始化队列大小,且不能改变。fair表示控制对象的内部锁是否采用公平锁,默认是非公平锁

LinkedBlockingQueue

链表结构组成的有界阻塞队列。内部结构是链表,具有链表的特性。默认队列的大小是Integer.MAX_VALUE,也可以指定大小。此队列按照先进先出的原则对元素进行排序。

DelayQueue

该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。

DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

PriorityBlockingQueue

基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),内部控制线程同步的锁采用的是非公平锁。

public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
    this.lock = new ReentrantLock(); //默认构造方法-非公平锁}

SynchronousQueue

这个队列比较特殊,没有任何内部容量,甚至连一个队列的容量都没有。并且每个 put 必须等待一个 take,反之亦然。需要区别容量为1的ArrayBlockingQueue、LinkedBlockingQueue。

注意

PriorityBlockingQueue不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。对于使用默认大小的LinkedBlockingQueue也是一样的。

阻塞队列的原理(简)

阻塞队列的原理很简单,利用了Lock锁的多条件(Condition)阻塞控制。

第十四章 锁接口和类

Java原生的锁——基于对象的锁,它一般是配合synchronized关键字来使用的。

synchronized的不足之处

  • 如果临界区是只读操作,可以多线程一起执行,synchronized同一时间只能有一个线程执行
  • synchronized无法知道线程有没有成功获取到锁
  • 使用synchronized,如果临界区因为IO或者sleep方法等原因阻塞了,而当前线程又没有释放锁,就会导致所有线程等待

锁的几种分类

可重入锁和非可重入锁

所谓重入锁,顾名思义。就是支持重新进入的锁,也就是说这个锁支持一个线程对资源重复加锁ReentrantLock的中文意思就是可重入锁。

synchronized关键字就是使用的重入锁, 如果我们自己在继承AQS实现同步器的时候,没有考虑到占有锁的线程再次获取锁的场景,可能就会导致线程阻塞,那这个就是一个“非可重入锁”。

公平锁与非公平锁

这里的“公平”,其实通俗意义来说就是“先来后到”,也就是FIFO。一般情况下,非公平锁能提升一定的效率。但是非公平锁可能会发生线程饥饿(有一些线程长时间得不到锁)的情况。所以要根据实际的需求来选择非公平锁和公平锁。ReentrantLock支持非公平锁和公平锁两种。

读写锁和排它锁

我们前面讲到的synchronized用的锁和ReentrantLock,其实都是“排它锁”。也就是说,这些锁在同一时刻只允许一个线程进行访问。而读写锁可以在同一时刻允许多个读线程访问。Java提供了ReentrantReadWriteLock类作为读写锁的默认实现,内部维护了两个锁:一个读锁,一个写锁。通过分离读锁和写锁,使得在“读多写少”的环境下,大大地提高了性能。

注意,即使用读写锁,在写线程访问时,所有的读线程和其它写线程均被阻塞。

JDK中有关锁的一些接口和类

抽象类AQS/AQLS/AOS

AQS(AbstractQueuedSynchronizer),提供了一个“队列同步器”的基本功能实现。而AQS里面的“资源”是用一个int类型的数据来表示的。

AQLS(AbstractQueuedLongSynchronizer),只是把资源的类型变成了long类型。

AOS(AbstractOwnableSynchronizer)是AQS和AQLS都继承的一个类。表示锁与持有者之间的关系

接口Condition/Lock/ReadWriteLock

juc.locks包下共有三个接口:ConditionLockReadWriteLock。其中,Lock和ReadWriteLock从名字就可以看得出来,分别是锁和读写锁的意思。Lock接口里面有一些获取锁和释放锁的方法声明,而ReadWriteLock里面只有两个方法,分别返回“读锁”和“写锁”:

public interface ReadWriteLock {
    Lock readLock();Lock writeLock();
}

Lock接口中有一个方法是可以获得一个Condition:

Condition newCondition();

每个对象都可以用继承自Objectwait/notify方法来实现等待/通知机制。而Condition接口也提供了类似Object监视器的方法,通过与Lock配合来实现等待/通知模式。

那为什么既然有Object的监视器方法了,还要用Condition呢?这里有一个二者简单的对比:

对比项 Object监视器 Condition
前置条件 获取对象的锁 调用Lock.lock获取锁,调用Lock.newCondition获取Condition对象
调用方式 直接调用,比如object.notify() 直接调用,比如condition.await()
等待队列的个数 一个 多个
当前线程释放锁进入等待状态 支持 支持
当前线程释放锁进入等待状态,在等待状态中不中断 不支持 支持
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态直到将来的某个时间 不支持 支持
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列中的全部线程 支持 支持

Condition和Object的wait/notify基本相似。其中,Condition的await方法对应的是Object的wait方法,而Condition的signal/signalAll方法则对应Object的notify/notifyAll()。但Condition类似于Object的等待/通知机制的加强版。我们来看看主要的方法:

方法名称 描述
await() 当前线程进入等待状态直到被通知(signal)或者中断;当前线程进入运行状态并从await()方法返回的场景包括:(1)其他线程调用相同Condition对象的signal/signalAll方法,并且当前线程被唤醒;(2)其他线程调用interrupt方法中断当前线程;
awaitUninterruptibly() 当前线程进入等待状态直到被通知,在此过程中对中断信号不敏感,不支持中断当前线程
awaitNanos(long) 当前线程进入等待状态,直到被通知、中断或者超时。如果返回值小于等于0,可以认定就是超时了
awaitUntil(Date) 当前线程进入等待状态,直到被通知、中断或者超时。如果没到指定时间被通知,则返回true,否则返回false
signal() 唤醒一个等待在Condition上的线程,被唤醒的线程在方法返回前必须获得与Condition对象关联的锁
signalAll() 唤醒所有等待在Condition上的线程,能够从await()等方法返回的线程必须先获得与Condition对象关联的锁

ReentrantLock

ReentrantLock是一个非抽象类,它是Lock接口的JDK默认实现,实现了锁的基本功能。从名字上看,它是一个”可重入“锁,从源码上看,它内部有一个抽象类Sync,是继承了AQS,自己实现的一个同步器。同时,ReentrantLock内部有两个非抽象类NonfairSyncFairSync,它们都继承了Sync。从名字上看得出,分别是”非公平同步器“和”公平同步器“的意思。这意味着ReentrantLock可以支持”公平锁“和”非公平锁“。

通过看这两个同步器的源码可以发现,它们的实现都是”独占“的。都调用了AOS的setExclusiveOwnerThread方法,所以ReentrantLock的锁是”独占“的,也就是说,它的锁都是”排他锁“,不能共享。

在ReentrantLock的构造方法里,可以传入一个boolean类型的参数,来指定它是否是一个公平锁,默认情况下是非公平的。这个参数一旦实例化后就不能修改,只能通过isFair()方法来查看。

ReentrantReadWriteLock

这个类也是一个非抽象类,它是ReadWriteLock接口的JDK默认实现。它与ReentrantLock的功能类似,同样是可重入的,支持非公平锁和公平锁。不同的是,它还支持”读写锁“。

ReentrantReadWriteLock内部的结构大概是这样:

// 内部结构
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
    // 具体实现
}
static final class NonfairSync extends Sync {
    // 具体实现
}
static final class FairSync extends Sync {
    // 具体实现
}
public static class ReadLock implements Lock, java.io.Serializable {
    private final Sync sync;protected ReadLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;}// 具体实现
}
public static class WriteLock implements Lock, java.io.Serializable {
    private final Sync sync;protected WriteLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;}// 具体实现
}// 构造方法,初始化两个锁
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);
}// 获取读锁和写锁的方法
public ReentrantReadWriteLock.WriteLock writeLock() {
     return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  {
     return readerLock; }

可以看到,它同样是内部维护了两个同步器。且维护了两个Lock的实现类ReadLock和WriteLock。从源码可以发现,这两个内部类用的是外部类的同步器。

ReentrantReadWriteLock实现了读写锁,但它有一个小弊端,就是在“写”操作的时候,其它线程不能写也不能读。我们称这种现象为“写饥饿”,将在后文的StampedLock类继续讨论这个问题。

StampedLock

StampedLock类是在Java 8 才发布,有人号称它为锁的性能之王。它没有实现Lock接口和ReadWriteLock接口,但它其实是实现了“读写锁”的功能,并且性能比ReentrantReadWriteLock更高。StampedLock还把读锁分为了“乐观读锁”和“悲观读锁”两种。

前面提到了ReentrantReadWriteLock会发生“写饥饿”的现象,但StampedLock不会。它是怎么做到的呢?它的核心思想在于,在读的时候如果发生了写,应该通过重试的方式来获取新的值,而不应该阻塞写操作。这种模式也就是典型的无锁编程思想,和CAS自旋的思想一样。这种操作方式决定了StampedLock在读线程非常多而写线程非常少的场景下非常适用,同时还避免了写饥饿情况的发生。

class Point {
    private double x, y;private final StampedLock sl = new StampedLock();// 写锁的使用void move(double deltaX, double deltaY) {
    long stamp = sl.writeLock(); // 获取写锁try {
    x += deltaX;y += deltaY;} finally {
    sl.unlockWrite(stamp); // 释放写锁}}// 乐观读锁的使用double distanceFromOrigin() {
    long stamp = sl.tryOptimisticRead(); // 获取乐观读锁double currentX = x, currentY = y;if (!sl.validate(stamp)) {
     // //检查乐观读锁后是否有其他写锁发生,有则返回falsestamp = sl.readLock(); // 获取一个悲观读锁try {
    currentX = x;currentY = y;} finally {
    sl.unlockRead(stamp); // 释放悲观读锁}}return Math.sqrt(currentX * currentX + currentY * currentY);}// 悲观读锁以及读锁升级写锁的使用void moveIfAtOrigin(double newX, double newY) {
    long stamp = sl.readLock(); // 悲观读锁try {
    while (x == 0.0 && y == 0.0) {
    // 读锁尝试转换为写锁:转换成功后相当于获取了写锁,转换失败相当于有写锁被占用long ws = sl.tryConvertToWriteLock(stamp); if (ws != 0L) {
     // 如果转换成功stamp = ws; // 读锁的票据更新为写锁的x = newX;y = newY;break;}else {
     // 如果转换失败sl.unlockRead(stamp); // 释放读锁stamp = sl.writeLock(); // 强制获取写锁}}} finally {
    sl.unlock(stamp); // 释放所有锁}}
}

乐观读锁的意思就是先假定在这个锁获取期间,共享变量不会被改变,既然假定不会被改变,那就不需要上锁。在获取乐观读锁之后进行了一些操作,然后又调用了validate方法,这个方法就是用来验证tryOptimisticRead之后,是否有写操作执行过,如果有,则获取一个悲观读锁,这里的悲观读锁和ReentrantReadWriteLock中的读锁类似,也是个共享锁。

可以看到,StampedLock获取锁会返回一个long类型的变量,释放锁的时候再把这个变量传进去。简单看看源码:

// 用于操作state后获取stamp的值
private static final int LG_READERS = 7;
private static final long RUNIT = 1L;               //0000 0000 0001
private static final long WBIT  = 1L << LG_READERS; //0000 1000 0000
private static final long RBITS = WBIT - 1L;        //0000 0111 1111
private static final long RFULL = RBITS - 1L;       //0000 0111 1110
private static final long ABITS = RBITS | WBIT;     //0000 1111 1111
private static final long SBITS = ~RBITS;           //1111 1000 0000// 初始化时state的值
private static final long ORIGIN = WBIT << 1;       //0001 0000 0000// 锁共享变量state
private transient volatile long state;
// 读锁溢出时用来存储多出的读锁
private transient int readerOverflow;

StampedLock用这个long类型的变量的前7位(LG_READERS)来表示读锁,每获取一个悲观读锁,就加1(RUNIT),每释放一个悲观读锁,就减1。而悲观读锁最多只能装128个(7位限制),很容易溢出,所以用一个int类型的变量来存储溢出的悲观读锁。

写锁用state变量剩下的位来表示,每次获取一个写锁,就加0000 1000 0000(WBIT)。需要注意的是,写锁在释放的时候,并不是减WBIT,而是再加WBIT。这是为了让每次写锁都留下痕迹,解决CAS中的ABA问题,也为乐观锁检查变化validate方法提供基础。

乐观读锁就比较简单了,并没有真正改变state的值,而是在获取锁的时候记录state的写状态,在操作完成后去检查state的写状态部分是否发生变化,上文提到了,每次写锁都会留下痕迹,也是为了这里乐观锁检查变化提供方便。

总的来说,StampedLock的性能是非常优异的,基本上可以取代ReentrantReadWriteLock的作用。

第十五章 并发容器集合

同步容器与并发容器

java.util包下提供了一些容器类,而Vector和Hashtable是线程安全的容器类,但是这些容器实现同步的方式是通过对方法加锁(sychronized)方式实现的,这样读写均需要锁操作,导致性能低下。而即使是Vector这样线程安全的类,在面对多线程下的复合操作的时候也是需要通过客户端加锁的方式保证原子性。

并发容器类介绍

  • Queue

    • BlockingQueue
      • ArrayBlockingQueue
      • PriorityBlockingQueue
      • SynchronousQueue
      • DelayQueue
      • LinkedBlockingQueue
      • BlockingDeque
        • LinkedBlockingDeque
      • TransferQueue
        • LinkedTransferQueue
    • ConcurrentLinkedDeque
    • ConcurrentLinkedQueue
  • ConcurrentMap

    • ConcurrentHashMap
    • ConcurrentNavigableMap
      • ConcurrentSkipListMap
  • CopyOnWrite容器

    • CopyOnWriteArraySet
    • CopyOnWriteArrayList

并发Map

ConcurrentMap接口继承了Map接口,在Map接口的基础上又定义了四个方法:

public interface ConcurrentMap<K, V> extends Map<K, V> {
    //插入元素V putIfAbsent(K key, V value);//移除元素boolean remove(Object key, Object value);//替换元素boolean replace(K key, V oldValue, V newValue);//替换元素V replace(K key, V value);
}

**putIfAbsent:**putIfAbsent方法中如果插入的key相同,则不替换原有的value值;

**remove:**要删除的key-value不能与Map中原有的key-value对应上,则不会删除该元素;

**replace(K,V,V):**key-oldValue能与Map中原有的key-value对应上,才进行替换操作;

**replace(K,V):**此replace不会对Map中原有的key-value进行比较,如果key存在则直接替换;

ConcurrentHashMap类

ConcurrentHashMap同HashMap一样也是基于散列表的map,但是它提供了一种与Hashtable完全不同的加锁策略,提供更高效的并发性和伸缩性。JDK 1.8中优化:

  • 同HashMap一样,链表也会在长度达到8的时候转化为红黑树,这样可以提升大量冲突时候的查询效率
  • 以某个位置的头结点(链表的头结点或红黑树的root结点)为锁,配合自旋+CAS避免不必要的锁开销

ConcurrentNavigableMap接口与ConcurrentSkipListMap类

ConcurrentNavigableMap接口继承了NavigableMap接口,这个接口提供了针对给定搜索目标返回最接近匹配项的导航方法。ConcurrentNavigableMap接口的主要实现类是ConcurrentSkipListMap类。从名字上来看,它的底层使用的是跳表(SkipList)的数据结构,它是一种”空间换时间“的数据结构,可以使用CAS来保证并发安全性。

并发Queue

JDK并没有提供线程安全的List类,因为对List来说,很难去开发一个通用并且没有并发瓶颈的线程安全的List。因为即使简单的读操作,拿contains() 这样一个操作来说,很难想到搜索的时候如何避免锁住整个list。所以退一步,JDK提供了对队列和双端队列的线程安全的类:ConcurrentLinkedQueue和ConcurrentLinkedDeque。因为队列相对于List来说,有更多的限制。这两个类是使用CAS来实现线程安全的。

并发Set

JDK提供了ConcurrentSkipListSet,是线程安全的有序的集合。底层是使用ConcurrentSkipListMap实现。谷歌的guava框架实现了一个线程安全的ConcurrentHashSet:

Set<String> s = Sets.newConcurrentHashSet();

第十六章 CopyOnWrite容器

什么是CopyOnWrite容器

CopyOnWrite容器即写时复制的容器(写入时复制思想实现读写分离),当我们往一个容器中添加元素的时候,将当前容器进行copy,复制出来一个新的容器,然后向新容器中添加新的元素,最后将原容器的引用指向新容器。

CopyOnWriteArrayList

优点

  • 无需同步,读性能高,用于“读多写少”的并发场景且该场景下进行遍历时不抛异常

缺点

  • 写操作触发拷贝,内存压力大,可能多次引起FullGC
  • 读写分离在新老容器,读出数据可能是旧数据,数据一致性问题

CopyOnWrite的业务中实现

参考CopyOnWriteArrayList实现的CopyOnWriteMap:

public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {
    private volatile Map<K, V> internalMap;public CopyOnWriteMap() {
    internalMap = new HashMap<K, V>();}public V put(K key, V value) {
    synchronized (this) {
    Map<K, V> newMap = new HashMap<K, V>(internalMap);V val = newMap.put(key, value);internalMap = newMap;return val;}}public V get(Object key) {
    return internalMap.get(key);}public void putAll(Map<? extends K, ? extends V> newData) {
    synchronized (this) {
    Map<K, V> newMap = new HashMap<K, V>(internalMap);newMap.putAll(newData);internalMap = newMap;}}
}