首页 > 其他 > 详细

ConcurrentHashMap浅析

时间:2015-08-14 22:47:55      阅读:271      评论:0      收藏:0      [点我收藏+]

概述:

ConcurrentHashMap是HashMap的升级版,我们都知道HashMap是不可靠的,线程不安全的,而Hashtable在同步的时候又会将整张表都锁住,从而在多并发的情况下效率低下。于是ConcurrentHashMap出现了,综合了两者的优点,所以一直是高并发情况下开发者的首选,但是相对的,它也有自身的一些不足,我们来分析一下它的原理。

ConcurrentHashMap结构分析:

ConcurrentHashMap胜于HashMap,却又不同于Hashtable,怎么解释?ConcurrentHashMap将哈希数组分成了好多段,这里的哈希数组可以想象成HashMap中存放链表头结点的地址的数组,但是却又不同,这个哈希数组中每一段中又存放着一个类似于Hashtable的结构,也就可以说ConcurrentHashMap中每一个段就是一个Hashtable。如图:

技术分享
(图片来自:http://www.cnblogs.com/ITtangtang/p/3948786.html)

我们可以看到ConcurrentHashMap是由很多段组成的,而每一个段就是一个Hashtable。
这样处理的原因是,我们同步的时候,不用锁住一张表,我们只需要锁住一个段就可以,其他的段也可以进行相应的操作,这也称为“分段锁”。

有了大致的理解后,我们来看看具体的信息:

成员信息

  final Segment<K,V>[] segments;
这个就是我们上面说的段数组,默认为16个,每一个段数组内部都是一个Hashtable,而且我们注意到,这个是final类型的,也就是分配好之后就不变了,这样做的原因就是如果ConcurrentHashMap需要rehash的时候,就不会影响Segments数组的大小,只需要改变每一个段内table的数组长度即可。如下是每个段内的成员 分布:
             
        transient volatile int count;//table内元素的数量
        //修改次数       
        transient int modCount;
        //临界值       
        transient int threshold;
        //存放链表头结点的数组,相当于Hashtable中的哈希数组    
        transient volatile HashEntry<K,V>[] table;
        //加载因子
        final float loadFactor;
我们注意到这里的table并不是final,这也就说明了我们上面的观点,rehash时只需要调整table的长度即可。

再来看每个链表节点的信息:

static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
     final HashEntry<K,V> next;
        ......
}
我们可以看到每个节点的key,hash,next都是final的,这也就说明我们在插入删除的时候不能从链表的中间插入删除,只能在表头处理。(下面会详细讲解)。
因为像在HashMap在链表的中间插入删除,如果读操作不加锁会导致读取到不一致的数据,想象一个场景:线程A将节点2的数据读取出来,刚好此时线程B将节点2删除了,那么此时读取到的数据便是不一致的,因为原来的节点2的next是指向节点3 的,而现在是指向节点4的。如果我们只允许在表头处理,那么就保证了HashEntry几乎是不可变的(删除的时候会变)。
而value的注意到是volatile修饰,就说明value读取的时候一定是最新的值。

-----------------------------------------------------------------------------------------------------------------------------------------------------

构造函数:

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();

        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;

        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
  //寻找小于2的n次方

        segmentShift = 32 - sshift;
        segmentMask = ssize - 1;
        this.segments = Segment.newArray(ssize);
  
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = 1;
        while (cap < c)
            cap <<= 1;

        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);
    }


创建一个ConcurrentHashMap,如果不指定参数,那么默认的是(16,0.75,16)。
如果指定的段的数量不是2的n次方,那么我们要找到小于给定数最大的2的n次方。segmentShift 和segmentMask 是在hash算法的时候要用到,后面我们会看到。
划分好各个段以后,我们还要对每个段进行初始化。C表示一个段内可以存放多少HashEntry,也就是一个段内hashEntry数组的长度,那么cap就是不大于c2
n次方。然后我们根据cap和加载因子建立Hashentry数组。

至此构造一个新的ConcurrentHashMap完成,我们该往里面放点值了。

Put方法

public V put(K key, V value) {
        if (value == null)//说明不支持值为null的,但是支持键为null的,后面会说到原因
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, false);
    }
说put方法之前,我们来说说定位,也就是hash。
 private static int hash(int h) {
        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

 final Segment<K,V> segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
       }

重新计算keyhashcode的哈希值,这里在哈希的目的是减少哈希冲突,是使每个元素都能够均匀的分布在不同的段上,从而提高容器的存取效率。

