目录
CyclicBarrier简介
案例说明
一、主程序
二、矩阵类
三、结果类
四、搜索的工作线程(主要功能)
五、汇总线程
执行结果
其他说明
CyclicBarrier简介
与CountDownLatch类似,CyclicBarrier类构造器需要提供一个整型N参数,这个参数表示在指定点同步的线程个数。每个线程可以调用CyclicBarrier对象的await()方法进入休眠,当N个线程都调用了await()方法后,表示大家都到达了指定地点,此时CyclicBarrier会唤醒所有调用了await()方法而休眠的线程继续执行下去。(CyclicBarrier类还提供了一个重载的构造方法,它可以额外接收一个Runnable类,当所有所有线程抵达同步点时,Runnable会作为一个独立线程对象进行执行,这种功能可以作为分治法的解决方案)
与CountDownLatch不同的是,设计上有其特殊性(提供了reset()方法,和BrokenBarrierException异常。)
案例说明
下面程序设计实现了,利用CyclicBarrier实现N个线程对现有矩阵进行分别搜索,然后通过独立线程进行汇总搜索结果的功能。
一、主程序
本DEMO实现了,由5个线程去搜索10000*1000的矩阵中,值等于6的个数,5个线程分别把结果存入result中。最后由grouper线程对result进行汇总。
package xyz.jangle.thread.test.n3_4.cyclicbarrier;import java.util.concurrent.CyclicBarrier;/*** 3.4、CyclicBarrier DEMO 在指定状态点同步任务* * 本DEMO实现了,由5个线程去搜索10000*1000的矩阵中,值等于6的个数,5个线程分别把结果存入result中。* 最后由grouper线程进行汇总。* * @author jangle* @email jangle@jangle.xyz* @time 2020年8月10日 下午12:27:16* */
public class M {public static void main(String[] args) {final int rows = 10000;final int length = 1000;// 待查询的号码final int searchNumber = 6;// 线程数量final int participants = 5;final int linesParticipant = 2000;// 矩阵MatrixMock mock = new MatrixMock(rows, length, searchNumber);// 结果Results results = new Results(rows);// 总结线程,在所有线程执行完毕后进行启动Grouper grouper = new Grouper(results);CyclicBarrier barrier = new CyclicBarrier(participants, grouper);Searcher[] searcher = new Searcher[participants];// 创建N个搜索线程,分别进行搜索for (int i = 0; i < searcher.length; i++) {searcher[i]= new Searcher(i*linesParticipant, (i+1)*linesParticipant, mock, results, searchNumber, barrier);Thread thread = new Thread(searcher[i]);thread.start();}System.out.println("主程序执行完毕");}}
二、矩阵类
构造一个被搜索的矩阵
package xyz.jangle.thread.test.n3_4.cyclicbarrier;import java.util.Random;/*** 矩阵类,它将生成0~9的随机数字矩阵,用于被搜索线程检索* * @author jangle* @email jangle@jangle.xyz* @time 2020年8月10日 下午12:29:48* */
public class MatrixMock {private final int data[][];public MatrixMock(int size, int length, int number) {int counter = 0;data = new int[size][length];Random random = new Random();for (int i = 0; i < size; i++) {for (int j = 0; j < length; j++) {data[i][j] = random.nextInt(10);if (data[i][j] == number) {counter++;}}}System.out.println("数字" + number + " ,共计" + counter + "个");}/*** 获取指定行* * @param row* @return*/public int[] getRow(int row) {if (row < 0 || row > data.length) {return null;}return data[row];}}
三、结果类
用于存放分治搜索的结果
package xyz.jangle.thread.test.n3_4.cyclicbarrier;
/*** 用于存放每行找到对应number的数量结果(每个搜索线程检索完毕后,将结果写入该对象)* (辅助类)* @author jangle* @email jangle@jangle.xyz* @time 2020年8月10日 下午12:44:37* */
public class Results {private final int data[];public Results(int size) {super();this.data = new int[size];}/*** 设置对应行存在对应number的数量* * @param position 行号* @param value 数量*/public void setData(int position,int value) {data[position] = value;}/*** 返回结果* * @return*/public int[] getData() {return data;}}
四、搜索的工作线程(主要功能)
负责对矩阵进行搜索,然后把搜索结果存入result
package xyz.jangle.thread.test.n3_4.cyclicbarrier;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;/*** 搜索线程(用于从mock矩阵中搜索指定的行区间,将匹配的搜索结果写入results中,然后调用barrier的await方法。)* @author jangle* @email jangle@jangle.xyz* @time 2020年8月10日 下午1:01:41* */
public class Searcher implements Runnable {private final int firstRow;private final int lastRow;private final MatrixMock mock;private final Results results;private final int number;private final CyclicBarrier barrier;public Searcher(int firstRow, int lastRow, MatrixMock mock, Results results, int number, CyclicBarrier barrier) {super();this.firstRow = firstRow;this.lastRow = lastRow;this.mock = mock;this.results = results;this.number = number;this.barrier = barrier;}@Overridepublic void run() {int counter;System.out.println(Thread.currentThread().getName() + "开始搜索第" + firstRow + "行到第" + lastRow + "行");for (int i = firstRow; i < lastRow; i++) {int[] row = mock.getRow(i);counter = 0;for (int j = 0; j < row.length; j++) {if (row[j] == number) {counter++;}}results.setData(i, counter);}System.out.println(Thread.currentThread().getName()+"搜索完毕");try {barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}}
五、汇总线程
对result结果进行汇总,并生成报告。
package xyz.jangle.thread.test.n3_4.cyclicbarrier;/*** 用于汇总的线程(在N个搜索线程搜索完毕之后执行)* * @author jangle* @email jangle@jangle.xyz* @time 2020年8月10日 下午12:53:57* */
public class Grouper implements Runnable {private final Results results;public Grouper(Results results) {super();this.results = results;}@Overridepublic void run() {int finalResult = 0;System.out.println("开始汇总结果");int[] data = results.getData();for (int i : data) {finalResult += i;}System.out.println("汇总结果,总共搜索出"+finalResult+"个");}}
执行结果
数字6 ,共计1000892个
Thread-1开始搜索第2000行到第4000行
主程序执行完毕
Thread-0开始搜索第0行到第2000行
Thread-2开始搜索第4000行到第6000行
Thread-4开始搜索第8000行到第10000行
Thread-3开始搜索第6000行到第8000行
Thread-2搜索完毕
Thread-1搜索完毕
Thread-3搜索完毕
Thread-4搜索完毕
Thread-0搜索完毕
开始汇总结果
汇总结果,总共搜索出1000892个
其他说明
getNumberWaiting()方法获取因调用await()方法而休眠的线程数量
getParties()方法获取CyclicBarrier对象同步发任务数
reset()重置(所有调用了await()方法进入休眠的线程,会因为这个方法的执行而抛出BrokenBarrierException)
isBroken()是否被破坏(若N个线程调用了await()方法进入休眠,其中一个线程因为被中断了,那么该线程会抛出InterruptedException异常,而其他线程会收到BrokenBarrierException异常,此时CyclicBarrier对象进入被破坏的状态,isBroken()返回true。)