当前位置: 代码迷 >> 高性能计算 >> 探索并发编程(四):Java并发工具
  详细解决方案

探索并发编程(四):Java并发工具

热度:825   发布时间:2014-03-10 20:54:33.0

基于线程安全的一些原则来编程当然可以避免并发问题,但不是所有人都能写出高质量的线程安全的代码,并且如果代码里到处都是线程安全的控制也极大地影响了代码可读性和可维护性。因此,Java平台为了解决这个问题,提供了很多线程安全的类和并发工具,通过这些类和工具就能更简便地写线程安全的代码。归纳一下有以下几种:

同步容器类
并发容器类
生产者和消费者模式
阻塞和可中断方法
Synchronizer
这些类和方法的使用都可以从JDK DOC查到,但在具体使用中还是有很多问题需要注意

同步容器类

同步容器类就是一些经过同步处理了的容器类,比如List有Vector,Map有Hashtable,查看其源码发现其保证线程安全的方式就是把每个对外暴露的存取方法用synchronized关键字同步化,这样做我们立马会想到有以下问题:
1)性能有问题

同步化了所有存取方法,就表明所有对这个容器对象的操作将会串行,这样做来得倒是干净,但性能的代价也是很可观的

2)复合操作问题

同步容器类只是同步了单一操作,如果客户端是一组复合操作,它就没法同步了,依然需要客户端做额外同步,比如以下代码:

1public static Object getLast(Vector list) {
2    int lastIndex = list.size() - 1;
3    return list.get(lastIndex);
4}
5public static void deleteLast(Vector list) {
6    int lastIndex = list.size() - 1;
7    list.remove(lastIndex);
8}

getLast和deleteLast都是复合操作,由先前对原子性的分析可以判断,这依然存在线程安全问题,有可能会抛出ArrayIndexOutOfBoundsException的异常,错误产生的逻辑如下所示:

解决办法就是通过对这些复合操作加锁

3)迭代器并发问题

Java Collection进行迭代的标准时使用Iterator,无论是使用老的方式迭代循环,还是Java 5提供for-each新方式,都需要对迭代的整个过程加锁,不然就会有Concurrentmodificationexception异常抛出。

此外有些迭代也是隐含的,比如容器类的toString方法,或containsAll, removeAll, retainAll等方法都会隐含地对容器进行迭代

并发容器类

正是由于同步容器类有以上问题,导致这些类成了鸡肋,于是Java 5推出了并发容器类,Map对应的有ConcurrentHashMap,List对应的有CopyOnWriteArrayList。与同步容器类相比,它有以下特性:

更加细化的锁机制。同步容器直接把容器对象做为锁,这样就把所有操作串行化,其实这是没必要的,过于悲观,而并发容器采用更细粒度的锁机制,保证一些不会发生并发问题的操作进行并行执行
附加了一些原子性的复合操作。比如putIfAbsent方法
迭代器的弱一致性。它在迭代过程中不再抛出Concurrentmodificationexception异常,而是弱一致性。在并发高的情况下,有可能size和isEmpty方法不准确,但真正在并发环境下这些方法也没什么作用。
CopyOnWriteArrayList采用写入时复制的方式避开并发问题。这其实是通过冗余和不可变性来解决并发问题,在性能上会有比较大的代价,但如果写入的操作远远小于迭代和读操作,那么性能就差别不大了
生产者和消费者模式

大学时学习操作系统多会为生产者和消费者模式而头痛,也是每次考试肯定会涉及到的,而Java知道大家很憷这个模式的并发复杂性,于是乎提供了阻塞队列(BlockingQueue)来满足这个模式的需求。阻塞队列说起来很简单,就是当队满的时候写线程会等待,直到队列不满的时候;当队空的时候读线程会等待,直到队不空的时候。实现这种模式的方法很多,其区别也就在于谁的消耗更低和等待的策略更优。以LinkedBlockingQueue的具体实现为例,它的put源码如下:

