当前位置: 代码迷 >> 综合 >> Java 并发 (12) -- Exchanger 类
  详细解决方案

Java 并发 (12) -- Exchanger 类

热度:4   发布时间:2023-12-16 13:14:10.0

文章目录

  • 1. 简介
  • 2. 精讲
    • 1. 概念
    • 2. 应用场景
    • 3. 示例
    • 4. 实现
      • 1. exchange(V x)

1. 简介

  1. Exchanger 允许在并发任务之间交换数据。具体来说,Exchanger 类允许在两个线程之间定义同步点。当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,第二个线程的数据结构进入到第一个线程中;或者这么说,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange() 方法交换数据,如果第一个线程先执行 exchange() 方法,它会一直等待第二个线程也执行 exchange() 方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方

  2. exchange(V x):等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象

  3. Exchanger 算法的核心是一个可交换数据的 slot,以及一个可以带有数据 item 的参与者。

    另外,Exchanger 有单槽位和多槽位之分,单个槽位在同一时刻只能用于两个线程交换数据,这样在竞争比较激烈的时候,会影响到性能,多个槽位就是多个线程可以同时进行两个的数据交换,彼此之间不受影响,这样可以很好的提高吞吐量

  4. Exchange 可以用于移传算法。移传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出 2 个交配结果;它也可以用于校对工作。比如,我们需要将纸质银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用 AB 岗两人进行录入,录入到 Excel 之后,系统需要加载这两个 Excel,并对两个 Excel 数据进行校验,看是否录入一致

2. 精讲

1. 概念

Exchanger(交换者)是一个用于线程间协作的工具类,用于进行线程间的数据交换

在 API 是这么介绍的:是一个可以在队中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange() 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。

Exchanger 允许在并发任务之间交换数据。具体来说,Exchanger 类允许在两个线程之间定义同步点。当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,第二个线程的数据结构进入到第一个线程中;或者这么说,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange() 方法交换数据,如果第一个线程先执行 exchange() 方法,它会一直等待第二个线程也执行 exchange() 方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方

2. 应用场景

  1. Exchange 可以用于移传算法。移传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出 2 个交配结果;

  2. Exchange 可以用于校对工作。比如,我们需要将纸质银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用 AB 岗两人进行录入,录入到 Excel 之后,系统需要加载这两个 Excel,并对两个 Excel 数据进行校验,看是否录入一致

3. 示例

Exchange 实现较为复杂,我们先看其怎么使用,然后再来分析其源码。现在我们用 Exchange 来模拟生产-消费者问题:

public class ExchangerTest {
    static class Producer implements Runnable{
    //生产者、消费者交换的数据结构private List<String> buffer;//步生产者和消费者的交换对象private Exchanger<List<String>> exchanger;Producer(List<String> buffer,Exchanger<List<String>> exchanger){
    this.buffer = buffer;this.exchanger = exchanger;}@Overridepublic void run() {
    for(int i = 1 ; i < 5 ; i++){
    System.out.println("生产者第" + i + "次提供");for(int j = 1 ; j <= 3 ; j++){
    System.out.println("生产者装入" + i  + "--" + j);buffer.add("buffer:" + i + "--" + j);}System.out.println("生产者装满,等待与消费者交换...");try {
    exchanger.exchange(buffer);} catch (InterruptedException e) {
    e.printStackTrace();}}}}static class Consumer implements Runnable {
    private List<String> buffer;private final Exchanger<List<String>> exchanger;public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
    this.buffer = buffer;this.exchanger = exchanger;}@Overridepublic void run() {
    for (int i = 1; i < 5; i++) {
    //调用exchange()与消费者进行数据交换try {
    buffer = exchanger.exchange(buffer);} catch (InterruptedException e) {
    e.printStackTrace();}System.out.println("消费者第" + i + "次提取");for (int j = 1; j <= 3 ; j++) {
    System.out.println("消费者 : " + buffer.get(0));buffer.remove(0);}}}}public static void main(String[] args){
    List<String> buffer1 = new ArrayList<String>();List<String> buffer2 = new ArrayList<String>();Exchanger<List<String>> exchanger = new Exchanger<List<String>>();Thread producerThread = new Thread(new Producer(buffer1,exchanger));Thread consumerThread = new Thread(new Consumer(buffer2,exchanger));producerThread.start();consumerThread.start();}
}

