今天正式開始自己的分布式學習,在第一章介紹多線程工作模式時,作者拋出了一段關於ConcurrentHashMap代碼讓我很是疑惑,代碼如下:
public class TestClass {
private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<String, Integer>();
public void add(String key){
Integer value = map.get(key);
if(value == null){
map.put(key, 1);
}else{
map.put(key, value + 1);
}
}
}
作者的結論是這樣嬸的:即使使用線程安全的ConcurrentHashMap來統計信息的總數,依然存在線程不安全的情況。
筆者的結論是這樣嬸的:ConcurrentHashMap本來就是線程安全的呀,讀雖然不加鎖,寫是會加鎖的呀,講道理的話上面的代碼應該沒啥問題啊。
既然持懷疑態度,那筆者只有寫個測試程序咯,因為偉大的毛主席曾說過:“實踐是檢驗真理的唯一標准” =_=
/**
* @Title: TestConcurrentHashMap.java
* @Describe:測試ConcurrentHashMap
* @author: Mr.Yanphet
* @Email: mr_yanphet@163.com
* @date: 2016年8月1日 下午4:50:18
* @version: 1.0
*/
public class TestConcurrentHashMap {
private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<String, Integer>();
public static void main(String[] args) {
DoWork dw = new DoWork(map);
ExecutorService pool = Executors.newFixedThreadPool(8);
try {
for (int i = 0; i < 20; i++) {
pool.execute(new Thread(dw));// 開啟20個線程
}
Thread.sleep(5000);// 主線程睡眠5s 等待子線程完成任務
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.shutdown();// 關閉線程池
}
System.out.println("統計的數量:" + map.get("count"));
}
static class DoWork implements Runnable {
private ConcurrentHashMap<String, Integer> map = null;
public DoWork(ConcurrentHashMap<String, Integer> map) {
this.map = map;
}
@Override
public void run() {
add("count");
}
public void add(String key) {
Integer value = map.get(key);// 獲取map中的數值
System.out.println("當前數量" + value);
if (null == value) {
map.put(key, 1);// 第一次存放
} else {
map.put(key, value + 1);// 以後次存放
}
}
}
}
debug輸出一下:
當前數量null 當前數量null 當前數量null 當前數量1 當前數量null 當前數量1 當前數量2 當前數量3 當前數量4 當前數量5 當前數量6 當前數量7 當前數量7 當前數量5 當前數量6 當前數量7 當前數量8 當前數量8 當前數量8 當前數量6 統計的數量:7
這結果並不是20呀,瞬間被打臉有木有啊?滿滿的心塞有木有啊?
秉承著打破砂鍋問到底的精神,必須找到原因,既然打了臉,那就別下次還打臉啊.....
翻開JDK1.6的源碼:
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);// segmentFor(hash)用於精確到某個段
}
map的put方法調用了segment(類似hashtable的結構)的put方法,此外該方法涉及的另外兩個方法,筆者一並放在一起分析。
V get(Object key, int hash) {
if (count != 0) { // segment存在值 繼續往下查找
HashEntry<K,V> e = getFirst(hash);// 根據hash值定位 相應的鏈表
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;// 值不為null 立即返回
return readValueUnderLock(e); // 值為null 重新讀取該值
}
e = e.next;// 循環查找鏈表中的下一個值
}
}
return null;// 如果該segment沒有值 直接返回null
}
// 定位鏈表
HashEntry<K,V> getFirst(int hash) {
HashEntry<K,V>[] tab = table;
return tab[hash & (tab.length - 1)];
}
// 再次讀取為null的值
V readValueUnderLock(HashEntry<K,V> e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}
這裡需要總結一下了:
1 ConcurrentHashMap讀取數據不加鎖的結論是不正確的,當讀取的值為null時,這時候ConcurrentHashMap是會加鎖再次讀取該值的(上面粗體部分)。至於讀到null就加鎖再讀的原因如下:
ConcurrentHashMap的put方法value是不能為null的(稍後代碼展示),現在get值為null,那麼可能有另外一個線程正在改變該值(比如remove),為了讀取到正確的值,所以采取加鎖再讀的方法。在此對Doug Lee大師的邏輯嚴密性佩服得五體投地啊有木有......
2 讀者大概也知道為啥不是20了吧,雖然put加鎖控制了線程的執行順序,但是get沒有鎖,也就是多個線程可能拿到相同的值,然後相同的值+1,結果就不是預期的20了。
既然知道了原因,那麼修改一下add(String key)這個方法,加鎖控制它get的順序即可。
public void add(String key) {
lock.lock();
try {
Integer value = map.get(key);
System.out.println("當前數量" + value);
if (null == value) {
map.put(key, 1);
} else {
map.put(key, value + 1);
}
} finally {
lock.unlock();
}
}
再次debug輸出一下:
當前數量null 當前數量1 當前數量2 當前數量3 當前數量4 當前數量5 當前數量6 當前數量7 當前數量8 當前數量9 當前數量10 當前數量11 當前數量12 當前數量13 當前數量14 當前數量15 當前數量16 當前數量17 當前數量18 當前數量19 統計的數量:20
得到正確的結果。
附上put方法的源碼:
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();// 值為空拋出NullPointerException異常
int hash = hash(key.hashCode());// 根據key的hashcode 然後獲取hash值
return segmentFor(hash).put(key, hash, value, false); //定位到某個segment
}
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // 該segment總的key-value數量+ 大於threshold閥值
rehash(); // segment擴容
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);// hash值與數組長度-1取&運算
HashEntry<K,V> first = tab[index]; // 定位到某個數組元素(頭節點)
HashEntry<K,V> e = first;// 頭節點
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {// 找到key 替換舊值
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}else {// 未找到key 生成節點
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
程序員最怕對技術似懂非懂,在此與君共勉,慎之戒之!!!