文章目录
-
- 1. 简介
- 2. 继承体系
- 3. 字段
- 4. 内部类
- 5. 构造器
- 6. 常用方法
1. 简介
public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {
}
SynchronousQueue 是 JUC 包下的无缓冲阻塞队列,它用来在两个线程之间直接移交元素,它有两种实现方式,一种是公平(队列)方式,一种是非公平(栈)方式。SynchronousQueue 的内部实现了两个类,一个是 TransferStack 类,使用 LIFO 顺序存储元素,这个类用于非公平模式;还有一个类是TransferQueue,使用 FIFO 顺序存储元素,这个类用于公平模式。
但是它有个很大的问题:如果有多个生产者,但只有一个消费者,如果消费者处理不过来,那么生产者就都会阻塞起来,反过来也是一样,所以,SynchronousQueue 一般用于生产、消费的速度大致相当的情况,这样才不会导致系统中过多的线程处于阻塞状态。
另外,其实 SynchronousQueue 内部是使用栈或者使用队列来存储包含线程和元素值的节点的,如果同一个模式的节点过多的话,它们都会存储进来,且都会阻塞着,所以,严格上来说,SynchronousQueue 并不能算是一个无缓冲队列。
使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。
2. 继承体系
3. 字段
// CPU的数量
static final int NCPUS = Runtime.getRuntime().availableProcessors();// 有超时的情况自旋多少次,当CPU数量小于2的时候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;// 没有超时的情况自旋多少次
static final int maxUntimedSpins = maxTimedSpins * 16;// 针对有超时的情况,自旋了多少次后,如果剩余时间大于1000纳秒就使用带时间的LockSupport.parkNanos()这个方法
static final long spinForTimeoutThreshold = 1000L;// 传输器,即两个线程交换元素使用的东西
private transient volatile Transferer<E> transferer;
4. 内部类
// Transferer抽象类,主要定义了一个transfer方法用来传输元素
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
// 以栈方式实现的Transferer
static final class TransferStack<E> extends Transferer<E> {
// 栈中节点的几种类型:// 1. 消费者(请求数据的)static final int REQUEST = 0;// 2. 生产者(提供数据的)static final int DATA = 1;// 3. 二者正在撮合中static final int FULFILLING = 2;// 栈中的节点static final class SNode {
// 下一个节点volatile SNode next; // next node in stack// 匹配者volatile SNode match; // the node matched to this// 等待着的线程volatile Thread waiter; // to control park/unpark// 元素Object item; // data; or null for REQUESTs// 模式,也就是节点的类型,是消费者,是生产者,还是正在撮合中int mode;}// 栈的头节点volatile SNode head;
}
// 以队列方式实现的Transferer
static final class TransferQueue<E> extends Transferer<E> {
// 队列中的节点static final class QNode {
// 下一个节点volatile QNode next; // next node in queue// 存储的元素volatile Object item; // CAS'ed to or from null// 等待着的线程volatile Thread waiter; // to control park/unpark// 是否是数据节点final boolean isData;}// 队列的头节点transient volatile QNode head;// 队列的尾节点transient volatile QNode tail;
}
5. 构造器
- 默认非公平模式
public SynchronousQueue() { this(false); }
- 指定模式
public SynchronousQueue(boolean fair) { // 如果是公平模式就使用队列,如果是非公平模式就使用栈transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
6. 常用方法
- public void put(E e) throws InterruptedException:入队
- public E take() throws InterruptedException:出队