引言
CAS(Compare And Swap)比较并交换是 JUC 并发编程中最为重要的一个工具。它在处理并发问题时提供了一个非阻塞的解决方案,引入了一种全新的并发编程思维——乐观锁。这种思想预设所有线程在执行过程中都不会发生冲突,每一个线程都会乐观地认为自己能够成功执行,从而大大降低了线程之间的等待和阻塞,极大地提高了系统的并发性能。
CAS 操作的引入,使得我们能够实现一系列复杂的并发数据结构,如 Atomic 类、并发容器、线程池等,并且它在实现无锁数据结构和算法时也是不可或缺的工具。CAS 通过使用硬件级别的原子性操作,保证了多个线程并发修改共享变量时,不会出现数据不一致的问题,为开发者构建线程安全的并发程序提供了强大的保证。
自旋锁是 CAS 的一个经典应用,其核心思想就是当一个线程试图获取锁时,如果锁已经被其他线程占用,那么该线程将在一个循环中不断地尝试获取锁,直到成功为止。这种方式避免了线程挂起和唤醒带来的高昂代价,是一种低延迟的锁策略。在并发量不高,锁持有时间较短的场景下,自旋锁可以带来很好的性能提升。
在本期文章中,我将以 JDK17u 源码为例从JDK底层的角度来(C/C++)深入探讨 CAS 的原理以及通过 CAS 实现的自旋锁功能。
CAS操作
CAS 操作通常被认为是一种乐观锁,它不像悲观锁那样预先组织其他线程的访问,而是先进行尝试性操作,如果没有其他线程修改过数据就算操作成功,否则操作失败。更具体的说,CAS 操作包括以下三个步骤:
- 获取当前值
- 计算新的值
- 比较当前值是否未被修改,如果是则将新值写入并返回 true,否则返回 false
溯源分析
在 JUC 的很多并发工具中都使用到了 CAS,如:ReentrantLock、AbstractQueuedSynchronizer、CountDownLatch 等。在 AQS 中是通过 compareAndSetState
方法来实现 CAS 功能的:
private static final Unsafe U = Unsafe.getUnsafe();
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSetInt(this, STATE, expect, update);
}
不难看出 compareAndSetState
方法间接地调用了来自 Unsafe
类中的 compareAndSetInt
方法:
@IntrinsicCandidate
public final native boolean compareAndSetInt(Object var1, long var2, int var4, int var5);
compareAndSetInt
方法是一种 JNI 技术,是通过调用 C/C++ 编译生成的 dll/so 库来实现的。那么其底层实现原理就是存在于原始的 C/C++ 文件中。如果读者阅读过 Unsafe 类的源码,会发现其中大部分方法都是通过 native
实现的,它们统一在 JDK 源码的 src\hotspot\share\prims\unsafe.cpp
文件中被实,聚焦到 compareAndSetInt
方法上:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {
oop p = JNIHandles::resolve(obj);
if (p == NULL) {
volatile jint* addr = (volatile jint*)index_oop_from_field_offset_long(p, offset);
return RawAccess<>::atomic_cmpxchg(addr, e, x) == e;
} else {
assert_field_offset_sane(p, offset);
return HeapAccess<>::atomic_cmpxchg_at(p, (ptrdiff_t)offset, e, x) == e;
}
} UNSAFE_END
将 UNSAFE_ENTRY
和 UNSAFE_END
进行宏展开可以使得源码更加清晰(源码中的注释详细地阐述了 CAS 发生的完整过程):
// Unsafe类中compareAndSetInt方法的本地实现,CAS是一种轻量级锁
extern "C" {
jboolean JNICALL Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x) {
// 从JNI环境中获取Java线程
JavaThread* thread = JavaThread::thread_from_jni_environment(env);
// 在ARM上启用W^X(写时不执行)内存保护策略,确保一块内存区域在任意时刻要么是可写的(Writable),要么是可执行的(eXecutable),不允许同时具备这两种属性
MACOS_AARCH64_ONLY(ThreadWXEnable __wx(WXWrite, thread));
// 将线程状态从"在本地代码中"(JNI)更改为"在VM中"
ThreadInVMfromNative __tiv(thread);
// 检查和跟踪VM中的本地方法调用
debug_only(VMNativeEntryWrapper __vew;)
// 清理在执行本地方法期间创建的JNI句柄,避免内存泄漏
HandleMarkCleaner __hm(thread);
// 用于异常宏的异常处理
JavaThread* THREAD = thread;
// 验证当前线程的堆栈对齐是否正确,确保堆栈布局保持一致
os::verify_stack_alignment();
{
// 函数解析传入的Java对象obj(AQS对象)
oop p = JNIHandles::resolve(obj);
if (p == NULL) {
// p对象为空根据offset计算被操作数的内存地址
volatile jint* addr = (volatile jint*)index_oop_from_field_offset_long(p, offset);
// 使用cmpxchg指令执行CAS操作,将addr中的期望值e设置为新值x
return RawAccess<>::atomic_cmpxchg(addr, e, x) == e;
} else {
// p对象不为空先检查字段的偏移量是否合理
assert_field_offset_sane(p, offset);
// 在堆上操作对象p,将p偏移offset地址中的期望值e设置为新值x
return HeapAccess<>::atomic_cmpxchg_at(p, (ptrdiff_t)offset, e, x) == e;
}
}
}
}
在这个被宏展开的 CAS 方法的尽头,是 RawAccess<>::atomic_cmpxchg
和 HeapAccess<>::atomic_cmpxchg_at
函数,虽然它们是解决不同的情况下的CAS但它们的本质都是执行了CPU层面的 cmpxchg
指令
atomic_cmpxchg
这段 cpp 代码的溯源分析中省略了大部分的操作系统和计算机环境的判断过程,这里只以 x86 架构下的 cmpxchg 指令的调用过为例进行分析
- 如果继续对
RawAccess<>::atomic_cmpxchg
函数深入探索就会来到src\hotspot\share\oops\access.hpp
中的atomic_cmpxchg
函数 - access.hpp 中的
atomic_cmpxchg
函数又调用了src\hotspot\share\oops\accessBackend.hpp
中的atomic_cmpxchg
函数 - accessBackend.hpp 中的
atomic_cmpxchg
函数返回了同文件下的atomic_cmpxchg_reduce_types
函数 - accessBackend.hpp 中的
atomic_cmpxchg_reduce_types
函数通过一系列条件和环境判断会调用到src\hotspot\share\oops\accessBackend.inline.hpp
中的atomic_cmpxchg_internal
函数 - accessBackend.inline.hpp 中的
atomic_cmpxchg_internal
函数最终会调用到来自src\hotspot\share\runtime\atomic.hpp
中的cmpxchg
函数 - atomic.hpp 中的
cmpxchg
函数返回了同文件下的PlatformCmpxchg
结构体中的operator
函数 - atomic.hpp 中的
operator
函数最终在src\hotspot\os_cpu\bsd_x86\atomic_bsd_x86.hpp
中被函数operator
实现 - atomic_bsd_x86.hpp 中的函数
operator
最终使用内联汇编块来调用了CPU层面的cmpxchg
指令
template<>
template<typename T>
inline T Atomic::PlatformCmpxchg<4>::operator()(T volatile* dest,
T compare_value,
T exchange_value,
atomic_memory_order /* order */) const {
STATIC_ASSERT(4 == sizeof(T));
__asm__ volatile ( "lock cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest)
: "cc", "memory");
return exchange_value;
}
cmpxchg
上述的 cmpxchgl
内联汇编实现了一个在多核或多线程环境下使用的原子操作:比较和交换(Compare and Swap, CAS)。这是一条 X86 架构下的 CAS 指令,lock cmpxchgl %1, (%3)
,它的作用是如果指定内存位置(%3)的值等于 eax 寄存器(也就是比较值compare_value
),那么将这个内存位置的值设为 %1(也就是 exchange_value
)。如果内存位置的值不等于 eax,那么就不做任何修改。无论是否修改,最后 eax 都会被设置为内存位置原本的值。
"lock cmpxchgl %1, (%3)"
:lock
前缀是一个指示符,表示后面的指令应当被原子地执行,即在执行完整个指令前,不会被其他指令打断或重排序。cmpxchgl
是一个 X86 指令,表示比较并交换(Compare and Swap)。%1
和(%3)
分别表示输入的第一个和第三个参数,即exchange_value
和内存位置dest
。: "=a" (exchange_value)
:是输出部分,"=a"
表示将结果(eax 寄存器的值)赋给exchange_value
。: "r" (exchange_value), "a" (compare_value), "r" (dest)
:是输入部分,"r"
表示一个通用寄存器,"a"
表示 eax 寄存器。它们分别对应于exchange_value
,compare_value
和dest
。: "cc", "memory"
:是 clobber 部分,它告诉编译器这段汇编代码可能会改变哪些资源。"cc"
表示这段代码可能会修改条件代码寄存器,"memory"
表示可能会修改内存。
综上所述,这段内联汇编实现了一个原子的CAS操作,其语义是:如果内存位置 dest
的值等于 compare_value
,那么将其设为 exchange_value
,并将操作前 dest
位置的值赋给 exchange_value
。
自旋锁
自旋锁是一种锁定机制,当一个线程尝试获取一个已经被其他线程持有的锁时,该线程将循环(即”自旋”)并且不断地检查锁是否已经被释放。自旋锁是一种忙等(busy waiting)锁,它不会让出 CPU,而是一直等待直到能够获取锁。
自旋锁中的循环是通过 CAS 来实现的,这也是 CAS 最常见的使用场景之一。自旋锁会持续地尝试获取锁,直到成功为止。在这个过程中,它会使用 CAS 操作来检查锁的状态:如果锁当前未被占用(即锁的状态为“未锁定”),那么就尝试使用 CAS 操作来锁定它。如果CAS操作成功,那么就说明成功获得了锁;否则,就说明锁已经被其他线程获取,自旋锁就会继续自旋,再次尝试获取锁。这个过程会一直持续,直到成功获取到锁。
自旋锁虽然是一种轻量级锁,但是大量的自旋锁在不断忙等的过程中会消耗大量的 CPU 时间。通常情况下,自旋锁会设置一个自旋限制,当自旋次数超过一定限制后,锁将会让出 CPU,进入阻塞状态,这样可以避免无尽的自旋浪费 CPU 资源。通过AtomicReference
原子类提供的 CAS 方法实现一个简单的具备自旋限制的自旋锁:
public class SpinLock {
private AtomicReference<Thread> owner = new AtomicReference<>();
private static final int SPIN_LIMIT = 1000; // 最大尝试自旋的次数
public void lock() {
Thread currentThread = Thread.currentThread();
int spinCount = 0; // 初始自旋次数
while (!owner.compareAndSet(null, currentThread)) {
// 检查自旋限制
if (++spinCount > SPIN_LIMIT) {
// 自旋次数超过限制,让出CPU,进入阻塞状态
try {
Thread.sleep(1); // 暂停1毫秒
} catch (InterruptedException e) {
throw new RuntimeException("Thread interrupted while trying to get lock", e);
}
spinCount = 0; // 重置自旋次数
}
}
}
public void unlock() {
Thread currentThread = Thread.currentThread();
if (!owner.compareAndSet(currentThread, null)) {
throw new IllegalMonitorStateException("Attempted to unlock a lock not owned by the current thread");
}
}
}
在这个例子中,自旋锁在尝试获取锁时,会检查自己的自旋次数。如果自旋次数超过了设定的限制,那么当前线程会让出 CPU(通过调用Thread.sleep(1)),然后在唤醒后再次尝试获取锁。这种方法可以在一定程度上降低大量线程自旋等待带来的 CPU 资源浪费。
回顾
综上所述,我对 CAS 的理解是,它是比较并交换 Compare-And-Swap 的意思,是一种原子操作的乐观锁,用于在多线程环境中实现无锁同步。
CAS 操作通常需要传入一个内存地址、期望值和新值。当内存地址的当前值与期望值相同时,CAS 操作将内存地址的值更新为新值,并返回 true。若更新失败,则返回 false。在 Java 中,CAS 操作主要通过 Unsafe 类提供的 native 本地方法实现,JDK 底层通过 C++ 函数“Unsafe_CompareAndSetInt”来实现这些方法。
在操作系统层面,CAS 操作通常由硬件指令“x86 架构下的 cmpxchg”实现,保证了原子性。在实际应用中,如果 CAS 操作失败,通常会进行自旋(spin)重试,即反复尝试 CAS 操作直至成功。这种自旋重试策略在 Java 并发工具类:ReentrantLock、AQS、 CountDownLatch 等中被广泛应用。
虽然 CAS 操作可以减少锁的开销,但在线程竞争较为激烈的场景下,自旋重试可能导致性能下降。
参考文献
[1] Stone, J. M. (1990). A simple and correct shared-queue algorithm using compare-and-swap. In Proceedings of the 1990 ACM/IEEE conference on Supercomputing (Supercomputing ‘90) (pp. 495–504). Washington, DC, USA: IEEE Computer Society Press.
[2] Valois, J. D. (1995). Lock-free linked lists using compare-and-swap. In Proceedings of the fourteenth annual ACM symposium on Principles of distributed computing (PODC ‘95) (pp. 214–222). New York, NY, USA: Association for Computing Machinery. https://doi.org/10.1145/224964.224988
[3] old程序员. (2020). 并发编程的灵魂:CAS机制详解. Retrieved May 18, 2023, from https://zhuanlan.zhihu.com/p/101430930