在这里插入图片描述
进行同步来获取数据,而生产者则通过 for 循环向缓存队列存储数据并使用 exchanger 对象与消费者同步。等到消费者从 exchanger 那里得到数据后,他的缓冲列表中有 3 个数据,而生产者得到的则是一个空的列表。上面的例子充分展示了消费者-生产者是如何利用 Exchanger 来完成数据交换的。 在 Exchanger 中,如果一个线程已经到达了 exchanger 节点时,对于它的伙伴节点的情况有三种:

  1. 如果它的伙伴节点在该线程到达之前已经调用了 exchanger() 方法,则它会唤醒它的伙伴然后进行数据交换,得到各自数据返回。
  2. 如果它的伙伴节点还没有到达交换点,则该线程将会被挂起,等待它的伙伴节点到达被唤醒,完成数据交换。
  3. 如果当前线程被中断了则抛出异常,或者等待超时了,则抛出超时异常。

4. 实现

Exchanger 算法的核心是通过一个可交换数据的 slot,以及一个可以带有数据 item 的参与者。源码中的描述如下:

for (;;) {
    if (slot is empty) {
                           // offerplace item in a Node;if (can CAS slot from empty to node) {
    wait for release;return matching item in node;}}else if (can CAS slot from node to empty) {
     // releaseget the item in node;set matching item in node;release waiting thread;}// else retry on CAS failure
}

Exchanger 中定义了如下几个重要的成员变量:

private final Participant participant;
private volatile Node[] arena;
private volatile Node slot;

participant 的作用是为每个线程保留唯一的一个 Node 节点。 slot 为单个槽,arena 为数组槽。他们都是 Node 类型。

在这里可能会感觉到疑惑,slot 作为 Exchanger 交换数据的场景,应该只需要一个就可以了啊 ?为何还多了一个 Participant 和数组类型的 arena 呢 ?一个 slot 交换场所原则上来说应该是可以的,但实际情况却不是如此,多个参与者使用同一个交换场所时,会存在严重伸缩性问题。既然单个交换场所存在问题,那么我们就安排多个,也就是数组 arena。通过数组 arena 来安排不同的线程使用不同的 slot 来降低竞争问题,并且可以保证最终一定会成对交换数据。但是 Exchanger 不是一来就会生成 arena 数组来降低竞争,只有当产生竞争时才会生成 arena 数组。那么怎么将 Node 与当前线程绑定呢 ?Participant ,Participant 的作用就是为每个线程保留唯一的一个 Node 节点,它继承了 ThreadLocal,同时在 Node 节点中记录在 arena 中的下标 index。 Node 定义如下:

@sun.misc.Contended 
static final class Node {
    int index;              // Arena indexint bound;              // Last recorded value of Exchanger.boundint collides;           // Number of CAS failures at current boundint hash;               // Pseudo-random for spinsObject item;            // This thread's current itemvolatile Object match;  // Item provided by releasing threadvolatile Thread parked; // Set to this thread when parked, else null
}
  • index:arena 的下标;
  • bound:上一次记录的 Exchanger.bound;
  • collides:在当前 bound 下 CAS 失败的次数;
  • hash:伪随机数,用于自旋;
  • item:这个线程的当前项,也就是需要交换的数据;
  • match:做 releasing 操作的线程传递的项;
  • parked:挂起时设置线程值,其他情况下为 null;

