目錄
CountDownLatch
作用
示例
CyclicBarrier
示例
CyclicBarrier 和 CountDownLatch 在用法上的不同
Semaphore
示例:Semaphore 控制資源訪問
最後一句話總結
這次說一下 JUC 中的同步器三個主要的成員:CountDownLatch
、CyclicBarrier
和 Semaphore
(不知道有沒有初學者覺得這三個的名字不太好記)。這三個是 JUC 中較為常用的同步器,通過它們可以方便地實現很多線程之間協作的功能。(下面的代碼出自 JDK 文檔)
直譯過來就是倒計數(CountDown)門闩(Latch)。倒計數不用說,門闩的意思顧名思義就是阻止前進。在這裡就是指 CountDownLatch.await()
方法在倒計數為0之前會阻塞當前線程。
CountDownLatch
的作用和 Thread.join()
方法類似,可用於一組線程和另外一組線程的協作。例如,主線程在做一項工作之前需要一系列的准備工作,只有這些准備工作都完成,主線程才能繼續它的工作。這些准備工作彼此獨立,所以可以並發執行以提高速度。在這個場景下就可以使用 CountDownLatch
協調線程之間的調度了。在直接創建線程的年代(Java 5.0 之前),我們可以使用 Thread.join()
。在 JUC 出現後,因為線程池中的線程不能直接被引用,所以就必須使用 CountDownLatch
了。
下面的這個例子可以理解為 F1 賽車的維修過程,只有 startSignal (可以表示停車,可能名字不太貼合)命令下達之後,維修工才開始干活,只有等所有工人完成工作之後,賽車才能繼續。
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
當 startSignal.await()
會阻塞線程,當 startSignal.countDown()
被調用之後,所有 Worker
線程開始執行 doWork()
方法,所以 Worker。doWork()
是幾乎同時開始執行的。當 Worker.doWork()
執行完畢後,調用 doneSignal.countDown()
,在所有 Worker
線程執行完畢之後,主線程繼續執行。
CyclicBarrier
翻譯過來叫循環柵欄、循環障礙什麼的(還是有點別扭的。所以還是別翻譯了,只可意會不可言傳啊)。它主要的方法就是一個:await()
。await()
方法沒被調用一次,計數便會減少1,並阻塞住當前線程。當計數減至0時,阻塞解除,所有在此 CyclicBarrier
上面阻塞的線程開始運行。在這之後,如果再次調用 await()
方法,計數就又會變成 N-1,新一輪重新開始,這便是 Cyclic 的含義所在。
CyclicBarrier
的使用並不難,但需要主要它所相關的異常。除了常見的異常,CyclicBarrier.await()
方法會拋出一個獨有的 BrokenBarrierException
。這個異常發生在當某個線程在等待本 CyclicBarrier
時被中斷或超時或被重置時,其它同樣在這個 CyclicBarrier
上等待的線程便會受到 BrokenBarrierException
。意思就是說,同志們,別等了,有個小伙伴已經掛了,咱們如果繼續等有可能會一直等下去,所有各回各家吧。
CyclicBarrier.await()
方法帶有返回值,用來表示當前線程是第幾個到達這個 Barrier 的線程。
和 CountDownLatch
一樣,CyclicBarrier
同樣可以可以在構造函數中設定總計數值。與 CountDownLatch
不同的是,CyclicBarrier
的構造函數還可以接受一個 Runnable
,會在 CyclicBarrier
被釋放時執行。
NOTE: CyclicBarrier 的功能也可以由 CountDownLatch 來實現
CyclicBarrier 的應用(當然,這個例子換成 CountDownLatch 也是可以實現的,很簡單,就不說怎麼寫了)
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N, new Runnable() {
public void run() {
mergeRows(...);
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
CyclicBarrier
和 CountDownLatch
在用法上的不同CountDownLatch
適用於一組線程和另一個主線程之間的工作協作。一個主線程等待一組工作線程的任務完畢才繼續它的執行是使用 CountDownLatch
的主要場景;CyclicBarrier
用於一組或幾組線程,比如一組線程需要在一個時間點上達成一致,例如同時開始一個工作。另外,CyclicBarrier
的循環特性和構造函數所接受的 Runnable
參數也是 CountDownLatch
所不具備的。
Semaphore
直譯是信號量,可能稱它是許可量更容易理解。當然,因為在計算機科學中這個名字由來已久,所以不能亂改。它的功能比較好理解,就是通過構造函數設定一個數量的許可,然後通過 acquire
方法獲得許可,release
方法釋放許可。它還有 tryAcquire
和 acquireUninterruptibly
方法,可以根據自己的需要選擇
Semaphore
控制資源訪問class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
上面這個示例中 Semaphore 的用法沒什麼可多講的。需要留言的是這裡面有兩個同步方法,不過對吞吐應該沒什麼影響,因為主要是對一個 boolean 數組做一下 O(n) 的操作,而且每個循環裡面的操作很簡單,所以速度很快。不過不知道 JUC 裡面線程池的控制是怎麼做的,本人不才,還沒看過那塊源代碼,得空看看,有知道的也可以說說。
CountDownLatch
是能使一組線程等另一組線程都跑完了再繼續跑;CyclicBarrier
能夠使一組線程在一個時間點上達到同步,可以是一起開始執行全部任務或者一部分任務。同時,它是可以循環使用的;Semaphore
是只允許一定數量的線程同時執行一段任務。