当前位置: 代码迷 >> 综合 >> JAVA多线程与高并发(三)[ReentrantLock,ReadWriteLock,CountdownLatch,CyclicBarrier,Phaser,Semaphore,Exchanger]
  详细解决方案

JAVA多线程与高并发(三)[ReentrantLock,ReadWriteLock,CountdownLatch,CyclicBarrier,Phaser,Semaphore,Exchanger]

热度:8   发布时间:2024-01-27 01:50:13.0

link-JAVA多线程与高并发系列[前言,大纲,目录]

前言

这节只讲这些锁的使用,下节讲底层原理(AQS).
如果用了很多锁后,担心死锁逻辑,可以用jstack检查

  • ReentrantLock和Synchronized相比,最大的特点就是可以根据业务需求灵活控制.

  • ReentrantReadWriteLock用于一块资源读多写少,又要求避免脏数据的情况

  • Semaphore用于限流,控制最多同时执行的线程数.

  • 剩下的主要是面试时候吊打面试官(手动狗头):

  • CountdownLatch和CyclicBarrier类似,需要准备大批量的数据/资源调动,分多个线程去准备,但是等这些数据都准备OK后才能进行下一步动作.(MapReduce?)
    CountdownLatch是数到0就完事了,再往下数啥效果都没有,不能循环使用;
    CyclicBarrier可以循环使用.

  • Exchanger可能写游戏的两人交易时用到

  • Phaser可能在遗传算法时用到

ReentrantLock(重要)

什么是可重入锁?字面意思, 可重入锁就是对同样的一把锁可以再锁一次.当一个线程中执行到加锁代码时,发现当前持有这把锁的线程就是自己,那可以执行该代码.
(ReentrantLock内部使用了CAS,有没有使用系统级别锁呢?回头再探究)

synchronize是可重入的

这个之前JAVA多线程与高并发(一)[线程概念,同步synchronize关键字] 测试过啦

ReentrantLock特点|使用场景

  1. 简单使用,替代synchronized
  2. 可以tryLock(一定时间)
  3. 可以被马上打断lockInterruptibly
  4. 可以使用公平锁

简单使用,替代synchronized

这个可以替代synchronized,只是需要手动上锁(lock)和解锁(unlock),要注意lock操作以及接下来的业务代码用try{}包起来,在finally{}中unlock,无论是否有异常,都会释放锁.
使用synchronized时,当加锁代码执行完或者抛异常时,JVM会自动释放锁.

