当前位置: 代码迷 >> 综合 >> Java 集合 (20) -- 阻塞队列之 SynchronousQueue 类
  详细解决方案

Java 集合 (20) -- 阻塞队列之 SynchronousQueue 类

热度:75   发布时间:2023-12-16 13:18:07.0

文章目录

    • 1. 简介
    • 2. 继承体系
    • 3. 字段
    • 4. 内部类
    • 5. 构造器
    • 6. 常用方法

1. 简介

public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {
    }

SynchronousQueue 是 JUC 包下的无缓冲阻塞队列,它用来在两个线程之间直接移交元素,它有两种实现方式,一种是公平(队列)方式,一种是非公平(栈)方式。SynchronousQueue 的内部实现了两个类,一个是 TransferStack 类,使用 LIFO 顺序存储元素,这个类用于非公平模式;还有一个类是TransferQueue,使用 FIFO 顺序存储元素,这个类用于公平模式。

但是它有个很大的问题:如果有多个生产者,但只有一个消费者,如果消费者处理不过来,那么生产者就都会阻塞起来,反过来也是一样,所以,SynchronousQueue 一般用于生产、消费的速度大致相当的情况,这样才不会导致系统中过多的线程处于阻塞状态。

另外,其实 SynchronousQueue 内部是使用栈或者使用队列来存储包含线程和元素值的节点的,如果同一个模式的节点过多的话,它们都会存储进来,且都会阻塞着,所以,严格上来说,SynchronousQueue 并不能算是一个无缓冲队列。

使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。

2. 继承体系

在这里插入图片描述

3. 字段

// CPU的数量
static final int NCPUS = Runtime.getRuntime().availableProcessors();// 有超时的情况自旋多少次,当CPU数量小于2的时候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;// 没有超时的情况自旋多少次
static final int maxUntimedSpins = maxTimedSpins * 16;// 针对有超时的情况,自旋了多少次后,如果剩余时间大于1000纳秒就使用带时间的LockSupport.parkNanos()这个方法
static final long spinForTimeoutThreshold = 1000L;// 传输器,即两个线程交换元素使用的东西
private transient volatile Transferer<E> transferer;

4. 内部类

// Transferer抽象类,主要定义了一个transfer方法用来传输元素
abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}
// 以栈方式实现的Transferer
static final class TransferStack<E> extends Transferer<E> {
    // 栈中节点的几种类型:// 1. 消费者(请求数据的)static final int REQUEST    = 0;// 2. 生产者(提供数据的)static final int DATA       = 1;// 3. 二者正在撮合中static final int FULFILLING = 2;// 栈中的节点static final class SNode {
    // 下一个节点volatile SNode next;        // next node in stack// 匹配者volatile SNode match;       // the node matched to this// 等待着的线程volatile Thread waiter;     // to control park/unpark// 元素Object item;                // data; or null for REQUESTs// 模式,也就是节点的类型,是消费者,是生产者,还是正在撮合中int mode;}// 栈的头节点volatile SNode head;
}
// 以队列方式实现的Transferer
static final class TransferQueue<E> extends Transferer<E> {
    // 队列中的节点static final class QNode {
    // 下一个节点volatile QNode next;          // next node in queue// 存储的元素volatile Object item;         // CAS'ed to or from null// 等待着的线程volatile Thread waiter;       // to control park/unpark// 是否是数据节点final boolean isData;}// 队列的头节点transient volatile QNode head;// 队列的尾节点transient volatile QNode tail;
}

5. 构造器

  1. 默认非公平模式
    public SynchronousQueue() {
          this(false);
    }
    
  2. 指定模式
    public SynchronousQueue(boolean fair) {
          // 如果是公平模式就使用队列,如果是非公平模式就使用栈transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
    

6. 常用方法

  1. public void put(E e) throws InterruptedException:入队
  2. public E take() throws InterruptedException:出队
  相关解决方案