LocalMap
成员属性
ThreadLocalMap 是 ThreadLocal 的内部类,没有实现 Map 接口,用独立的方式实现了 Map 的功能,其内部 Entry 也是独立实现
// 初始化当前 map 内部散列表数组的初始长度 16
private static final int INITIAL_CAPACITY = 16;
// 存放数据的table,数组长度必须是2的整次幂。
private Entry[] table;
// 数组里面 entrys 的个数,可以用于判断 table 当前使用量是否超过阈值
private int size = 0;
// 进行扩容的阈值,表使用量大于它的时候进行扩容。
private int threshold;
存储结构 Entry:
- Entry 继承 WeakReference,key 是弱引用,目的是将 ThreadLocal 对象的生命周期和线程生命周期解绑
- Entry 限制只能用 ThreadLocal 作为 key,key 为 null (entry.get() == null) 意味着 key 不再被引用,entry 也可以从 table 中清除
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
// this.referent = referent = key;
super(k);
value = v;
}
}
构造方法:延迟初始化的,线程第一次存储 threadLocal - value 时才会创建 threadLocalMap 对象
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 初始化table,创建一个长度为16的Entry数组
table = new Entry[INITIAL_CAPACITY];
// 【寻址算法】计算索引
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 创建 entry 对象,存放到指定位置的 slot 中
table[i] = new Entry(firstKey, firstValue);
// 数据总量是 1
size = 1;
// 将阈值设置为 (当前数组长度 * 2)/ 3。
setThreshold(INITIAL_CAPACITY);
}
成员方法
-
set():添加数据,ThreadLocalMap 使用线性探测法来解决哈希冲突
-
该方法会一直探测下一个地址,直到有空的地址后插入,若插入后 Map 数量超过阈值,数组会扩容为原来的 2 倍
假设当前 table 长度为16,计算出来 key 的 hash 值为 14,如果 table[14] 上已经有值,并且其 key 与当前 key 不一致,那么就发生了 hash 冲突,这个时候将 14 加 1 得到 15,取 table[15] 进行判断,如果还是冲突会回到 0,取 table[0],以此类推,直到可以插入,可以把 Entry[] table 看成一个环形数组
-
线性探测法会出现堆积问题,可以采取平方探测法解决
-
在探测过程中 ThreadLocal 会复用 key 为 null 的脏 Entry 对象,并进行垃圾清理,防止出现内存泄漏
private void set(ThreadLocal<?> key, Object value) { // 获取散列表 ThreadLocal.ThreadLocalMap.Entry[] tab = table; int len = tab.length; // 哈希寻址 int i = key.threadLocalHashCode & (len-1); // 使用线性探测法向后查找元素,碰到 entry 为空时停止探测 for (ThreadLocal.ThreadLocalMap.Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { // 获取当前元素 key ThreadLocal<?> k = e.get(); // ThreadLocal 对应的 key 存在,【直接覆盖之前的值】 if (k == key) { e.value = value; return; } // 【这两个条件谁先成立不一定,所以 replaceStaleEntry 中还需要判断 k == key 的情况】 // key 为 null,但是值不为 null,说明之前的 ThreadLocal 对象已经被回收了,当前是【过期数据】 if (k == null) { // 【碰到一个过期的 slot,当前数据复用该槽位,替换过期数据】 // 这个方法还进行了垃圾清理动作,防止内存泄漏 replaceStaleEntry(key, value, i); return; } } // 逻辑到这说明碰到 slot == null 的位置,则在空元素的位置创建一个新的 Entry tab[i] = new Entry(key, value); // 数量 + 1 int sz = ++size; // 【做一次启发式清理】,如果没有清除任何 entry 并且【当前使用量达到了负载因子所定义,那么进行 rehash if (!cleanSomeSlots(i, sz) && sz >= threshold) // 扩容 rehash(); }
// 获取【环形数组】的下一个索引 private static int nextIndex(int i, int len) { // 索引越界后从 0 开始继续获取 return ((i + 1 < len) ? i + 1 : 0); }
// 在指定位置插入指定的数据 private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { // 获取散列表 Entry[] tab = table; int len = tab.length; Entry e; // 探测式清理的开始下标,默认从当前 staleSlot 开始 int slotToExpunge = staleSlot; // 以当前 staleSlot 开始【向前迭代查找】,找到索引靠前过期数据,找到以后替换 slotToExpunge 值 // 【保证在一个区间段内,从最前面的过期数据开始清理】 for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i; // 以 staleSlot 【向后去查找】,直到碰到 null 为止,还是线性探测 for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { // 获取当前节点的 key ThreadLocal<?> k = e.get(); // 条件成立说明是【替换逻辑】 if (k == key) { e.value = value; // 因为本来要在 staleSlot 索引处插入该数据,现在找到了i索引处的key与数据一致 // 但是 i 位置距离正确的位置更远,因为是向后查找,所以还是要在 staleSlot 位置插入当前 entry // 然后将 table[staleSlot] 这个过期数据放到当前循环到的 table[i] 这个位置, tab[i] = tab[staleSlot]; tab[staleSlot] = e; // 条件成立说明向前查找过期数据并未找到过期的 entry,但 staleSlot 位置已经不是过期数据了,i 位置才是 if (slotToExpunge == staleSlot) slotToExpunge = i; // 【清理过期数据,expungeStaleEntry 探测式清理,cleanSomeSlots 启发式清理】 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } // 条件成立说明当前遍历的 entry 是一个过期数据,并且该位置前面也没有过期数据 if (k == null && slotToExpunge == staleSlot) // 探测式清理过期数据的开始下标修改为当前循环的 index,因为 staleSlot 会放入要添加的数据 slotToExpunge = i; } // 向后查找过程中并未发现 k == key 的 entry,说明当前是一个【取代过期数据逻辑】 // 删除原有的数据引用,防止内存泄露 tab[staleSlot].value = null; // staleSlot 位置添加数据,【上面的所有逻辑都不会更改 staleSlot 的值】 tab[staleSlot] = new Entry(key, value); // 条件成立说明除了 staleSlot 以外,还发现其它的过期 slot,所以要【开启清理数据的逻辑】 if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); }
private static int prevIndex(int i, int len) { // 形成一个环绕式的访问,头索引越界后置为尾索引 return ((i - 1 >= 0) ? i - 1 : len - 1); }
-
-
getEntry():ThreadLocal 的 get 方法以当前的 ThreadLocal 为 key,调用 getEntry 获取对应的存储实体 e
private Entry getEntry(ThreadLocal<?> key) { // 哈希寻址 int i = key.threadLocalHashCode & (table.length - 1); // 访问散列表中指定指定位置的 slot Entry e = table[i]; // 条件成立,说明 slot 有值并且 key 就是要寻找的 key,直接返回 if (e != null && e.get() == key) return e; else // 进行线性探测 return getEntryAfterMiss(key, i, e); } // 线性探测寻址 private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { // 获取散列表 Entry[] tab = table; int len = tab.length; // 开始遍历,碰到 slot == null 的情况,搜索结束 while (e != null) { // 获取当前 slot 中 entry 对象的 key ThreadLocal<?> k = e.get(); // 条件成立说明找到了,直接返回 if (k == key) return e; if (k == null) // 过期数据,【探测式过期数据回收】 expungeStaleEntry(i); else // 更新 index 继续向后走 i = nextIndex(i, len); // 获取下一个槽位中的 entry e = tab[i]; } // 说明当前区段没有找到相应数据 // 【因为存放数据是线性的向后寻找槽位,都是紧挨着的,不可能越过一个 空槽位 在后面放】,可以减少遍历的次数 return null; }
-
rehash():触发一次全量清理,如果数组长度大于等于长度的
2/3 * 3/4 = 1/2
,则进行 resizeprivate void rehash() { // 清楚当前散列表内的【所有】过期的数据 expungeStaleEntries(); // threshold = len * 2 / 3,就是 2/3 * (1 - 1/4) if (size >= threshold - threshold / 4) resize(); }
private void expungeStaleEntries() { Entry[] tab = table; int len = tab.length; // 【遍历所有的槽位,清理过期数据】 for (int j = 0; j < len; j++) { Entry e = tab[j]; if (e != null && e.get() == null) expungeStaleEntry(j); } }
Entry 数组为扩容为原来的 2 倍 ,重新计算 key 的散列值,如果遇到 key 为 null 的情况,会将其 value 也置为 null,帮助 GC
private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; // 新数组的长度是老数组的二倍 int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; // 统计新table中的entry数量 int count = 0; // 遍历老表,进行【数据迁移】 for (int j = 0; j < oldLen; ++j) { // 访问老表的指定位置的 entry Entry e = oldTab[j]; // 条件成立说明老表中该位置有数据,可能是过期数据也可能不是 if (e != null) { ThreadLocal<?> k = e.get(); // 过期数据 if (k == null) { e.value = null; // Help the GC } else { // 非过期数据,在新表中进行哈希寻址 int h = k.threadLocalHashCode & (newLen - 1); // 【线程探测】 while (newTab[h] != null) h = nextIndex(h, newLen); // 将数据存放到新表合适的 slot 中 newTab[h] = e; count++; } } } // 设置下一次触发扩容的指标:threshold = len * 2 / 3; setThreshold(newLen); size = count; // 将扩容后的新表赋值给 threadLocalMap 内部散列表数组引用 table = newTab; }
-
remove():删除 Entry
private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; // 哈希寻址 int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { // 找到了对应的 key if (e.get() == key) { // 设置 key 为 null e.clear(); // 探测式清理 expungeStaleEntry(i); return; } } }
清理方法
-
探测式清理:沿着开始位置向后探测清理过期数据,沿途中碰到未过期数据则将此数据 rehash 在 table 数组中的定位,重定位后的元素理论上更接近
i = entry.key & (table.length - 1)
,让数据的排列更紧凑,会优化整个散列表查询性能// table[staleSlot] 是一个过期数据,以这个位置开始继续向后查找过期数据 private int expungeStaleEntry(int staleSlot) { // 获取散列表和数组长度 Entry[] tab = table; int len = tab.length; // help gc,先把当前过期的 entry 置空,在取消对 entry 的引用 tab[staleSlot].value = null; tab[staleSlot] = null; // 数量-1 size--; Entry e; int i; // 从 staleSlot 开始向后遍历,直到碰到 slot == null 结束,【区间内清理过期数据】 for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); // 当前 entry 是过期数据 if (k == null) { // help gc e.value = null; tab[i] = null; size--; } else { // 当前 entry 不是过期数据的逻辑,【rehash】 // 重新计算当前 entry 对应的 index int h = k.threadLocalHashCode & (len - 1); // 条件成立说明当前 entry 存储时发生过 hash 冲突,向后偏移过了 if (h != i) { // 当前位置置空 tab[i] = null; // 以正确位置 h 开始,向后查找第一个可以存放 entry 的位置 while (tab[h] != null) h = nextIndex(h, len); // 将当前元素放入到【距离正确位置更近的位置,有可能就是正确位置】 tab[h] = e; } } } // 返回 slot = null 的槽位索引,图例是 7,这个索引代表【索引前面的区间已经清理完成垃圾了】 return i; }
-
启发式清理:向后循环扫描过期数据,发现过期数据调用探测式清理方法,如果连续几次的循环都没有发现过期数据,就停止扫描
// i 表示启发式清理工作开始位置,一般是空 slot,n 一般传递的是 table.length private boolean cleanSomeSlots(int i, int n) { // 表示启发式清理工作是否清除了过期数据 boolean removed = false; // 获取当前 map 的散列表引用 Entry[] tab = table; int len = tab.length; do { // 获取下一个索引,因为探测式返回的 slot 为 null i = nextIndex(i, len); Entry e = tab[i]; // 条件成立说明是过期的数据,key 被 gc 了 if (e != null && e.get() == null) { // 【发现过期数据重置 n 为数组的长度】 n = len; // 表示清理过过期数据 removed = true; // 以当前过期的 slot 为开始节点 做一次探测式清理工作 i = expungeStaleEntry(i); } // 假设 table 长度为 16 // 16 >>> 1 ==> 8,8 >>> 1 ==> 4,4 >>> 1 ==> 2,2 >>> 1 ==> 1,1 >>> 1 ==> 0 // 连续经过这么多次循环【没有扫描到过期数据】,就停止循环,扫描到空 slot 不算,因为不是过期数据 } while ((n >>>= 1) != 0); // 返回清除标记 return removed; }
参考视频:https://space.bilibili.com/457326371/
内存泄漏
Memory leak:内存泄漏是指程序中动态分配的堆内存由于某种原因未释放或无法释放,造成系统内存的浪费,导致程序运行速度减慢甚至系统崩溃等严重后果,内存泄漏的堆积终将导致内存溢出
-
如果 key 使用强引用:使用完 ThreadLocal ,threadLocal Ref 被回收,但是 threadLocalMap 的 Entry 强引用了 threadLocal,造成 threadLocal 无法被回收,无法完全避免内存泄漏
-
如果 key 使用弱引用:使用完 ThreadLocal ,threadLocal Ref 被回收,ThreadLocalMap 只持有 ThreadLocal 的弱引用,所以threadlocal 也可以被回收,此时 Entry 中的 key = null。但没有手动删除这个 Entry 或者 CurrentThread 依然运行,依然存在强引用链,value 不会被回收,而这块 value 永远不会被访问到,也会导致 value 内存泄漏
-
两个主要原因:
- 没有手动删除这个 Entry
- CurrentThread 依然运行
根本原因:ThreadLocalMap 是 Thread的一个属性,生命周期跟 Thread 一样长,如果没有手动删除对应 Entry 就会导致内存泄漏
解决方法:使用完 ThreadLocal 中存储的内容后将它 remove 掉就可以
ThreadLocal 内部解决方法:在 ThreadLocalMap 中的 set/getEntry 方法中,通过线性探测法对 key 进行判断,如果 key 为 null(ThreadLocal 为 null)会对 Entry 进行垃圾回收。所以使用弱引用比强引用多一层保障,就算不调用 remove,也有机会进行 GC
变量传递
基本使用
父子线程:创建子线程的线程是父线程,比如实例中的 main 线程就是父线程
ThreadLocal 中存储的是线程的局部变量,如果想实现线程间局部变量传递可以使用 InheritableThreadLocal 类
public static void main(String[] args) {
ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
threadLocal.set("父线程设置的值");
new Thread(() -> System.out.println("子线程输出:" + threadLocal.get())).start();
}
// 子线程输出:父线程设置的值
实现原理
InheritableThreadLocal 源码:
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
protected T childValue(T parentValue) {
return parentValue;
}
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
实现父子线程间的局部变量共享需要追溯到 Thread 对象的构造方法:
private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc,
// 该参数默认是 true
boolean inheritThreadLocals) {
// ...
Thread parent = currentThread();
// 判断父线程(创建子线程的线程)的 inheritableThreadLocals 属性不为 null
if (inheritThreadLocals && parent.inheritableThreadLocals != null) {
// 复制父线程的 inheritableThreadLocals 属性,实现父子线程局部变量共享
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
}
// ..
}
// 【本质上还是创建 ThreadLocalMap,只是把父类中的可继承数据设置进去了】
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
private ThreadLocalMap(ThreadLocalMap parentMap) {
// 获取父线程的哈希表
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];
// 【逐个复制父线程 ThreadLocalMap 中的数据】
for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
// 调用的是 InheritableThreadLocal#childValue(T parentValue)
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
// 线性探测
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}
参考文章:https://blog.csdn.net/feichitianxia/article/details/110495764
线程池
基本概述
线程池:一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作
线程池作用:
- 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
- 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
- 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
线程池的核心思想:线程复用,同一个线程可以被重复使用,来处理多个任务
池化技术 (Pool) :一种编程技巧,核心思想是资源复用,在请求量大时能优化应用性能,降低系统频繁建连的资源开销
阻塞队列
基本介绍
有界队列和无界队列:
-
有界队列:有固定大小的队列,比如设定了固定大小的 LinkedBlockingQueue,又或者大小为 0
-
无界队列:没有设置固定大小的队列,这些队列可以直接入队,直到溢出(超过 Integer.MAX_VALUE),所以相当于无界
java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:FIFO 队列
- ArrayBlockQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的无界(默认大小 Integer.MAX_VALUE)的阻塞队列
- PriorityBlockQueue:支持优先级排序的无界阻塞队列
- DelayedWorkQueue:使用优先级队列实现的延迟无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列,每一个生产线程会阻塞到有一个 put 的线程放入元素为止
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
与普通队列(LinkedList、ArrayList等)的不同点在于阻塞队列中阻塞添加和阻塞删除方法,以及线程安全:
- 阻塞添加 put():当阻塞队列元素已满时,添加队列元素的线程会被阻塞,直到队列元素不满时才重新唤醒线程执行
- 阻塞删除 take():在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般会返回被删除的元素)
核心方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入(尾) | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除(头) | remove() | poll() | take() | poll(time,unit) |
检查(队首元素) | element() | peek() | 不可用 | 不可用 |
- 抛出异常组:
- 当阻塞队列满时:在往队列中 add 插入元素会抛出 IIIegalStateException: Queue full
- 当阻塞队列空时:再往队列中 remove 移除元素,会抛出 NoSuchException
- 特殊值组:
- 插入方法:成功 true,失败 false
- 移除方法:成功返回出队列元素,队列没有就返回 null
- 阻塞组:
- 当阻塞队列满时,生产者继续往队列里 put 元素,队列会一直阻塞生产线程直到队列有空间 put 数据或响应中断退出
- 当阻塞队列空时,消费者线程试图从队列里 take 元素,队列会一直阻塞消费者线程直到队列中有可用元素
- 超时退出:当阻塞队列满时,队里会阻塞生产者线程一定时间,超过限时后生产者线程会退出
链表队列
入队出队
LinkedBlockingQueue 源码:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item;
/**
* 下列三种情况之一
* - 真正的后继节点
* - 自己, 发生在出队时
* - null, 表示是没有后继节点, 是尾节点了
*/
Node<E> next;
Node(E x) { item = x; }
}
}
入队:尾插法
-
初始化链表
last = head = new Node<E>(null)
,Dummy 节点用来占位,item 为 nullpublic LinkedBlockingQueue(int capacity) { // 默认是 Integer.MAX_VALUE if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
-
当一个节点入队:
private void enqueue(Node<E> node) { // 从右向左计算 last = last.next = node; }
-
再来一个节点入队
last = last.next = node
出队:出队头节点,FIFO
-
出队源码:
private E dequeue() { Node<E> h = head; // 获取临头节点 Node<E> first = h.next; // 自己指向自己,help GC h.next = h; head = first; // 出队的元素 E x = first.item; // 【当前节点置为 Dummy 节点】 first.item = null; return x; }
-
h = head
→first = h.next
-
h.next = h
→head = first
first.item = null
:当前节点置为 Dummy 节点
加锁分析
用了两把锁和 dummy 节点:
- 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
- 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
- 消费者与消费者线程仍然串行
- 生产者与生产者线程仍然串行
线程安全分析:
-
当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是 head 节点的线程安全,两把锁保证了入队和出队没有竞争
-
当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争
-
当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞
// 用于 put(阻塞) offer(非阻塞) private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); // 阻塞等待不满,说明已经满了 // 用于 take(阻塞) poll(非阻塞) private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 阻塞等待不空,说明已经是空的
入队出队:
-
put 操作:
public void put(E e) throws InterruptedException { // 空指针异常 if (e == null) throw new NullPointerException(); int c = -1; // 把待添加的元素封装为 node 节点 Node<E> node = new Node<E>(e); // 获取全局生产锁 final ReentrantLock putLock = this.putLock; // count 用来维护元素计数 final AtomicInteger count = this.count; // 获取可打断锁,会抛出异常 putLock.lockInterruptibly(); try { // 队列满了等待 while (count.get() == capacity) { // 【等待队列不满时,就可以生产数据】,线程处于 Waiting notFull.await(); } // 有空位, 入队且计数加一,尾插法 enqueue(node); // 返回自增前的数字 c = count.getAndIncrement(); // put 完队列还有空位, 唤醒其他生产 put 线程,唤醒一个减少竞争 if (c + 1 < capacity) notFull.signal(); } finally { // 解锁 putLock.unlock(); } // c自增前是0,说明生产了一个元素,唤醒一个 take 线程 if (c == 0) signalNotEmpty(); }
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 调用 notEmpty.signal(),而不是 notEmpty.signalAll() 是为了减少竞争,因为只剩下一个元素 notEmpty.signal(); } finally { takeLock.unlock(); } }
-
take 操作:
public E take() throws InterruptedException { E x; int c = -1; // 元素个数 final AtomicInteger count = this.count; // 获取全局消费锁 final ReentrantLock takeLock = this.takeLock; // 可打断锁 takeLock.lockInterruptibly(); try { // 没有元素可以出队 while (count.get() == 0) { // 【阻塞等待队列不空,就可以消费数据】,线程处于 Waiting notEmpty.await(); } // 出队,计数减一,FIFO,出队头节点 x = dequeue(); // 返回自减前的数字 c = count.getAndDecrement(); // 队列还有元素 if (c > 1) // 唤醒一个消费take线程 notEmpty.signal(); } finally { takeLock.unlock(); } // c 是消费前的数据,消费前满了,消费一个后还剩一个空位,唤醒生产线程 if (c == capacity) // 调用的是 notFull.signal() 而不是 notFull.signalAll() 是为了减少竞争 signalNotFull(); return x; }
性能比较
主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较:
- Linked 支持有界,Array 强制有界
- Linked 实现是链表,Array 实现是数组
- Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
- Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
- Linked 两把锁,Array 一把锁
同步队列
成员属性
SynchronousQueue 是一个不存储元素的 BlockingQueue,每一个生产者必须阻塞匹配到一个消费者
成员变量:
-
运行当前程序的平台拥有 CPU 的数量:
static final int NCPUS = Runtime.getRuntime().availableProcessors()
-
指定超时时间后,当前线程最大自旋次数:
// 只有一个 CPU 时自旋次数为 0,所有程序都是串行执行,多核 CPU 时自旋 32 次是一个经验值 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
自旋的原因:线程挂起唤醒需要进行上下文切换,涉及到用户态和内核态的转变,是非常消耗资源的。自旋期间线程会一直检查自己的状态是否被匹配到,如果自旋期间被匹配到,那么直接就返回了,如果自旋次数达到某个指标后,还是会将当前线程挂起
-
未指定超时时间,当前线程最大自旋次数:
static final int maxUntimedSpins = maxTimedSpins * 16; // maxTimedSpins 的 16 倍
-
指定超时限制的阈值,小于该值的线程不会被挂起:
static final long spinForTimeoutThreshold = 1000L; // 纳秒
超时时间设置的小于该值,就会被禁止挂起,阻塞再唤醒的成本太高,不如选择自旋空转
-
转换器:
private transient volatile Transferer<E> transferer; abstract static class Transferer<E> { /** * 参数一:可以为 null,null 时表示这个请求是一个 REQUEST 类型的请求,反之是一个 DATA 类型的请求 * 参数二:如果为 true 表示指定了超时时间,如果为 false 表示不支持超时,会一直阻塞到匹配或者被打断 * 参数三:超时时间限制,单位是纳秒 * 返回值:返回值如果不为 null 表示匹配成功,DATA 类型的请求返回当前线程 put 的数据 * 如果返回 null,表示请求超时或被中断 */ abstract E transfer(E e, boolean timed, long nanos); }
-
构造方法:
public SynchronousQueue(boolean fair) { // fair 默认 false // 非公平模式实现的数据结构是栈,公平模式的数据结构是队列 transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
-
成员方法:
public boolean offer(E e) { if (e == null) throw new NullPointerException(); return transferer.transfer(e, true, 0) != null; } public E poll() { return transferer.transfer(null, true, 0); }
非公实现
TransferStack 是非公平的同步队列,因为所有的请求都被压入栈中,栈顶的元素会最先得到匹配,造成栈底的等待线程饥饿
TransferStack 类成员变量:
-
请求类型:
// 表示 Node 类型为请求类型 static final int REQUEST = 0; // 表示 Node类 型为数据类型 static final int DATA = 1; // 表示 Node 类型为匹配中类型 // 假设栈顶元素为 REQUEST-NODE,当前请求类型为 DATA,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】 // 假设栈顶元素为 DATA-NODE,当前请求类型为 REQUEST,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】 static final int FULFILLING = 2;
-
栈顶元素:
volatile SNode head;
内部类 SNode:
-
成员变量:
static final class SNode { // 指向下一个栈帧 volatile SNode next; // 与当前 node 匹配的节点 volatile SNode match; // 假设当前node对应的线程自旋期间未被匹配成功,那么node对应的线程需要挂起, // 挂起前 waiter 保存对应的线程引用,方便匹配成功后,被唤醒。 volatile Thread waiter; // 数据域,不为空表示当前 Node 对应的请求类型为 DATA 类型,反之则表示 Node 为 REQUEST 类型 Object item; // 表示当前Node的模式 【DATA/REQUEST/FULFILLING】 int mode; }
-
构造方法:
SNode(Object item) { this.item = item; }
-
设置方法:设置 Node 对象的 next 字段,此处对 CAS 进行了优化,提升了 CAS 的效率
boolean casNext(SNode cmp, SNode val) { //【优化:cmp == next】,可以提升一部分性能。 cmp == next 不相等,就没必要走 cas指令。 return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }
-
匹配方法:
boolean tryMatch(SNode s) { // 当前 node 尚未与任何节点发生过匹配,CAS 设置 match 字段为 s 节点,表示当前 node 已经被匹配 if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { // 当前 node 如果自旋结束,会 park 阻塞,阻塞前将 node 对应的 Thread 保留到 waiter 字段 // 获取当前 node 对应的阻塞线程 Thread w = waiter; // 条件成立说明 node 对应的 Thread 正在阻塞 if (w != null) { waiter = null; // 使用 unpark 方式唤醒线程 LockSupport.unpark(w); } return true; } // 匹配成功返回 true return match == s; }
-
取消方法:
// 取消节点的方法 void tryCancel() { // match 字段指向自己,表示这个 node 是取消状态,取消状态的 node,最终会被强制移除出栈 UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } boolean isCancelled() { return match == this; }
TransferStack 类成员方法:
-
snode():填充节点方法
static SNode snode(SNode s, Object e, SNode next, int mode) { // 引用指向空时,snode 方法会创建一个 SNode 对象 if (s == null) s = new SNode(e); // 填充数据 s.mode = mode; s.next = next; return s; }
-
transfer():核心方法,请求匹配出栈,不匹配阻塞
E transfer(E e, boolean timed, long nanos) { // 包装当前线程的 node SNode s = null; // 根据元素判断当前的请求类型 int mode = (e == null) ? REQUEST : DATA; // 自旋 for (;;) { // 获取栈顶指针 SNode h = head; // 【CASE1】:当前栈为空或者栈顶 node 模式与当前请求模式一致无法匹配,做入栈操作 if (h == null || h.mode == mode) { // 当前请求是支持超时的,但是 nanos <= 0 说明这个请求不支持 “阻塞等待” if (timed && nanos <= 0) { // 栈顶元素是取消状态 if (h != null && h.isCancelled()) // 栈顶出栈,设置新的栈顶 casHead(h, h.next); else // 表示【匹配失败】 return null; // 入栈 } else if (casHead(h, s = snode(s, e, h, mode))) { // 等待被匹配的逻辑,正常情况返回匹配的节点;取消情况返回当前节点,就是 s SNode m = awaitFulfill(s, timed, nanos); // 说明当前 node 是【取消状态】 if (m == s) { // 将取消节点出栈 clean(s); return null; } // 执行到这说明【匹配成功】了 // 栈顶有节点并且 匹配节点还未出栈,需要协助出栈 if ((h = head) != null && h.next == s) casHead(h, s.next); // 当前 node 模式为 REQUEST 类型,返回匹配节点的 m.item 数据域 // 当前 node 模式为 DATA 类型:返回 node.item 数据域,当前请求提交的数据 e return (E) ((mode == REQUEST) ? m.item : s.item); } // 【CASE2】:逻辑到这说明请求模式不一致,如果栈顶不是 FULFILLING 说明没被其他节点匹配,【当前可以匹配】 } else if (!isFulfilling(h.mode)) { // 头节点是取消节点,match 指向自己,协助出栈 if (h.isCancelled()) casHead(h, h.next); // 入栈当前请求的节点 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // m 是 s 的匹配的节点 SNode m = s.next; // m 节点在 awaitFulfill 方法中被中断,clean 了自己 if (m == null) { // 清空栈 casHead(s, null); s = null; // 返回到外层自旋中 break; } // 获取匹配节点的下一个节点 SNode mn = m.next; // 尝试匹配,【匹配成功】,则将 fulfilling 和 m 一起出栈,并且唤醒被匹配的节点的线程 if (m.tryMatch(s)) { casHead(s, mn); return (E) ((mode == REQUEST) ? m.item : s.item); } else // 匹配失败,出栈 m s.casNext(m, mn); } } // 【CASE3】:栈顶模式为 FULFILLING 模式,表示【栈顶和栈顶下面的节点正在发生匹配】,当前请求需要做协助工作 } else { // h 表示的是 fulfilling 节点,m 表示 fulfilling 匹配的节点 SNode m = h.next; if (m == null) // 清空栈 casHead(h, null); else { SNode mn = m.next; // m 和 h 匹配,唤醒 m 中的线程 if (m.tryMatch(h)) casHead(h, mn); else h.casNext(m, mn); } } } }
-
awaitFulfill():阻塞当前线程等待被匹配,返回匹配的节点,或者被取消的节点
SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 等待的截止时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 当前线程 Thread w = Thread.currentThread(); // 表示当前请求线程在下面的 for(;;) 自旋检查的次数 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); // 自旋检查逻辑:是否匹配、是否超时、是否被中断 for (;;) { // 当前线程收到中断信号,需要设置 node 状态为取消状态 if (w.isInterrupted()) s.tryCancel(); // 获取与当前 s 匹配的节点 SNode m = s.match; if (m != null) // 可能是正常的匹配的,也可能是取消的 return m; // 执行了超时限制就判断是否超时 if (timed) { nanos = deadline - System.nanoTime(); // 【超时了,取消节点】 if (nanos <= 0L) { s.tryCancel(); continue; } } // 说明当前线程还可以进行自旋检查 if (spins > 0) // 自旋一次 递减 1 spins = shouldSpin(s) ? (spins - 1) : 0; // 说明没有自旋次数了 else if (s.waiter == null) //【把当前 node 对应的 Thread 保存到 node.waiter 字段中,要阻塞了】 s.waiter = w; // 没有超时限制直接阻塞 else if (!timed) LockSupport.park(this); // nanos > 1000 纳秒的情况下,才允许挂起当前线程 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
boolean shouldSpin(SNode s) { // 获取栈顶 SNode h = head; // 条件一成立说明当前 s 就是栈顶,允许自旋检查 // 条件二成立说明当前 s 节点自旋检查期间,又来了一个与当前 s 节点匹配的请求,双双出栈后条件会成立 // 条件三成立前提当前 s 不是栈顶元素,并且当前栈顶正在匹配中,这种状态栈顶下面的元素,都允许自旋检查 return (h == s || h == null || isFulfilling(h.mode)); }
-
clear():指定节点出栈
void clean(SNode s) { // 清空数据域和关联线程 s.item = null; s.waiter = null; // 获取取消节点的下一个节点 SNode past = s.next; // 判断后继节点是不是取消节点,是就更新 past if (past != null && past.isCancelled()) past = past.next; SNode p; // 从栈顶开始向下检查,【将栈顶开始向下的 取消状态 的节点全部清理出去】,直到碰到 past 或者不是取消状态为止 while ((p = head) != null && p != past && p.isCancelled()) // 修改的是内存地址对应的值,p 指向该内存地址所以数据一直在变化 casHead(p, p.next); // 说明中间遇到了不是取消状态的节点,继续迭代下去 while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } }
公平实现
TransferQueue 是公平的同步队列,采用 FIFO 的队列实现,请求节点与队尾模式不同,需要与队头发生匹配
TransferQueue 类成员变量:
-
指向队列的 dummy 节点:
transient volatile QNode head;
-
指向队列的尾节点:
transient volatile QNode tail;
-
被清理节点的前驱节点:
transient volatile QNode cleanMe;
入队操作是两步完成的,第一步是 t.next = newNode,第二步是 tail = newNode,所以队尾节点出队,是一种非常特殊的情况
TransferQueue 内部类:
-
QNode:
static final class QNode { // 指向当前节点的下一个节点 volatile QNode next; // 数据域,Node 代表的是 DATA 类型 item 表示数据,否则 Node 代表的 REQUEST 类型,item == null volatile Object item; // 假设当前 node 对应的线程自旋期间未被匹配成功,那么 node 对应的线程需要挂起, // 挂起前 waiter 保存对应的线程引用,方便匹配成功后被唤醒。 volatile Thread waiter; // true 当前 Node 是一个 DATA 类型,false 表示当前 Node 是一个 REQUEST 类型 final boolean isData; // 构建方法 QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } // 尝试取消当前 node,取消状态的 node 的 item 域指向自己 void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } // 判断当前 node 是否为取消状态 boolean isCancelled() { return item == this; } // 判断当前节点是否 “不在” 队列内,当 next 指向自己时,说明节点已经出队。 boolean isOffList() { return next == this; } }
TransferQueue 类成员方法:
-
设置头尾节点:
void advanceHead(QNode h, QNode nh) { // 设置头指针指向新的节点, if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) // 老的头节点出队 h.next = h; } void advanceTail(QNode t, QNode nt) { if (tail == t) // 更新队尾节点为新的队尾 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); }
-
transfer():核心方法
E transfer(E e, boolean timed, long nanos) { // s 指向当前请求对应的 node QNode s = null; // 是否是 DATA 类型的请求 boolean isData = (e != null); // 自旋 for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) continue; // head 和 tail 同时指向 dummy 节点,说明是空队列 // 队尾节点与当前请求类型是一致的情况,说明阻塞队列中都无法匹配, if (h == t || t.isData == isData) { // 获取队尾 t 的 next 节点 QNode tn = t.next; // 多线程环境中其他线程可能修改尾节点 if (t != tail) continue; // 已经有线程入队了,更新 tail if (tn != null) { advanceTail(t, tn); continue; } // 允许超时,超时时间小于 0,这种方法不支持阻塞等待 if (timed && nanos <= 0) return null; // 创建 node 的逻辑 if (s == null) s = new QNode(e, isData); // 将 node 添加到队尾 if (!t.casNext(null, s)) continue; // 更新队尾指针 advanceTail(t, s); // 当前节点 等待匹配.... Object x = awaitFulfill(s, e, timed, nanos); // 说明【当前 node 状态为 取消状态】,需要做出队逻辑 if (x == s) { clean(t, s); return null; } // 说明当前 node 仍然在队列内,匹配成功,需要做出队逻辑 if (!s.isOffList()) { // t 是当前 s 节点的前驱节点,判断 t 是不是头节点,是就更新 dummy 节点为 s 节点 advanceHead(t, s); // s 节点已经出队,所以需要把它的 item 域设置为它自己,表示它是个取消状态 if (x != null) s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; // 队尾节点与当前请求节点【互补匹配】 } else { // h.next 节点,【请求节点与队尾模式不同,需要与队头发生匹配】,TransferQueue 是一个【公平模式】 QNode m = h.next; // 并发导致其他线程修改了队尾节点,或者已经把 head.next 匹配走了 if (t != tail || m == null || h != head) continue; // 获取匹配节点的数据域保存到 x Object x = m.item; // 判断是否匹配成功 if (isData == (x != null) || x == m || !m.casItem(x, e)) { advanceHead(h, m); continue; } // 【匹配完成】,将头节点出队,让这个新的头结点成为 dummy 节点 advanceHead(h, m); // 唤醒该匹配节点的线程 LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }
-
awaitFulfill():阻塞当前线程等待被匹配
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { // 表示等待截止时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 自选检查的次数 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 被打断就取消节点 if (w.isInterrupted()) s.tryCancel(e); // 获取当前 Node 数据域 Object x = s.item; // 当前请求为 DATA 模式时:e 请求带来的数据 // s.item 修改为 this,说明当前 QNode 对应的线程 取消状态 // s.item 修改为 null 表示已经有匹配节点了,并且匹配节点拿走了 item 数据 // 当前请求为 REQUEST 模式时:e == null // s.item 修改为 this,说明当前 QNode 对应的线程 取消状态 // s.item != null 且 item != this 表示当前 REQUEST 类型的 Node 已经匹配到 DATA 了 if (x != e) return x; // 超时检查 if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } // 自旋次数减一 if (spins > 0) --spins; // 没有自旋次数了,把当前线程封装进去 waiter else if (s.waiter == null) s.waiter = w; // 阻塞 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
操作Pool
创建方式
Executor
存放线程的容器:
private final HashSet<Worker> workers = new HashSet<Worker>();
构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数介绍:
-
corePoolSize:核心线程数,定义了最小可以同时运行的线程数量
-
maximumPoolSize:最大线程数,当队列中存放的任务达到队列容量时,当前可以同时运行的数量变为最大线程数,创建线程并立即执行最新的任务,与核心线程数之间的差值又叫救急线程数
-
keepAliveTime:救急线程最大存活时间,当线程池中的线程数量大于
corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等到keepAliveTime
时间超过销毁 -
unit:
keepAliveTime
参数的时间单位 -
workQueue:阻塞队列,存放被提交但尚未被执行的任务
-
threadFactory:线程工厂,创建新线程时用到,可以为线程创建时起名字
-
handler:拒绝策略,线程到达最大线程数仍有新任务时会执行拒绝策略
RejectedExecutionHandler 下有 4 个实现类:
- AbortPolicy:让调用者抛出 RejectedExecutionException 异常,默认策略
- CallerRunsPolicy:让调用者运行的调节机制,将某些任务回退到调用者,从而降低新任务的流量
- DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常
- DiscardOldestPolicy:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交当前任务
补充:其他框架拒绝策略
- Dubbo:在抛出 RejectedExecutionException 异常前记录日志,并 dump 线程栈信息,方便定位问题
- Netty:创建一个新线程来执行任务
- ActiveMQ:带超时等待(60s)尝试放入队列
- PinPoint:它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
工作原理:
-
创建线程池,这时没有创建线程(懒惰),等待提交过来的任务请求,调用 execute 方法才会创建线程
-
当调用 execute() 方法添加一个请求任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列
- 如果这时队列满了且正在运行的线程数量还小于 maximumPoolSize,那么会创建非核心线程立刻运行这个任务,对于阻塞队列中的任务不公平。这是因为创建每个 Worker(线程)对象会绑定一个初始任务,启动 Worker 时会优先执行
- 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
-
当一个线程完成任务时,会从队列中取下一个任务来执行
-
当一个线程空闲超过一定的时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉,所以线程池的所有任务完成后最终会收缩到 corePoolSize 大小
图片来源:https://space.bilibili.com/457326371/
Executors
Executors 提供了四种线程池的创建:newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor、newScheduledThreadPool
-
newFixedThreadPool:创建一个拥有 n 个线程的线程池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- LinkedBlockingQueue 是一个单向链表实现的阻塞队列,默认大小为
Integer.MAX_VALUE
,也就是无界队列,可以放任意数量的任务,在任务比较多的时候会导致 OOM(内存溢出) - 适用于任务量已知,相对耗时的长期任务
-
newCachedThreadPool:创建一个可扩容的线程池
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
-
核心线程数是 0, 最大线程数是 29 个 1,全部都是救急线程(60s 后可以回收),可能会创建大量线程,从而导致 OOM
-
SynchronousQueue 作为阻塞队列,没有容量,对于每一个 take 的线程会阻塞直到有一个 put 的线程放入元素为止(类似一手交钱、一手交货)
-
适合任务数比较密集,但每个任务执行时间较短的情况
-
-
newSingleThreadExecutor:创建一个只有 1 个线程的单线程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
- 保证所有任务按照指定顺序执行,线程数固定为 1,任务数多于 1 时会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放
对比:
-
创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,线程池会新建一个线程,保证池的正常工作
-
Executors.newSingleThreadExecutor() 线程个数始终为 1,不能修改。FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
原因:父类不能直接调用子类中的方法,需要反射或者创建对象的方式,可以调用子类静态方法
-
Executors.newFixedThreadPool(1) 初始时为 1,可以修改。对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
开发要求
阿里巴巴 Java 开发手册要求:
-
线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
- 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题
- 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者过度切换的问题
-
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回的线程池对象弊端如下:
- FixedThreadPool 和 SingleThreadPool:请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
- CacheThreadPool 和 ScheduledThreadPool:允许创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,导致 OOM
创建多大容量的线程池合适?
-
一般来说池中总线程数是核心池线程数量两倍,确保当核心池有线程停止时,核心池外有线程进入核心池
-
过小会导致程序不能充分地利用系统资源、容易导致饥饿
-
过大会导致更多的线程上下文切换,占用更多内存
上下文切换:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态,任务从保存到再加载的过程就是一次上下文切换
核心线程数常用公式:
-
CPU 密集型任务 (N+1): 这种任务消耗的是 CPU 资源,可以将核心线程数设置为 N (CPU 核心数) + 1,比 CPU 核心数多出来的一个线程是为了防止线程发生缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 某个核心就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如在内存中对大量数据进行分析
-
I/O 密集型任务: 这种系统 CPU 处于阻塞状态,用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用,因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N 或 CPU 核数/ (1-阻塞系数),阻塞系数在 0.8~0.9 之间
IO 密集型就是涉及到网络读取,文件读取此类任务 ,特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上
提交方法
ExecutorService 类 API:
方法 | 说明 |
---|---|
void execute(Runnable command) | 执行任务(Executor 类 API) |
Future<?> submit(Runnable task) | 提交任务 task() |
Future submit(Callable |
提交任务 task,用返回值 Future 获得任务执行结果 |
List<Future |
提交 tasks 中所有任务 |
List<Future |
提交 tasks 中所有任务,超时时间针对所有task,超时会取消没有执行完的任务,并抛出超时异常 |
T invokeAny(Collection<? extends Callable |
提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消 |
execute 和 submit 都属于线程池的方法,对比:
-
execute 只能执行 Runnable 类型的任务,没有返回值; submit 既能提交 Runnable 类型任务也能提交 Callable 类型任务,底层是封装成 FutureTask,然后调用 execute 执行
-
execute 会直接抛出任务执行时的异常,submit 会吞掉异常,可通过 Future 的 get 方法将任务执行时的异常重新抛出
关闭方法
ExecutorService 类 API:
方法 | 说明 |
---|---|
void shutdown() | 线程池状态变为 SHUTDOWN,等待任务执行完后关闭线程池,不会接收新任务,但已提交任务会执行完,而且也可以添加线程(不绑定ren’wu) |
List |
线程池状态变为 STOP,用 interrupt 中断正在执行的任务,直接关闭线程池,不会接收新任务,会将队列中的任务返回 |
boolean isShutdown() | 不在 RUNNING 状态的线程池,此执行者已被关闭,方法返回 true |
boolean isTerminated() | 线程池状态是否是 TERMINATED,如果所有任务在关闭后完成,返回 true |
boolean awaitTermination(long timeout, TimeUnit unit) | 调用 shutdown 后,由于调用线程不会等待所有任务运行结束,如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待 |
处理异常
execute 会直接抛出任务执行时的异常,submit 会吞掉异常,有两种处理方法
方法 1:主动捉异常
ExecutorService executorService = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
System.out.println("task1");
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
}
});
方法 2:使用 Future 对象
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<?> future = pool.submit(() -> {
System.out.println("task1");
int i = 1 / 0;
return true;
});
System.out.println(future.get());
工作原理
状态信息
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量。这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 CAS 原子操作进行赋值
-
状态表示:
// 高3位:表示当前线程池运行状态,除去高3位之后的低位:表示当前线程池中所拥有的线程数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 表示在 ctl 中,低 COUNT_BITS 位,是用于存放当前线程数量的位 private static final int COUNT_BITS = Integer.SIZE - 3; // 低 COUNT_BITS 位所能表达的最大数值,000 11111111111111111111 => 5亿多 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
-
四种状态:
// 111 000000000000000000,转换成整数后其实就是一个【负数】 private static final int RUNNING = -1 << COUNT_BITS; // 000 000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 010 000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 011 000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS;
状态 高3位 接收新任务 处理阻塞任务队列 说明 RUNNING 111 Y Y SHUTDOWN 000 N Y 不接收新任务,但处理阻塞队列剩余任务 STOP 001 N N 中断正在执行的任务,并抛弃阻塞队列任务 TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入终结 TERMINATED 011 - - 终止状态 -
获取当前线程池运行状态:
// ~CAPACITY = ~000 11111111111111111111 = 111 000000000000000000000(取反) // c == ctl = 111 000000000000000000111 // 111 000000000000000000111 // 111 000000000000000000000 // 111 000000000000000000000 获取到了运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; }
-
获取当前线程池线程数量:
// c = 111 000000000000000000111 // CAPACITY = 000 111111111111111111111 // 000 000000000000000000111 => 7 private static int workerCountOf(int c) { return c & CAPACITY; }
-
重置当前线程池状态 ctl:
// rs 表示线程池状态,wc 表示当前线程池中 worker(线程)数量,相与以后就是合并后的状态 private static int ctlOf(int rs, int wc) { return rs | wc; }
-
比较当前线程池 ctl 所表示的状态:
// 比较当前线程池 ctl 所表示的状态,是否小于某个状态 s // 状态对比:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED private static boolean runStateLessThan(int c, int s) { return c < s; } // 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态s private static boolean runStateAtLeast(int c, int s) { return c >= s; } // 小于 SHUTDOWN 的一定是 RUNNING,SHUTDOWN == 0 private static boolean isRunning(int c) { return c < SHUTDOWN; }
-
设置线程池 ctl:
// 使用 CAS 方式 让 ctl 值 +1 ,成功返回 true, 失败返回 false private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } // 使用 CAS 方式 让 ctl 值 -1 ,成功返回 true, 失败返回 false private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } // 将 ctl 值减一,do while 循环会一直重试,直到成功为止 private void decrementWorkerCount() { do {} while (!compareAndDecrementWorkerCount(ctl.get())); }
成员属性
成员变量
-
线程池中存放 Worker 的容器:线程池没有初始化,直接往池中加线程即可
private final HashSet<Worker> workers = new HashSet<Worker>();
-
线程全局锁:
// 增加减少 worker 或者时修改线程池运行状态需要持有 mainLock private final ReentrantLock mainLock = new ReentrantLock();
-
可重入锁的条件变量:
// 当外部线程调用 awaitTermination() 方法时,会等待当前线程池状态为 Termination 为止 private final Condition termination = mainLock.newCondition()
-
线程池相关参数:
private volatile int corePoolSize; // 核心线程数量 private volatile int maximumPoolSize; // 线程池最大线程数量 private volatile long keepAliveTime; // 空闲线程存活时间 private volatile ThreadFactory threadFactory; // 创建线程时使用的线程工厂,默认是 DefaultThreadFactory private final BlockingQueue<Runnable> workQueue;// 【超过核心线程提交任务就放入 阻塞队列】
private volatile RejectedExecutionHandler handler; // 拒绝策略,juc包提供了4中方式 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();// 默认策略
-
记录线程池相关属性的数值:
private int largestPoolSize; // 记录线程池生命周期内线程数最大值 private long completedTaskCount; // 记录线程池所完成任务总数,当某个 worker 退出时将完成的任务累加到该属性
-
控制核心线程数量内的线程是否可以被回收:
// false(默认)代表不可以,为 true 时核心线程空闲超过 keepAliveTime 也会被回收 // allowCoreThreadTimeOut(boolean value) 方法可以设置该值 private volatile boolean allowCoreThreadTimeOut;
内部类:
-
Worker 类:每个 Worker 对象会绑定一个初始任务,启动 Worker 时优先执行,这也是造成线程池不公平的原因。Worker 继承自 AQS,本身具有锁的特性,采用独占锁模式,state = 0 表示未被占用,> 0 表示被占用,< 0 表示初始状态不能被抢锁
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; // worker 内部封装的工作线程 Runnable firstTask; // worker 第一个执行的任务,普通的 Runnable 实现类或者是 FutureTask volatile long completedTasks; // 记录当前 worker 所完成任务数量 // 构造方法 Worker(Runnable firstTask) { // 设置AQS独占模式为初始化中状态,这个状态不能被抢占锁 setState(-1); // firstTask不为空时,当worker启动后,内部线程会优先执行firstTask,执行完后会到queue中去获取下个任务 this.firstTask = firstTask; // 使用线程工厂创建一个线程,并且【将当前worker指定为Runnable】,所以thread启动时会调用 worker.run() this.thread = getThreadFactory().newThread(this); } // 【不可重入锁】 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } }
public Thread newThread(Runnable r) { // 将当前 worker 指定为 thread 的执行方法,线程调用 start 会调用 r.run() Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
-
拒绝策略相关的内部类
成员方法
提交方法
-
AbstractExecutorService#submit():提交任务,把 Runnable 或 Callable 任务封装成 FutureTask 执行,可以通过方法返回的任务对象,调用 get 阻塞获取任务执行的结果或者异常,源码分析在笔记的 Future 部分
public Future<?> submit(Runnable task) { // 空指针异常 if (task == null) throw new NullPointerException(); // 把 Runnable 封装成未来任务对象,执行结果就是 null,也可以通过参数指定 FutureTask#get 返回数据 RunnableFuture<Void> ftask = newTaskFor(task, null); // 执行方法 execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 把 Callable 封装成未来任务对象 RunnableFuture<T> ftask = newTaskFor(task); // 执行方法 execute(ftask); // 返回未来任务对象,用来获取返回值 return ftask; }
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { // Runnable 封装成 FutureTask,【指定返回值】 return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { // Callable 直接封装成 FutureTask return new FutureTask<T>(callable); }
-
execute():执行任务,但是没有返回值,没办法获取任务执行结果,出现异常会直接抛出任务执行时的异常。根据线程池中的线程数,选择添加任务时的处理方式
// command 可以是普通的 Runnable 实现类,也可以是 FutureTask,不能是 Callable public void execute(Runnable command) { // 非空判断 if (command == null) throw new NullPointerException(); // 获取 ctl 最新值赋值给 c,ctl 高 3 位表示线程池状态,低位表示当前线程池线程数量。 int c = ctl.get(); // 【1】当前线程数量小于核心线程数,此次提交任务直接创建一个新的 worker,线程池中多了一个新的线程 if (workerCountOf(c) < corePoolSize) { // addWorker 为创建线程的过程,会创建 worker 对象并且将 command 作为 firstTask,优先执行 if (addWorker(command, true)) return; // 执行到这条语句,说明 addWorker 一定是失败的,存在并发现象或者线程池状态被改变,重新获取状态 // SHUTDOWN 状态下也有可能创建成功,前提 firstTask == null 而且当前 queue 不为空(特殊情况) c = ctl.get(); } // 【2】执行到这说明当前线程数量已经达到核心线程数量 或者 addWorker 失败 // 判断当前线程池是否处于running状态,成立就尝试将 task 放入到 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 条件一成立说明线程池状态被外部线程给修改了,可能是执行了 shutdown() 方法,该状态不能接收新提交的任务 // 所以要把刚提交的任务删除,删除成功说明提交之后线程池中的线程还未消费(处理)该任务 if (!isRunning(recheck) && remove(command)) // 任务出队成功,走拒绝策略 reject(command); // 执行到这说明线程池是 running 状态,获取线程池中的线程数量,判断是否是 0 // 【担保机制】,保证线程池在 running 状态下,最起码得有一个线程在工作 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 【3】offer失败说明queue满了 // 如果线程数量尚未达到 maximumPoolSize,会创建非核心 worker 线程直接执行 command,【这也是不公平的原因】 // 如果当前线程数量达到 maximumPoolSiz,这里 addWorker 也会失败,走拒绝策略 else if (!addWorker(command, false)) reject(command); }
添加线程
-
prestartAllCoreThreads():提前预热,创建所有的核心线程
public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }
-
addWorker():添加线程到线程池,返回 true 表示创建 Worker 成功,且线程启动。首先判断线程池是否允许添加线程,允许就让线程数量 + 1,然后去创建 Worker 加入线程池
注意:SHUTDOWN 状态也能添加线程,但是要求新加的 Woker 没有 firstTask,而且当前 queue 不为空,所以创建一个线程来帮助线程池执行队列中的任务
// core == true 表示采用核心线程数量限制,false 表示采用 maximumPoolSize private boolean addWorker(Runnable firstTask, boolean core) { // 自旋【判断当前线程池状态是否允许创建线程】,允许就设置线程数量 + 1 retry: for (;;) { // 获取 ctl 的值 int c = ctl.get(); // 获取当前线程池运行状态 int rs = runStateOf(c); // 判断当前线程池状态【是否允许添加线程】 // 当前线程池是 SHUTDOWN 状态,但是队列里面还有任务尚未处理完,需要处理完 queue 中的任务 // 【不允许再提交新的 task,所以 firstTask 为空,但是可以继续添加 worker】 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; for (;;) { // 获取线程池中线程数量 int wc = workerCountOf(c); // 条件一一般不成立,CAPACITY是5亿多,根据 core 判断使用哪个大小限制线程数量,超过了返回 false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 记录线程数量已经加 1,类比于申请到了一块令牌,条件失败说明其他线程修改了数量 if (compareAndIncrementWorkerCount(c)) // 申请成功,跳出了 retry 这个 for 自旋 break retry; // CAS 失败,没有成功的申请到令牌 c = ctl.get(); // 判断当前线程池状态是否发生过变化,被其他线程修改了,可能其他线程调用了 shutdown() 方法 if (runStateOf(c) != rs) // 返回外层循环检查是否能创建线程,在 if 语句中返回 false continue retry; } } //【令牌申请成功,开始创建线程】 // 运行标记,表示创建的 worker 是否已经启动,false未启动 true启动 boolean workerStarted = false; // 添加标记,表示创建的 worker 是否添加到池子中了,默认false未添加,true是添加。 boolean workerAdded = false; Worker w = null; try { // 【创建 Worker,底层通过线程工厂 newThread 方法创建执行线程,指定了首先执行的任务】 w = new Worker(firstTask); // 将新创建的 worker 节点中的线程赋值给 t final Thread t = w.thread; // 这里的判断为了防止 程序员自定义的 ThreadFactory 实现类有 bug,创造不出线程 if (t != null) { final ReentrantLock mainLock = this.mainLock; // 加互斥锁,要添加 worker 了 mainLock.lock(); try { // 获取最新线程池运行状态保存到 rs int rs = runStateOf(ctl.get()); // 判断线程池是否为RUNNING状态,不是再【判断当前是否为SHUTDOWN状态且firstTask为空,特殊情况】 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 当线程start后,线程isAlive会返回true,这里还没开始启动线程,如果被启动了就需要报错 if (t.isAlive()) throw new IllegalThreadStateException(); //【将新建的 Worker 添加到线程池中】 workers.add(w); int s = workers.size(); // 当前池中的线程数量是一个新高,更新 largestPoolSize if (s > largestPoolSize) largestPoolSize = s; // 添加标记置为 true workerAdded = true; } } finally { // 解锁啊 mainLock.unlock(); } // 添加成功就【启动线程执行任务】 if (workerAdded) { // Thread 类中持有 Runnable 任务对象,调用的是 Runnable 的 run ,也就是 FutureTask t.start(); // 运行标记置为 true workerStarted = true; } } } finally { // 如果启动线程失败,做清理工作 if (! workerStarted) addWorkerFailed(w); } // 返回新创建的线程是否启动 return workerStarted; }
-
addWorkerFailed():清理任务
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; // 持有线程池全局锁,因为操作的是线程池相关的东西 mainLock.lock(); try { //条件成立需要将 worker 在 workers 中清理出去。 if (w != null) workers.remove(w); // 将线程池计数 -1,相当于归还令牌。 decrementWorkerCount(); // 尝试停止线程池 tryTerminate(); } finally { //释放线程池全局锁。 mainLock.unlock(); } }
运行方法
-
Worker#run:Worker 实现了 Runnable 接口,当线程启动时,会调用 Worker 的 run() 方法
public void run() { // ThreadPoolExecutor#runWorker() runWorker(this); }
-
runWorker():线程启动就要执行任务,会一直 while 循环获取任务并执行
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 获取 worker 的 firstTask Runnable task = w.firstTask; // 引用置空,【防止复用该线程时重复执行该任务】 w.firstTask = null; // 初始化 worker 时设置 state = -1,表示不允许抢占锁 // 这里需要设置 state = 0 和 exclusiveOwnerThread = null,开始独占模式抢锁 w.unlock(); // true 表示发生异常退出,false 表示正常退出。 boolean completedAbruptly = true; try { // firstTask 不是 null 就直接运行,否则去 queue 中获取任务 // 【getTask 如果是阻塞获取任务,会一直阻塞在take方法,直到获取任务,不会走返回null的逻辑】 while (task != null || (task = getTask()) != null) { // worker 加锁,shutdown 时会判断当前 worker 状态,【根据独占锁状态判断是否空闲】 w.lock(); // 说明线程池状态大于 STOP,目前处于 STOP/TIDYING/TERMINATION,此时给线程一个中断信号 if ((runStateAtLeast(ctl.get(), STOP) || // 说明线程处于 RUNNING 或者 SHUTDOWN 状态,清除打断标记 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 中断线程,设置线程的中断标志位为 true wt.interrupt(); try { // 钩子方法,【任务执行的前置处理】 beforeExecute(wt, task); Throwable thrown = null; try { // 【执行任务】 task.run(); } catch (Exception x) { //..... } finally { // 钩子方法,【任务执行的后置处理】 afterExecute(task, thrown); } } finally { task = null; // 将局部变量task置为null,代表任务执行完成 w.completedTasks++; // 更新worker完成任务数量 w.unlock(); // 解锁 } } // getTask()方法返回null时会走到这里,表示queue为空并且线程空闲超过保活时间,【当前线程执行退出逻辑】 completedAbruptly = false; } finally { // 正常退出 completedAbruptly = false // 异常退出 completedAbruptly = true,【从 task.run() 内部抛出异常】时,跳到这一行 processWorkerExit(w, completedAbruptly); } }
-
unlock():重置锁
public void unlock() { release(1); } // 外部不会直接调用这个方法 这个方法是 AQS 内调用的,外部调用 unlock 时触发此方法 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); // 设置持有者为 null setState(0); // 设置 state = 0 return true; }
-
getTask():获取任务,线程空闲时间超过 keepAliveTime 就会被回收,判断的依据是当前线程阻塞获取任务超过保活时间,方法返回 null 就代表当前线程要被回收了,返回到 runWorker 执行线程退出逻辑。线程池具有担保机制,对于 RUNNING 状态下的超时回收,要保证线程池中最少有一个线程运行,或者任务阻塞队列已经是空
private Runnable getTask() { // 超时标记,表示当前线程获取任务是否超时,true 表示已超时 boolean timedOut = false; for (;;) { int c = ctl.get(); // 获取线程池当前运行状态 int rs = runStateOf(c); // 【tryTerminate】打断线程后执行到这,此时线程池状态为STOP或者线程池状态为SHUTDOWN并且队列已经是空 // 所以下面的 if 条件一定是成立的,可以直接返回 null,线程就应该退出了 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 使用 CAS 自旋的方式让 ctl 值 -1 decrementWorkerCount(); return null; } // 获取线程池中的线程数量 int wc = workerCountOf(c); // 线程没有明确的区分谁是核心或者非核心线程,是根据当前池中的线程数量判断 // timed = false 表示当前这个线程 获取task时不支持超时机制的,当前线程会使用 queue.take() 阻塞获取 // timed = true 表示当前这个线程 获取task时支持超时机制,使用 queue.poll(xxx,xxx) 超时获取 // 条件一代表允许回收核心线程,那就无所谓了,全部线程都执行超时回收 // 条件二成立说明线程数量大于核心线程数,当前线程认为是非核心线程,有保活时间,去超时获取任务 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果线程数量是否超过最大线程数,直接回收 // 如果当前线程【允许超时回收并且已经超时了】,就应该被回收了,由于【担保机制】还要做判断: // wc > 1 说明线程池还用其他线程,当前线程可以直接回收 // workQueue.isEmpty() 前置条件是 wc = 1,【如果当前任务队列也是空了,最后一个线程就可以退出】 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 使用 CAS 机制将 ctl 值 -1 ,减 1 成功的线程,返回 null,代表可以退出 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 根据当前线程是否需要超时回收,【选择从队列获取任务的方法】是超时获取或者阻塞获取 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 获取到任务返回任务,【阻塞获取会阻塞到获取任务为止】,不会返回 null if (r != null) return r; // 获取任务为 null 说明超时了,将超时标记设置为 true,下次自旋时返 null timedOut = true; } catch (InterruptedException retry) { // 阻塞线程被打断后超时标记置为 false,【说明被打断不算超时】,要继续获取,直到超时或者获取到任务 // 如果线程池 SHUTDOWN 状态下的打断,会在循环获取任务前判断,返回 null timedOut = false; } } }
-
processWorkerExit():线程退出线程池,也有担保机制,保证队列中的任务被执行
// 正常退出 completedAbruptly = false,异常退出为 true private void processWorkerExit(Worker w, boolean completedAbruptly) { // 条件成立代表当前 worker 是发生异常退出的,task 任务执行过程中向上抛出异常了 if (completedAbruptly) // 从异常时到这里 ctl 一直没有 -1,需要在这里 -1 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { // 将当前 worker 完成的 task 数量,汇总到线程池的 completedTaskCount completedTaskCount += w.completedTasks; // 将 worker 从线程池中移除 workers.remove(w); } finally { mainLock.unlock(); // 解锁 } // 尝试停止线程池,唤醒下一个线程 tryTerminate(); int c = ctl.get(); // 线程池不是停止状态就应该有线程运行【担保机制】 if (runStateLessThan(c, STOP)) { // 正常退出的逻辑,是对空闲线程回收,不是执行出错 if (!completedAbruptly) { // 根据是否回收核心线程确定【线程池中的线程数量最小值】 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 最小值为 0,但是线程队列不为空,需要一个线程来完成任务担保机制 if (min == 0 && !workQueue.isEmpty()) min = 1; // 线程池中的线程数量大于最小值可以直接返回 if (workerCountOf(c) >= min) return; } // 执行 task 时发生异常,有个线程因为异常终止了,需要添加 // 或者线程池中的数量小于最小值,这里要创建一个新 worker 加进线程池 addWorker(null, false); } }
停止方法
-
shutdown():停止线程池
public void shutdown() { final ReentrantLock mainLock = this.mainLock; // 获取线程池全局锁 mainLock.lock(); try { checkShutdownAccess(); // 设置线程池状态为 SHUTDOWN,如果线程池状态大于 SHUTDOWN,就不会设置直接返回 advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); // 空方法,子类可以扩展 onShutdown(); } finally { // 释放线程池全局锁 mainLock.unlock(); } tryTerminate(); }
-
interruptIdleWorkers():shutdown 方法会中断所有空闲线程,根据是否可以获取 AQS 独占锁判断是否处于工作状态。线程之所以空闲是因为阻塞队列没有任务,不会中断正在运行的线程,所以 shutdown 方法会让所有的任务执行完毕
// onlyOne == true 说明只中断一个线程 ,false 则中断所有线程 private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; / /持有全局锁 mainLock.lock(); try { // 遍历所有 worker for (Worker w : workers) { // 获取当前 worker 的线程 Thread t = w.thread; // 条件一成立:说明当前迭代的这个线程尚未中断 // 条件二成立:说明【当前worker处于空闲状态】,阻塞在poll或者take,因为worker执行task时是要加锁的 // 每个worker有一个独占锁,w.tryLock()尝试加锁,加锁成功返回 true if (!t.isInterrupted() && w.tryLock()) { try { // 中断线程,处于 queue 阻塞的线程会被唤醒,进入下一次自旋,返回 null,执行退出相逻辑 t.interrupt(); } catch (SecurityException ignore) { } finally { // 释放worker的独占锁 w.unlock(); } } // false,代表中断所有的线程 if (onlyOne) break; } } finally { // 释放全局锁 mainLock.unlock(); } }
-
shutdownNow():直接关闭线程池,不会等待任务执行完成
public List<Runnable> shutdownNow() { // 返回值引用 List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; // 获取线程池全局锁 mainLock.lock(); try { checkShutdownAccess(); // 设置线程池状态为STOP advanceRunState(STOP); // 中断线程池中【所有线程】 interruptWorkers(); // 从阻塞队列中导出未处理的task tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); // 返回当前任务队列中 未处理的任务。 return tasks; }
-
tryTerminate():设置为 TERMINATED 状态 if either (SHUTDOWN and pool and queue empty) or (STOP and pool empty)
final void tryTerminate() { for (;;) { // 获取 ctl 的值 int c = ctl.get(); // 线程池正常,或者有其他线程执行了状态转换的方法,当前线程直接返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || // 线程池是 SHUTDOWN 并且任务队列不是空,需要去处理队列中的任务 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 执行到这里说明线程池状态为 STOP 或者线程池状态为 SHUTDOWN 并且队列已经是空 // 判断线程池中线程的数量 if (workerCountOf(c) != 0) { // 【中断一个空闲线程】,在 queue.take() | queue.poll() 阻塞空闲 // 唤醒后的线程会在getTask()方法返回null, // 执行 processWorkerExit 退出逻辑时会再次调用 tryTerminate() 唤醒下一个空闲线程 interruptIdleWorkers(ONLY_ONE); return; } // 池中的线程数量为 0 来到这里 final ReentrantLock mainLock = this.mainLock; // 加全局锁 mainLock.lock(); try { // 设置线程池状态为 TIDYING 状态,线程数量为 0 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 结束线程池 terminated(); } finally { // 设置线程池状态为TERMINATED状态。 ctl.set(ctlOf(TERMINATED, 0)); // 【唤醒所有调用 awaitTermination() 方法的线程】 termination.signalAll(); } return; } } finally { // 释放线程池全局锁 mainLock.unlock(); } } }
Future
线程使用
FutureTask 未来任务对象,继承 Runnable、Future 接口,用于包装 Callable 对象,实现任务的提交
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "Hello World";
}
});
new Thread(task).start(); //启动线程
String msg = task.get(); //获取返回任务数据
System.out.println(msg);
}
构造方法:
public FutureTask(Callable<V> callable){
this.callable = callable; // 属性注入
this.state = NEW; // 任务状态设置为 new
}
public FutureTask(Runnable runnable, V result) {
// 适配器模式
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 使用装饰者模式将 runnable 转换成 callable 接口,外部线程通过 get 获取
// 当前任务执行结果时,结果可能为 null 也可能为传进来的值,【传进来什么返回什么】
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
// 构造方法
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
// 实则调用 Runnable#run 方法
task.run();
// 返回值为构造 FutureTask 对象时传入的返回值或者是 null
return result;
}
}
成员属性
FutureTask 类的成员属性:
-
任务状态:
// 表示当前task状态 private volatile int state; // 当前任务尚未执行 private static final int NEW = 0; // 当前任务正在结束,尚未完全结束,一种临界状态 private static final int COMPLETING = 1; // 当前任务正常结束 private static final int NORMAL = 2; // 当前任务执行过程中发生了异常,内部封装的 callable.run() 向上抛出异常了 private static final int EXCEPTIONAL = 3; // 当前任务被取消 private static final int CANCELLED = 4; // 当前任务中断中 private static final int INTERRUPTING = 5; // 当前任务已中断 private static final int INTERRUPTED = 6;
-
任务对象:
private Callable<V> callable; // Runnable 使用装饰者模式伪装成 Callable
-
存储任务执行的结果,这是 run 方法返回值是 void 也可以获取到执行结果的原因:
// 正常情况下:任务正常执行结束,outcome 保存执行结果,callable 返回值 // 非正常情况:callable 向上抛出异常,outcome 保存异常 private Object outcome;
-
执行当前任务的线程对象:
private volatile Thread runner; // 当前任务被线程执行期间,保存当前执行任务的线程对象引用
-
线程阻塞队列的头节点:
// 会有很多线程去 get 当前任务的结果,这里使用了一种数据结构头插头取(类似栈)的一个队列来保存所有的 get 线程 private volatile WaitNode waiters;
-
内部类:
static final class WaitNode { // 单向链表 volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
成员方法
FutureTask 类的成员方法:
-
FutureTask#run:任务执行入口
public void run() { //条件一:成立说明当前 task 已经被执行过了或者被 cancel 了,非 NEW 状态的任务,线程就不需要处理了 //条件二:线程是 NEW 状态,尝试设置当前任务对象的线程是当前线程,设置失败说明其他线程抢占了该任务,直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 执行到这里,当前 task 一定是 NEW 状态,而且【当前线程也抢占 task 成功】 Callable<V> c = callable; // 判断任务是否为空,防止空指针异常;判断 state 状态,防止外部线程在此期间 cancel 掉当前任务 // 【因为 task 的执行者已经设置为当前线程,所以这里是线程安全的】 if (c != null && state == NEW) { V result; // true 表示 callable.run 代码块执行成功 未抛出异常 // false 表示 callable.run 代码块执行失败 抛出异常 boolean ran; try { // 【调用自定义的方法,执行结果赋值给 result】 result = c.call(); // 没有出现异常 ran = true; } catch (Throwable ex) { // 出现异常,返回值置空,ran 置为 false result = null; ran = false; // 设置返回的异常 setException(ex); } // 代码块执行正常 if (ran) // 设置返回的结果 set(result); } } finally { // 任务执行完成,取消线程的引用,help GC runner = null; int s = state; // 判断任务是不是被中断 if (s >= INTERRUPTING) // 执行中断处理方法 handlePossibleCancellationInterrupt(s); } }
FutureTask#set:设置正常返回值,首先将任务状态设置为 COMPLETING 状态代表完成中,逻辑执行完设置为 NORMAL 状态代表任务正常执行完成,最后唤醒 get() 阻塞线程
protected void set(V v) { // CAS 方式设置当前任务状态为完成中,设置失败说明其他线程取消了该任务 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 【将结果赋值给 outcome】 outcome = v; // 将当前任务状态修改为 NORMAL 正常结束状态。 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); } }
FutureTask#setException:设置异常返回值
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 赋值给返回结果,用来向上层抛出来的异常 outcome = t; // 将当前任务的状态 修改为 EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); finishCompletion(); } }
FutureTask#finishCompletion:唤醒 get() 阻塞线程
private void finishCompletion() { // 遍历所有的等待的节点,q 指向头节点 for (WaitNode q; (q = waiters) != null;) { // 使用cas设置 waiters 为 null,防止外部线程使用cancel取消当前任务,触发finishCompletion方法重复执行 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 自旋 for (;;) { // 获取当前 WaitNode 节点封装的 thread Thread t = q.thread; // 当前线程不为 null,唤醒当前 get() 等待获取数据的线程 if (t != null) { q.thread = null; LockSupport.unpark(t); } // 获取当前节点的下一个节点 WaitNode next = q.next; // 当前节点是最后一个节点了 if (next == null) break; // 断开链表 q.next = null; // help gc q = next; } break; } } done(); callable = null; // help GC }
FutureTask#handlePossibleCancellationInterrupt:任务中断处理
private void handlePossibleCancellationInterrupt(int s) { if (s == INTERRUPTING) // 中断状态中 while (state == INTERRUPTING) // 等待中断完成 Thread.yield(); }
-
FutureTask#get:获取任务执行的返回值,执行 run 和 get 的不是同一个线程,一般有多个线程 get,只有一个线程 run
public V get() throws InterruptedException, ExecutionException { // 获取当前任务状态 int s = state; // 条件成立说明任务还没执行完成 if (s <= COMPLETING) // 返回 task 当前状态,可能当前线程在里面已经睡了一会 s = awaitDone(false, 0L); return report(s); }
FutureTask#awaitDone:get 线程封装成 WaitNode 对象进入阻塞队列阻塞等待
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 0 不带超时 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 引用当前线程,封装成 WaitNode 对象 WaitNode q = null; // 表示当前线程 waitNode 对象,是否进入阻塞队列 boolean queued = false; // 【三次自旋开始休眠】 for (;;) { // 判断当前 get() 线程是否被打断,打断返回 true,清除打断标记 if (Thread.interrupted()) { // 当前线程对应的等待 node 出队, removeWaiter(q); throw new InterruptedException(); } // 获取任务状态 int s = state; // 条件成立说明当前任务执行完成已经有结果了 if (s > COMPLETING) { // 条件成立说明已经为当前线程创建了 WaitNode,置空 help GC if (q != null) q.thread = null; // 返回当前的状态 return s; } // 条件成立说明当前任务接近完成状态,这里让当前线程释放一下 cpu ,等待进行下一次抢占 cpu else if (s == COMPLETING) Thread.yield(); // 【第一次自旋】,当前线程还未创建 WaitNode 对象,此时为当前线程创建 WaitNode对象 else if (q == null) q = new WaitNode(); // 【第二次自旋】,当前线程已经创建 WaitNode 对象了,但是node对象还未入队 else if (!queued) // waiters 指向队首,让当前 WaitNode 成为新的队首,【头插法】,失败说明其他线程修改了新的队首 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 【第三次自旋】,会到这里,或者 else 内 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 阻塞指定的时间 LockSupport.parkNanos(this, nanos); } // 条件成立:说明需要阻塞 else // 【当前 get 操作的线程被 park 阻塞】,除非有其它线程将唤醒或者将当前线程中断 LockSupport.park(this); } }
FutureTask#report:封装运行结果,可以获取 run() 方法中设置的成员变量 outcome,这是 run 方法的返回值是 void 也可以获取到任务执行的结果的原因
private V report(int s) throws ExecutionException { // 获取执行结果,是在一个 futuretask 对象中的属性,可以直接获取 Object x = outcome; // 当前任务状态正常结束 if (s == NORMAL) return (V)x; // 直接返回 callable 的逻辑结果 // 当前任务被取消或者中断 if (s >= CANCELLED) throw new CancellationException(); // 抛出异常 // 执行到这里说明自定义的 callable 中的方法有异常,使用 outcome 上层抛出异常 throw new ExecutionException((Throwable)x); }
-
FutureTask#cancel:任务取消,打断正在执行该任务的线程
public boolean cancel(boolean mayInterruptIfRunning) { // 条件一:表示当前任务处于运行中或者处于线程池任务队列中 // 条件二:表示修改状态,成功可以去执行下面逻辑,否则返回 false 表示 cancel 失败 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // 如果任务已经被执行,是否允许打断 if (mayInterruptIfRunning) { try { // 获取执行当前 FutureTask 的线程 Thread t = runner; if (t != null) // 打断执行的线程 t.interrupt(); } finally { // 设置任务状态为【中断完成】 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 唤醒所有 get() 阻塞的线程 finishCompletion(); } return true; }
任务调度
Timer
Timer 实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务
private static void method1() {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
System.out.println("task 1");
//int i = 1 / 0;//任务一的出错会导致任务二无法执行
Thread.sleep(2000);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
System.out.println("task 2");
}
};
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此任务1的延时,影响了任务2的执行
timer.schedule(task1, 1000);//17:45:56 c.ThreadPool [Timer-0] - task 1
timer.schedule(task2, 1000);//17:45:58 c.ThreadPool [Timer-0] - task 2
}
Scheduled
任务调度线程池 ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor:
- 使用内部类 ScheduledFutureTask 封装任务
- 使用内部类 DelayedWorkQueue 作为线程池队列
- 重写 onShutdown 方法去处理 shutdown 后的任务
- 提供 decorateTask 方法作为 ScheduledFutureTask 的修饰方法,以便开发者进行扩展
构造方法:Executors.newScheduledThreadPool(int corePoolSize)
public ScheduledThreadPoolExecutor(int corePoolSize) {
// 最大线程数固定为 Integer.MAX_VALUE,保活时间 keepAliveTime 固定为 0
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
// 阻塞队列是 DelayedWorkQueue
new DelayedWorkQueue());
}
常用 API:
ScheduledFuture<?> schedule(Runnable/Callable<V>, long delay, TimeUnit u)
:延迟执行任务ScheduledFuture<?> scheduleAtFixedRate(Runnable/Callable<V>, long initialDelay, long period, TimeUnit unit)
:定时执行周期任务,不考虑执行的耗时,参数为初始延迟时间、间隔时间、单位ScheduledFuture<?> scheduleWithFixedDelay(Runnable/Callable<V>, long initialDelay, long delay, TimeUnit unit)
:定时执行周期任务,考虑执行的耗时,参数为初始延迟时间、间隔时间、单位
基本使用:
-
延迟任务,但是出现异常并不会在控制台打印,也不会影响其他线程的执行
public static void main(String[] args){ // 线程池大小为1时也是串行执行 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); // 添加两个任务,都在 1s 后同时执行 executor.schedule(() -> { System.out.println("任务1,执行时间:" + new Date()); //int i = 1 / 0; try { Thread.sleep(2000); } catch (InterruptedException e) { } }, 1000, TimeUnit.MILLISECONDS); executor.schedule(() -> { System.out.println("任务2,执行时间:" + new Date()); }, 1000, TimeUnit.MILLISECONDS); }
-
定时任务 scheduleAtFixedRate:一次任务的启动到下一次任务的启动之间只要大于等于间隔时间,抢占到 CPU 就会立即执行
public static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); System.out.println("start..." + new Date()); pool.scheduleAtFixedRate(() -> { System.out.println("running..." + new Date()); Thread.sleep(2000); }, 1, 1, TimeUnit.SECONDS); } /*start...Sat Apr 24 18:08:12 CST 2021 running...Sat Apr 24 18:08:13 CST 2021 running...Sat Apr 24 18:08:15 CST 2021 running...Sat Apr 24 18:08:17 CST 2021
-
定时任务 scheduleWithFixedDelay:一次任务的结束到下一次任务的启动之间等于间隔时间,抢占到 CPU 就会立即执行,这个方法才是真正的设置两个任务之间的间隔
public static void main(String[] args){ ScheduledExecutorService pool = Executors.newScheduledThreadPool(3); System.out.println("start..." + new Date()); pool.scheduleWithFixedDelay(() -> { System.out.println("running..." + new Date()); Thread.sleep(2000); }, 1, 1, TimeUnit.SECONDS); } /*start...Sat Apr 24 18:11:41 CST 2021 running...Sat Apr 24 18:11:42 CST 2021 running...Sat Apr 24 18:11:45 CST 2021 running...Sat Apr 24 18:11:48 CST 2021
成员属性
成员变量
-
shutdown 后是否继续执行周期任务:
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
-
shutdown 后是否继续执行延迟任务:
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
-
取消方法是否将该任务从队列中移除:
// 默认 false,不移除,等到线程拿到任务之后抛弃 private volatile boolean removeOnCancel = false;
-
任务的序列号,可以用来比较优先级:
private static final AtomicLong sequencer = new AtomicLong();
延迟任务
ScheduledFutureTask 继承 FutureTask,实现 RunnableScheduledFuture 接口,具有延迟执行的特点,覆盖 FutureTask 的 run 方法来实现对延时执行、周期执行的支持。对于延时任务调用 FutureTask#run,而对于周期性任务则调用 FutureTask#runAndReset 并且在成功之后根据 fixed-delay/fixed-rate 模式来设置下次执行时间并重新将任务塞到工作队列
在调度线程池中无论是 runnable 还是 callable,无论是否需要延迟和定时,所有的任务都会被封装成 ScheduledFutureTask
成员变量:
-
任务序列号:
private final long sequenceNumber;
-
执行时间:
private long time; // 任务可以被执行的时间,交付时间,以纳秒表示 private final long period; // 0 表示非周期任务,正数表示 fixed-rate 模式的周期,负数表示 fixed-delay 模式
fixed-rate:两次开始启动的间隔,fixed-delay:一次执行结束到下一次开始启动
-
实际的任务对象:
RunnableScheduledFuture<V> outerTask = this;
-
任务在队列数组中的索引下标:
// DelayedWorkQueue 底层使用的数据结构是最小堆,记录当前任务在堆中的索引,-1 代表删除 int heapIndex;
成员方法:
-
构造方法:
ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); // 任务的触发时间 this.time = ns; // 任务的周期,多长时间执行一次 this.period = period; // 任务的序号 this.sequenceNumber = sequencer.getAndIncrement(); }
-
compareTo():ScheduledFutureTask 根据执行时间 time 正序排列,如果执行时间相同,在按照序列号 sequenceNumber 正序排列,任务需要放入 DelayedWorkQueue,延迟队列中使用该方法按照从小到大进行排序
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { // 类型强转 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; // 比较者 - 被比较者的执行时间 long diff = time - x.time; // 比较者先执行 if (diff < 0) return -1; // 被比较者先执行 else if (diff > 0) return 1; // 比较者的序列号小 else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } // 不是 ScheduledFutureTask 类型时,根据延迟时间排序 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
-
run():执行任务,非周期任务直接完成直接结束,周期任务执行完后会设置下一次的执行时间,重新放入线程池的阻塞队列,如果线程池中的线程数量少于核心线程,就会添加 Worker 开启新线程
public void run() { // 是否周期性,就是判断 period 是否为 0 boolean periodic = isPeriodic(); // 根据是否是周期任务检查当前状态能否执行任务,不能执行就取消任务 if (!canRunInCurrentRunState(periodic)) cancel(false); // 非周期任务,直接调用 FutureTask#run 执行 else if (!periodic) ScheduledFutureTask.super.run(); // 周期任务的执行,返回 true 表示执行成功 else if (ScheduledFutureTask.super.runAndReset()) { // 设置周期任务的下一次执行时间 setNextRunTime(); // 任务的下一次执行安排,如果当前线程池状态可以执行周期任务,加入队列,并开启新线程 reExecutePeriodic(outerTask); } }
周期任务正常完成后任务的状态不会变化,依旧是 NEW,不会设置 outcome 属性。但是如果本次任务执行出现异常,会进入 setException 方法将任务状态置为异常,把异常保存在 outcome 中,方法返回 false,后续的该任务将不会再周期的执行
protected boolean runAndReset() { // 任务不是新建的状态了,或者被别的线程执行了,直接返回 false if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { // 执行方法,没有返回值 c.call(); ran = true; } catch (Throwable ex) { // 出现异常,把任务设置为异常状态,唤醒所有的 get 阻塞线程 setException(ex); } } } finally { // 执行完成把执行线程引用置为 null runner = null; s = state; // 如果线程被中断进行中断处理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 如果正常执行,返回 true,并且任务状态没有被取消 return ran && s == NEW; }
// 任务下一次的触发时间 private void setNextRunTime() { long p = period; if (p > 0) // fixed-rate 模式,【时间设置为上一次执行任务的时间 + p】,两次任务执行的时间差 time += p; else // fixed-delay 模式,下一次执行时间是【当前这次任务结束的时间(就是现在) + delay 值】 time = triggerTime(-p); }
-
reExecutePeriodic():准备任务的下一次执行,重新放入阻塞任务队列
// ScheduledThreadPoolExecutor#reExecutePeriodic void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { // 【放入任务队列】 super.getQueue().add(task); // 如果提交完任务之后,线程池状态变为了 shutdown 状态,需要再次检查是否可以执行, // 如果不能执行且任务还在队列中未被取走,则取消任务 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else // 当前线程池状态可以执行周期任务,加入队列,并【根据线程数量是否大于核心线程数确定是否开启新线程】 ensurePrestart(); } }
-
cancel():取消任务
public boolean cancel(boolean mayInterruptIfRunning) { // 调用父类 FutureTask#cancel 来取消任务 boolean cancelled = super.cancel(mayInterruptIfRunning); // removeOnCancel 用于控制任务取消后是否应该从阻塞队列中移除 if (cancelled && removeOnCancel && heapIndex >= 0) // 从等待队列中删除该任务,并调用 tryTerminate() 判断是否需要停止线程池 remove(this); return cancelled; }
延迟队列
DelayedWorkQueue 是支持延时获取元素的阻塞队列,内部采用优先队列 PriorityQueue(小根堆、满二叉树)存储元素
其他阻塞队列存储节点的数据结构大都是链表,延迟队列是数组,所以延迟队列出队头元素后需要让其他元素(尾)替换到头节点,防止空指针异常
成员变量:
-
容量:
private static final int INITIAL_CAPACITY = 16; // 初始容量 private int size = 0; // 节点数量 private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; // 存放节点
-
锁:
private final ReentrantLock lock = new ReentrantLock(); // 控制并发 private final Condition available = lock.newCondition();// 条件队列
-
阻塞等待头节点的线程:线程池内的某个线程去 take() 获取任务时,如果延迟队列顶层节点不为 null(队列内有任务),但是节点任务还不到触发时间,线程就去检查队列的 leader字段是否被占用
- 如果未被占用,则当前线程占用该字段,然后当前线程到 available 条件队列指定超时时间
堆顶任务.time - now()
挂起 - 如果被占用,当前线程直接到 available 条件队列不指定超时时间的挂起
// leader 在 available 条件队列内是首元素,它超时之后会醒过来,然后再次将堆顶元素获取走,获取走之后,take()结束之前,会调用是 available.signal() 唤醒下一个条件队列内的等待者,然后释放 lock,下一个等待者被唤醒后去到 AQS 队列,做 acquireQueue(node) 逻辑 private Thread leader = null;
- 如果未被占用,则当前线程占用该字段,然后当前线程到 available 条件队列指定超时时间
成员方法
-
offer():插入节点
public boolean offer(Runnable x) { // 判空 if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; // 队列锁,增加删除数据时都要加锁 final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; // 队列数量大于存放节点的数组长度,需要扩容 if (i >= queue.length) // 扩容为原来长度的 1.5 倍 grow(); size = i + 1; // 当前是第一个要插入的节点 if (i == 0) { queue[0] = e; // 修改 ScheduledFutureTask 的 heapIndex 属性,表示该对象在队列里的下标 setIndex(e, 0); } else { // 向上调整元素的位置,并更新 heapIndex siftUp(i, e); } // 情况1:当前任务是第一个加入到 queue 内的任务,所以在当前任务加入到 queue 之前,take() 线程会直接 // 到 available 队列不设置超时的挂起,并不会去占用 leader 字段,这时需会唤醒一个线程 让它去消费 // 情况2:当前任务【优先级最高】,原堆顶任务可能还未到触发时间,leader 线程设置超时的在 available 挂起 // 原先的 leader 等待的是原先的头节点,所以 leader 已经无效,需要将 leader 线程唤醒, // 唤醒之后它会检查堆顶,如果堆顶任务可以被消费,则直接获取走,否则继续成为 leader 等待新堆顶任务 if (queue[0] == e) { // 将 leader 设置为 null leader = null; // 直接随便唤醒等待头结点的阻塞线程 available.signal(); } } finally { lock.unlock(); } return true; }
// 插入新节点后对堆进行调整,进行节点上移,保持其特性【节点的值小于子节点的值】,小顶堆 private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { // 父节点,就是堆排序 int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; // key 和父节点比,如果大于父节点可以直接返回,否则就继续上浮 if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
-
poll():非阻塞获取头结点,获取执行时间最近并且可以执行的
// 非阻塞获取 public RunnableScheduledFuture<?> poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 获取队头节点,因为是小顶堆 RunnableScheduledFuture<?> first = queue[0]; // 头结点为空或者的延迟时间没到返回 null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else // 头结点达到延迟时间,【尾节点成为替代节点下移调整堆结构】,返回头结点 return finishPoll(first); } finally { lock.unlock(); } }
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { // 获取尾索引 int s = --size; // 获取尾节点 RunnableScheduledFuture<?> x = queue[s]; // 将堆结构最后一个节点占用的 slot 设置为 null,因为该节点要尝试升级成堆顶,会根据特性下调 queue[s] = null; // s == 0 说明 当前堆结构只有堆顶一个节点,此时不需要做任何的事情 if (s != 0) // 从索引处 0 开始向下调整 siftDown(0, x); // 出队的元素索引设置为 -1 setIndex(f, -1); return f; }
-
take():阻塞获取头节点,读取当前堆中最小的也就是触发时间最近的任务
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; // 保证线程安全 lock.lockInterruptibly(); try { for (;;) { // 头节点 RunnableScheduledFuture<?> first = queue[0]; if (first == null) // 等待队列不空,直至有任务通过 offer 入队并唤醒 available.await(); else { // 获取头节点的延迟时间是否到时 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 到达触发时间,获取头节点并调整堆,重新选择延迟时间最小的节点放入头部 return finishPoll(first); // 逻辑到这说明头节点的延迟时间还没到 first = null; // 说明有 leader 线程在等待获取头节点,当前线程直接去阻塞等待 if (leader != null) available.await(); else { // 没有 leader 线程,【当前线程作为leader线程,并设置头结点的延迟时间作为阻塞时间】 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 在条件队列 available 使用带超时的挂起(堆顶任务.time - now() 纳秒值..) available.awaitNanos(delay); // 到达阻塞时间时,当前线程会从这里醒来来 } finally { // t堆顶更新,leader 置为 null,offer 方法释放锁后, // 有其它线程通过 take/poll 拿到锁,读到 leader == null,然后将自身更新为leader。 if (leader == thisThread) // leader 置为 null 用以接下来判断是否需要唤醒后继线程 leader = null; } } } } } finally { // 没有 leader 线程,头结点不为 null,唤醒阻塞获取头节点的线程, // 【如果没有这一步,就会出现有了需要执行的任务,但是没有线程去执行】 if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
-
remove():删除节点,堆移除一个元素的时间复杂度是 O(log n),延迟任务维护了 heapIndex,直接访问的时间复杂度是 O(1),从而可以更快的移除元素,任务在队列中被取消后会进入该逻辑
public boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { // 查找对象在队列数组中的下标 int i = indexOf(x); // 节点不存在,返回 false if (i < 0) return false; // 修改元素的 heapIndex,-1 代表删除 setIndex(queue[i], -1); // 尾索引是长度-1 int s = --size; // 尾节点作为替代节点 RunnableScheduledFuture<?> replacement = queue[s]; queue[s] = null; // s == i 说明头节点就是尾节点,队列空了 if (s != i) { // 向下调整 siftDown(i, replacement); // 说明没发生调整 if (queue[i] == replacement) // 上移和下移不可能同时发生,替代节点大于子节点时下移,否则上移 siftUp(i, replacement); } return true; } finally { lock.unlock(); } }
成员方法
提交任务
-
schedule():延迟执行方法,并指定执行的时间,默认是当前时间
public void execute(Runnable command) { // 以零延时任务的形式实现 schedule(command, 0, NANOSECONDS); }
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 判空 if (command == null || unit == null) throw new NullPointerException(); // 没有做任何操作,直接将 task 返回,该方法主要目的是用于子类扩展,并且【根据延迟时间设置任务触发的时间点】 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>( command, null, triggerTime(delay, unit))); // 延迟执行 delayedExecute(t); return t; }
// 返回【当前时间 + 延迟时间】,就是触发当前任务执行的时间 private long triggerTime(long delay, TimeUnit unit) { // 设置触发的时间 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } long triggerTime(long delay) { // 如果 delay < Long.Max_VALUE/2,则下次执行时间为当前时间 +delay // 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
overflowFree 的原因:如果某个任务的 delay 为负数,说明当前可以执行(其实早该执行了)。阻塞队列中维护任务顺序是基于 compareTo 比较的,比较两个任务的顺序会用 time 相减。那么可能出现一个 delay 为正数减去另一个为负数的 delay,结果上溢为负数,则会导致 compareTo 产生错误的结果
private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); // 判断一下队首的delay是不是负数,如果是正数就不用管,怎么减都不会溢出 // 否则拿当前 delay 减去队首的 delay 来比较看,如果不出现上溢,排序不会乱 // 不然就把当前 delay 值给调整为 Long.MAX_VALUE + 队首 delay if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
-
scheduleAtFixedRate():定时执行,一次任务的启动到下一次任务的启动的间隔
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // 任务封装,【指定初始的延迟时间和周期时间】 ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 默认返回本身 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 开始执行这个任务 delayedExecute(t); return t; }
-
scheduleWithFixedDelay():定时执行,一次任务的结束到下一次任务的启动的间隔
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); // 任务封装,【指定初始的延迟时间和周期时间】,周期时间为 - 表示是 fixed-delay 模式 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
运行任务
-
delayedExecute():校验线程池状态,延迟或周期性任务的主要执行方法
private void delayedExecute(RunnableScheduledFuture<?> task) { // 线程池是 SHUTDOWN 状态,需要执行拒绝策略 if (isShutdown()) reject(task); else { // 把当前任务放入阻塞队列,因为需要【获取执行时间最近的】,当前任务需要比较 super.getQueue().add(task); // 线程池状态为 SHUTDOWN 并且不允许执行任务了,就从队列删除该任务,并设置任务的状态为取消状态 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 可以执行 ensurePrestart(); } }
-
ensurePrestart():开启线程执行任务
// ThreadPoolExecutor#ensurePrestart void ensurePrestart() { int wc = workerCountOf(ctl.get()); // worker数目小于corePoolSize,则添加一个worker。 if (wc < corePoolSize) // 第二个参数 true 表示采用核心线程数量限制,false 表示采用 maximumPoolSize addWorker(null, true); // corePoolSize = 0的情况,至少开启一个线程,【担保机制】 else if (wc == 0) addWorker(null, false); }
-
canRunInCurrentRunState():任务运行时都会被调用以校验当前状态是否可以运行任务
boolean canRunInCurrentRunState(boolean periodic) { // 根据是否是周期任务判断,在线程池 shutdown 后是否继续执行该任务,默认非周期任务是继续执行的 return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
-
onShutdown():删除并取消工作队列中的不需要再执行的任务
void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); // shutdown 后是否仍然执行延时任务 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // shutdown 后是否仍然执行周期任务 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 如果两者皆不可,则对队列中【所有任务】调用 cancel 取消并清空队列 if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; // 不需要执行的任务删除并取消,已经取消的任务也需要从队列中删除 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { if (q.remove(t)) t.cancel(false); } } } } // 因为任务被从队列中清理掉,所以需要调用 tryTerminate 尝试【改变线程池的状态】 tryTerminate(); }
ForkJoin
Fork/Join:线程池的实现,体现是分治思想,适用于能够进行任务拆分的 CPU 密集型运算,用于并行计算
任务拆分:将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列都可以用分治思想进行求解
-
Fork/Join 在分治的基础上加入了多线程,把每个任务的分解和合并交给不同的线程来完成,提升了运算效率
-
ForkJoin 使用 ForkJoinPool 来启动,是一个特殊的线程池,默认会创建与 CPU 核心数大小相同的线程池
-
任务有返回值继承 RecursiveTask,没有返回值继承 RecursiveAction
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new MyTask(5)));
//拆分 5 + MyTask(4) --> 4 + MyTask(3) -->
}
// 1~ n 之间整数的和
class MyTask extends RecursiveTask<Integer> {
private int n;
public MyTask(int n) {
this.n = n;
}
@Override
public String toString() {
return "MyTask{" + "n=" + n + '}';
}
@Override
protected Integer compute() {
// 如果 n 已经为 1,可以求得结果了
if (n == 1) {
return n;
}
// 将任务进行拆分(fork)
MyTask t1 = new MyTask(n - 1);
t1.fork();
// 合并(join)结果
int result = n + t1.join();
return result;
}
}
继续拆分优化:
class AddTask extends RecursiveTask<Integer> {
int begin;
int end;
public AddTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
public String toString() {
return "{" + begin + "," + end + '}';
}
@Override
protected Integer compute() {
// 5, 5
if (begin == end) {
return begin;
}
// 4, 5 防止多余的拆分 提高效率
if (end - begin == 1) {
return end + begin;
}
// 1 5
int mid = (end + begin) / 2; // 3
AddTask t1 = new AddTask(begin, mid); // 1,3
t1.fork();
AddTask t2 = new AddTask(mid + 1, end); // 4,5
t2.fork();
int result = t1.join() + t2.join();
return result;
}
}
ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率:
- 每个线程都维护了一个双端队列,用来存储需要执行的任务
- 工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行
- 窃取的必须是最晚的任务,避免和队列所属线程发生竞争,但是队列中只有一个任务时还是会发生竞争
享元模式
享元模式(Flyweight pattern): 用于减少创建对象的数量,以减少内存占用和提高性能,这种类型的设计模式属于结构型模式,它提供了减少对象数量从而改善应用所需的对象结构的方式
异步模式:让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务,也可将其归类为分工模式,典型实现就是线程池
工作机制:享元模式尝试重用现有的同类对象,如果未找到匹配的对象,则创建新对象
自定义连接池:
public static void main(String[] args) {
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection con = pool.borrow();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(con);
}).start();
}
}
class Pool {
//连接池的大小
private final int poolSize;
//连接对象的数组
private Connection[] connections;
//连接状态数组 0表示空闲 1表示繁忙
private AtomicIntegerArray states; //int[] -> AtomicIntegerArray
//构造方法
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i + 1));
}
}
//使用连接
public Connection borrow() {
while (true) {
for (int i = 0; i < poolSize; i++) {
if (states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
System.out.println(Thread.currentThread().getName() + " borrow " + connections[i]);
return connections[i];
}
}
}
//如果没有空闲连接,当前线程等待
synchronized (this) {
try {
System.out.println(Thread.currentThread().getName() + " wait...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//归还连接
public void free(Connection con) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == con) {//判断是否是同一个对象
states.set(i, 0);//不用cas的原因是只会有一个线程使用该连接
synchronized (this) {
System.out.println(Thread.currentThread().getName() + " free " + con);
this.notifyAll();
}
break;
}
}
}
}
class MockConnection implements Connection {
private String name;
//.....
}
同步器
AQS
核心思想
AQS:AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架,许多同步类实现都依赖于该同步器
AQS 用状态属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- 独占模式是只有一个线程能够访问资源,如 ReentrantLock
- 共享模式允许多个线程访问资源,如 Semaphore,ReentrantReadWriteLock 是组合式
AQS 核心思想:
-
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置锁定状态
-
请求的共享资源被占用,AQS 用队列实现线程阻塞等待以及被唤醒时锁分配的机制,将暂时获取不到锁的线程加入到队列中
CLH 是一种基于单向链表的高性能、公平的自旋锁,AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配
设计原理
设计原理:
-
获取锁:
while(state 状态不允许获取) { // tryAcquire(arg) if(队列中还没有此线程) { 入队并阻塞 park } } 当前线程出队
-
释放锁:
if(state 状态允许了) { // tryRelease(arg) 恢复阻塞的线程(s) unpark }
AbstractQueuedSynchronizer 中 state 设计:
-
state 使用了 32bit int 来维护同步状态,独占模式 0 表示未加锁状态,大于 0 表示已经加锁状态
private volatile int state;
-
state 使用 volatile 修饰配合 cas 保证其修改时的原子性
-
state 表示线程重入的次数(独占模式)或者剩余许可数(共享模式)
-
state API:
protected final int getState()
:获取 state 状态protected final void setState(int newState)
:设置 state 状态protected final boolean compareAndSetState(int expect,int update)
:CAS 安全设置 state
封装线程的 Node 节点中 waitstate 设计:
-
使用 volatile 修饰配合 CAS 保证其修改时的原子性
-
表示 Node 节点的状态,有以下几种状态:
// 默认为 0 volatile int waitStatus; // 由于超时或中断,此节点被取消,不会再改变状态 static final int CANCELLED = 1; // 此节点后面的节点已(或即将)被阻止(通过park),【当前节点在释放或取消时必须唤醒后面的节点】 static final int SIGNAL = -1; // 此节点当前在条件队列中 static final int CONDITION = -2; // 将releaseShared传播到其他节点 static final int PROPAGATE = -3;
阻塞恢复设计:
- 使用 park & unpark 来实现线程的暂停和恢复,因为命令的先后顺序不影响结果
- park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
- park 线程可以通过 interrupt 打断
队列设计:
-
使用了 FIFO 先入先出队列,并不支持优先级队列,同步队列是双向链表,便于出队入队
// 头结点,指向哑元节点 private transient volatile Node head; // 阻塞队列的尾节点,阻塞队列不包含头结点,从 head.next → tail 认为是阻塞队列 private transient volatile Node tail; static final class Node { // 枚举:共享模式 static final Node SHARED = new Node(); // 枚举:独占模式 static final Node EXCLUSIVE = null; // node 需要构建成 FIFO 队列,prev 指向前继节点 volatile Node prev; // next 指向后继节点 volatile Node next; // 当前 node 封装的线程 volatile Thread thread; // 条件队列是单向链表,只有后继指针,条件队列使用该属性 Node nextWaiter; }
-
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet,条件队列是单向链表
public class ConditionObject implements Condition, java.io.Serializable { // 指向条件队列的第一个 node 节点 private transient Node firstWaiter; // 指向条件队列的最后一个 node 节点 private transient Node lastWaiter; }
模板对象
同步器的设计是基于模板方法模式,该模式是基于继承的,主要是为了在不改变模板结构的前提下在子类中重新定义模板中的内容以实现复用代码
- 使用者继承
AbstractQueuedSynchronizer
并重写指定的方法 - 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,这些模板方法会调用使用者重写的方法
AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:
isHeldExclusively() //该线程是否正在独占资源。只有用到condition才需要去实现它
tryAcquire(int) //独占方式。尝试获取资源,成功则返回true,失败则返回false
tryRelease(int) //独占方式。尝试释放资源,成功则返回true,失败则返回false
tryAcquireShared(int) //共享方式。尝试获取资源。负数表示失败;0表示成功但没有剩余可用资源;正数表示成功且有剩余资源
tryReleaseShared(int) //共享方式。尝试释放资源,成功则返回true,失败则返回false
- 默认情况下,每个方法都抛出
UnsupportedOperationException
- 这些方法的实现必须是内部线程安全的
- AQS 类中的其他方法都是 final ,所以无法被其他类使用,只有这几个方法可以被其他类使用
自定义
自定义一个不可重入锁:
class MyLock implements Lock {
//独占锁 不可重入
class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
// 加上锁 设置 owner 为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override //解锁
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);//volatile 修饰的变量放在后面,防止指令重排
return true;
}
@Override //是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
private MySync sync = new MySync();
@Override //加锁(不成功进入等待队列等待)
public void lock() {
sync.acquire(1);
}
@Override //加锁 可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override //尝试加锁,尝试一次
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override //尝试加锁,带超时
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override //解锁
public void unlock() {
sync.release(1);
}
@Override //条件变量
public Condition newCondition() {
return sync.newCondition();
}
}
Re-Lock
锁对比
ReentrantLock 相对于 synchronized 具备如下特点:
- 锁的实现:synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的
- 性能:新版本 Java 对 synchronized 进行了很多优化,synchronized 与 ReentrantLock 大致相同
- 使用:ReentrantLock 需要手动解锁,synchronized 执行完代码块自动解锁
- 可中断:ReentrantLock 可中断,而 synchronized 不行
- 公平锁:公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁
- ReentrantLock 可以设置公平锁,synchronized 中的锁是非公平的
- 不公平锁的含义是阻塞队列内公平,队列外非公平
- 锁超时:尝试获取锁,超时获取不到直接放弃,不进入阻塞队列
- ReentrantLock 可以设置超时时间,synchronized 会一直等待
- 锁绑定多个条件:一个 ReentrantLock 可以同时绑定多个 Condition 对象,更细粒度的唤醒线程
- 两者都是可重入锁
使用锁
构造方法:ReentrantLock lock = new ReentrantLock();
ReentrantLock 类 API:
-
public void lock()
:获得锁-
如果锁没有被另一个线程占用,则将锁定计数设置为 1
-
如果当前线程已经保持锁定,则保持计数增加 1
-
如果锁被另一个线程保持,则当前线程被禁用线程调度,并且在锁定已被获取之前处于休眠状态
-
-
public void unlock()
:尝试释放锁- 如果当前线程是该锁的持有者,则保持计数递减
- 如果保持计数现在为零,则锁定被释放
- 如果当前线程不是该锁的持有者,则抛出异常
基本语法:
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
公平锁
基本使用
构造方法:ReentrantLock lock = new ReentrantLock(true)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock 默认是不公平的:
public ReentrantLock() {
sync = new NonfairSync();
}
说明:公平锁一般没有必要,会降低并发度
非公原理
加锁
NonfairSync 继承自 AQS
public void lock() {
sync.lock();
}
-
没有竞争:ExclusiveOwnerThread 属于 Thread-0,state 设置为 1
// ReentrantLock.NonfairSync#lock final void lock() { // 用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示【获得了独占锁】 if (compareAndSetState(0, 1)) // 设置当前线程为独占线程 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);//失败进入 }
-
第一个竞争出现:Thread-1 执行,CAS 尝试将 state 由 0 改为 1,结果失败(第一次),进入 acquire 逻辑
// AbstractQueuedSynchronizer#acquire public final void acquire(int arg) { // tryAcquire 尝试获取锁失败时, 会调用 addWaiter 将当前线程封装成node入队,acquireQueued 阻塞当前线程, // acquireQueued 返回 true 表示挂起过程中线程被中断唤醒过,false 表示未被中断过 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果线程被中断了逻辑来到这,完成一次真正的打断效果 selfInterrupt(); }
-
进入 tryAcquire 尝试获取锁逻辑,这时 state 已经是1,结果仍然失败(第二次),加锁成功有两种情况:
- 当前 AQS 处于无锁状态
- 加锁线程就是当前线程,说明发生了锁重入
// ReentrantLock.NonfairSync#tryAcquire protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // 抢占成功返回 true,抢占失败返回 false final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // state 值 int c = getState(); // 条件成立说明当前处于【无锁状态】 if (c == 0) { //如果还没有获得锁,尝试用cas获得,这里体现非公平性: 不去检查 AQS 队列是否有阻塞线程直接获取锁 if (compareAndSetState(0, acquires)) { // 获取锁成功设置当前线程为独占锁线程。 setExclusiveOwnerThread(current); return true; } } // 如果已经有线程获得了锁, 独占锁线程还是当前线程, 表示【发生了锁重入】 else if (current == getExclusiveOwnerThread()) { // 更新锁重入的值 int nextc = c + acquires; // 越界判断,当重入的深度很深时,会导致 nextc < 0,int值达到最大之后再 + 1 变负数 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 更新 state 的值,这里不使用 cas 是因为当前线程正在持有锁,所以这里的操作相当于在一个管程内 setState(nextc); return true; } // 获取失败 return false; }
-
接下来进入 addWaiter 逻辑,构造 Node 队列,前置条件是当前线程获取锁失败,说明有线程占用了锁
- 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
- Node 的创建是懒惰的,其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
// AbstractQueuedSynchronizer#addWaiter,返回当前线程的 node 节点 private Node addWaiter(Node mode) { // 将当前线程关联到一个 Node 对象上, 模式为独占模式 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 快速入队,如果 tail 不为 null,说明存在阻塞队列 if (pred != null) { // 将当前节点的前驱节点指向 尾节点 node.prev = pred; // 通过 cas 将 Node 对象加入 AQS 队列,成为尾节点,【尾插法】 if (compareAndSetTail(pred, node)) { pred.next = node;// 双向链表 return node; } } // 初始时队列为空,或者 CAS 失败进入这里 enq(node); return node; }
// AbstractQueuedSynchronizer#enq private Node enq(final Node node) { // 自旋入队,必须入队成功才结束循环 for (;;) { Node t = tail; // 说明当前锁被占用,且当前线程可能是【第一个获取锁失败】的线程,【还没有建立队列】 if (t == null) { // 设置一个【哑元节点】,头尾指针都指向该节点 if (compareAndSetHead(new Node())) tail = head; } else { // 自旋到这,普通入队方式,首先赋值尾节点的前驱节点【尾插法】 node.prev = t; // 【在设置完尾节点后,才更新的原始尾节点的后继节点,所以此时从前往后遍历会丢失尾节点】 if (compareAndSetTail(t, node)) { //【此时 t.next = null,并且这里已经 CAS 结束,线程并不是安全的】 t.next = node; return t; // 返回当前 node 的前驱节点 } } } }
-
线程节点加入阻塞队列成功,进入 AbstractQueuedSynchronizer#acquireQueued 逻辑阻塞线程
-
acquireQueued 会在一个自旋中不断尝试获得锁,失败后进入 park 阻塞
-
如果当前线程是在 head 节点后,会再次 tryAcquire 尝试获取锁,state 仍为 1 则失败(第三次)
final boolean acquireQueued(final Node node, int arg) { // true 表示当前线程抢占锁失败,false 表示成功 boolean failed = true; try { // 中断标记,表示当前线程是否被中断 boolean interrupted = false; for (;;) { // 获得当前线程节点的前驱节点 final Node p = node.predecessor(); // 前驱节点是 head, FIFO 队列的特性表示轮到当前线程可以去获取锁 if (p == head && tryAcquire(arg)) { // 获取成功, 设置当前线程自己的 node 为 head setHead(node); p.next = null; // help GC // 表示抢占锁成功 failed = false; // 返回当前线程是否被中断 return interrupted; } // 判断是否应当 park,返回 false 后需要新一轮的循环,返回 true 进入条件二阻塞线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 条件二返回结果是当前线程是否被打断,没有被打断返回 false 不进入这里的逻辑 // 【就算被打断了,也会继续循环,并不会返回】 interrupted = true; } } finally { // 【可打断模式下才会进入该逻辑】 if (failed) cancelAcquire(node); } }
- 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node 的 waitStatus 改为 -1,返回 false;waitStatus 为 -1 的节点用来唤醒下一个节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 表示前置节点是个可以唤醒当前节点的节点,返回 true if (ws == Node.SIGNAL) return true; // 前置节点的状态处于取消状态,需要【删除前面所有取消的节点】, 返回到外层循环重试 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 获取到非取消的节点,连接上当前节点 pred.next = node; // 默认情况下 node 的 waitStatus 是 0,进入这里的逻辑 } else { // 【设置上一个节点状态为 Node.SIGNAL】,返回外层循环重试 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回不应该 park,再次尝试一次 return false; }
- shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,这时 state 仍为 1 获取失败(第四次)
- 当再次进入 shouldParkAfterFailedAcquire 时,这时其前驱 node 的 waitStatus 已经是 -1 了,返回 true
- 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
private final boolean parkAndCheckInterrupt() { // 阻塞当前线程,如果打断标记已经是 true, 则 park 会失效 LockSupport.park(this); // 判断当前线程是否被打断,清除打断标记 return Thread.interrupted(); }
-
-
再有多个线程经历竞争失败后:
解锁
ReentrantLock#unlock:释放锁
public void unlock() {
sync.release(1);
}
Thread-0 释放锁,进入 release 流程
-
进入 tryRelease,设置 exclusiveOwnerThread 为 null,state = 0
-
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor
// AbstractQueuedSynchronizer#release public final boolean release(int arg) { // 尝试释放锁,tryRelease 返回 true 表示当前线程已经【完全释放锁,重入的释放了】 if (tryRelease(arg)) { // 队列头节点 Node h = head; // 头节点什么时候是空?没有发生锁竞争,没有竞争线程创建哑元节点 // 条件成立说明阻塞队列有等待线程,需要唤醒 head 节点后面的线程 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
// ReentrantLock.Sync#tryRelease protected final boolean tryRelease(int releases) { // 减去释放的值,可能重入 int c = getState() - releases; // 如果当前线程不是持有锁的线程直接报错 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 是否已经完全释放锁 boolean free = false; // 支持锁重入, 只有 state 减为 0, 才完全释放锁成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 当前线程就是持有锁线程,所以可以直接更新锁,不需要使用 CAS setState(c); return free; }
-
进入 AbstractQueuedSynchronizer#unparkSuccessor 方法,唤醒当前节点的后继节点
- 找到队列中距离 head 最近的一个没取消的 Node,unpark 恢复其运行,本例中即为 Thread-1
- 回到 Thread-1 的 acquireQueued 流程
private void unparkSuccessor(Node node) { // 当前节点的状态 int ws = node.waitStatus; if (ws < 0) // 【尝试重置状态为 0】,因为当前节点要完成对后续节点的唤醒任务了,不需要 -1 了 compareAndSetWaitStatus(node, ws, 0); // 找到需要 unpark 的节点,当前节点的下一个 Node s = node.next; // 已取消的节点不能唤醒,需要找到距离头节点最近的非取消的节点 if (s == null || s.waitStatus > 0) { s = null; // AQS 队列【从后至前】找需要 unpark 的节点,直到 t == 当前的 node 为止,找不到就不唤醒了 for (Node t = tail; t != null && t != node; t = t.prev) // 说明当前线程状态需要被唤醒 if (t.waitStatus <= 0) // 置换引用 s = t; } // 【找到合适的可以被唤醒的 node,则唤醒线程】 if (s != null) LockSupport.unpark(s.thread); }
从后向前的唤醒的原因:enq 方法中,节点是尾插法,首先赋值的是尾节点的前驱节点,此时前驱节点的 next 并没有指向尾节点,从前遍历会丢失尾节点
-
唤醒的线程会从 park 位置开始执行,如果加锁成功(没有竞争),会设置
- exclusiveOwnerThread 为 Thread-1,state = 1
- head 指向刚刚 Thread-1 所在的 Node,该 Node 会清空 Thread
- 原本的 head 因为从链表断开,而可被垃圾回收(图中有错误,原来的头节点的 waitStatus 被改为 0 了)
-
如果这时有其它线程来竞争**(非公平)**,例如这时有 Thread-4 来了并抢占了锁
- Thread-4 被设置为 exclusiveOwnerThread,state = 1
- Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
公平原理
与非公平锁主要区别在于 tryAcquire 方法:先检查 AQS 队列中是否有前驱节点,没有才去 CAS 竞争
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有(false)才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 锁重入
return false;
}
}
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// 头尾指向一个节点,链表为空,返回false
return h != t &&
// 头尾之间有节点,判断头节点的下一个是不是空
// 不是空进入最后的判断,第二个节点的线程是否是本线程,不是返回 true,表示当前节点有前驱节点
((s = h.next) == null || s.thread != Thread.currentThread());
}
可重入
可重入是指同一个线程如果首次获得了这把锁,那么它是这把锁的拥有者,因此有权利再次获取这把锁,如果不可重入锁,那么第二次获得锁时,自己也会被锁挡住,直接造成死锁
源码解析参考:nonfairTryAcquire(int acquires))
和 tryRelease(int releases)
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
method1();
}
public static void method1() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " execute method2");
} finally {
lock.unlock();
}
}
在 Lock 方法加两把锁会是什么情况呢?
- 加锁两次解锁两次:正常执行
- 加锁两次解锁一次:程序直接卡死,线程不能出来,也就说明申请几把锁,最后需要解除几把锁
- 加锁一次解锁两次:运行程序会直接报错
public void getLock() {
lock.lock();
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t get Lock");
} finally {
lock.unlock();
//lock.unlock();
}
}
可打断
基本使用
public void lockInterruptibly()
:获得可打断的锁
- 如果没有竞争此方法就会获取 lock 对象锁
- 如果有竞争就进入阻塞队列,可以被其他线程用 interrupt 打断
注意:如果是不可中断模式,那么即使使用了 interrupt 也不会让等待状态中的线程中断
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
System.out.println("尝试获取锁");
lock.lockInterruptibly();
} catch (InterruptedException e) {
System.out.println("没有获取到锁,被打断,直接返回");
return;
}
try {
System.out.println("获取到锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
t1.start();
Thread.sleep(2000);
System.out.println("主线程进行打断锁");
t1.interrupt();
}
实现原理
-
不可打断模式:即使它被打断,仍会驻留在 AQS 阻塞队列中,一直要等到获得锁后才能得知自己被打断了
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//阻塞等待 // 如果acquireQueued返回true,打断状态 interrupted = true selfInterrupt(); } static void selfInterrupt() { // 知道自己被打断了,需要重新产生一次中断完成中断效果 Thread.currentThread().interrupt(); }
final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; // 还是需要获得锁后, 才能返回打断状态 return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){ // 条件二中判断当前线程是否被打断,被打断返回true,设置中断标记为 true,【获取锁后返回】 interrupted = true; } } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { // 阻塞当前线程,如果打断标记已经是 true, 则 park 会失效 LockSupport.park(this); // 判断当前线程是否被打断,清除打断标记,被打断返回true return Thread.interrupted(); }
-
可打断模式:AbstractQueuedSynchronizer#acquireInterruptibly,被打断后会直接抛出异常
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public final void acquireInterruptibly(int arg) { // 被其他线程打断了直接返回 false if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) // 没获取到锁,进入这里 doAcquireInterruptibly(arg); }
private void doAcquireInterruptibly(int arg) throws InterruptedException { // 返回封装当前线程的节点 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { //... if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 【在 park 过程中如果被 interrupt 会抛出异常】, 而不会再次进入循环获取锁后才完成打断效果 throw new InterruptedException(); } } finally { // 抛出异常前会进入这里 if (failed) // 取消当前线程的节点 cancelAcquire(node); } }
// 取消节点出队的逻辑 private void cancelAcquire(Node node) { // 判空 if (node == null) return; // 把当前节点封装的 Thread 置为空 node.thread = null; // 获取当前取消的 node 的前驱节点 Node pred = node.prev; // 前驱节点也被取消了,循环找到前面最近的没被取消的节点 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 获取前驱节点的后继节点,可能是当前 node,也可能是 waitStatus > 0 的节点 Node predNext = pred.next; // 把当前节点的状态设置为 【取消状态 1】 node.waitStatus = Node.CANCELLED; // 条件成立说明当前节点是尾节点,把当前节点的前驱节点设置为尾节点 if (node == tail && compareAndSetTail(node, pred)) { // 把前驱节点的后继节点置空,这里直接把所有的取消节点出队 compareAndSetNext(pred, predNext, null); } else { // 说明当前节点不是 tail 节点 int ws; // 条件一成立说明当前节点不是 head.next 节点 if (pred != head && // 判断前驱节点的状态是不是 -1,不成立说明前驱状态可能是 0 或者刚被其他线程取消排队了 ((ws = pred.waitStatus) == Node.SIGNAL || // 如果状态不是 -1,设置前驱节点的状态为 -1 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 前驱节点的线程不为null pred.thread != null) { Node next = node.next; // 当前节点的后继节点是正常节点 if (next != null && next.waitStatus <= 0) // 把 前驱节点的后继节点 设置为 当前节点的后继节点,【从队列中删除了当前节点】 compareAndSetNext(pred, predNext, next); } else { // 当前节点是 head.next 节点,唤醒当前节点的后继节点 unparkSuccessor(node); } node.next = node; // help GC } }
锁超时
基本使用
public boolean tryLock()
:尝试获取锁,获取到返回 true,获取不到直接放弃,不进入阻塞队列
public boolean tryLock(long timeout, TimeUnit unit)
:在给定时间内获取锁,获取不到就退出
注意:tryLock 期间也可以被打断
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
if (!lock.tryLock(2, TimeUnit.SECONDS)) {
System.out.println("获取不到锁");
return;
}
} catch (InterruptedException e) {
System.out.println("被打断,获取不到锁");
return;
}
try {
log.debug("获取到锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
System.out.println("主线程获取到锁");
t1.start();
Thread.sleep(1000);
try {
System.out.println("主线程释放了锁");
} finally {
lock.unlock();
}
}
实现原理
-
成员变量:指定超时限制的阈值,小于该值的线程不会被挂起
static final long spinForTimeoutThreshold = 1000L;
超时时间设置的小于该值,就会被禁止挂起,因为阻塞在唤醒的成本太高,不如选择自旋空转
-
tryLock()
public boolean tryLock() { // 只尝试一次 return sync.nonfairTryAcquire(1); }
-
tryLock(long timeout, TimeUnit unit)
public final boolean tryAcquireNanos(int arg, long nanosTimeout) { if (Thread.interrupted()) throw new InterruptedException(); // tryAcquire 尝试一次 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
private boolean doAcquireNanos(int arg, long nanosTimeout) { if (nanosTimeout <= 0L) return false; // 获取最后期限的时间戳 final long deadline = System.nanoTime() + nanosTimeout; //... try { for (;;) { //... // 计算还需等待的时间 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) //时间已到 return false; if (shouldParkAfterFailedAcquire(p, node) && // 如果 nanosTimeout 大于该值,才有阻塞的意义,否则直接自旋会好点 nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 【被打断会报异常】 if (Thread.interrupted()) throw new InterruptedException(); } } }
哲学家就餐
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");//...
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
}
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public void run() {
while (true) {
// 尝试获得左手筷子
if (left.tryLock()) {
try {
// 尝试获得右手筷子
if (right.tryLock()) {
try {
System.out.println("eating...");
Thread.sleep(1000);
} finally {
right.unlock();
}
}
} finally {
left.unlock();
}
}
}
}
}
class Chopstick extends ReentrantLock {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
条件变量
基本使用
synchronized 的条件变量,是当条件不满足时进入 WaitSet 等待;ReentrantLock 的条件变量比 synchronized 强大之处在于支持多个条件变量
ReentrantLock 类获取 Condition 对象:public Condition newCondition()
Condition 类 API:
void await()
:当前线程从运行状态进入等待状态,释放锁void signal()
:唤醒一个等待在 Condition 上的线程,但是必须获得与该 Condition 相关的锁
使用流程:
-
await / signal 前需要获得锁
-
await 执行后,会释放锁进入 ConditionObject 等待
-
await 的线程被唤醒去重新竞争 lock 锁
-
线程在条件队列被打断会抛出中断异常
-
竞争 lock 锁成功后,从 await 后继续执行
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
//创建一个新的条件变量
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
new Thread(() -> {
try {
lock.lock();
System.out.println("进入等待");
//进入休息室等待
condition1.await();
System.out.println("被唤醒了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
Thread.sleep(1000);
//叫醒
new Thread(() -> {
try {
lock.lock();
//唤醒
condition2.signal();
} finally {
lock.unlock();
}
}).start();
}
实现原理
await
总体流程是将 await 线程包装成 node 节点放入 ConditionObject 的条件队列,如果被唤醒就将 node 转移到 AQS 的执行阻塞队列,等待获取锁,每个 Condition 对象都包含一个等待队列
-
开始 Thread-0 持有锁,调用 await,线程进入 ConditionObject 等待,直到被唤醒或打断,调用 await 方法的线程都是持锁状态的,所以说逻辑里不存在并发
public final void await() throws InterruptedException { // 判断当前线程是否是中断状态,是就直接给个中断异常 if (Thread.interrupted()) throw new InterruptedException(); // 将调用 await 的线程包装成 Node,添加到条件队列并返回 Node node = addConditionWaiter(); // 完全释放节点持有的锁,因为其他线程唤醒当前线程的前提是【持有锁】 int savedState = fullyRelease(node); // 设置打断模式为没有被打断,状态码为 0 int interruptMode = 0; // 如果该节点还没有转移至 AQS 阻塞队列, park 阻塞,等待进入阻塞队列 while (!isOnSyncQueue(node)) { LockSupport.park(this); // 如果被打断,退出等待队列,对应的 node 【也会被迁移到阻塞队列】尾部,状态设置为 0 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 逻辑到这说明当前线程退出等待队列,进入【阻塞队列】 // 尝试枪锁,释放了多少锁就【重新获取多少锁】,获取锁成功判断打断模式 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // node 在条件队列时 如果被外部线程中断唤醒,会加入到阻塞队列,但是并未设 nextWaiter = null if (node.nextWaiter != null) // 清理条件队列内所有已取消的 Node unlinkCancelledWaiters(); // 条件成立说明挂起期间发生过中断 if (interruptMode != 0) // 应用打断模式 reportInterruptAfterWait(interruptMode); }
// 打断模式 - 在退出等待时重新设置打断状态 private static final int REINTERRUPT = 1; // 打断模式 - 在退出等待时抛出异常 private static final int THROW_IE = -1;
-
创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
private Node addConditionWaiter() { // 获取当前条件队列的尾节点的引用,保存到局部变量 t 中 Node t = lastWaiter; // 当前队列中不是空,并且节点的状态不是 CONDITION(-2),说明当前节点发生了中断 if (t != null && t.waitStatus != Node.CONDITION) { // 清理条件队列内所有已取消的 Node unlinkCancelledWaiters(); // 清理完成重新获取 尾节点 的引用 t = lastWaiter; } // 创建一个关联当前线程的新 node, 设置状态为 CONDITION(-2),添加至队列尾部 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; // 空队列直接放在队首【不用CAS因为执行线程是持锁线程,并发安全】 else t.nextWaiter = node; // 非空队列队尾追加 lastWaiter = node; // 更新队尾的引用 return node; }
// 清理条件队列内所有已取消(不是CONDITION)的 node,【链表删除的逻辑】 private void unlinkCancelledWaiters() { // 从头节点开始遍历【FIFO】 Node t = firstWaiter; // 指向正常的 CONDITION 节点 Node trail = null; // 等待队列不空 while (t != null) { // 获取当前节点的后继节点 Node next = t.nextWaiter; // 判断 t 节点是不是 CONDITION 节点,条件队列内不是 CONDITION 就不是正常的 if (t.waitStatus != Node.CONDITION) { // 不是正常节点,需要 t 与下一个节点断开 t.nextWaiter = null; // 条件成立说明遍历到的节点还未碰到过正常节点 if (trail == null) // 更新 firstWaiter 指针为下个节点 firstWaiter = next; else // 让上一个正常节点指向 当前取消节点的 下一个节点,【删除非正常的节点】 trail.nextWaiter = next; // t 是尾节点了,更新 lastWaiter 指向最后一个正常节点 if (next == null) lastWaiter = trail; } else { // trail 指向的是正常节点 trail = t; } // 把 t.next 赋值给 t,循环遍历 t = next; } }
-
接下来 Thread-0 进入 AQS 的 fullyRelease 流程,释放同步器上的锁
// 线程可能重入,需要将 state 全部释放 final int fullyRelease(Node node) { // 完全释放锁是否成功,false 代表成功 boolean failed = true; try { // 获取当前线程所持有的 state 值总数 int savedState = getState(); // release -> tryRelease 解锁重入锁 if (release(savedState)) { // 释放成功 failed = false; // 返回解锁的深度 return savedState; } else { // 解锁失败抛出异常 throw new IllegalMonitorStateException(); } } finally { // 没有释放成功,将当前 node 设置为取消状态 if (failed) node.waitStatus = Node.CANCELLED; } }
-
fullyRelease 中会 unpark AQS 队列中的下一个节点竞争锁,假设 Thread-1 竞争成功
-
Thread-0 进入 isOnSyncQueue 逻辑判断节点是否移动到阻塞队列,没有就 park 阻塞 Thread-0
final boolean isOnSyncQueue(Node node) { // node 的状态是 CONDITION,signal 方法是先修改状态再迁移,所以前驱节点为空证明还【没有完成迁移】 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 说明当前节点已经成功入队到阻塞队列,且当前节点后面已经有其它 node,因为条件队列的 next 指针为 null if (node.next != null) return true; // 说明【可能在阻塞队列,但是是尾节点】 // 从阻塞队列的尾节点开始向前【遍历查找 node】,如果查找到返回 true,查找不到返回 false return findNodeFromTail(node); }
-
await 线程 park 后如果被 unpark 或者被打断,都会进入 checkInterruptWhileWaiting 判断线程是否被打断:在条件队列被打断的线程需要抛出异常
private int checkInterruptWhileWaiting(Node node) { // Thread.interrupted() 返回当前线程中断标记位,并且重置当前标记位 为 false // 如果被中断了,根据是否在条件队列被中断的,设置中断状态码 return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
// 这个方法只有在线程是被打断唤醒时才会调用 final boolean transferAfterCancelledWait(Node node) { // 条件成立说明当前node一定是在条件队列内,因为 signal 迁移节点到阻塞队列时,会将节点的状态修改为 0 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 把【中断唤醒的 node 加入到阻塞队列中】 enq(node); // 表示是在条件队列内被中断了,设置为 THROW_IE 为 -1 return true; } //执行到这里的情况: //1.当前node已经被外部线程调用 signal 方法将其迁移到 阻塞队列 内了 //2.当前node正在被外部线程调用 signal 方法将其迁移至 阻塞队列 进行中状态 // 如果当前线程还没到阻塞队列,一直释放 CPU while (!isOnSyncQueue(node)) Thread.yield(); // 表示当前节点被中断唤醒时不在条件队列了,设置为 REINTERRUPT 为 1 return false; }
-
最后开始处理中断状态:
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { // 条件成立说明【在条件队列内发生过中断,此时 await 方法抛出中断异常】 if (interruptMode == THROW_IE) throw new InterruptedException(); // 条件成立说明【在条件队列外发生的中断,此时设置当前线程的中断标记位为 true】 else if (interruptMode == REINTERRUPT) // 进行一次自己打断,产生中断的效果 selfInterrupt(); }
signal
-
假设 Thread-1 要来唤醒 Thread-0,进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node,必须持有锁才能唤醒, 因此 doSignal 内线程安全
public final void signal() { // 判断调用 signal 方法的线程是否是独占锁持有线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取条件队列中第一个 Node Node first = firstWaiter; // 不为空就将第该节点【迁移到阻塞队列】 if (first != null) doSignal(first); }
// 唤醒 - 【将没取消的第一个节点转移至 AQS 队列尾部】 private void doSignal(Node first) { do { // 成立说明当前节点的下一个节点是 null,当前节点是尾节点了,队列中只有当前一个节点了 if ((firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 将等待队列中的 Node 转移至 AQS 队列,不成功且还有节点则继续循环 } while (!transferForSignal(first) && (first = firstWaiter) != null); } // signalAll() 会调用这个函数,唤醒所有的节点 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; // 唤醒所有的节点,都放到阻塞队列中 } while (first != null); }
-
执行 transferForSignal,先将节点的 waitStatus 改为 0,然后加入 AQS 阻塞队列尾部,将 Thread-3 的 waitStatus 改为 -1
// 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功 final boolean transferForSignal(Node node) { // CAS 修改当前节点的状态,修改为 0,因为当前节点马上要迁移到阻塞队列了 // 如果状态已经不是 CONDITION, 说明线程被取消(await 释放全部锁失败)或者被中断(可打断 cancelAcquire) if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 返回函数调用处继续寻找下一个节点 return false; // 【先改状态,再进行迁移】 // 将当前 node 入阻塞队列,p 是当前节点在阻塞队列的【前驱节点】 Node p = enq(node); int ws = p.waitStatus; // 如果前驱节点被取消或者不能设置状态为 Node.SIGNAL,就 unpark 取消当前节点线程的阻塞状态, // 让 thread-0 线程竞争锁,重新同步状态 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
-
Thread-1 释放锁,进入 unlock 流程
ReadWrite
读写锁
独占锁:指该锁一次只能被一个线程所持有,对 ReentrantLock 和 Synchronized 而言都是独占锁
共享锁:指该锁可以被多个线程锁持有
ReentrantReadWriteLock 其读锁是共享锁,写锁是独占锁
作用:多个线程同时读一个资源类没有任何问题,为了满足并发量,读取共享资源应该同时进行,但是如果一个线程想去写共享资源,就不应该再有其它线程可以对该资源进行读或写
使用规则:
-
加锁解锁格式:
r.lock(); try { // 临界区 } finally { r.unlock(); }
-
读-读能共存、读-写不能共存、写-写不能共存
-
读锁不支持条件变量
-
重入时升级不支持:持有读锁的情况下去获取写锁会导致获取写锁永久等待,需要先释放读,再去获得写
-
重入时降级支持:持有写锁的情况下去获取读锁,造成只有当前线程会持有读锁,因为写锁会互斥其他的锁
w.lock(); try { r.lock();// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存 try { // ... } finally{ w.unlock();// 要在写锁释放之前获取读锁 } } finally{ r.unlock(); }
构造方法:
public ReentrantReadWriteLock()
:默认构造方法,非公平锁public ReentrantReadWriteLock(boolean fair)
:true 为公平锁
常用API:
public ReentrantReadWriteLock.ReadLock readLock()
:返回读锁public ReentrantReadWriteLock.WriteLock writeLock()
:返回写锁public void lock()
:加锁public void unlock()
:解锁public boolean tryLock()
:尝试获取锁
读读并发:
public static void main(String[] args) {
ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock r = rw.readLock();
ReentrantReadWriteLock.WriteLock w = rw.writeLock();
new Thread(() -> {
r.lock();
try {
Thread.sleep(2000);
System.out.println("Thread 1 running " + new Date());
} finally {
r.unlock();
}
},"t1").start();
new Thread(() -> {
r.lock();
try {
Thread.sleep(2000);
System.out.println("Thread 2 running " + new Date());
} finally {
r.unlock();
}
},"t2").start();
}
评论区