public class T02_ReentrantLock2 {Lock lock = new ReentrantLock();void m1() {try {lock.lock(); //synchronized(this)for (int i = 0; i < 10; i++) {TimeUnit.SECONDS.sleep(1);System.out.println(i);// 此处验证一下"可重入"特性if (i == 2) {m2();}}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}void m2() {try {lock.lock();System.out.println("m2 ...");} finally {lock.unlock();}}public static void main(String[] args) {T02_ReentrantLock2 rl = new T02_ReentrantLock2();new Thread(rl::m1).start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}// 必须等锁被上一个线程释放后,才能继续执行new Thread(rl::m2).start();}
}

可以tryLock(一定时间)

public class T03_ReentrantLock3 {Lock lock = new ReentrantLock();void m1() {try {lock.lock();for (int i = 0; i < 10; i++) {TimeUnit.SECONDS.sleep(1);System.out.println(i);}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}/*** 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行* 可以根据tryLock的返回值来判定是否锁定* 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中*/void m2() {// 当然也可以lock.tryLock(),不指定时间/*boolean locked = lock.tryLock();System.out.println("m2 ..." + locked);if(locked) lock.unlock();*/boolean locked = false;try {long start = System.currentTimeMillis();locked = lock.tryLock(5, TimeUnit.SECONDS);long end = System.currentTimeMillis();System.out.println("m2 ..." + locked + "|waiting time:" + (end - start));} catch (InterruptedException e) {e.printStackTrace();} finally {if(locked) {lock.unlock();}}}public static void main(String[] args) {T03_ReentrantLock3 rl = new T03_ReentrantLock3();new Thread(rl::m1).start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}new Thread(rl::m2).start();}
}

可以被马上打断lockInterruptibly

lock()方法的锁,只能在当前线程获取到锁后才能被打断;
而lockInterruptibly()可以马上被打断,不必等获取锁.
证明一下:

public class T04_ReentrantLock4 {public static void main(String[] args) {Lock lock = new ReentrantLock();Thread t1 = new Thread(()->{try {lock.lock();System.out.println("t1 start");TimeUnit.SECONDS.sleep(10);System.out.println("t1 end");} catch (InterruptedException e) {System.out.println("t1 was interrupted!");} finally {lock.unlock();}});t1.start();Thread t2 = new Thread(()->{try {// 如果是直接lock的话,t2就不能被直接打断,只能等t1执行完了,t2获取到lock后才能被打断
// lock.lock();lock.lockInterruptibly(); //可以对interrupt()方法做出响应System.out.println("t2 start");TimeUnit.SECONDS.sleep(5);System.out.println("t2 end");} catch (InterruptedException e) {System.out.println("t2 was interrupted!");} finally {lock.unlock();}});t2.start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}t2.interrupt(); //打断线程2的等待}
}

如果t2调用lock.lockInterruptibly(),控制台会输出:
(那个异常是因为t2unlock的时候,锁还在t1上)

t1 start
t2 was interrupted!
Exception in thread "Thread-1" java.lang.IllegalMonitorStateExceptionat java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)at com.mashibing.juc.c_020.T04_ReentrantLock4.lambda$main$1(T04_ReentrantLock4.java:55)at java.lang.Thread.run(Thread.java:748)

如果t2调用lock.lock(),控制台会输出:

t1 start
t1 end
t2 start
t2 was interrupted!

公平锁

ReentrantLock默认为非公平锁,多个线程在一个队列中去竞争获取锁,随机拿到锁,就跟买彩票一样,不是先买的就先中奖(synchronized也是非公平锁);
公平锁就是像买饭排队一样,比较"公平".

ReentrantLock可以指定是否为公平锁,默认非公平锁;
synchronized只有非公平锁

这个不太好写测试用例,回头再证明吧…
创建ReentrantLock时,传入fair参数为true,创建的就是公平锁

public class T05_ReentrantLock5 extends Thread {//参数为true表示为公平锁,请对比输出结果private static ReentrantLock lock = new ReentrantLock(true);@Overridepublic void run() {for (int i = 0; i < 100; i++) {System.out.println(Thread.currentThread().getName() + " will lock at" + System.currentTimeMillis());lock.lock();try {System.out.println(Thread.currentThread().getName() + " get lock at" + System.currentTimeMillis());} finally {lock.unlock();}}}public static void main(String[] args) throws InterruptedException {T05_ReentrantLock5 rl = new T05_ReentrantLock5();Thread th1 = new Thread(rl);Thread th2 = new Thread(rl);th1.start();th2.start();}
}

ReadWriteLock读写锁(重要)

读写锁是读锁和写锁的合称,

  • 读锁:共享锁,其他线程可以同时读,在读操作之间共享资源;但是其他线程不能写,否则会读到脏数据
  • 写锁:排他锁/互斥锁,其他线程既不能写也不能读,也是为了防止脏数据.

其他的锁一般都是排它锁/互斥锁,当一个线程拿到锁后,别的线程就只能阻塞等待,啥也干不了.

主要用于一些内容,写操作不多,但是读操作很多的地方,比如公司的组织架构

读写锁代码示例:

public class T10_TestReadWriteLock {static Lock lock = new ReentrantLock();private static int value;static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();static Lock readLock = readWriteLock.readLock();static Lock writeLock = readWriteLock.writeLock();public static void read(Lock lock, CountDownLatch countDownLatch) {try {lock.lock();//模拟读取操作Thread.sleep(1000);countDownLatch.countDown();System.out.println("read over!");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void write(Lock lock, int v, CountDownLatch countDownLatch) {try {lock.lock();// 写操作,模拟耗时1秒Thread.sleep(1000);value = v;countDownLatch.countDown();System.out.println("write over!");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) throws InterruptedException {// 使用CountDownLatch方便计时,下面有CountdownLatch说明CountDownLatch countDownLatch = new CountDownLatch(20);// 普通互斥锁
// Runnable readR = ()-> read(lock, countDownLatch);
// Runnable writeR = ()->write(lock, new Random().nextInt(), countDownLatch);// 读写锁Runnable readR = () -> read(readLock, countDownLatch);Runnable writeR = () -> write(writeLock, new Random().nextInt(), countDownLatch);long start = System.currentTimeMillis();for (int i = 0; i < 18; i++) {new Thread(readR).start();}for (int i = 0; i < 2; i++) {new Thread(writeR).start();}countDownLatch.await();long end = System.currentTimeMillis();System.out.println("cust milliseconds:" + (end - start));}
}

Semaphore信号量

有点类似于令牌桶算法.用于控制并发的最大数量,限流.
有时候一个任务的某一步骤特别耗时,很多个线程同时执行的话,每个线程都很慢,可能大家都超时失败,为了防止这一现象,可以用信号量控制最大并发数.
假设希望最大并发数为n,那就创建一个有n个令牌的桶,每次线程执行特定方法时,去申请令牌(acquire),拿不到就阻塞;拿到了才继续执行,执行完再把令牌放回桶里(release).

代码示例:

public class T11_TestSemaphore {public static void main(String[] args) {
// Semaphore s = new Semaphore(3);Semaphore s = new Semaphore(3, true);//允许一个线程同时执行
// Semaphore s = new Semaphore(1);for (int i = 1; i <= 10; i++) {new Thread(() -> {// 可以做其他的事情,不受信号量控制String name = Thread.currentThread().getName();try {s.acquire();System.out.println(name + " start...");Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {s.release();System.out.println(name + " end...");}// 可以做其他的事情,不受信号量控制}).start();}}
}

CountdownLatch

见名知意,倒数计数(而非计时)的门闩,计数结束就开门了,然后可以继续执行指令.
使用场景:用于在一个线程中有些特殊操作,需要在业务逻辑上等待其他几个线程完成特定操作后才能执行.
比线程的join()方法更灵活一些,因为不需要等其他线程全部执行完,而是手动灵活控制.
做个类比:饭店中的传菜员拿来一页订单,客人要求几个菜一起上,传菜员就要等大厨们都做完装盘后才能把才端给客人;而且大厨炒完菜还要刷锅,这时候传菜员是不需要等待的.
看一个例子:

public class T06_TestCountDownLatch {public static void main(String[] args) {// 使用线程的join方法
// usingJoin();// 使用门闩usingCountDownLatch();}private static void usingCountDownLatch() {Thread[] threads = new Thread[100];CountDownLatch latch = new CountDownLatch(threads.length);for(int i=0; i<threads.length; i++) {threads[i] = new Thread(()->{int result = 0;// 业务逻辑a,假设本操作是一些操作的前置条件for(int j=0; j<10000; j++) {result += j;}// 门闩倒数计数(减1),原子操作latch.countDown();// 下面可以有业务逻辑b,c,d无所谓了try {TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + "at" + new Date());} catch (InterruptedException e) {e.printStackTrace();}});}for (int i = 0; i < threads.length; i++) {threads[i].start();}try {// 插上门闩,等倒数计数完了(到0)才能继续执行latch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end latch at" + new Date());}// 这个作为对比private static void usingJoin() {Thread[] threads = new Thread[100];for(int i=0; i<threads.length; i++) {threads[i] = new Thread(()->{int result = 0;for(int j=0; j<10000; j++) {result += j;}});}for (int i = 0; i < threads.length; i++) {threads[i].start();}// 等threads中的线程全都结束后才能继续执行for (int i = 0; i < threads.length; i++) {try {threads[i].join();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("end join");}
}

CyclicBarrier

见名知意,循环栅栏.
就像装鸡蛋的托盘一样,鸡蛋把托盘装满后才去装箱.

首先创建一个CyclicBarrier,它规定当一定数量的线程调用CyclicBarrier的await()方法后,CyclicBarrier会执行一些操作(也可以啥都不做);
其他线程调用CyclicBarrier的await()方法后,线程开始阻塞,等待barrier满了(各个线程就绪)后才继续执行.
CyclicBarrier的barrierAction由该批次进入的最后一个线程执行

下面是个例子,可以看出是先Ready后那些线程才能继续执行.

public class T07_TestCyclicBarrier {public static void main(String[] args) {// 线程满了后啥都不做,这个应该用不到//CyclicBarrier barrier = new CyclicBarrier(20);// lambda表达式,和下面的匿名内部类是等效的CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("Ready,go! @ " + Instant.now()));/*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {@Overridepublic void run() {System.out.println("Ready,go! @ " + Instant.now());}});*/for (int i = 0; i < 100; i++) {new Thread(() -> {try {TimeUnit.SECONDS.sleep(1);barrier.await();System.out.println(Thread.currentThread().getName() + "GO ON @ " + Instant.now());} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}).start();}}
}

Phaser(极少用到)

phase:相位,阶段;phaser就是按照不同的相位阶段去执行不同的命令.

有点像是CountdownLatch和CyclicBarrier的结合:先注册一定的数量parties作为参与的线程数,大家都准备就绪后触发一次onAdvance操作;也可以随时修改这个parties数量.
触发onAdvance时可以获取到当前是第几个相位(第几次触发),和当前有多少个注册的parties

可以这么理解:CyclicBarrier是只有一个栅栏,Phaser是纵向好几个栅栏,每个栅栏触发时可以有不同的操作.

关键方法:
0.自定义一个类继承Phaser,并重写onAdvance方法.每次parties到达相位后,会调用onAdvance方法
1.phaser.bulkRegister(int parties)
批量注册parties,有点类似于CountdownLatch的倒数计数的初始化
2.phaser.arriveAndAwaitAdvance()
Arrives at this phaser and awaits others.
各parties准备就绪后到达相位,等待其他parties后才继续执行,注意下一次相位仍会参与
3.phaser.arriveAndDeregister()
Arrives at this phaser and deregisters from it without waiting for others to arrive.
到达这个相位,并且从中注销,不需等其他parties到达就可继续执行,不再参与Phaser规则了

应用场景:
遗传算法(大概是用计算机去模拟达尔文进化策略),这个我没去详细了解.
以简单的结婚流程为例:
1.首先关键人物(比如一对新人,伴郎伴娘和亲朋好友等客人)都要到场;2.然后走个结婚的流程,新人发表感言,客人吃饭,新人敬酒;3.客人陆续离场;4.新人入洞房,相互拥抱.
这些步骤顺序不能乱,行为的执行者也不能乱.(只有在大家离场后,两位新人才能入洞房,哈哈)

代码示例:

public class T09_TestPhaserWedding {static Random r = new Random();static MarriagePhaser phaser = new MarriagePhaser();static void milliSleep(int milli) {try {TimeUnit.MILLISECONDS.sleep(milli);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {phaser.bulkRegister(7);// 5位客人for(int i=0; i<5; i++) {new Thread(new Person("p" + i)).start();}// 2位新人new Thread(new Person("bridegroom")).start();new Thread(new Person("bride")).start();}static class MarriagePhaser extends Phaser {/**** @param phase 当前是第几个阶段.the current phase number on entry to this method,* before this phaser is advanced* @param registeredParties 目前多少线程参与.the current number of registered parties* @return {@code true} if this phaser should terminate*/@Overrideprotected boolean onAdvance(int phase, int registeredParties) {switch (phase) {case 0:System.out.println("everyone arrived!" + registeredParties);System.out.println();return false;case 1:System.out.println("everyone finished eating!" + registeredParties);System.out.println();return false;case 2:System.out.println("everyone left!" + registeredParties);System.out.println();return false;case 3:System.out.println("The Newlywed Time!" + registeredParties);return true;default:return true;}}}static class Person implements Runnable {String name;public Person(String name) {this.name = name;}public void arrive() {milliSleep(r.nextInt(1000));System.out.printf("%s arrived!\n", name);phaser.arriveAndAwaitAdvance();}public void eat() {milliSleep(r.nextInt(1000));System.out.printf("%s finish eating!\n", name);phaser.arriveAndAwaitAdvance();}public void leave() {milliSleep(r.nextInt(1000));System.out.printf("%s leave!\n", name);phaser.arriveAndAwaitAdvance();// 所有人离去后,保洁人员才打扫座位System.out.printf("everyone was left, clean %s's seat\n", name);}private void hug() {if(name.equals("bridegroom") || name.equals("bride")) {milliSleep(r.nextInt(1000));System.out.printf("%s hug!\n", name);phaser.arriveAndAwaitAdvance();} else {phaser.arriveAndDeregister();// 注销后,不必等本次相位的所有parties到达,可以随意执行System.out.printf("%s Deregister\n", name);}}@Overridepublic void run() {arrive();eat();leave();hug();}}
}

Exchanger交换器

用于两个线程间交换数据.
中间有个类似CyclicBarrier的操作,第一个线程准备好而第二个线程没准备好前,第一个会阻塞;只有两个线程都准备好后才会开始交换,然后继续执行

应用场景:两人做交易(不知道各位有没有玩过零几年很火的游戏<传奇>?)

代码示例:

public class T12_TestExchanger {static Exchanger<String> exchanger = new Exchanger<>();public static void main(String[] args) {new Thread(()->{String source = "T1";String getted = null;try {TimeUnit.SECONDS.sleep(3);System.out.println("T1 ready @" + new Date());getted = exchanger.exchange(source);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " get:" + getted + "@" + new Date());}, "t1").start();new Thread(()->{String source = "T2";String getted = null;try {System.out.println("T2 ready @" + new Date());getted = exchanger.exchange(source);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " get:" + getted + "@" + new Date());}, "t2").start();// 如果再来个T3,程序会一直阻塞下去/*new Thread(()->{String source = "T3";String getted = null;try {System.out.println("T3 ready @" + new Date());getted = exchanger.exchange(source);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " get:" + getted + "@" + new Date());}, "t3").start();*/}
}
  相关解决方案