加鎖和解鎖的基本思想是,當某個進程進入臨界區,它將持有一個某種類型的鎖(UNIX裡一般來說是semaphore,Linux裡一般是信號量和原子量或者spinlock)。當其他進程在該進程沒有釋放該鎖時試圖進入臨界區(加鎖),它將會被設置成睡眠狀態,然後被置入等待該鎖的進程隊列(某個優先級的)。當該鎖被釋放時,也就是解鎖事件發生時,內核將從等待該鎖的進程優先級隊列中尋找一個進程並將其置為就緒態,等待調度(schedule)。
在system v中,等待某一事件被稱為sleep(sleep on an event),因此下文將統一使用睡眠(sleep)。等待某事件也可以成為等待某個鎖。(注:本文中的sleep與sleep()系統調用不同)
系統的實現將一組事件映射到一組內核虛擬地址(鎖);而且事件不區別對待到底有多少進程在等待。這就意味著兩個不規則的事情:
一、當某個事件發生時,等待該事件的一組進程均被喚醒(而不是僅僅喚醒一個進程),並且狀態均被設置成就緒(ready-to-run)。這時候由內核選擇(schedule)一個進程來執行,由於system v內核不是可搶占的(Linux內核可搶占),因此其他的進程將一直在就緒狀態等待調度,或者再次進入睡眠(因為該鎖有可能被執行進程持有,而執行進程因為等待其他事件的發生而睡眠),或者等其他進程在用戶態被搶占。
二、多個事件映射到同一個地址(鎖)。假設事件e1和e2都映射到同一個地址(鎖)addr,有一組進程在等待e1,一組進程在等待e2,它們等待的事件不同,但是對應的鎖相同。假如e2發生了,所有等待e2的進程都被喚醒進入就緒狀態,而由於e1沒有發生,鎖addr沒有被釋放,所有被喚醒的進程又回到睡眠狀態。貌似一個事件對應一個地址會提高效率,但實際上由於system v是非搶占式內核,而且這種多對一映射非常少,再加上運行態進程很快就會釋放資源(在其他進程被調度之前),因此這種映射不會導致性能的顯著降低。
下面簡單闡述一下sleep和wakeup的算法。
//偽代碼
sleep(地址(事件),優先級)
返回值:進程能捕獲的信號發生導致的返回則返回1,當進程不能捕獲的信號發生時返回longjmp算法,否則返回0。
{
提高處理器執行等級以禁用所有中斷;//避免競態條件
將進程的狀態設置為睡眠;
根據事件將進程放入睡眠哈希隊列;//一般來說每個事件都有一個等待隊列
將睡眠地址(事件)及輸入的優先級保存到進程表中;
if (該等待是不可中斷的等待)
//一般有兩種睡眠狀態:可中斷的和不可中斷的。不可中斷的睡眠是指進程除了等待的事件外,
//不會被其他任何事件(如信號)中斷睡眠狀態,該情況不太常用。
{
上下文切換;//此處該進程執行上下文被保存起來,內核轉而執行其他進程
//在別處進行了上下文切換,內核選擇該上下文進行執行,此時該進程被喚醒
恢復處理器等級來允許中斷;
返回0;
}
// 被信號中斷的睡眠
if (沒有未遞送的信號)
{
上下文切換;
if (沒有未遞送的信號)
{
恢復處理器等級來允許中斷;
返回0;
}
}
//有未遞送的信號
若進程還在等待哈希隊列中,將其從該隊列移出;
恢復處理器等級來允許中斷;
if(進程捕獲該信號)
返回1;
執行longjmp算法;//這一段我也不明白
}
void wakeup(地址(事件))
{
禁用所有的中斷;
根據地址(事件)查找睡眠進程隊列;
for(每個在該事件上睡眠的進程)
{
將該進程從哈希隊列中移出;
設置狀態為就緒;
將該進程放入調度鏈表中;
清除進程表中的睡眠地址(事件);
if(進程不在內存中)
{
喚醒swapper進程;
}
else if(喚醒的進程更適合運行)
{
設置調度標志;
}
}
恢復中斷;
}
在wakeup調用之後,被喚醒的進程並不是馬上投入運行,而是讓其適合運行,等到下次調度該進程才有機會運行(也可能不會運行)。
代碼示例:
由於沒能找到UNIX的源碼,因此用Linux的源代碼替代。上述偽代碼已經是比較簡單的處理。Linux 0.01則更簡單。
//Linux 0.01的實現:
//不可中斷睡眠
void sleep_on(struct task_struct **p)
{
struct task_struct *tmp;
if (!p)
return;
if (current == &(init_task.task)) //current宏用來獲取當前運行進程的task_struct
panic("task[0] trying to sleep");
tmp = *p; //將已經在睡眠的進程p2保存到tmp
*p = current; //將當前進程p1保存到睡眠進程隊列中(實際上只有一個)
current->state = TASK_UNINTERRUPTIBLE; //p1狀態設置為不可中斷的睡眠
schedule(); //上下文切換,執行其他進程
//p1被喚醒後回到此處
if (tmp)
tmp->state=0; //將p2的狀態設置為運行,等待調度
}
//可中斷睡眠
void interruptible_sleep_on(struct task_struct **p)
{
struct task_struct *tmp;
if (!p)
return;
if (current == &(init_task.task))
panic("task[0] trying to sleep");
tmp=*p; //正在等待的進程p2保存到tmp
*p=current; //將當前進程p1保存到睡眠進程隊列中
repeat: current->state = TASK_INTERRUPTIBLE; //將p1狀態設置為可中斷睡眠
schedule(); //上下文切換
if (*p && *p != current) { //p2睡眠被中斷
(**p).state=0;//p1設置為運行態
goto repeat; //回到repeat,繼續讓p2睡眠
}
*p=NULL;
if (tmp)
tmp->state=0; //將p2的狀態設置為運行態,等待調度
}
這兩個函數比較難以理解,主要是在最後兩條語句。在schedule()之前,切換的是當前進程的上下文,但是,切換回來之後,卻是將原先正在睡眠的進程置為就緒態。在執行schedule()之前,各指針如下圖所示(不好意思,不會粘貼圖片):
---
| p |
---
||
\/
---- Step 3 ---------
| *p |--------->| current |
---- ---------
|
X Step 1
|
\/
---------------- Step 2 -----
| Wait Process |<--------| tmp |
---------------- -----
而在schedule()返回到這段代碼之後,事情就不一樣了。因為在step 3之後,current進程已經進入睡眠,tmp指向的睡眠進程的描述符也被保存下來。從schedule()返回之後,執行的代碼仍然是current,而tmp指向的仍然是wait process,此時將其狀態置為就緒,等待下一次調度。
與前兩個函數相比,wake_up相當簡單:
//被喚醒的進程並不是馬上投入運行,而是讓其適合運行
void wake_up(struct task_struct **p)
{
if (p && *p) {
(**p).state=0; //將要喚醒的進程狀態置為就緒
*p=NULL; //將進程移出等待的進程
}
}
有了sleep_on()和wake_up()之後,就可以對資源加鎖了,如(硬盤緩沖加鎖、等待緩沖可用、喚醒等待進程):
//鎖住bh
static inline void lock_buffer(struct buffer_head * bh)
{
if (bh->b_lock)
printk("hd.c: buffer multiply locked\n");
bh->b_lock=1;
}
static inline void unlock_buffer(struct buffer_head * bh)
{
if (!bh->b_lock)
printk("hd.c: free buffer being unlocked\n");
bh->b_lock=0;
wake_up(&bh->b_wait);
}
static inline void wait_on_buffer(struct buffer_head * bh)
{
cli(); //禁止中斷
while (bh->b_lock)
sleep_on(&bh->b_wait);
sti(); //恢復中斷
}
//Linux 0.99.15的sleep和wake_up的實現(支持等待隊列):
static inline void __sleep_on(struct wait_queue **p, int state)
{
unsigned long flags;
struct wait_queue wait = { current, NULL };
if (!p)
return;
if (current == task[0])
panic("task[0] trying to sleep");
current->state = state;
add_wait_queue(p, &wait); //將當前進程加入等待隊列
save_flags(flags); //保存中斷掩碼
sti(); //屏蔽中斷
schedule(); //上下文切換
remove_wait_queue(p, &wait); //從等待隊列中移除當前進程
restore_flags(flags); //恢復中斷掩碼
}
void wake_up(struct wait_queue **q)
{
struct wait_queue *tmp;
struct task_struct * p;
if (!q || !(tmp = *q))
return;
do {//將等待隊列中喚醒隊首進程
if ((p = tmp->task) != NULL) {
if ((p->state == TASK_UNINTERRUPTIBLE) ||
(p->state == TASK_INTERRUPTIBLE)) {
p->state = TASK_RUNNING;
if (p->counter > current->counter)
need_resched = 1;
}
}
if (!tmp->next) {
printk("wait_queue is bad (eip = %08lx)\n",((unsigned long *) q)[-1]);
printk(" q = %p\n",q);
printk(" *q = %p\n",*q);
printk(" tmp = %p\n",tmp);
break;
}
tmp = tmp->next;
} while (tmp != *q);
}