曹耘豪的博客

Java并发编程

  1. Java线程池原理分析
    1. 构造函数的参数
      1. corePoolSize
      2. maximumPoolSize
      3. keepAliveTime,unit
      4. workQueue
      5. threadFactory
      6. handler
    2. 内部状态ctl字段
    3. execute方法
    4. getTask方法
    5. runWorker方法
    6. submit方法
      1. FutureTask
  2. 阻塞队列和AQS
    1. offer方法,阻塞添加
    2. poll方法,阻塞获取
    3. await实现原理
    4. AQS(AbstractQueuedSynchronizer,抽象队列同步器)
      1. Unsafe.park(absolute, time)
      2. Unsafe.unpark(thread)
    5. 参考
  3. CompletionService
    1. 背景
      1. 传统写法
      2. 如果使用CompletionService
    2. CompletionService原理
    3. 其他使用场景
      1. 取出第一个完成的结果
      2. 限时内取出第一个非null的结果
  4. ConcurrentHashMap原理
    1. 初始化Table
    2. Get方法
      1. find方法
    3. Put方法
    4. transfer扩容
    5. 参考
  5. CompletableFuture
    1. 构造CompletableFuture
    2. 链式调用(异步顺序执行)
    3. 工具方法
    4. 注意事项
      1. 同步操作的线程问题
    5. 参考
  6. synchronized
    1. synchronized特性
    2. synchronized优化
    3. 锁介绍
      1. 偏向锁
      2. 轻量级锁
      3. 重量级锁
      4. 自旋锁
    4. 参考

Java线程池原理分析

2023-01-29

本文主要介绍ThreadPoolExecutor

构造函数的参数

corePoolSize

核心线程个数

maximumPoolSize

最大线程个数

keepAliveTime,unit

非核心线程个数超时时间及单位

workQueue

任务队列

threadFactory

线程工厂。主要用于自定义线程name

handler

拒绝策略。当线程数已达最大且任务队列满了触发策略。内置4种拒绝策略,也可以自定义

内部状态ctl字段

使用32位int保存线程池状态(3位)和线程数(29位)

线程池状态有5个:

execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 线程数小于corePoolSize,添加核心线程
if (addWorker(command, true))
return;
// 如果添加失败,说明ctl有变更,核心线程可能满了
c = ctl.get();
}

if (isRunning(c) && workQueue.offer(command)) {
// 任务加入队列成功
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 没有运行且从队列中移除成功,触发拒绝策略
// 移除失败说明已经被worker取走了,已经完成或正在执行,不能触发拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
// 没有线程,添加非核心线程,但不分配任务,因为任务已经添加到队列
addWorker(null, false);
}
else if (!addWorker(command, false))
// 队列满了,且添加新线程失败,触发拒绝策略
reject(command);
}

getTask方法

从队列里获取下一个任务

runWorker方法

submit方法

execute的参数是RunnableRunnable并没有返回值,无法直接获取执行的结果,而submit的参数Callable是有返回值的,submit返回Future,包含Callable的返回值

1
2
3
4
5
6
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

比较简单,将Callable<T>包装成Runnable传给execute,而newTaskFor就是新建一个FutureTask

FutureTask

阻塞队列和AQS

以下以LinkedBlockingQueue为例

offer方法,阻塞添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout); // 计算阻塞纳秒数
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;

// 获取put锁
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 当前队列满了
if (nanos <= 0)
// 说明等待超时,意味着添加失败
return false;
// 需等待,并且释放put锁,其他线程调用offer也会停在这里
nanos = notFull.awaitNanos(nanos);
// nanos = nanos - 等待的时间
// 走到这里,重新获取了put锁
}
enqueue(new Node<E>(e));
c = count.getAndIncrement(); // c是添加前的队列大小
if (c + 1 < capacity)
// 添加后队列依然不满,其他offer线程发信号继续执行
// 如果没有下面这句话,其他offer线程会等到超时才会继续执行
notFull.signal();
} finally {
// 释放put锁
putLock.unlock();
}
if (c == 0)
// 添加前队列为空,可能存在其他获取线程阻塞,这里通知他们可以取了
signalNotEmpty();

