我使用线程管理器并发若干子线程, 个数由队列的大小决定,我想实现只要其中一个子线程计算失败,我就停止所有的运行或不运行的子线程,现在的问题是, 我遍历子线程列表, f.cancel(true); 这一段代码没有成功,后台还是继续输出了 System.out 的输出信息, 不知道是不是哪写错了,求帮忙。。。
public Map<String, Object> dataExtract() throws InterruptedException{
// 装载数据表池
DataTabPool.getInstance().loadTabPool();
final ArrayBlockingQueue<String> ABQ = DataTabPool.getInstance().getABQ(); //获取数据表池队列,加载到并行线程中
final Map<String, Object> returnMap = new HashMap<String, Object>();
// 用于存放每层抽取线程任务列表,便于控制所有的线程任务
final List<Future<Map<String, Object>>> list = new ArrayList<Future<Map<String, Object>>>();
// 控制线程并行运行
final CountDownLatch begin = new CountDownLatch(ABQ.size());
// 线程一并等待,同时开始运行。任何一个验证出现错误的时候,能在任务列表中找到任务并停止
final CyclicBarrier barrierOne = new CyclicBarrier(ABQ.size());
// 根据列队大小定义并发线程数
final ExecutorService exec = Executors.newFixedThreadPool(ABQ.size());
// 并行子线程
for (int index = 0; index < ABQ.size(); index++) {
final int NO = index + 1;
if(!DataExtractMain.this.mark){
break;
}
list.add(exec.submit(new Callable<Map<String, Object>>() {
public Map<String, Object> call() throws Exception {
try {
// 如果当前计数为零,则此方法立即返回。
barrierOne.await();
// 动态从池中取出表名
String tableName = DataTabPool.getInstance().takeTab();
System.out.println("No." + NO + " :" + tableName);
// 开始抽数业务代码
Map<String,Object[]> resultMap = new DataExtractWork().extractDataAndSave(tableName) ;
// 当抽取失败的时候,取消任务列表中所有任务,同时修改标记位,停止整个任务管理器,之后的多有操作也不用执行
if (resultMap == null) {
DataExtractMain.this.mark = false;
// 循环遍历停掉管理器中的子线程
for (Future<Map<String, Object>> f : list) {
f.cancel(true);
}
}
} finally {
begin.countDown();
}
return null;
}
}));
}
System.out.println("Extract Start");
begin.await();
System.out.println("Extract Over");
exec.shutdown();
// 判断标志,来决定是否继续后续操作
if (exec.isTerminated()) {
System.out.println("errorList size : " + errorList.size());
return null;
}else{
return returnMap;
}
}
------解决思路----------------------
你要 去了解线程是怎么终止的。不是你的代码有问题,是你对线程终止的理解有问题。
有几种情况可以让线程立马停止:某些阻塞的时候下被调用cancel,线程中主动检查cancel标识。
怎么理解上面那句话,就是说如果不存在阻塞的情况下,系统认为你的线程很快就要结束了,其不会也不能显式的把这个线程干掉。当阻塞的时候,可以显式的干掉,系统接收到信号以后会强制从阻塞状态中恢复过来。
所以你要做的是,在你线程的执行过程中显式的检查cancel标识,否则线程是不会在没有阻塞标识的情况下自己停止的
------解决思路----------------------
可以用监控者,执行者的模式。