详解并发编程中的CAS原子类
[toc]
# 引言
CAS全称Compare-And-Swap,是一种无锁编程算法,即比对读取值与内存中的值的差异决定是否进行修改操作的一种(乐观锁机制),该工具类常用于多线程共享变量的修改操作。而其底层实现也是基于硬件平台的汇编指令,JVM只是封装其调用仅此而已。而本文会基于以下大纲展开对CAS的探讨。
我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
# 详解CAS原子类
# 原子类基础使用示例
使用封装CAS操作的AtomicInteger操作多线程共享变量无需我们手动加锁,避免了人为上锁粒度把控不足所导致的安全性问题:
AtomicInteger count = new AtomicInteger();
//并行流并发操作原子类
IntStream.rangeClosed(1, 100_0000).parallel()
.forEach(i -> count.incrementAndGet());
Console.log("输出结果:{}", count);//1000000
2
3
4
5
6
# 详解原子类的底层实现
原子类的并发累加本质上采用了一种乐观锁机制,其底层对于利用一个volatile修饰的变量value存储用户当前读取到的变量值,在进行并发操作的自增时,会执行如下步骤:
- 原子类就会利用
unsafe类到内存中读取该变量的最新修改结果并存储到临时变量v中 - 再次读取变量的值并进行比对,两者一致,则基于 v进行累加更新,并写到value中
- 两者不一致,说明volatile读到的值是过期的,重新从步骤1开始执行,直到读取到最新的