在 Node 定义中有两个变量值得思考:bound 以及 collides。前面提到了数组 arena 是为了避免竞争而产生的,如果系统不存在竞争问题,那么完全没有必要开辟一个高效的 arena 来徒增系统的复杂性。首先通过单个 slot 的 exchanger 来交换数据,当探测到竞争时将安排不同的位置的 slot 来保存线程 Node,并且可以确保没有 slot 会在同一个缓存行上。如何来判断会有竞争呢 ?CAS 替换 slot 失败,如果失败,则通过记录冲突次数来扩展 arena 的尺寸,我们在记录冲突的过程中会跟踪 bound 的值,以及在 bound 的值被改变时会重新计算冲突次数。这里阐述可能有点儿模糊,不着急,我们先有这个概念,后面在 arenaExchange 中再次做详细阐述。 我们直接看 exchange() 方法:

1. exchange(V x)

等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。方法定义如下:

public V exchange(V x) throws InterruptedException {
    Object v;Object item = (x == null) ? NULL_ITEM : x; // translate null argsif ((arena != null ||(v = slotExchange(item, false, 0L)) == null) &&((Thread.interrupted() || // disambiguates null return(v = arenaExchange(item, false, 0L)) == null)))throw new InterruptedException();return (v == NULL_ITEM) ? null : (V)v;
}

这个方法比较好理解:arena 为数组槽,如果为 null,则执行 slotExchange() 方法,不为 null 则判断线程是否中断,如果中断值抛出 InterruptedException 异常,没有中断则执行 arenaExchange() 方法。

整套逻辑就是:如果 slotExchange(Object item, boolean timed, long ns) 方法执行失败了就执行arenaExchange(Object item, boolean timed, long ns) 方法,最后返回结果 v。 NULL_ITEM 为一个空节点,其实就是一个 Object 对象而已,slotExchange() 为单个 slot 交换

slotExchange(Object item, boolean timed, long ns):

private final Object slotExchange(Object item, boolean timed, long ns) {
    // 获取当前线程的节点 pNode p = participant.get();// 当前线程Thread t = Thread.currentThread();// 线程中断,直接返回if (t.isInterrupted())return null;// 自旋for (Node q;;) {
    //slot != nullif ((q = slot) != null) {
    //尝试CAS替换if (U.compareAndSwapObject(this, SLOT, q, null)) {
    Object v = q.item;      // 当前线程的项,也就是交换的数据q.match = item;         // 做releasing操作的线程传递的项Thread w = q.parked;    // 挂起时设置线程值// 挂起线程不为null,线程挂起if (w != null)U.unpark(w);return v;}//如果失败了,则创建arena//bound 则是上次Exchanger.boundif (NCPU > 1 && bound == 0 &&U.compareAndSwapInt(this, BOUND, 0, SEQ))arena = new Node[(FULL + 2) << ASHIFT];}//如果arena != null,直接返回,进入arenaExchange逻辑处理else if (arena != null)return null;else {
    p.item = item;if (U.compareAndSwapObject(this, SLOT, null, p))break;p.item = null;}}/** 等待 release* 进入spin+block模式*/int h = p.hash;long end = timed ? System.nanoTime() + ns : 0L;int spins = (NCPU > 1) ? SPINS : 1;Object v;while ((v = p.match) == null) {
    if (spins > 0) {
    h ^= h << 1; h ^= h >>> 3; h ^= h << 10;if (h == 0)h = SPINS | (int)t.getId();else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)Thread.yield();}else if (slot != p)spins = SPINS;else if (!t.isInterrupted() && arena == null &&(!timed || (ns = end - System.nanoTime()) > 0L)) {
    U.putObject(t, BLOCKER, this);p.parked = t;if (slot == p)U.park(false, ns);p.parked = null;U.putObject(t, BLOCKER, null);}else if (U.compareAndSwapObject(this, SLOT, p, null)) {
    v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;break;}}U.putOrderedObject(p, MATCH, null);p.item = null;p.hash = h;return v;
}
  • 程序首先通过 participant 获取当前线程节点 Node。检测是否中断,如果中断 return null,等待后续抛出 InterruptedException 异常。
  • 如果 slot 不为 null,则进行 slot 消除,成功直接返回数据 V,否则失败,则创建 arena 消除数组。
  • 如果 slot 为 null,但 arena 不为 null,则返回 null,进入 arenaExchange 逻辑。
  • 如果 slot 为 null,且 arena 也为 null,则尝试占领该 slot,失败重试,成功则跳出循环进入 spin+block(自旋+阻塞)模式。

