曹耘豪的博客

Java并发之阻塞队列

  1. offer方法,阻塞添加
  2. poll方法,阻塞获取
  • await实现原理
    1. AQS(AbstractQueuedSynchronizer,抽象队列同步器)
      1. Unsafe.park(absolute, time)
      2. Unsafe.unpark(thread)
  • 参考
  • 以下以LinkedBlockingQueue为例

    offer方法,阻塞添加

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout); // 计算阻塞纳秒数
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;

    // 获取put锁
    putLock.lockInterruptibly();
    try {
    while (count.get() == capacity) {
    // 当前队列满了
    if (nanos <= 0)
    // 说明等待超时,意味着添加失败
    return false;
    // 需等待,并且释放put锁,其他线程调用offer也会停在这里
    nanos = notFull.awaitNanos(nanos);
    // nanos = nanos - 等待的时间
    // 走到这里,重新获取了put锁
    }
    enqueue(new Node<E>(e));
    c = count.getAndIncrement(); // c是添加前的队列大小
    if (c + 1 < capacity)
    // 添加后队列依然不满,其他offer线程发信号继续执行
    // 如果没有下面这句话,其他offer线程会等到超时才会继续执行
    notFull.signal();
    } finally {
    // 释放put锁
    putLock.unlock();
    }
    if (c == 0)
    // 添加前队列为空,可能存在其他获取线程阻塞,这里通知他们可以取了
    signalNotEmpty();

    return true; // 添加成功
    }

    poll方法,阻塞获取

    流程和上述差不多

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout); // 计算阻塞纳秒数
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;

    // 获取take锁
    takeLock.lockInterruptibly();
    try {
    while (count.get() == 0) {
    if (nanos <= 0)
    // 超时返回null,说明阻塞队列值不能有null
    return null;
    nanos = notEmpty.awaitNanos(nanos);
    }
    x = dequeue();
    c = count.getAndDecrement(); // c是取走前的队列大小
    if (c > 1)
    // 取走后还有剩余,则通知其他获取线程执行
    notEmpty.signal();
    } finally {
    takeLock.unlock();
    }
    if (c == capacity)
    // 取之前是满的,则通知其他添加线程执行
    signalNotFull();
    return x;
    }

    await实现原理

    LinkedBlockingQueue为例,其内部使用ReentrantLock

    AQS(AbstractQueuedSynchronizer,抽象队列同步器)

    一个AQS包含一个同步队列和多个条件队列。在条件队列等待的线程需等待对应条件的signal信号,然后加入到同步队列

    如果当前队列如下

    1
    2
    3
    Sync Queue: 1 --> 2 --> 3
    Condition A Queue: 4 --> 5 --> 6
    Condition B Queue: 7 --> 8 --> 9

    触发一次A.signal(),结果如下

    1
    2
    3
    Sync Queue: 1 --> 2 --> 3 --> 4
    Condition A Queue: 5 --> 6
    Condition B Queue: 7 --> 8 --> 9

    源码解析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    public abstract class AbstractQueuedSynchronizer {
    static final class Node { // 节点定义
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    }

    // 同步队列,链表表头
    Node head;
    Node tail;

    // 条件对象
    public class ConditionObject {
    private transient Node firstWaiter; // 条件队列,链表表头
    private transient Node lastWaiter;

    public final void signal() {
    // ...其他代码
    // 将条件队列的第一个等待者加入同步队列
    enq(firstWaiter)
    }

    public final void await() {
    // 添加一个条件Waiter
    Node node = addConditionWaiter();
    // 释放锁
    int savedState = fullyRelease(node);
    // 循环判断是否在同步队列,如果不在就继续挂起
    while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    break;
    }
    // 在同步队列尝试获取锁,内部依然是循环
    acquireQueued(node, savedState)
    // ...其他代码
    }
    }

    private Node enq(final Node node) {
    // 将node添加至同步队列末尾
    }
    }
    Unsafe.park(absolute, time)

    挂起当前线程

    Unsafe.unpark(thread)

    恢复挂起线程

    参考

    1. https://blog.csdn.net/a7980718/article/details/83661613
       / 
      ,