Java中初始是使用mutex互斥鎖,因為互斥鎖是會線程等待掛起,而對獲取鎖後的操作時間比較短暫的應用場景來說,這樣的鎖會讓競爭鎖的線程不停的park,unpark 的操作,這樣的系統的調用性能是非常糟糕的,為了提高鎖的性能,java 在6 默認使用了自旋鎖。
在Linux中本身就已經提供了自旋鎖的系統調用,在glibc-2.9中就有它的比較簡單的實現方法
- int pthread_spin_lock (lock) pthread_spinlock_t *lock;
- {
- asm ("\n"
- "1:\t" LOCK_PREFIX "decl %0\n\t"
- "jne 2f\n\t"
- ".subsection 1\n\t"
- ".align 16\n"
- "2:\trep; nop\n\t"
- "cmpl $0, %0\n\t"
- "jg 1b\n\t"
- "jmp 2b\n\t"
- ".previous"
- : "=m" (*lock)
- : "m" (*lock));
- return 0;
- }
通過總線鎖把參數-1保證了減法的原子性,如果減後的值是(0)的代表獲得鎖,其他線程的線程自旋直到參數變成初始值(1),繼續競爭鎖,直到獲得這把鎖。
Java 並沒有使用系統自帶的自旋鎖,自己重寫了自旋鎖的邏輯,並且增加了自旋的次數的控制。詳細見-XX:+UseSpinning 和 -XX:PreBlockSpin=xx
讓我們具體來看是如何實現的,注意這是mutex鎖中所實現的lock,而並不是synchinized 的鎖的spin lock的實現(這個你可以參考synchronizer.cpp裡的方法TrySpin_VaryDuration)
- int Monitor::TrySpin (Thread * const Self) {
- if (TryLock()) return 1 ;
- if (!os::is_MP()) return 0 ;
-
- int Probes = 0 ;
- int Delay = 0 ;
- int Steps = 0 ;
- int SpinMax = NativeMonitorSpinLimit ;
- int flgs = NativeMonitorFlags ;
- for (;;) {
- intptr_t v = _LockWord.FullWord;
- if ((v & _LBIT) == 0) {
- if (CASPTR (&_LockWord, v, v|_LBIT) == v) {
- return 1 ;
- }
- continue ;
- }
-
- if ((flgs & 8) == 0) {
- SpinPause () ;
- }
-
- // Periodically increase Delay -- variable Delay form
- // conceptually: delay *= 1 + 1/Exponent
- ++ Probes;
- if (Probes > SpinMax) return 0 ;
-
- if ((Probes & 0x7) == 0) {
- Delay = ((Delay << 1)|1) & 0x7FF ;
- // CONSIDER: Delay += 1 + (Delay/4); Delay &= 0x7FF ;
- }
-
- if (flgs & 2) continue ;
-
- // Consider checking _owner's schedctl state, if OFFPROC abort spin.
- // If the owner is OFFPROC then it's unlike that the lock will be dropped
- // in a timely fashion, which suggests that spinning would not be fruitful
- // or profitable.
-
- // Stall for "Delay" time units - iterations in the current implementation.
- // Avoid generating coherency traffic while stalled.
- // Possible ways to delay:
- // PAUSE, SLEEP, MEMBAR #sync, MEMBAR #halt,
- // wr %g0,%asi, gethrtime, rdstick, rdtick, rdtsc, etc. ...
- // Note that on Niagara-class systems we want to minimize STs in the
- // spin loop. N1 and brethren write-around the L1$ over the xbar into the L2$.
- // Furthermore, they don't have a W$ like traditional SPARC processors.
- // We currently use a Marsaglia Shift-Xor RNG loop.
- Steps += Delay ;
- if (Self != NULL) {
- jint rv = Self->rng[0] ;
- for (int k = Delay ; --k >= 0; ) {
- rv = MarsagliaXORV (rv) ;
- if ((flgs & 4) == 0 && SafepointSynchronize::do_call_back()) return 0 ;
- }
- Self->rng[0] = rv ;
- } else {
- Stall (Delay) ;
- }
- }
- }
a. os::is_MP() 判斷系統是否是多核的系統,在單核下,自旋鎖是沒有意義的。
b. CASPTR 使用了 Atomic::cmpxchg_ptr 原子語義 cmpxchg 比較替換,如果比較的值相等就替換成需要的值並且返回去比較的值,如果不相同返回被比較的值的內容。
在這裡的語義是比較_LockWord.FullWord 和 _Lockword 的值是否相同,如果相同就把_Lockword 的值置換成v|_LBIT(_LBIT的值是1)。
自旋鎖的邏輯:判斷_LockWord.FullWord bit 0 是否是0,如果是0代表沒有占有鎖,那就嘗試去占有鎖,通過原子替換置bit0 為1,如果置換成功那麼代表擁有鎖,沒有則進入自旋。
SpinPause () 函數
在linux_x86 64位機器上 定義了
.globl SpinPause
.align 16
.type SpinPause,@function
SpinPause:
rep
nop
movq $1, %rax
ret
主要在rep, nop 的指令經過編譯器後的指令是pause,是用於提高cpu性能的,在官方上描述pase指令是為了避免memory order violation ,有種說法就是cpu是流水線的處理指令的,當原子指令store的時候,而如果有線程同時也在load他的值,那麼load 必須等到store 執行成功,這樣cpu就無法進行流水線作業了。但我更覺的這是個加強版的nop 也就是多增加幾個空的機器周期,一來省電,二來本身spin lock就需要cpu空運行,並且不需要訪問內存。
c. SafepointSynchronize::do_call_back()這是一個安全點,提供一個停止自旋鎖的切入點,比如vm thread,在做線程dump, 內存 dump的時候,是需要讓 自旋鎖提前停止的。
d. if (Probes > SpinMax) return 0 ; 當大於自旋的次數的時候,自旋自動退出,也就是前面所說的參數 -XX:PreBlockSpin
最後這裡還有個比較有意思的方法MarsagliaXORV (rv) ; 是算隨機數的,不清楚為什麼java讓cpu自旋的過程中計算隨機數的意義何在,為了不讓cpu空轉?感覺用spinpase 更合理一點。