segmentShift默认28segmentMask默认15,将计算到的哈希值无符号右移28位,即让高四位进行运算。




V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                    rehash();  //扩容
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;
                if (e != null) {
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                }
                else {
                    oldValue = null;
                    ++modCount;
                    tab[index] = new HashEntry<K,V>(key, hash, first, value);                     //-------1-----------------
                    count = c; // write-volatile                                                //-----------3--------------
                }
                return oldValue;
            } finally {
                unlock();
            }
           }

Put方法一执行,首先就会先加锁,然后接着判断是否需要扩容,然后定位到待插入元素的链表头,接着循环判断是否存在key键相同的,若有存在的便将旧值保存起来,用新值覆盖旧值。如果没找到,将旧值置为null,创建一个新的节点,放到链表的头部。然后更新count的值。


--------------------------------------------------------------------------------------------------------------------------------------------------------------------

get方法

public V get(Object key) {
        int hash = hash(key.hashCode());
        return segmentFor(hash).get(key, hash);
       }
这里的hash定位和上面一样,我们可以看到这里一共要定位两次:首先定位到是在哪一个段,然后在这个段的HashEntry数组中定位到是哪一个索引,即哪一个链表。

V get(Object key, int hash) {
            if (count != 0) { // read-volatile                         //--------------2------------------
                HashEntry<K,V> e = getFirst(hash);                    //--------------4------------------
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key)) {
                        V v = e.value;
                        if (v != null)
                            return v;
                        return readValueUnderLock(e); // recheck      
                    }
                    e = e.next;
                }
            }
            return null;
           }

这里的思路也很简单:首先判断count是否为0,如果为0说明不存在,直接返回null。若存在则定位到链表头部位,遍历链表,如果找到键相同的,那么就返回对应的值,没找到返回null。

我们简单的分析完会产生几个问题。

1、为什么判断count?

首先来看count的定义:
 transient volatile int count;
发现是volatile,接着我们在源码中发现,无论是删除还是插入都会修改count,这就说明无论是哪一个线程更改了数据,count对所有线程都是可见的,而且都是最新值。
所以在这里判断一下,确保ConcurrentHashMap中的确没有数据。

2、if (v != null)是什么意思?

一般来说,我们已经在链表中找到我们需要的键了,那么对应的值肯定是存在的,因为ConcurrentHashMap中不允许值为null,那么这句话不是多此一举吗?
no,no,no,我们想象一个场景:线程A在执行到put代码块中的第1行(虚线注释的第1行),此时正在执行HashEntry的构造函数,就在此时线程B执行get代码块中的第2行(虚线注释的第2行),由于线程A的构造函数还没赋值,当然后续的第3行,也就没有给count赋值,此时线程B就读到没有值,就会返回null,可是事实是我们已经put进去了,但是没有读到值。所以为了防止这类情况发生,我们需要在判断一次,如果value等于null,进入readValueUnderLock(e)(图中recheck那一行),代码如下:
 V readValueUnderLock(HashEntry<K,V> e) {
            lock();
            try {
                return e.value;
            } finally {
                unlock();
            }
        }
我们可以明确的看出来,这里是先要获得锁,也就是保证其他人先不能修改数据,然后返回value。

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------

remove方法:

public V remove(Object key) {
	int hash = hash(key.hashCode());
        return segmentFor(hash).remove(key, hash, null);
    }
同样的先定位数据。

V remove(Object key, int hash, Object value) {
            lock();
            try {
                int c = count - 1;
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue = null;
                if (e != null) {
                    V v = e.value;
                    if (value == null || value.equals(v)) {
                        oldValue = v;
                        // All entries following removed node can stay
                        // in list, but all preceding ones need to be
                        // cloned.
                        ++modCount;
                        HashEntry<K,V> newFirst = e.next;
                        for (HashEntry<K,V> p = first; p != e; p = p.next)
                            newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                          newFirst, p.value);
                        tab[index] = newFirst;
                        count = c; // write-volatile
                    }
                }
                return oldValue;
            } finally {
                unlock();
            }
           }

我们定位到链表的头结点以后,遍历找寻key,如果没有对应的键,那么就返回null,如果发现了,我们要把待删节点之前的数据节点,逐个拷贝出来(因为前面说过,next是不可变的,所以只能在表头操作),从前往后拷贝,拷贝第1个节点,放到待删节点的前1个节点,依次类推,可能还有点模糊,我们看下面的图:
技术分享
图片忘记复制的哪里的。