return true; // 添加成功
}

poll方法,阻塞获取

流程和上述差不多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout); // 计算阻塞纳秒数
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;

// 获取take锁
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
// 超时返回null,说明阻塞队列值不能有null
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement(); // c是取走前的队列大小
if (c > 1)
// 取走后还有剩余,则通知其他获取线程执行
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
// 取之前是满的,则通知其他添加线程执行
signalNotFull();
return x;
}

await实现原理

LinkedBlockingQueue为例,其内部使用ReentrantLock

AQS(AbstractQueuedSynchronizer,抽象队列同步器)

一个AQS包含一个同步队列和多个条件队列。在条件队列等待的线程需等待对应条件的signal信号,然后加入到同步队列

如果当前队列如下

1
2
3
Sync Queue: 1 --> 2 --> 3
Condition A Queue: 4 --> 5 --> 6
Condition B Queue: 7 --> 8 --> 9

触发一次A.signal(),结果如下

1
2
3
Sync Queue: 1 --> 2 --> 3 --> 4
Condition A Queue: 5 --> 6
Condition B Queue: 7 --> 8 --> 9

源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public abstract class AbstractQueuedSynchronizer {
static final class Node { // 节点定义
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}

// 同步队列,链表表头
Node head;
Node tail;

// 条件对象
public class ConditionObject {
private transient Node firstWaiter; // 条件队列,链表表头
private transient Node lastWaiter;

public final void signal() {
// ...其他代码
// 将条件队列的第一个等待者加入同步队列
enq(firstWaiter)
}

public final void await() {
// 添加一个条件Waiter
Node node = addConditionWaiter();
// 释放锁
int savedState = fullyRelease(node);
// 循环判断是否在同步队列,如果不在就继续挂起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 在同步队列尝试获取锁,内部依然是循环
acquireQueued(node, savedState)
// ...其他代码
}
}

private Node enq(final Node node) {
// 将node添加至同步队列末尾
}
}

Unsafe.park(absolute, time)

挂起当前线程

Unsafe.unpark(thread)

恢复挂起线程

参考

  1. https://blog.csdn.net/a7980718/article/details/83661613

CompletionService

2023-03-14

背景

当并行从有多个数据源同时取数据时,我们可能会写出以下代码,先提交任务再循环get,使用异常进行逻辑判断,很不优雅

传统写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final long TIMEOUT = 500;
final int TASK_NUM = 5;

List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < TASK_NUM; i++) {
Future<String> future = executor.submit(callable);
futures.add(future);
}

long start = System.currentTimeMillis();
for (Future<String> future : futures) {
long time = TIMEOUT - (System.currentTimeMillis() - start);
if (time > 0 || future.isDone()) {
try {
String s = future.get(time, TimeUnit.MILLISECONDS);
res.add(s);
} catch (Exception e) {
e.printStackTrace();
}
} else {
future.cancel(true); // 中断超时线程
}
}

如果使用CompletionService

1
2
3
4
5
6
7
8
9
10
11
12
13
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
for (int i = 0; i < TASK_NUM; i++) {
completionService.submit(callable);
}

long start2 = System.currentTimeMillis();
for (int i = 0; i < 3; i++) {
long time = TIMEOUT - (System.currentTimeMillis() - start2);
if (time <= 0) break;
Future<String> future = completionService.poll(time, TimeUnit.MILLISECONDS);
if (future == null) break; // future为null说明超时
res.add(f.get());
}

CompletionService原理

1
2
3
4
5
6
7
8
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); } // 完成时加入到完成队列
private final Future<V> task;
}

其他使用场景

取出第一个完成的结果

