轻量级锁加锁&解锁过程

轻量级锁是对 synchronized 的一种优化机制,它是一种乐观锁,适用于多线程竞争比较弱的情况。在这种情况下,相对于传统的重量级互斥锁(使用操作系统互斥量加锁),轻量级锁的性能更好。

轻量级锁通过操作加锁对象的对象头中的 Mark Word 进行加锁。在 HotSpot 虚拟机中,对象头分为两部分:一部分用于存储对象自身的运行时数据(例如 hashcode、GC Age 等),它的位数和虚拟机的位数对应,这部分数据叫做「Mark Word」。另一部分用于存储指向方法区对象类型数据的指针。除此之外,对于数组对象,还会存储数组的长度。

Mark Word 的不同状态

如下是 32 位 HotSpot 虚拟机中 Mark Word 在不同状态下存储的信息:

锁状态25 bit4bit1bit2bit
23bit2bit是否是偏向锁锁标志位
无锁对象的hashCode对象分代年龄001
偏向锁线程IDEpoch对象分代年龄101
轻量级锁指向栈中锁记录的指针00
重量级锁指向互斥量(重量级锁)的指针10
GC标记11

synchronized 有 4 种锁状态,从低到高分别为:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态,锁可以升级但不能降级。本文主要讨论的是轻量级锁的加锁和解锁过程。

轻量级锁加锁

轻量级锁加锁的前提是锁对象不能带有偏向特征。加锁的过程可分为两种情况来讨论:一种是无锁状态(锁标志位为 01,偏向标志位为 0),可直接尝试加锁。另一种是有锁状态,需检查是否为当前线程持有的轻量级锁。

1
assert(!mark->has_bias_pattern(), "should not see bias pattern here");

无锁状态

无锁状态下,锁标志位为 01,偏向标志位为 0,可直接尝试加锁

Mark Word 初始状态

代码进入同步块时,同步对象处于无锁状态,锁标志位为“01”,偏向标志位为“0”

image-20200323001907541

建立锁记录

第一步:在加锁前,虚拟机需要在当前线程的栈帧中建立锁记录(Lock Record)的空间。Lock Record 中包含一个 _displaced_header 属性,用于存储锁对象的 Mark Word 的拷贝。

image-20200323002018621

复制锁对象的 Mark Word

第二步:将锁对象的 Mark Word 复制到锁记录中,这个复制过来的记录叫做 Displaced Mark Word。具体来讲,是将 mark word 放到锁记录的 _displaced_header 属性中。

image-20200323002049050
1
2
// 将Mark Word保存在锁记录中
lock->set_displaced_header(mark);
1
2
3
4
5
6
7
8
class BasicLock VALUE_OBJ_CLASS_SPEC {
friend class VMStructs;
private:
volatile markOop _displaced_header;
public:
void set_displaced_header(markOop header) { _displaced_header = header; }
......
};

CAS 更新锁对象的 Mark Word

第三步:虚拟机使用 CAS 操作尝试将锁对象的 Mark Word 更新为指向锁记录的指针。如果更新成功,这个线程就获得了该对象的锁

image-20200323002116692
1
2
3
4
5
6
7
// lock: 指向Lock Record的指针
// obj()->mark_addr(): 锁对象的Mark Word地址
// mark: 锁对象的Mark Word
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}

有锁状态

有锁状态下,如果是当前线程持有的轻量级锁,则说明是重入,不需要争抢锁。否则,说明有多个线程竞争,轻量级锁需要升级为重量级锁。

当前线程持有锁

对应无锁状态的第二步:锁对象处于加锁状态,并且锁对象的 Mark Word 指向当前线程的栈帧范围内,说明当前线程已经持有该轻量级锁,再次获取到该锁,也就是锁重入。

image-20200323002141347

此时不需要争抢锁,可执行同步代码

1
2
3
4
5
6
7
8
// Mark Word 处于加锁状态,当前线程持有的锁(Mark Word 指向的是当前线程的栈帧地址范围)
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
// 锁重入,将 Displaced Mark Word 设置为 null
lock->set_displaced_header(NULL);
return;
}

每次获取轻量级锁时都会创建一个 Lock Record,锁重入时会创建多个指向同一个 Object 的 Lock Record,除第一次设置 Displaced Mark Word ,后面均设置为 null

不是当前线程持有的锁

上一种条件不满足,说明存在多个线程竞争锁,轻量级锁要膨胀了。对!就是 inflate 膨胀了! :laughing: 膨胀成重量级锁后再加锁

1
2
3
4
5
6
// The object header will never be displaced to this lock,
// so it does not matter what the value is, except that it
// must be non-zero to avoid looking like a re-entrant lock,
// and must not look locked either.
lock->set_displaced_header(markOopDesc::unused_mark());
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);

轻量级锁解锁

解锁的思路是使用 CAS 操作把当前线程的栈帧中的 Displaced Mark Word 替换回锁对象中去,如果替换成功,则解锁成功。

CAS 替换回 Mark Word

image-20200323002205988
1
2
3
4
5
6
7
8
9
10
11
12
13
...
mark = object->mark() ;

// If the object is stack-locked by the current thread, try to
// swing the displaced header from the box back to the mark.
if (mark == (markOop) lock) {
assert (dhw->is_neutral(), "invariant") ;
// 将 Displaced Mark Word 替换回去
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
TEVENT (fast_exit: release stacklock) ;
return;
}
}

替换失败

替换失败,轻量级锁膨胀成重量级锁后再解锁

1
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;

锁重入

如上文所讲:加锁时,如果是锁重入,会将 Displaced Mark Word 设置为 null。相对应地,解锁时,如果判断 Displaced Mark Word 为 null 则说明是锁重入,不做替换操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
markOop dhw = lock->displaced_header();
markOop mark ;
if (dhw == NULL) {
// Recursive stack-lock.
// Diagnostics -- Could be: stack-locked, inflating, inflated.
mark = object->mark() ;
assert (!mark->is_neutral(), "invariant") ;
if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;
}
if (mark->has_monitor()) {
ObjectMonitor * m = mark->monitor() ;
assert(((oop)(m->object()))->mark() == mark, "invariant") ;
assert(m->is_entered(THREAD), "invariant") ;
}
return ;
}

参考代码

bytecodeInterpreter.cpp

代码位置:hotspot/src/share/vm/interpreter/bytecodeInterpreter.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* monitorenter and monitorexit for locking/unlocking an object */
CASE(_monitorenter): {
...

// traditional lightweight locking
if (!success) {
markOop displaced = lockee->mark()->set_unlocked();
entry->lock()->set_displaced_header(displaced);
bool call_vm = UseHeavyMonitors;
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// Is it simple recursive case?
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
entry->lock()->set_displaced_header(NULL);
} else {
// 轻量级锁加锁
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
...
}
...
}

interpreterRuntime.cpp

代码位置: hotspot/src/share/vm/interpreter/interpreterRuntime.cpp

1
2
3
4
5
6
7
if (UseBiasedLocking) {
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
// 轻量级锁加锁
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}

如果使用偏向锁,则进入 fast_enter 代码逻辑,否则进入 slow_enter 轻量级锁加锁逻辑

参考

周志明. 深入理解Java虚拟机
死磕 Synchronized 底层实现:轻量级锁
(六) synchronized的源码分析