依照图上面的假如我们要删除e3,我们就需要先拷贝e1,作为e3的前驱,让其挂上e3的后继,然后删除e3,依次类推。

------------------------------------------------------------------------------------------------------------------------------------------------------------

Size方法

我们来看看会锁定全段的方法。


 public int size() {
        final Segment<K,V>[] segments = this.segments;
        long sum = 0;
        long check = 0;
        int[] mc = new int[segments.length];
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
            check = 0;
            sum = 0;
            int mcsum = 0;
            for (int i = 0; i < segments.length; ++i) {
                sum += segments[i].count;
                mcsum += mc[i] = segments[i].modCount;
            }
            if (mcsum != 0) {
                for (int i = 0; i < segments.length; ++i) {
                    check += segments[i].count;
                    if (mc[i] != segments[i].modCount) {
                        check = -1; // force retry
                        break;
                    }
                }
            }
            if (check == sum)
                break;
        }
        if (check != sum) { // Resort to locking all segments
            sum = 0;
            for (int i = 0; i < segments.length; ++i)
                segments[i].lock();
            for (int i = 0; i < segments.length; ++i)
                sum += segments[i].count;
            for (int i = 0; i < segments.length; ++i)
                segments[i].unlock();
        }
        if (sum > Integer.MAX_VALUE)
            return Integer.MAX_VALUE;
        else
            return (int)sum;
    }

size方法首先不是锁定全段,首先进行遍历,对段上的数据进行遍历,期间用到了modCount,并且将每个段的修改次数都写进了一个新数组,然后累加所有的和,接下来在第二次遍历时,判断第一次得到的修改次数和第二次的是否一样,如果完全一样而且累加和都相等,说明在求取size的过程中没有其他任何线程对其进行修改;相反如果在求取size的过程中,另一个线程修改了数据,就会造成第一个遍历的和第二次遍历得到的修改次数不一样,那么此时会在这样判断一次,也就是整体的在循环一次,因为RETRIES_BEFORE_LOCK默认是2。完成后如果还不能无法避免其他线程在修改,那么此时锁定所有的段,,然后对其进行求取size,最后释放所有段的锁。


containsVlaue的方法和上面类似。

------------------------------------------------------------------------------------------------------------------------------------------------------------------

迭代器的那些事:

说到迭代器肯定是弱一致性的,其实ConcurrentHashMap中的get、clear方法等都是弱一致性的。那么什么是弱一致性?
拿get来说,刚才我们看到如果我们网ConcurrentHashMap中put了一条数据,我们希望立即读到的时候,有可能会读取不到的,但是ConcurrentHashMap中的确存在这条数据。这就是弱一致性。
换成clear,也就是当我清空了一部分的节点空间,然后其余线程又再put数据,因为clear不是同步的,所以其他线程有可能刚好put到我们清空的空间中,这样clear返回的时候,ConcurrentHashMap中就可能还有数据产生。
迭代器也是一样,当迭代遍历的时候,其他线程修改了遍历过的数据,那我们遍历得到的结果就和ConcurrentHashMap中实际存在的数据就会不一致。

所以相对于的Hashtable便是强一致的,当在遍历时,只要某个线程修改了数据,那就会抛出ConcurrentModificationException异常。


ConcurrentHashMap的弱一致性是为了提高效率,是一致性和效率之间的权衡。如果要保持绝对的一致性,可以选择Hashtable或者同步后的HashMap。

小结

ConcurrentHashMap是HashMap的线程安全升级版,是Hashtable的改进版。但是不能完全替代Hashtable,因为在某些必须保证一致性的前提下,我们会选择Hashtable。


ConcurrentHashMap的键可以为null,值却不能为null,这和以前我们见到的HashMap都不一样(HashMap是键值都可为null,Hashtable是键值都不可为null),那么究竟是为什么呢?我们刚才在上面的get代码快中发现,如果get的key存在,但是value却为null,则需要重新加锁后重读,所以值为null是有特殊用处的。


ConcurrentHashMap和Hashtable:Hashtable利用synchronized锁住整张表,当Hashtable的数量增大到一定程度时,迭代时就会锁住整张表,就造成了性能和效率的下降,而ConcurrentHashMap则使用分段锁,每次只用锁住一个段,不影响其他的段进行操作。


ConcurrentHashMap在读取的时候不用加锁,所以也造成了弱一致性。而Hashtable无论任何情况都会加锁,所以也成就了强一致性。


版权声明:本文为博主原创文章,转载请注明出处。

ConcurrentHashMap浅析

原文:http://blog.csdn.net/u014307117/article/details/47666857

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!