1
2
3
4
5
for (int i = 0; i < TASK_NUM; i++) {
completionService.submit(callable);
}
String result = completionService.take().get();
// use result

限时内取出第一个非null的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
List<Future<String>> futures4 = new ArrayList<>(TASK_NUM); // 用于中断
for (int i = 0; i < TASK_NUM; i++) {
futures4.add(completionService.submit(callable));
}
long start4 = System.currentTimeMillis(); // 计时

String result4 = null;
for (int i = 0; i < TASK_NUM; i++) {
long time = TIMEOUT - (System.currentTimeMillis() - start4);
if (time <= 0) break;
Future<String> future = completionService.poll(time, TimeUnit.MILLISECONDS);
if (future == null) break; // future为null说明超时,直接退出
String data = future.get();
if (data != null) {
result4 = data;
break;
}
}
for (Future<String> future : futures4) {
future.cancel(true);
}

if (result4 != null) {
// use result
}

ConcurrentHashMap原理

2023-04-19
基于Java 8

初始化Table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

Get方法

流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

find方法

Put方法

流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

transfer扩容

桶的个数n是2次幂的,桶定位方式是i = (n - 1) & h),当扩容为2倍时,如果n & h == 0,则桶号保持不变,如果n & h == n,则桶号为i + n

参考

CompletableFuture

2023-04-06
Java 8引入了函数式编程,大大提高代码的可读性,写起来也更优雅。异步编程自然也不例外,新增的CompletableFuture就是为了使用函数式编程来更优雅地编写异步代码

构造CompletableFuture

一般直接使用工厂方法创建

链式调用(异步顺序执行)

大多数链式调用都有异步版本(Async结尾),会多一个Executor参数,如果不传会使用ForkJoinPool

工具方法

注意事项

同步操作的线程问题

没有阻塞操作时:

1
2
3
4
5
6
Executor executor = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return "data";
}, executor)
.thenRun(() -> System.out.println(Thread.currentThread().getName()));

输出:(同步操作在当前线程执行)

1
2
pool-1-thread-1
main

添加阻塞操作:

1
2
3
4
5
6
7
8
9
10
Executor executor = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try { // 模拟阻塞操作
Thread.sleep(1);
} catch (Exception e) {
}
return "data";
}, executor)
.thenRun(() -> System.out.println(Thread.currentThread().getName()));

输出:(同步操作和异步线程线程一样)

1
2
pool-1-thread-1
pool-1-thread-1

参考

synchronized

2023-04-20

synchronized特性

synchronized优化

锁介绍

偏向锁

在运行过程中,对象的锁偏向某个线程。即在开启偏向锁机制的情况下,某个线程获得锁,当该线程下次再想要获得锁时,不需要再获得锁(即忽略synchronized关键词),直接就可以执行同步代码,比较适合竞争较少的情况

轻量级锁

对于没有多线程竞争的情况,如果存在多线程竞争,则膨胀为重量级锁

重量级锁

即当有其他线程占用锁时,当前线程会进入阻塞状态

自旋锁

在自旋状态下,当一个线程A尝试进入同步代码块,但是当前的锁已经被线程B占有时,线程A不进入阻塞状态,而是不停的空转,等待线程B释放锁。如果锁的线程能在很短时间内释放资源,那么等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞状态,只需自旋,等持有锁的线程释放后即可立即获取锁,避免了用户线程和内核的切换消耗。

自旋等待最大时间:线程自旋会消耗cpu,若自旋太久,则会让cpu做太多无用功,因此要设置自旋等待最大时间。

优点:开启自旋锁后能减少线程的阻塞,在对于锁的竞争不激烈且占用锁时间很短的代码块来说,能提升很大的性能,在这种情况下自旋的消耗小于线程阻塞挂起的消耗

缺点:在线程竞争锁激烈,或持有锁的线程需要长时间执行同步代码块的情况下,使用自旋会使得cpu做的无用功太多

参考

   / 
  ,