今天正式開始自己的分布式學習,在第一章介紹多線程工作模式時,作者拋出了一段關於ConcurrentHashMap代碼讓我很是疑惑,代碼如下:
public class TestClass { private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<String, Integer>(); public void add(String key){ Integer value = map.get(key); if(value == null){ map.put(key, 1); }else{ map.put(key, value + 1); } } }
作者的結論是這樣嬸的:即使使用線程安全的ConcurrentHashMap來統計信息的總數,依然存在線程不安全的情況。
筆者的結論是這樣嬸的:ConcurrentHashMap本來就是線程安全的呀,讀雖然不加鎖,寫是會加鎖的呀,講道理的話上面的代碼應該沒啥問題啊。
既然持懷疑態度,那筆者只有寫個測試程序咯,因為偉大的毛主席曾說過:“實踐是檢驗真理的唯一標准” =_=
/** * @Title: TestConcurrentHashMap.java * @Describe:測試ConcurrentHashMap * @author: Mr.Yanphet * @Email: [email protected] * @date: 2016年8月1日 下午4:50:18 * @version: 1.0 */ public class TestConcurrentHashMap { private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<String, Integer>(); public static void main(String[] args) { DoWork dw = new DoWork(map); ExecutorService pool = Executors.newFixedThreadPool(8); try { for (int i = 0; i < 20; i++) { pool.execute(new Thread(dw));// 開啟20個線程 } Thread.sleep(5000);// 主線程睡眠5s 等待子線程完成任務 } catch (Exception e) { e.printStackTrace(); } finally { pool.shutdown();// 關閉線程池 } System.out.println("統計的數量:" + map.get("count")); } static class DoWork implements Runnable { private ConcurrentHashMap<String, Integer> map = null; public DoWork(ConcurrentHashMap<String, Integer> map) { this.map = map; } @Override public void run() { add("count"); } public void add(String key) { Integer value = map.get(key);// 獲取map中的數值 System.out.println("當前數量" + value); if (null == value) { map.put(key, 1);// 第一次存放 } else { map.put(key, value + 1);// 以後次存放 } } } }
debug輸出一下:
當前數量null 當前數量null 當前數量null 當前數量1 當前數量null 當前數量1 當前數量2 當前數量3 當前數量4 當前數量5 當前數量6 當前數量7 當前數量7 當前數量5 當前數量6 當前數量7 當前數量8 當前數量8 當前數量8 當前數量6 統計的數量:7
這結果並不是20呀,瞬間被打臉有木有啊?滿滿的心塞有木有啊?
秉承著打破砂鍋問到底的精神,必須找到原因,既然打了臉,那就別下次還打臉啊.....
翻開JDK1.6的源碼:
public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash);// segmentFor(hash)用於精確到某個段 }
map的put方法調用了segment(類似hashtable的結構)的put方法,此外該方法涉及的另外兩個方法,筆者一並放在一起分析。
V get(Object key, int hash) { if (count != 0) { // segment存在值 繼續往下查找 HashEntry<K,V> e = getFirst(hash);// 根據hash值定位 相應的鏈表 while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v;// 值不為null 立即返回 return readValueUnderLock(e); // 值為null 重新讀取該值 } e = e.next;// 循環查找鏈表中的下一個值 } } return null;// 如果該segment沒有值 直接返回null } // 定位鏈表 HashEntry<K,V> getFirst(int hash) { HashEntry<K,V>[] tab = table; return tab[hash & (tab.length - 1)]; } // 再次讀取為null的值 V readValueUnderLock(HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } }
這裡需要總結一下了:
1 ConcurrentHashMap讀取數據不加鎖的結論是不正確的,當讀取的值為null時,這時候ConcurrentHashMap是會加鎖再次讀取該值的(上面粗體部分)。至於讀到null就加鎖再讀的原因如下:
ConcurrentHashMap的put方法value是不能為null的(稍後代碼展示),現在get值為null,那麼可能有另外一個線程正在改變該值(比如remove),為了讀取到正確的值,所以采取加鎖再讀的方法。在此對Doug Lee大師的邏輯嚴密性佩服得五體投地啊有木有......
2 讀者大概也知道為啥不是20了吧,雖然put加鎖控制了線程的執行順序,但是get沒有鎖,也就是多個線程可能拿到相同的值,然後相同的值+1,結果就不是預期的20了。
既然知道了原因,那麼修改一下add(String key)這個方法,加鎖控制它get的順序即可。
public void add(String key) { lock.lock(); try { Integer value = map.get(key); System.out.println("當前數量" + value); if (null == value) { map.put(key, 1); } else { map.put(key, value + 1); } } finally { lock.unlock(); } }
再次debug輸出一下:
當前數量null 當前數量1 當前數量2 當前數量3 當前數量4 當前數量5 當前數量6 當前數量7 當前數量8 當前數量9 當前數量10 當前數量11 當前數量12 當前數量13 當前數量14 當前數量15 當前數量16 當前數量17 當前數量18 當前數量19 統計的數量:20
得到正確的結果。
附上put方法的源碼:
public V put(K key, V value) { if (value == null) throw new NullPointerException();// 值為空拋出NullPointerException異常 int hash = hash(key.hashCode());// 根據key的hashcode 然後獲取hash值 return segmentFor(hash).put(key, hash, value, false); //定位到某個segment } V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; if (c++ > threshold) // 該segment總的key-value數量+ 大於threshold閥值 rehash(); // segment擴容 HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1);// hash值與數組長度-1取&運算 HashEntry<K,V> first = tab[index]; // 定位到某個數組元素(頭節點) HashEntry<K,V> e = first;// 頭節點 while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) {// 找到key 替換舊值 oldValue = e.value; if (!onlyIfAbsent) e.value = value; }else {// 未找到key 生成節點 oldValue = null; ++modCount; tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // write-volatile } return oldValue; } finally { unlock(); } }
程序員最怕對技術似懂非懂,在此與君共勉,慎之戒之!!!