在自旋+阻塞模式中,首先取得结束时间和自旋次数。如果 match (做 releasing 操作的线程传递的项) 为 null,其首先尝试 spins+随机次自旋(改自旋使用当前节点中的 hash,并改变之)和退让。当自旋数为 0 后,假如 slot 发生了改变(slot != p)则重置自旋数并重试。否则假如:当前未中断&arena 为 null&(当前不是限时版本或者限时版本+当前时间未结束):阻塞或者限时阻塞。假如:当前中断或者arena不为null或者当前为限时版本+时间已经结束:不限时版本:置v为null;限时版本:如果时间结束以及未中断则TIMED_OUT;否则给出null(原因是探测到 arena 非空或者当前线程中断)。 match 不为空时跳出循环。 整个 slotExchange 清晰明了。

arenaExchange(Object item, boolean timed, long ns):

private final Object arenaExchange(Object item, boolean timed, long ns) {
    Node[] a = arena;Node p = participant.get();for (int i = p.index;;) {
                          // access slot at iint b, m, c; long j;                       // j is raw array offsetNode q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);if (q != null && U.compareAndSwapObject(a, j, q, null)) {
    Object v = q.item;                     // releaseq.match = item;Thread w = q.parked;if (w != null)U.unpark(w);return v;}else if (i <= (m = (b = bound) & MMASK) && q == null) {
    p.item = item;                         // offerif (U.compareAndSwapObject(a, j, null, p)) {
    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;Thread t = Thread.currentThread(); // waitfor (int h = p.hash, spins = SPINS;;) {
    Object v = p.match;if (v != null) {
    U.putOrderedObject(p, MATCH, null);p.item = null;             // clear for next usep.hash = h;return v;}else if (spins > 0) {
    h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshiftif (h == 0)                // initialize hashh = SPINS | (int)t.getId();else if (h < 0 &&          // approx 50% true(--spins & ((SPINS >>> 1) - 1)) == 0)Thread.yield();        // two yields per wait}else if (U.getObjectVolatile(a, j) != p)spins = SPINS;       // releaser hasn't set match yetelse if (!t.isInterrupted() && m == 0 &&(!timed ||(ns = end - System.nanoTime()) > 0L)) {
    U.putObject(t, BLOCKER, this); // emulate LockSupportp.parked = t;              // minimize windowif (U.getObjectVolatile(a, j) == p)U.park(false, ns);p.parked = null;U.putObject(t, BLOCKER, null);}else if (U.getObjectVolatile(a, j) == p &&U.compareAndSwapObject(a, j, p, null)) {
    if (m != 0)                // try to shrinkU.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);p.item = null;p.hash = h;i = p.index >>>= 1;        // descendif (Thread.interrupted())return null;if (timed && m == 0 && ns <= 0L)return TIMED_OUT;break;                     // expired; restart}}}elsep.item = null;                     // clear offer}else {
    if (p.bound != b) {
                        // stale; resetp.bound = b;p.collides = 0;i = (i != m || m == 0) ? m : m - 1;}else if ((c = p.collides) < m || m == FULL ||!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
    p.collides = c + 1;i = (i == 0) ? m : i - 1;          // cyclically traverse}elsei = m + 1;                         // growp.index = i;}}
}

首先通过 participant 取得当前节点 Node,然后根据当前节点 Node 的 index 去取 arena 中相对应的节点 node。前面提到过 arena 可以确保不同的 slot 在 arena 中是不会相冲突的,那么是怎么保证的呢?我们先看 arena 的创建:

arena = new Node[(FULL + 2) << ASHIFT];

这个 arena 到底有多大呢?我们先看 FULL 和 ASHIFT 的定义:

static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
private static final int ASHIFT = 7;private static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final int MMASK = 0xff;      // 255

假如我的机器 NCPU = 8 ,则得到的是 768 大小的 arena 数组。然后通过以下代码取得在 arena 中的节点:

 Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);

他仍然是通过右移 ASHIFT 位来取得 Node 的,ABASE 定义如下:

Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);

U.arrayBaseOffset() 获取对象头长度,数组元素的大小可以通过 unsafe.arrayIndexScale(T[].class) 方法获取到。这也就是说要访问类型为 T 的第 N 个元素的话,你的偏移量 offset 应该是 arrayOffset+N*arrayScale 。也就是说 BASE = arrayOffset+ 128 。其次我们再看 Node 节点的定义

@sun.misc.Contended 
static final class Node{
    ....
}

在 Java 8 中我们是可以利用 sun.misc.Contended 来规避伪共享的。所以说通过 << ASHIFT 方式加上 sun.misc.Contended,可以使得任意两个可用 Node 不会再同一个缓存行中。 关于伪共享请参考如下博文: 伪共享(False Sharing)Java8中用sun.misc.Contended避免伪共享(false sharing)

我们再次回到 arenaExchange()。取得 arena 中的 node 节点后,如果定位的节点 q 不为空,且 CAS 操作成功,则交换数据,返回交换的数据,唤醒等待的线程。 如果 q 等于 null 且下标在 bound & MMASK 范围之内,则尝试占领该位置,如果成功,则采用自旋 + 阻塞的方式进行等待交换数据。 如果下标不在 bound & MMASK 范围之内获取由于 q 不为 null 但是竞争失败的时候:消除 p。加入 bound 不等于当前节点的 bond(b != p.bound),则更新 p.bound = b,collides = 0 ,i = m 或者 m - 1。如果冲突的次数不到 m 、获取 m 已经为最大值或者修改当前 bound 的值失败,则通过增加一次 collides 以及循环递减下标i的值;否则更新当前 bound 的值成功:我们令 i 为 m+1 即为此时最大的下标。最后更新当前 index 的值。

Exchanger 使用、原理都比较好理解,但是这个源码看起来真心有点儿复杂,是真心难看懂,但是这种交换的思路 Doug Lea 在后续博文中还会提到,例如 SynchronousQueue、LinkedTransferQueue。 最后用一个在网上看到的段子结束此篇博客(http://brokendreams.iteye.com/blog/2253956),博主对其做了一点点修改,以便更加符合在 1.8 环境下的 Exchanger: 其实就是"我"和"你"(可能有多个"我",多个"你")在一个叫 Slot 的地方做交易(一手交钱,一手交货),过程分以下步骤:

  1. 我先到一个叫做 Slot 的交易场所交易,发现你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第 2 步;如果别人抢先一步把你喊走了,那我就进入第 5 步。
  2. 我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM 的不卖了,走了,那我只能再找别人买货了(从头开始)。
  3. 我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上…),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第 4 步;如果被别人抢座了,那我只能在找别的地方儿了,进入第 5 步。
  4. 你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点 Slot),太 TM 浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。
  5. 我跑去喊管理员,尼玛,就一个坑交易个毛啊,然后管理在一个更加开阔的地方开辟了好多个单间,然后我就挨个来看每个单间是否有人。如果有人我就问他是否可以交易,如果回应了我,那我就进入第2步。如果我没有人,那我就占着这个单间等其他人来交易,进入第4步。
  6. 如果我尝试了几次都没有成功,我就会认为,是不是我 TM 选的这个单间风水不好 ?不行,得换个地儿继续(从头开始);如果我尝试了多次发现还没有成功,怒了,把管理员喊来:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!
  相关解决方案