@since 1.5
jdk1.7以前的ConcurrentHashMap使用的是分段锁实现的并发
jdk1.8之后的版本使用的数组+链表+红黑树数据结构再加上CAS原子操作实现的
Hashtable因为在put等操作的时候使用的synchronized加锁的方式来实现并发安全,但是这样访问map就效率太过于低下了,所以就有了分段锁的方式。大概的意思就是,在开始访问Hashtable的时候因为加了锁的原因,每次只有一个线程可以对map进行修改,但是map太常用且违背map设计的初衷,所以Hashtable就渐渐的被遗弃了。
1.7以前的ConcurrentHashMap使用分段锁解决了这个并发效率低下的问题。分段锁意思大概就是对map的bucket采用分段的方式分成一个个段,再对每个段分别加锁,这样,在有线程访问某一段的时候就要获取某一段的锁,但是其他的段因为是使用的不同的锁,从而其他段是可以被其他线程访问的,这样就大大解决了map并发访问的问题。需要注意的是,ConcurrentHashMap既不允许key不为null,值也不允许为null;
ConcurrentHashMap是由Segment[] 与HashEntry[] 构成的,大概就是一个CurrentHashMap由多个segment构成,segment继承了ReentrantLock,而一个个segment又是由多个HashEntry构成的,HashEntry里保存着KV值。这样相当于一个Seg管理多个Entry。
从小到大看:
成员变量:
/**
* 默认初始容量
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
/**
* 默认加载因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* 默认并发度,该参数会影响segments数组的长度
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 最大容量,构造ConcurrentHashMap时指定的大小超过该值则会使用该值替换,
* ConcurrentHashMap的大小必须是2的幂,且小于等于1<<30,以确保可使用int索引条目
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 每个segment中table数组的最小长度,必须是2的幂,至少为2,以免延迟构造后立即调整大小
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
/**
* 允许的最大segment数量,用于限定构造函数参数concurrencyLevel的边界,必须是2的幂
*/
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
/**
* 非锁定情况下调用size和containsValue方法的重试次数,避免由于table连续被修改导致无限重试
*/
static final int RETRIES_BEFORE_LOCK = 2;
/**
* 与当前实例相关联的,用于key哈希码的随机值,以减少哈希冲突
*/
private transient final int hashSeed = randomHashSeed(this);
private static int randomHashSeed(ConcurrentHashMap instance) {
if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {
return sun.misc.Hashing.randomHashSeed(instance);
}
return 0;
}
/**
* 用于索引segment的掩码值,key哈希码的高位用于选择segment
*/
final int segmentMask;
/**
* 用于索引segment偏移值
*/
final int segmentShift;
/**
* segments数组
*/
final Segment<K,V>[] segments;
首先看构造函数(有很多构造函数,但是最终都调用的这个):
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 并发等级如果大于了最大的seg,就要将seg扩充到并发等级、
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 寻找与给定参数concurrencyLevel匹配的最佳Segment数组ssize,必须是2的幂
// 如果concurrencyLevel是2的幂,那么最后选定的ssize就是concurrencyLevel
// 否则concurrencyLevel,ssize为大于concurrencyLevel最小2的幂值
// concurrencyLevel为7,则ssize为2的3次幂,为8
// 计算ssize的时候进行了多少次的移位运算
int sshift = 0;
//ssize = segment size
int ssize = 1;
//就是找到最符合并发等级的segment size
while (ssize < concurrencyLevel) {
++sshift;
//左移是乘2 ssize = 1; 会变成2的幂次
ssize <<= 1;
}
//用于定位段
this.segmentShift = 32 - sshift;
// 因为ssize是2的幂次方,所以segmentMask是一个全1的一个二进制
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//计算每个Segment中,table数组的初始大小 总的桶数/总的段数=每个段包含的bucket的数量
int c = initialCapacity / ssize;
//确保每个桶都被分到了seg中 一个seg中包含多个table 一个table又包含一个hashentry[]
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 创建segments和第一个segment
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); //原子按顺序写入segments[0]
this.segments = ss;
}
/**
* map转化为ConcurrentHashMap
*
* @param m the map
*/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}
hash算法
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
定位key所在的seg
private Segment<K,V> segmentForHash(int h) {
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
}
HashEntry:
static final class HashEntry<K,V> {
//数组结构的一个个Entry
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V>[] newArray(int i) {
return new HashEntry[i];
}
}
利用CAS原子操作与synchronized保证并发
底层采用数组+链表+红黑树的实现
一些常量:
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
private static final float LOAD_FACTOR = 0.75f;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
一些成员变量:
transient volatile Node<K,V>[] table;
private transient volatile Node<K,V>[] nextTable;
private transient volatile long baseCount;
//size controll
private transient volatile int sizeCtl;
private transient volatile int transferIndex;
private transient volatile int cellsBusy;
private transient volatile CounterCell[] counterCells;
构造函数:
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//初始容量需要大于等于并发等级
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
//size*loadFactor = capacity
//size:那个数组的长度
//capacity:
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
//cap是大于等于size的一个2的次幂
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
//tableSizeFor会返回大于等于size的2的次幂
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
要摒弃jdk1.7对于CHM的想法。
CHM主要还是为了实现一个线程安全的HashMap,而线程安全主要是体现在了对于map的put与get操作上。所以我们主要看一下CHM的put与get。
注意几个变量:f代表Node,fh代表Node的hashcode。
其中,fh=-1代表此时数组正在扩容。fh=-2代表当前树根节点的hash。fh>0代表此时的node是链表node。
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //两次hash,减少hash冲突,可以均匀分布
int binCount = 0;
for (Node<K,V>[] tab = table;;) { //对这个table进行迭代
Node<K,V> f; int n, i, fh;
//这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置没有数据,就直接无锁插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)//如果在进行扩容,则先进行扩容操作
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { //表示该节点是链表结构
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//这里涉及到相同的key进行put就会覆盖原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //插入链表尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//红黑树结构
Node<K,V> p;
binCount = 2;
//红黑树结构旋转插入
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) { //如果链表的长度大于8时就会进行红黑树的转换
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//统计size,并且检查是否需要扩容
return null;
}
put操作对于hashmap而言,主要是要先遍历数组,找到要put的位置,然后查看有无冲突,有冲突则放入链表/红黑树中,无冲突则直接放入。大概的逻辑是这样的,细节慢慢来。
我们看到在CHM中,对于put的主要逻辑体现在:
for (Node<K,V>[] tab = table;;) { //对这个table进行迭代
Node<K,V> f; int n, i, fh;
//这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置没有数据,就直接无锁插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)//如果在进行扩容,则先进行扩容操作
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { //表示该节点是链表结构
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//这里涉及到相同的key进行put就会覆盖原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //插入链表尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//红黑树结构
Node<K,V> p;
binCount = 2;
//红黑树结构旋转插入
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) { //如果链表的长度大于8时就会进行红黑树的转换
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//统计size,并且检查是否需要扩容
分解看就是:
先遍历整个数组table:for (Node<K,V>[] tab = table;;)
然后这里需要注意的一点,这里使用了懒加载的方式,在第一次put的时候才会初始化map。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
初始化了之后,又分为几种情况插入:
1.如果要插入的位置没有元素,则直接插入:
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置没有数据,就直接无锁插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
这里需要注意,使用了tabAt
:
@SuppressWarnings("unchecked") // ASHIFT等均为private static final
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { // 获取索引i处Node
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
在这里查看此时要插入的地方有没有值得时候,才用了“安全”的方法。因为这里有可能出现多个线程查看某个位置是否有值得情况。需要保证数据的可见性,如果在线程A看到这个位置没有值,准备插入,但是线程B也看到了这个位置没有值,但是线程B在插入的时候线程A已经插入了值,这时候就会出现线程安全的问题。
在确保了这里没有值的时候,再使用cas操作插入某个值:
// 利用CAS算法设置i位置上的Node节点(将c和table[i]比较,相同则插入v)。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
保证插入没有线程安全问题。
2.如果此时数组table正在扩容,则先进行扩容,因为这里是采用的无限循环的方式,所以在扩容完成之后,跳出此次循环之后还是会进入循环继续进行插入操作。
注意,这里查看数组是否正在进行扩容的方式。比较有意思,而我也没怎么理解。
3.要插入的地方有值,存在hash冲突。
这时候就要加锁了,防止多个线程同时操作时造成线程安全问题。
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { //表示该节点是链表结构
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//这里涉及到相同的key进行put就会覆盖原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //插入链表尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//红黑树结构
Node<K,V> p;
binCount = 2;
//红黑树结构旋转插入
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { //表示该节点是链表结构
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//这里涉及到相同的key进行put就会覆盖原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //插入链表尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//红黑树结构
Node<K,V> p;
binCount = 2;
//红黑树结构旋转插入
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
这里对某一个结点加锁,保证一次只有一个线程可以进入。如果是链表结构(fh>0),则直接插入链表尾。如果是树结构,则红黑树旋转插入。最后查看书否需要将链表结构转化为树结构。
至于扩容操作自己也没看咋懂或者说好多看不下去了这里留一个todo
接下来看看get方法:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); //计算两次hash
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//读取首节点的Node元素
if ((eh = e.hash) == h) { //如果该节点就是首节点就返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来
//查找,查找到就返回
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {//既不是首节点也不是ForwardingNode,那就往下遍历
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get方法比较简单,大概就是计算hash,在这里有可能碰到在取元素的时候map正在扩容,就调用find方法去查找元素。
最后,为什么要使用红黑树代替链表。
因为对于链表而言,要是链表长度太长,会出现遍历操作很慢的情况(链表的特点,插入删除快,遍历慢),所以采用了红黑树(平衡的二叉搜索树)。至于为什么不适用数组,是因为插入删除慢。。。。。。至于为什么不使用二叉搜索树。因为,二叉搜索树有可能出现很不平衡的状况,使得遍历的效率跟链表差不多。
本篇博客参考了https://www.cnblogs.com/study-everyday/p/6430462.html,如果大家要深入的去了解,建议看看这篇。
原文:https://www.cnblogs.com/GaryZz/p/11341265.html