程序設計需要同步(synchronization),原因:
1)復雜的功能要求的需要使用多線程編程,線程之間存在讀寫共享變量。
2)讀寫共享變量(shared mutual variable),JVM的內存模型(Memory model: decide when and how changes made by one thread become visuble to others)受到其它因素干擾。
3)對共享變量的操作非原子性。例如 i++;就不是原子操作,它分為兩部分,(1) 讀i (2) i+1寫入內存。如果i是線程A和線程B共享的變量,線程A在操作(1)之後,線程調度器調度調度線程B執行i++,因此兩個線程在變量i產生了不一致。注意,volatile修飾符是線程操作之前更新數據,但是,上面的問題顯然不是更新數據就能解決的。
4)增加互斥區(mutual exclusion)會降低執行效率,但是這是實現數據安全、功能強大的多線程編程最為重要的部分。
5)線程之間需要配合的場景需要並發控制邏輯。
Java並發編程使用的方法:
1) 為代碼塊和函數添加synchronized,同步的作用有兩點:
(1)a means of mutual exclusion, to prevent an object from being observed in an inconsistent state while it’s being modified by another thread.
(2)guarantee that one thread’s changes will be visible to another
2)配合使用object的wait和notify,實現對象monitor控制權從一個線程調度到另外一個線程。具體實例請參閱http://www.linuxidc.com/Linux/2012-08/67023.htm
3)使用ReentrantLock和Condition控制。ReentrantLock和Condition出現在java.util.concurrent包中。下面有從Hadoop源碼中摘取出來的一部分的內容作為介紹。
Hadoop源碼使用並發控制的實例:
Map階段產生<K,V>會先存儲在內存中,等到io.sort.mb指定的內存達到阈值(percent)時,會啟動spill到本地磁盤的工作。
ReentrantLock與Condition的配合使用,Condition為ReentrantLock鎖的等待和釋放提供控制邏輯。
例如,使用ReentrantLock加鎖之後,可以通過它自身的Condition.await()方法釋放該鎖,線程在此等待Condition.signal()方法,然後繼續執行下去。await方法需要放在while循環中,因此,在不同線程之間實現並發控制,還需要一個volatile的變量,boolean是原子性的變量。因此,一般的並發控制的操作邏輯如下所示:
volatile boolean isProcess = false;
ReentrantLock lock = new ReentrantLock();
Condtion processReady = lock.newCondtion();
thread: run() {
lock.lock();
isProcess = true;
try {
while(!isProcessReady) { //isProcessReady 是另外一個線程的控制變量
processReady.await();//釋放了lock,在此等待signal
}catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
isProcess = false;
}
}
}
}
看Hadoop的一段摘取的源碼:
private class MapOutputBuffer<K extends Object, V extends Object>
implements MapOutputCollector<K, V>, IndexedSortable {
...
boolean spillInProgress;
final ReentrantLock spillLock = new ReentrantLock();
final Condition spillDone = spillLock.newCondition();
final Condition spillReady = spillLock.newCondition();
volatile boolean spillThreadRunning = false;
final SpillThread spillThread = new SpillThread();
...
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
...
spillInProgress = false;
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
spillLock.lock();
try {
spillThread.start();
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
}
protected class SpillThread extends Thread {
@Override
public void run() {
spillLock.lock();
spillThreadRunning = true;
try {
while (true) {
spillDone.signal();
while (!spillInProgress) {
spillReady.await();
}
try {
spillLock.unlock();
sortAndSpill();
} catch (Throwable t) {
sortSpillException = t;
} finally {
spillLock.lock();
if (bufend < bufstart) {
bufvoid = kvbuffer.length;
}
kvstart = kvend;
bufstart = bufend;
spillInProgress = false;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
spillLock.unlock();
spillThreadRunning = false;
}
}
}