在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore
CountdownLatch
CountDownLatch位于java.util.concurrent包下;允许一个或多个线程等待直到在其它线程中执行的一组操作结束
/*** Constructs a {@code CountDownLatch} initialized with the given count.** @param count the number of times {@link #countDown} must be invoked* before threads can pass through {@link #await}* @throws IllegalArgumentException if {@code count} is negative*/public CountDownLatch(int count) {}
CountdownLatch只提供了一个构造器
// 将count值减1public void countDown() {sync.releaseShared(1);}//调用后会将当前线程挂起,直到count值为0或者线程中断抛出中断异常public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}//和await()类似,只不过等待一定的时间后count值还没变成0的话就会继续执行public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}
其中countDown是将count值减少,await()/await(timeout,unit)是线程挂起,直到count值为0,countDown和await结合起来处理跨线程间协作
- 示例
CountDownLatch countDownLatch = new CountDownLatch(2);new Thread(new Runnable() {@Overridepublic void run() {System.out.println("子线程"+Thread.currentThread().getName()+"开始执行"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));countDownLatch.countDown();}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("子线程"+Thread.currentThread().getName()+"开始执行"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));countDownLatch.countDown();}}).start();System.out.println("等待两个子线程执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("继续执行主线程"+new SimpleDateFormat("HH:mm:ss").format(new Date()));//运行结果
子线程Thread-1开始执行17:20:32
子线程Thread-0开始执行17:20:32
等待两个子线程执行完毕17:20:32
子线程Thread-1执行完毕17:20:34
子线程Thread-0执行完毕17:20:34
继续执行主线程17:20:34
主线程await等待直到子线程执行countDown()让count等于0后继续执行
CountDownLatch countDownLatch = new CountDownLatch(2);new Thread(new Runnable() {@Overridepublic void run() {System.out.println("子线程"+Thread.currentThread().getName()+"开始执行"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));countDownLatch.countDown();}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("子线程"+Thread.currentThread().getName()+"开始执行"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));countDownLatch.countDown();}}).start();System.out.println("等待两个子线程执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {countDownLatch.await(1, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();System.out.println(e.getMessage());}System.out.println("继续执行主线程"+new SimpleDateFormat("HH:mm:ss").format(new Date()));//运行结果
等待两个子线程执行完毕17:25:21
子线程Thread-0开始执行17:25:21
子线程Thread-1开始执行17:25:21
继续执行主线程17:25:22
子线程Thread-0执行完毕17:25:24
子线程Thread-1执行完毕17:25:24
当await(timeout,unit)等待时间超过1秒后,count值还不是0时,继续执行
CyclicBarrier
/*** A synchronization aid that allows a set of threads to all wait for* each other to reach a common barrier point. CyclicBarriers are* useful in programs involving a fixed sized party of threads that* must occasionally wait for each other. The barrier is called* <em>cyclic</em> because it can be re-used after the waiting threads* are released.** <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command* that is run once per barrier point, after the last thread in the party* arrives, but before any threads are released.* This <em>barrier action</em> is useful* for updating shared-state before any of the parties continue.*/
官方介绍:允许一组线程全部等待直到每一个都到达共同的障碍点,为什么有循环呢,因为它在等待线程释放后可以重新使用;另外可以支持一个线程运行,运行的时间节点是这些线程都达到barrier状态时会执行的内容
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();}public CyclicBarrier(int parties) {this(parties, null);}
构造函数中parties指让多少个线程或任务等待至barrier状态,barrierAction为所有线程都到达屏障点后执行的任务
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));}
- await()比较常用,方法用来挂起线程,等所有线程都到达barrier点后执行后续操作
- await(timeout,unit)方法是让这些线程等待一段时间,如果在时间范围内没有到达barrier状态,则线程继续执行
示例
CyclicBarrier cyclicBarrier= new CyclicBarrier(3);new Thread(new Runnable() {@Overridepublic void run() {System.out.println("子线程"+Thread.currentThread().getName()+"开始执行"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}try {cyclicBarrier.await();} catch (BrokenBarrierException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("子线程"+Thread.currentThread().getName()+"开始执行"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}try {cyclicBarrier.await();} catch (BrokenBarrierException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));}}).start();try {cyclicBarrier.await();} catch (BrokenBarrierException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("主线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));//运行结果:
子线程Thread-0开始执行10:46:27
子线程Thread-1开始执行10:46:27
子线程Thread-1执行完毕10:46:30
主线程main执行完毕10:46:30
子线程Thread-0执行完毕10:46:30
三个线程包括主线程都需要等待直至所有线程到达barrier状态后才开始后续操作
CyclicBarrier cyclicBarrier= new CyclicBarrier(3, new Runnable() {@Overridepublic void run() {System.out.println("CyclicBarrier任务"+Thread.currentThread().getName()+"执行"+new SimpleDateFormat("HH:mm:ss").format(new Date()));}});new Thread(new Runnable() {@Overridepublic void run() {System.out.println("子线程"+Thread.currentThread().getName()+"开始执行"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}try {cyclicBarrier.await();} catch (BrokenBarrierException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("子线程"+Thread.currentThread().getName()+"开始执行"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}try {cyclicBarrier.await();} catch (BrokenBarrierException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));}}).start();try {cyclicBarrier.await();} catch (BrokenBarrierException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("主线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));//运行结果
子线程Thread-1开始执行10:55:30
子线程Thread-0开始执行10:55:30
CyclicBarrier任务Thread-1执行10:55:33
子线程Thread-1执行完毕10:55:33
子线程Thread-0执行完毕10:55:33
主线程main执行完毕10:55:33
当三个线程都达到barrier状态后,选择一个线程执行Runnable
static void testBarrier(CyclicBarrier cyclicBarrier){new Thread(new Runnable() {@Overridepublic void run() {System.out.println("子线程"+Thread.currentThread().getName()+"开始执行"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}try {cyclicBarrier.await();} catch (BrokenBarrierException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));}}).start();new Thread(new Runnable() {@Overridepublic void run() {System.out.println("子线程"+Thread.currentThread().getName()+"开始执行"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}try {cyclicBarrier.await();} catch (BrokenBarrierException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));}}).start();try {cyclicBarrier.await();} catch (BrokenBarrierException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("主线程"+Thread.currentThread().getName()+"执行完毕"+new SimpleDateFormat("HH:mm:ss").format(new Date()));}//测试代码public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(3);testBarrier(cyclicBarrier);testBarrier(cyclicBarrier);}//测试结果
子线程Thread-0开始执行11:01:13
子线程Thread-1开始执行11:01:13
子线程Thread-1执行完毕11:01:16
子线程Thread-0执行完毕11:01:16
主线程main执行完毕11:01:16
子线程Thread-2开始执行11:01:16
子线程Thread-3开始执行11:01:16
子线程Thread-3执行完毕11:01:19
主线程main执行完毕11:01:19
子线程Thread-2执行完毕11:01:19
- cyclicBarrier在三个线程都到达barrier状态后,有可以循环使用;但是CountDownLatch不能重复使用
semaphore
/*** A counting semaphore. Conceptually, a semaphore maintains a set of* permits. Each {@link #acquire} blocks if necessary until a permit is* available, and then takes it. Each {@link #release} adds a permit,* potentially releasing a blocking acquirer.* However, no actual permit objects are used; the {@code Semaphore} just* keeps a count of the number available and acts accordingly.** <p>Semaphores are often used to restrict the number of threads than can* access some (physical or logical) resource.
*/
官方介绍:一个计数信号灯,它维护着一组许可证,Semaphore可以控制同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可
public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
- permits表示许可数目,即同时有多少个线程数执行
- fair表示是否是公平的,如果是公平的,谁等待时间长谁先执行
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;}public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();return sync.nonfairTryAcquireShared(permits) >= 0;}public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public void release() {sync.releaseShared(1);}public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}
- acquire()获取许可证,acquire(permits)获取几个许可证, tryAcquire()和tryAcquire(timeout,unit)尝试获取许可证,获取不到或者获取规定时间内获取不到则返回false
- release释放许可证; 注意,释放许可证之前必须获取许可证
Semaphore示例
static class Worker implements Runnable{Semaphore semaphore;int num;public Worker(Semaphore semaphore, int num) {this.semaphore = semaphore;this.num = num;}@Overridepublic void run() {try {semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"的工人"+num+"占用机器生产"+new SimpleDateFormat("HH:mm:ss").format(new Date()));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程"+Thread.currentThread().getName()+"的工人"+num+"释放机器"+new SimpleDateFormat("HH:mm:ss").format(new Date()));semaphore.release();}}//测试代码
Semaphore semaphore = new Semaphore(3);for (int i=0;i<5;i++){new Thread(new Worker(semaphore,i)).start();}//运行结果
子线程Thread-1的工人1占用机器生产13:07:26
子线程Thread-2的工人2占用机器生产13:07:26
子线程Thread-0的工人0占用机器生产13:07:26
子线程Thread-1的工人1释放机器13:07:28
子线程Thread-2的工人2释放机器13:07:28
子线程Thread-0的工人0释放机器13:07:28
子线程Thread-3的工人3占用机器生产13:07:28
子线程Thread-4的工人4占用机器生产13:07:28
子线程Thread-4的工人4释放机器13:07:30
子线程Thread-3的工人3释放机器13:07:30