对应的我们也给出这段实现的源码,可以看到AtomicInteger就是利用unsafe的getAndAddInt完成并发自增的:
//直接基于内存地址读取变量的工具类 unsafe
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
//并发操作变量原子类对应的value地址偏移量
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
//调用unsafe完成变量value的自增更新
public final int incrementAndGet() {
return U.getAndAddInt(this, VALUE, 1) + 1;
}
2
3
4
5
6
7
8
9
步入unsafe的逻辑实现,可以看到其底层实现就如上文所说:
- 利用
volatile读获取并发变量最新值到v中 - 传入原子类对象和偏移量地址再次读取变量的最新值,两者一致直接自增并写入到内存中
- 若不一致,回到步骤1循环执行,直到成功
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
//利用volatile读获取最新值
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));//基于偏移量再次读取value的最新值,如果一致则更新
return v;
}
2
3
4
5
6
7
8
9
10
# 手写一个原子类
基于上述的源码分析,不难看出原子类本质上就是通过比对本次操作变量的值和最新的值是否一致以判断是否出现并发修改的情况,所以按照这个理念,我们也可以通过unsafe手写一个原子类,大体步骤为:
- 定义一个原子类
- 初始化unsafe
- 编写一个变量count记录原子更新的值
- 编写一个并发自增的方法通过拉取当前读取的count值和最新count进行比对,一致再执行更新的操作
对应的我们也给出本次手写的初始化unsafe和定位字段偏移量的代码段,可以看到笔者通过反射的方式完成unsafe工具类的获取和初始化,完成该操作后直接基于unsafe类定位到count变量的偏移地址,方便后续快速定位和比对变量值:
// 获取Unsafe对象
private static Unsafe unsafe;
// 自增的count的值,volatile保证可见性
private volatile int count = 0;
// count字段的偏移量
private static long countOffSet;
static {
try {
//初始化unsafe
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
} catch (Exception e) {
Console.log("获取unsafe失败,失败原因:[{}]", e.getMessage(), e);
}
//初始化定位count变量的偏移量地址
countOffSet = unsafe.objectFieldOffset(CasCountInc.class.getDeclaredField("count"));
} catch (NoSuchFieldException e) {
Console.log("获取count的偏移量报错,错误原因:[{}]", e.getMessage(), e);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
对应的自增代码如下,可以看到该逻辑即直接通过volatile读获取当前count的值,然后再进行cas操作时传入count偏移量地址让unsafe的compareAndSwapInt比对最新的count值和我们读取的oldCount是否一致以决定是否更新:
public void inc() {
int oldCount = 0;
//基于cas完成自增
do {
//拉取本次的值
oldCount = count;
//通过乐观锁的方式比对旧有的值和偏移量中的新值是否一致,将值进行更新并设置到count中
} while (!unsafe.compareAndSwapInt(this, countOffSet, oldCount, oldCount + 1));
}
2
3
4
5
6
7
8
9
10
完整的代码如下所示:
public class CasCountInc {
// 获取Unsafe对象
private static Unsafe unsafe;
// 自增的count的值,volatile保证可见性
private volatile int count = 0;
// count字段的偏移量
private static long countOffSet;
static {
try {
//初始化unsafe
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
} catch (Exception e) {
Console.log("获取unsafe失败,失败原因:[{}]", e.getMessage(), e);
}
//初始化定位count变量的偏移量地址
countOffSet = unsafe.objectFieldOffset(CasCountInc.class.getDeclaredField("count"));
} catch (NoSuchFieldException e) {
Console.log("获取count的偏移量报错,错误原因:[{}]", e.getMessage(), e);
}
}
public void inc() {
int oldCount = 0;
//基于cas完成自增
do {
//拉取本次的值
oldCount = count;
//通过乐观锁的方式比对旧有的值和偏移量中的新值是否一致,将值进行更新并设置到count中
} while (!unsafe.compareAndSwapInt(this, countOffSet, oldCount, oldCount + 1));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
对应的我们也给出测试单元的代码,感兴趣的读者可以自行运行一下,发现结果确实是100w:
CasCountInc casCountInc = new CasCountInc();
IntStream.range(0, 100_0000).parallel()
.forEach(i -> casCountInc.inc());
Assert.equals(casCountInc.count, 100_0000);
2
3
4
5
# 详解更多原子类
# 原子类更新基本类型
原子类基本类型的格式为Atomic+包装类名,这里笔者列举几个比较常用的:
AtomicBoolean: 原子更新布尔类型。AtomicInteger: 原子更新整型。AtomicLong: 原子更新长整型。
# 原子类更新数组类型
AtomicIntegerArray: 原子更新整型数组里的元素。AtomicLongArray: 原子更新长整型数组里的元素。AtomicReferenceArray: 原子更新引用类型数组里的元素。
对应我们给出AtomicIntegerArray原子操作数组的示例:
public class AtomicIntegerArrayDemo {
public static void main(String[] args) throws InterruptedException {
AtomicIntegerArray array = new AtomicIntegerArray(new int[] { 0, 0 });
System.out.println(array);
// 索引1位置+2
System.out.println(array.getAndAdd(1, 2));
System.out.println(array);
}
}
2
3
4
5
6
7
8
9
10
11
12
# 原子类更新引用类型
AtomicReference: 原子更新引用类型。AtomicStampedReference: 原子更新引用类型, 内部使用Pair来存储元素值及其版本号。AtomicMarkableReferce: 原子更新带有标记位的引用类型。
对应的我们给出原子操作引用类型的代码示例:
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceTest {
public static void main(String[] args){
// 创建两个Person对象,它们的id分别是101和102。
Person p1 = new Person(101);
Person p2 = new Person(102);
// 新建AtomicReference对象,初始化它的值为p1对象
AtomicReference ar = new AtomicReference(p1);
// 通过CAS设置ar。如果ar的值为p1的话,则将其设置为p2。
ar.compareAndSet(p1, p2);
Person p3 = (Person)ar.get();
System.out.println("p3 is "+p3);
System.out.println("p3.equals(p1)="+p3.equals(p1));
System.out.println("p3.equals(p2)="+p3.equals(p2));
}
}
class Person {
volatile long id;
public Person(long id) {
this.id = id;
}
public String toString() {
return "id:"+id;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 原子类更新成员变量
通过原子类型操作成员变量大体有以下几个更新器:
AtomicIntegerFieldUpdater: 原子更新整型的字段的更新器。AtomicLongFieldUpdater: 原子更新长整型字段的更新器。AtomicStampedFieldUpdater: 原子更新带有版本号的引用类型AtomicReferenceFieldUpdater: 上面已经说过此处不在赘述。
如下所示,我们创建一个基础类DataDemo,通过原子类CAS操作字段值进行自增操作。
public class TestAtomicIntegerFieldUpdater {
private static Logger logger = LoggerFactory.getLogger(TestAtomicIntegerFieldUpdater.class);
public static void main(String[] args) {
TestAtomicIntegerFieldUpdater tIA = new TestAtomicIntegerFieldUpdater();
tIA.doIt();
}
/**
* 返回需要更新的整型字段更新器
*
* @param fieldName
* @return
*/
public AtomicIntegerFieldUpdater<DataDemo> updater(String fieldName) {
return AtomicIntegerFieldUpdater.newUpdater(DataDemo.class, fieldName);
}
public void doIt() {
DataDemo data = new DataDemo();
// 修改公共变量,返回更新前的旧值 0
AtomicIntegerFieldUpdater<DataDemo> updater = updater("publicVar");
int oldVal = updater.getAndIncrement(data);
logger.info("publicVar 更新前的值[{}] 更新后的值 [{}]", oldVal, data.publicVar);
// 更新保护级别的变量
AtomicIntegerFieldUpdater<DataDemo> protectedVarUpdater = updater("protectedVar");
int oldProtectedVar = protectedVarUpdater.getAndAdd(data, 2);
logger.info("protectedVar 更新前的值[{}] 更新后的值 [{}]", oldProtectedVar, data.protectedVar);
// logger.info("privateVar = "+updater("privateVar").getAndAdd(data,2)); 私有变量会报错
/*
* 下面报异常:must be integer
* */
// logger.info("integerVar = "+updater("integerVar").getAndIncrement(data));
//logger.info("longVar = "+updater("longVar").getAndIncrement(data));
}
class DataDemo {
// 公共且可见的publicVar
public volatile int publicVar = 0;
// 保护级别的protectedVar
protected volatile int protectedVar = 4;
// 私有变量
private volatile int privateVar = 5;
// final 不可变量
public final int finalVar = 11;
public volatile Integer integerVar = 19;
public volatile Long longVar = 18L;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
通过上述代码我们可以总结出CAS字段必须符合以下要求:
- 变量必须使用volatile保证可见性
- 必须是当前对象可以访问到的类型才可进行操作‘
- 只能是实例变量而不是类变量,即不可以有static修饰符
- 包装类也不行
# 详解CAS的ABA问题
# 什么是ABA问题
CAS更新是一种乐观锁机制,所以在更新前会检查值有没有变化,如果没有变化则认为没人修改过,进而执行更新操作。在这种情况下我们试想这样一个场景,我们现在希望完成并发情况的数字操作:
- 线程0将数值由0改为1,再由1改为0,按照正常的逻辑理解,本次数值发生变化了2次
- 线程1开始执行,发现数值是0,认为没有发生变化,CAS成功,数值直接变化3:

对应的我们给出入下代码示例:
AtomicReference<Integer> atomicReference = new AtomicReference<>(0);
new Thread(() -> {
//1.线程0将0改为1,再还原回0
atomicReference.compareAndSet(0, 1);
atomicReference.compareAndSet(1, 0);
}).start();
new Thread(() -> {
ThreadUtil.sleep(1000);
//2. 线程1尝试将0改为3,发现是0直接修改
atomicReference.compareAndSet(0, 3);
}).start();
Console.log("value:{}", atomicReference.get());
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# AtomicStampedReference如何解决ABA问题
源码如下所示,可以看到AtomicStampedReference解决ABA问题的方式是基于当前修改操作的时间戳和元引用值是否一致,若一直则进行数据更新
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference; //维护对象引用
final int stamp; //用于标志版本
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
....
/**
* expectedReference :更新之前的原始引用值
* newReference : 新值
* expectedStamp : 预期时间戳
* newStamp : 更新后的时间戳
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
// 获取当前的(元素值,版本号)对
Pair<V> current = pair;
return
// 引用没变
expectedReference == current.reference &&
// 版本号没变
expectedStamp == current.stamp &&
//可以看到这个括号里面用了一个短路运算如果当前版本与新值一样就说更新过,就不往下走CAS代码了
((newReference == current.reference &&
newStamp == current.stamp) ||
// 构造新的Pair对象并CAS更新
casPair(current, Pair.of(newReference, newStamp)));
}
private boolean casPair(Pair<V> cmp, Pair<V> val) {
// 调用Unsafe的compareAndSwapObject()方法CAS更新pair的引用为新引用
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# AtomicStampedReference解决ABA问题示例
代码示例,我们下面就用other代码模拟干扰现场,如果other现场先进行CAS更新再还原操作,那么main线程的版本号就会过时,CAS就会操作失败
/**
* ABA问题代码示例
*/
public class AtomicStampedReferenceTest {
private static AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<>(1, 0);
public static void main(String[] args) {
Thread main = new Thread(() -> {
System.out.println("操作线程" + Thread.currentThread() + ",初始值 a = " + atomicStampedRef.getReference());
int stamp = atomicStampedRef.getStamp(); //获取当前标识别
try {
Thread.sleep(1000); //等待1秒 ,以便让干扰线程执行
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean isCASSuccess = atomicStampedRef.compareAndSet(1, 2, stamp, stamp + 1); //此时expectedReference未发生改变,但是stamp已经被修改了,所以CAS失败
System.out.println("操作线程" + Thread.currentThread() + ",CAS操作结果: " + isCASSuccess);
}, "主操作线程");
Thread other = new Thread(() -> {
Thread.yield(); // 确保thread-main 优先执行
atomicStampedRef.compareAndSet(1, 2, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
System.out.println("操作线程" + Thread.currentThread() + ",【increment】 ,值 = " + atomicStampedRef.getReference());
atomicStampedRef.compareAndSet(2, 1, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
System.out.println("操作线程" + Thread.currentThread() + ",【decrement】 ,值 = " + atomicStampedRef.getReference());
}, "干扰线程");
main.start();
other.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# AtomicMarkableReference解决对象ABA问题
AtomicMarkableReference,它不是维护一个版本号,而是维护一个boolean类型的标记,标记对象是否有修改,从而解决ABA问题。
public boolean weakCompareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark) {
return compareAndSet(expectedReference, newReference,
expectedMark, newMark);
}
2
3
4
5
6
7
8
# 常见面试题
# CAS为什么比synchronized快(重点)
CAS工作原理是基于乐观锁且操作是原子性的,与synchronized的悲观锁(底层需要调用操作系统的mutex锁)相比,效率也会相对高一些。
# CAS是不是操作系统执行的?(重点)
不是,CAS是主要是通过处理器的指令来保证原子性的,在上面的讲解中我们都知道CAS操作底层都是调用Unsafe的native修饰的方法,以AtomicInteger为例对应的底层的实现是Unsafe的compareAndSwapInt:
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
2
3
对应的我们给出这段代码的c语言实现,即位于:https://github.com/openjdk/jdk/blob/jdk8-b01/hotspot/src/share/vm/prims/unsafe.cpp (opens new window)的unsafe.cpp:
可以看到出去前两个形参后续的参数与compareAndSwapInt列表一一对应,这段代码执行CAS操作时,本质上就是调用cmpxchg指令(Compare and Exchange),cmpxchg指令会判断当前服务器是否是多核,如果是则在指令前添加LOCK前缀保证cmpxchg操作的原子性,反之就不加Lock前缀直接执行比对后修改变量值这种乐观锁操作。
对应源码如下,它首先获取字段的偏移地址,然后传入预期值e与原值比较,如果一致,则将新结果x写入原子操作变量内存中:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
//获取字段偏移量地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
//比较如果期望值e和当前字段存储的值一样,则讲值更新为x
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
2
3
4
5
6
7
8
# CAS存在那些问题?
但即便如此CAS仍然存在两个问题:
- 可能存在长时间
CAS:如下代码所示,这就是AtomicInteger底层的UNSAFE类如何进行CAS的具体代码 ,可以看出这个CAS操作需要拿到volatile变量后在进行循环CAS才有可能成功这就很可能存在自旋循环,从而给CPU带来很大的执行开销。
public final int getAndAddInt(Object paramObject, long paramLong, int paramInt)
{
int i;
do
//获取最新结果
i = getIntVolatile(paramObject, paramLong);
//通过cas自旋操作完成自增
while (!compareAndSwapInt(paramObject, paramLong, i, i + paramInt));
return i;
}
2
3
4
5
6
7
8
9
10
CAS只能对一个变量进行原子操作:为了解决这个问题,JDK 1.5之后通过AtomicReference使得变量可以封装成一个对象进行操作ABA问题:总所周知CAS就是比对当前值与旧值是否相等,在进行修改操作,假设我现在有一个变量值为A,我改为B,再还原为A,这样操作变量值是没变的?那么CAS也会成功不就不合理吗?这就好比一个银行储户想查询概念转账记录,如果转账一次记为1,如果按照ABA问题的逻辑,那么这个银行账户转账记录次数有可能会缺少。为了解决这个问题JDK 1.5提供了AtomicStampedReference,通过比对版本号在进行CAS操作,那么上述操作就会变为1A->2B->3A,由于版本追加,那么我们就能捕捉到当前变量的变化了。
# AtomicInteger自增到10000后如何归零
AtomicInteger atomicInteger=new AtomicInteger(10000);
atomicInteger.compareAndSet(10000, 0);
2
# CAS 平时怎么用的,会有什么问题,为什么快,如果我用 for 循环代替 CAS 执行效率是一样的吗?(重点)
问题1: 一些需要并发计数并实时监控的场景可以用到。 问题2: CAS存在问题:CAS是基于乐观锁机制,所以数据同步失败就会原地自旋,在高并发场景下开销很大,所以线程数很大的情况下不建议使用原子类。 问题3:用 for 循环代替 CAS 存在问题: 如果并发量大的话,自旋的线程多了就会导致性能瓶颈。 for 循环代替 CAS执行效率是否一样:大概率是CAS快,原因如下:
- CAS是
native方法更接近底层 - for循环为了保证线程安全可能会用到
sync锁或者Lock无论那种都需要上锁和释放的逻辑,相比CAS乐观锁来说开销很大。
# 小结
我是 sharkchili ,CSDN Java 领域博客专家,mini-redis的作者,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
# 参考
JUC原子类: CAS, Unsafe和原子类详解:https://www.pdai.tech/md/java/thread/java-thread-x-juc-AtomicInteger.html (opens new window)
深入理解高并发编程:https://book.douban.com/subject/35928998/?icn=index-latestbook-subject (opens new window)
并发编程(三)原子性(2):https://blog.csdn.net/fuyuanduan/article/details/127993662#:~:text=【原子与否】:,cmpxchg指令这个不是原子的。 (opens new window)