01public void put(E e) throws InterruptedException {
02    if (e == null) throw new NullPointerException();
03    int c = -1;
04    final ReentrantLock putLock = this.putLock;
05    final AtomicInteger count = this.count;
06    putLock.lockInterruptibly();
07    try {
08        try {
09            while (count.get() == capacity)
10                notFull.await();
11        } catch (InterruptedException ie) {
12            notFull.signal(); // propagate to a non-interrupted thread
13            throw ie;
14        }
15        insert(e);
16        c = count.getAndIncrement();
17        if (c + 1 < capacity)
18            notFull.signal();
19    } finally {
20        putLock.unlock();
21    }
22    if (c == 0)
23        signalNotEmpty();
24}

撇开其锁的具体实现,其流程就是我们在操作系统课上学习到的标准生产者模式,看来那些枯燥的理论还是有用武之地的。其中,最核心的还是Java的锁实现,有兴趣的朋友可以再进一步深究一下

阻塞和可中断方法

由LinkedBlockingQueue的put方法可知,它是通过线程的阻塞和中断阻塞来实现等待的。当调用一个会抛出InterruptedException的方法时,就成为了一个阻塞的方法,要为响应中断做好准备。处理中断可有以下方法:

传递InterruptedException。把捕获的InterruptedException再往上抛,使其调用者感知到,当然在抛之前需要完成你自己应该做的清理工作,LinkedBlockingQueue的put方法就是采取这种方式
中断其线程。在不能抛出异常的情况下,可以直接调用Thread.interrupt()将其中断。
Synchronizer

Synchronizer不是一个类,而是一种满足一个种规则的类的统称。它有以下特性:

它是一个对象
封装状态,而这些状态决定着线程执行到某一点是通过还是被迫等待
提供操作状态的方法
其实BlockingQueue就是一种Synchronizer。Java还提供了其他几种Synchronizer

1)CountDownLatch

CountDownLatch是一种闭锁,它通过内部一个计数器count来标示状态,当count>0时,所有调用其await方法的线程都需等待,当通过其countDown方法将count降为0时所有等待的线程将会被唤起。使用实例如下所示:

01public class TestHarness {
02    public long timeTasks(int nThreads, final Runnable task)
03            throws InterruptedException {
04        final CountDownLatch startGate = new CountDownLatch(1);
05        final CountDownLatch endGate = new CountDownLatch(nThreads);
06        for (int i = 0; i < nThreads; i++) {
07            Thread t = new Thread() {
08                public void run() {
09                    try {
10                        startGate.await();
11                        try {
12                            task.run();
13                        } finally {
14                            endGate.countDown();
15                        }
16                    } catch (InterruptedException ignored) { }
17                }
18            };
19            t.start();
20        }
21        long start = System.nanoTime();
22        startGate.countDown();
23        endGate.await();
24        long end = System.nanoTime();
25        return end-start;
26    }
27}

2)Semaphore

Semaphore类实际上就是操作系统中谈到的信号量的一种实现,其原理就不再累述,可见探索并发编程——操作系统篇

具体使用就是通过其acquire和release方法来完成,如以下示例:

01public class BoundedHashSet<T> {
02    private final Set<T> set;
03    private final Semaphore sem;
04    public BoundedHashSet(int bound) {
05        this.set = Collections.synchronizedSet(new HashSet<T>());
06        sem = new Semaphore(bound);
07    }
08    public boolean add(T o) throws InterruptedException {
09        sem.acquire();
10        boolean wasAdded = false;
11        try {
12            wasAdded = set.add(o);
13            return wasAdded;
14        }
15        finally {
16            if (!wasAdded)
17                sem.release();
18        }
19    }
20    public boolean remove(Object o) {
21        boolean wasRemoved = set.remove(o);
22        if (wasRemoved)
23            sem.release();
24        return wasRemoved;
25    }
26}

3)关卡

关卡和闭锁类似,也是阻塞一组线程,直到某件事情发生,而不同在于关卡是等到符合某种条件的所有线程都达到关卡点。具体使用上可以用CyclicBarrier来应用关卡

以上是Java提供的一些并发工具,既然是工具就有它所适用的场景,因此需要知道它的特性,这样才能在具体场景下选择最合适的工具。

  相关解决方案