spinlock剖析与改进
—-by 淘宝子嘉
1, spinlock介绍
spinlock又称自旋锁,线程通过busy-wait-loop的方式来获取锁,任何时刻时刻只有一个线程能够获得锁,其他线程忙等待直到获得锁。spinlock在多处理器多线程环境的场景中有很广泛的使用,一般要求使用spinlock的临界区尽量简短,这样获取的锁可以尽快释放,以满足其他忙等的线程。Spinlock和mutex不同,spinlock不会导致线程的状态切换(用户态->内核态),但是spinlock使用不当(如临界区执行时间过长)会导致cpu busy飙高。
2, spinlock与mutex对比
2.1,优缺点比较
spinlock不会使线程状态发生切换,mutex在获取不到锁的时候会选择sleep
mutex获取锁分为两阶段,第一阶段在用户态采用spinlock锁总线的方式获取一次锁,如果成功立即返回;否则进入第二阶段,调用系统的futex锁去sleep,当锁可用后被唤醒,继续竞争锁。
Spinlock优点:没有昂贵的系统调用,一直处于用户态,执行速度快
Spinlock缺点:一直占用cpu,而且在执行过程中还会锁bus总线,锁总线时其他处理器不能使用总线
Mutex优点:不会忙等,得不到锁会sleep
Mutex缺点:sleep时会陷入到内核态,需要昂贵的系统调用
2.2,使用准则
Spinlock使用准则:临界区尽量简短,控制在100行代码以内,不要有显式或者隐式的系统调用,调用的函数也尽量简短。例如,不要在临界区中调用read,write,open等会产生系统调用的函数,也不要去sleep;strcpy,memcpy等函数慎用,依赖于数据的大小。
3, spinlock系统实现
spinlock的实现方式有多种,但是思想都是差不多的,现罗列一下:
3.1,glibc-2.9中的实现方法:
</pre> int pthread_spin_lock (lock) pthread_spinlock_t *lock; { asm ("\n" "1:\t" LOCK_PREFIX "decl %0\n\t" "jne 2f\n\t" ".subsection 1\n\t" ".align 16\n" "2:\trep; nop\n\t" "cmpl $0, %0\n\t" "jg 1b\n\t" "jmp 2b\n\t" ".previous" : "=m" (*lock) : "m" (*lock)); return 0; } <pre>
执行过程:
1,lock_prefix 即 lock。lock decl %0,锁总线将%0(即lock变量)减一。Lock可以保证接下来一条指令的原子性
2, 如果lock=1,decl的执行结果为lock=0,ZF标志位为1,直接跳到return 0;否则跳到标签2。也许要问,为啥能直接跳到return 0呢?因为subsection和previous之间的代码被编译到别的段中,因此jne之后紧接着的代码就是 return 0 (leaveq;retq)。Rep nop在经过编译器编译之后被编译成 pause。
3, 如果跳到标签2,说明获取锁不成功,循环等待lock重新变成1,如果lock为1跳到标签1重新竞争锁。
该实现采用的是AT&T的汇编语法,更详细的执行流程解释可以参考“五竹”大牛的文档
3.2,系统自带(glibc-2.3.4)spinlock反汇编代码:
系统环境:
2.6.9-89.ELsmp #1 SMP x86_64 x86_64 x86_64 GNU/Linux
(gdb) disas pthread_spin_lock
Dump of assembler code for function pthread_spin_lock:
//eax寄存器清零,做返回值
0x0000003056a092f0 <pthread_spin_lock+0>: xor %eax,%eax
//rdi存的是lock锁地址,原子减一
0x0000003056a092f2 <pthread_spin_lock+2>: lock decl (%rdi)
//杯了个催的,加锁不成功,跳转,开始busy wait
0x0000003056a092f5 <pthread_spin_lock+5>: jne 0x3056a09300 <pthread_spin_lock+16>
//终于夹上了…加锁成功,返回
0x0000003056a092f7 <pthread_spin_lock+7>: retq
……………………………………….省略若干nop……………………………………….
0x0000003056a092ff <pthread_spin_lock+15>: nop
//pause指令降低CPU功耗
0x0000003056a09300 <pthread_spin_lock+16>: pause
//检查锁是否可用
0x0000003056a09302 <pthread_spin_lock+18>: cmpl $0×0,(%rdi)
//回跳,重新锁总线获取锁
0x0000003056a09305 <pthread_spin_lock+21>: jg 0x3056a092f2 <pthread_spin_lock+2>
//长夜漫漫,爱上一个不回家的人,继续等~
0x0000003056a09307 <pthread_spin_lock+23>: jmp 0x3056a09300 <pthread_spin_lock+16>
0x0000003056a09309 <pthread_spin_lock+25>: nop
……………………………………….省略若干nop……………………………………….
End of assembler dump.
Glibc的汇编代码还是很简洁的,没有多余的代码。
4, Pause指令解释(from intel):
Description
Improves the performance of spin-wait loops. When executing a “spin-wait loop,” a Pentium 4 or Intel Xeon processor suffers a severe performance penalty when exiting the loop because it detects a possible memory order violation. The PAUSE instruction provides a hint to the processor that the code sequence is a spin-wait loop. The processor uses this hint to avoid the memory order violation in most situations, which greatly improves processor performance. For this reason, it is recommended that a PAUSE instruction be placed in all spin-wait loops.
提升spin-wait-loop的性能,当执行spin-wait循环的时候,笨死和小强处理器会因为在退出循环的时候检测到memory order violation而导致严重的性能损失,pause指令就相当于提示处理器哥目前处于spin-wait中。在绝大多数情况下,处理器根据这个提示来避免violation,藉此大幅提高性能,由于这个原因,我们建议在spin-wait中加上一个pause指令。
名词解释(以下为本人猜想):memory order violation,直译为-内存访问顺序冲突,当处理器在(out of order)乱序执行的流水线上去内存load某个内存地址的值(此处是lock)的时候,发现这个值正在被store,而且store本身就在load之前,对于处理器来说,这就是一个hazard,流水流不起来。
在本文中,具体是指当一个获得锁的工作线程W从临界区退出,在调用unlock释放锁的时候,有若干个等待线程S都在自旋检测锁是否可用,此时W线程会产生一个store指令,若干个S线程会产生很多load指令,在store之后的load指令要等待store在流水线上执行完毕才能执行,由于处理器是乱序执行,在没有store指令之前,处理器对多个没有依赖的load是可以随机乱序执行的,当有了store指令之后,需要reorder重新排序执行,此时会严重影响处理器性能,按照intel的说法,会带来25倍的性能损失。Pause指令的作用就是减少并行load的数量,从而减少reorder时所耗时间。
An additional function of the PAUSE instruction is to reduce the power consumed by a Pentium 4 processor while executing a spin loop. The Pentium 4 processor can execute a spin-wait loop extremely quickly, causing the processor to consume a lot of power while it waits for the resource it is spinning on to become available. Inserting a pause instruction in a spin-wait loop greatly reduces the processor’s power consumption.
Pause指令还有一个附加所用是减少笨死处理器在执行spin loop时的耗电量。当笨死探测锁是否可用时,笨死以飞快的速度执行spin-wait loop,导致消耗大量电量。在spin-wait loop中插入一条pause指令会显著减少处理器耗电量。
This instruction was introduced in the Pentium 4 processors, but is backward compatible with all IA-32 processors. In earlier IA-32 processors, the PAUSE instruction operates like a NOP instruction. The Pentium 4 and Intel Xeon processors implement the PAUSE instruction as a pre-defined delay. The delay is finite and can be zero for some processors. This instruction does not change the architectural state of the processor (that is, it performs essentially a delaying no-op operation).
该指令在笨死中被引入,但是向后兼容所有IA-32处理器,在早期的IA-32处理器中,pause指令以nop的方式来运行,笨死和小强以一个预定义delay的方式来实现pause,该延迟是有限的,在某些处理器上可能是0,pause指令并不改变处理器的架构(位?)状态(也就是说,它是一个延迟执行的nop指令)。至于Pause指令的延迟有多大,intel并没有给出具体数值,但是据某篇文章给出的测试结果,大概有38~40个clock左右(官方数字:nop延迟是1个clock),一下子延迟40个clock,intel也够狠的。
This instruction’s operation is the same in non-64-bit modes and 64-bit mode.
该指令在64位与非64位模式下的表现是一致的。
5, spinlock改进
根据上一小节的分析,pause指令最主要的作用是减低功耗和延迟执行下一条指令。所以我们可以有这样的猜想:如果在spin-wait的过程中,记录下加锁失败的次数,对失败的加锁行为进行惩罚(failure penalty),让等待时间和失败次数成正比,即失败次数越多等待时间越长、执行的pause指令越多。
- 这样的好处有:
A, 在锁竞争很激烈的情况下,通过增加延迟来降低锁总线的次数,当锁总线次数降低时,系统其它程序对内存的竞争减少,提高系统整体吞吐量
B, 在锁竞争很激烈的情况下,减少计算指令的执行次数,降低功耗,低碳低碳!!!
C, 当锁竞争不激烈时,还能获得和原来一样的性能
- 但是带来的问题有:
A, 如果竞争锁失败次数越多,pause次数越多的话,会导致有些线程产生starvation
B, 在某些特殊的场景下,锁竞争很小的时候,failure penalty可能会导致程序执行时间变长,进而导致总的加锁次数不一定减少
- 解决方案:
当失败次数超过一个阈值的时候,将失败次数清零,使线程以洁白之躯重新参与竞争;对于少数failure penalty导致执行时间延长的情况,可以先忽略。
- 基于failure penalty的改进实现:
为了便于区分,起了很山寨的名字叫newlock,对比对象是sim_spin_lock,sim_spin_lock与pthread_spin_lock算法一致,实现细节基本一致,之所以加入这种对比是为了更加精确地衡量新算法的效果。ecx寄存器记录下本次加锁过程的失败次数
</pre> int newlock(pthread_spinlock_t *lock){ __asm__( //eax清零,记录当前lock次数 "xor %%eax,%%eax\n\t" //ecx清零,记录总的lock次数 "xor %%ecx,%%ecx\n\t" //记录加锁次数 "1:incl %%ecx\n\t" //记录当前加锁次数 "incl %%eax\n\t" //锁总线,开始加锁,rdi寄存器存储的是lock变量的地址 "lock decl(%%rdi)\n\t" //加锁不成功 "jne 2f\n\t" //加锁成功,将锁总线次数作为返回值返回 "movl %%ecx,%%eax\n\t" "leave\n\t" "retq\n\t" "nop\n\t" "nop\n\t" //pause跳转标签 "5:pause\n\t" "4:pause\n\t" "3:pause\n\t" "2:pause\n\t" //探测锁是否可用 "cmpl $0x0,(%%rdi)\n\t" //锁可用,重新加锁 "jg 1b\n\t" //加锁次数与4取模 "and $0x3,%%eax\n\t" //根据结果进行跳转 "cmpl $0x0,%%eax\n\t" "je 2b\n\t" "cmpl $0x1,%%eax\n\t" "je 3b\n\t" "cmpl $0x2,%%eax\n\t" "je 4b\n\t" "je 5b\n\t" "nop\n\t" "nop\n\t" : :"D"(lock) :"%eax","%ecx","%edx"); }
- 与pthread_spin_lock算法相同的sim_spin_lock:
</pre> int sim_spin_lock(pthread_spinlock_t *lock){ __asm__( //eax清零,记录当前lock次数 "xor %%eax,%%eax\n\t" //ecx清零,记录总的lock次数 "xor %%ecx,%%ecx\n\t" //记录加锁次数 "1:incl %%ecx\n\t" //记录当前加锁次数 "incl %%eax\n\t" //锁总线,开始加锁,rdi寄存器存储的是lock变量的地址 "lock decl(%%rdi)\n\t" //加锁不成功 "jne 2f\n\t" //加锁成功,将锁总线次数作为返回值返回 "movl %%ecx,%%eax\n\t" "leave\n\t" "retq\n\t" "nop\n\t" "nop\n\t" //pause跳转标签 "2:pause\n\t" //探测锁是否可用 "cmpl $0x0,(%%rdi)\n\t" //锁可用,重新加锁 "jg 1b\n\t" "jmp 2b\n\t" "nop\n\t" "nop\n\t" : :"D"(lock) :"%eax","%ecx","%edx"); }
6, 性能对比
- 测试环境:
处理器:8Core Intel(R) Xeon(R) CPU E5410 @ 2.33GHz
Glibc版本:glibc-2.3.4-2.43
GCC版本:3.4.6
- 测试程序:
</pre> #include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <sys/time.h> #include "liblock.h" #define MAX_LOOP 1000000 #define CODE_LEN 10 pthread_spinlock_t mylock; long g_count = 0; int thread_count = 20; int _nlock = 0; int lock_count = 0; int loop_count = 1000000; int code_len = 1; void *func(void *arg){ int j, k, m; for(int i=0; i< loop_count; i++){ if(_nlock == 0){ lock_count += sim_spin_lock(&mylock); //pthread_spin_lock(&mylock); }else{ lock_count += newlock(&mylock); //newlock(&mylock); } g_count++; //下面这几句代码之间有很强的依赖性,流水hazard比较多 //每句代码都要依赖上一句的执行结果,而且都是store操作 //用于模仿实际情况中的临界区代码 for(int i=0; i< code_len; i++){ j = k; m = j; k = m; m = j+1; k = m+2; j = m+k; } pthread_spin_unlock(&mylock); } return NULL; } int get_process_time(struct timeval *ptvStart) { struct timeval tvEnd; gettimeofday(&tvEnd,NULL); return ((tvEnd.tv_sec - ptvStart->tv_sec)*1000+(tvEnd.tv_usec - ptvStart->tv_usec)/1000); } int main(int argc, char *argv[]){ if(argc < 3){ return 0; } int ms_time = 0; struct timeval tvStart; _nlock = atoi(argv[1]); thread_count = atoi(argv[2]); loop_count = atoi(argv[3]); code_len = atoi(argv[4]); pthread_t *tid = (pthread_t*)malloc(sizeof(pthread_t)*thread_count); pthread_spin_init(&mylock, 0); gettimeofday(&tvStart,NULL); for(int i=0; i<thread_count; i++){ pthread_create(&tid[i], NULL, func, NULL); } void *tret; for(int i=0; i<thread_count; i++){ int ret = pthread_join(tid[i], &tret); if(ret != 0){ printf("cannot join thread1"); } } ms_time = get_process_time(&tvStart); fprintf(stderr, "g_count:%ld\tlock_count:%ld\tloop_count:%d\tcode_len:%d\ttime:%.2f\n", \ g_count, lock_count, loop_count, code_len, ms_time/1000.0f); return 0; }
- Benchmark设置:
>输入参数:
线程数量:1,2,3,5,7,8,10,12,14,16,18,20,25,30,35,40
每个线程加锁(循环)次数:5000000
临界区长度:1,1+1*6,1+3*6,1+5*6,1+10*6,1+20*6,1+50*6,1+100*6
case执行次数:每个case执行10次,尽量减少误差
> 输出参数:
执行时间:平均执行时间
锁总线次数:平均锁总线次数
- 执行时间对比:
- 锁总线次数对比:
还有若干图,囿于篇幅和时间,亲们我就不贴了
7, 实验结果分析
- 定义:
竞争因子:竞争因子=线程数量/处理器个数。以本文为例,假设有16个线程,处理器为8核,那么竞争因子为2
- 竞争因子与执行时间的关系:
当竞争因子为1/8,也就是只有一个线程的时候,很明显两种算法的执行时间是一致的,因为此时没有其他线程竞争锁;当竞争因子在1/8~4/8之间的时候,在某些情况下(依赖于临界区长度)新算法性能低于老算法;当竞争因子大于4/8的时候,新算法性能优于老算法。
- 竞争因子与锁总线次数的关系:
当竞争因子为1/8的时候,新老算法一致(原理同上);当竞争因子在1/8~4/8之间的时候,大部分情况下老算法性能优于新算法;当竞争因子大于4/8的时候,新算法性能优于老算法,由于锁总线次数基数比较大,在图上可能比较难看出来。
- 拐点:
细心的同学可能会发现有一个拐点:就是当临界区的长度为 1+50*6 的时候,也就是临界区有300行代码的时候,新算法锁总线次数要比老算法多,而且执行时间也长一些,这就牵扯到新算法的一个缺点:在某些情况下,当锁可用,需要去竞争锁的时候,由于线程还在pause中,只有等pause结束才能去竞争,而pause结束时,锁很可能不可用(被其他线程获取),根据新算法,加锁失败线程又要多pause一次,导致整体的锁总线次数和执行时间增加。这个拐点依赖于竞争因子、具体的处理器、具体的临界区代码、pause执行周期。
- 不足:
由于时间的关系,试验做的不是很足;指令回避那块可以做优化(目前只是简单的cmpl和jmp);到底最大pause次数是多少需要试验来支撑,目前是4次;个人感觉pause的处理器实现粒度还是比较粗的,应该是intel的一个经验值,接下来的试验可以用nop来代替pause,这样得出来的数据应该会更为平滑一些,控制也更为细腻。
- 总结:
当竞争比较激烈的时候,新算法在绝大部分情况下优于老算法(除了拐点),大概有2-5%的提升;当处理器竞争比较小的时候,尤其是竞争因子为3/8的时候,新算法不如老算法;如果用nop替代pause应该会有更好的表现。