ConcurrentHashMap原理分析

/ 默认分类 / 0 条评论 / 1144浏览

ConcurrentHashMap原理分析

  1. ConcurrentHashMap是HashTable的改进版,HashTable是线程安全的,但直接使用方法级别的同步锁,也就等于每次操作都锁住整张hash表,ConcurrentHashMap 在其基础上改进,使用分段锁(JDK1.7使用Segment对某端hash表进行上锁,1.8则针对单个hash槽),每次操作只锁定当前操作的hash表的段落,其他hash表位置数据还是可以被并发操作,这样大大提高了效率。

  2. ConcurrenntHashMap也不支持null key和null value

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();

3.ConcurrenntHashMap计算hash值的方法和HashMap一样,同样是hashCode的高低位异或

int hash = spread(key.hashCode());


static final int spread(int h) {
        //HASH_BITS=0x7fffffff,很明显与它是用来去除符号位的
        return (h ^ (h >>> 16)) & HASH_BITS;
    }
  1. 基本原理分析

主要总结为下面一句话: 多线程环境下,ConcurrentHashMap通过读取volatile(内存可见性)修饰的sizeCtl属性,来判断当前所处的并发状态。 通过CAS设置sizeCtl属性,告知其他线程当前的状态变化。
所以,不同值的sizeCtl所代表的含义也不同:

未初始化:
sizeCtl=0:表示没有指定初始容量。
sizeCtl>0|:表示初始容量。

初始化中:
sizeCtl=-1,标记作用,告知其他线程,正在初始化

正常状态:
sizeCtl=0.75n ,扩容阈值

扩容中:
sizeCtl < 0 : 表示有其他线程正在执行扩容
sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此时只有一个线程在执行扩容
private transient volatile int sizeCtl;

ps:sizeCtl参数在构造函数时初始化为当前hash表容量(非负数,默认没有指定容量时为0)

下面是第一次插入数据的初始化方法执行过程

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果是第一次插入数据,那么就在initTable方法中初始化数组
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //如果sizeCtl值小于0则表示当前有其他线程在初始化了,那么当前线程就请求让出时间片,等待下一次cpu调度执行 
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //否则使用CAS为sizeCtl赋值为-1,如果赋值成功则说明乐观锁没有失败,当前只有这一个线程要初始化hash表,那么就开始初始化操作
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                //最终如果当前线程初始化完毕了,重置sizeCtl,保证值为-1表示当前有一个线程在初始化  
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

ps:上面的U.compareAndSwapInt(this, SIZECTL, sc, -1),可以了解下java中的Unsafe类,该类为java提供了类似c/c++的底层内存数据操作,ConcurretHashMap中 的乐观锁就是使用Unsafe中的CAS实现的(先比较,满足期望再赋值 Compare and Swap),下面按照ConcurrentHashMap中的使用简单介绍下

//在对象o中,如果内存中对象o所在地址偏移位为offset位置的数据和expected期望数据一样,则修改使地址偏移位为offset位置的数据为x
public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);

所以在ConcurrentHashMap中SIZECTL其实就是对象中的sizeCtl变量,具体可以看下面ConcurrentHashMap中的一段代码:

// Unsafe mechanics
    private static final sun.misc.Unsafe U;
    private static final long SIZECTL;
    private static final long TRANSFERINDEX;
    private static final long BASECOUNT;
    private static final long CELLSBUSY;
    private static final long CELLVALUE;
    private static final long ABASE;
    private static final int ASHIFT;

    static {
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentHashMap.class;
            //这里在静态块中就已经初始化了SIZECTL值就是ConcurrentHashMap中sizeCtl变量在内存的偏移量,其他的同理
            SIZECTL = U.objectFieldOffset
                (k.getDeclaredField("sizeCtl"));
            TRANSFERINDEX = U.objectFieldOffset
                (k.getDeclaredField("transferIndex"));
            BASECOUNT = U.objectFieldOffset
                (k.getDeclaredField("baseCount"));
            CELLSBUSY = U.objectFieldOffset
                (k.getDeclaredField("cellsBusy"));
            Class<?> ck = CounterCell.class;
            CELLVALUE = U.objectFieldOffset
                (ck.getDeclaredField("value"));
            Class<?> ak = Node[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }
    }

ConcurrentHashMap对于并发的控制,在jdk1.7和jdk1.8有不一样的实现,1.7中使用Segment分段锁,Segment是继承了ReentrantLock的类,其中包含 hash表中的某一段数据,这样不同的线程操作就可以按照操作位置使用不同位置的Segment来进行线程安全操作,这样就等于实现了hash表的分段锁定,在1.8中 没有使用分段锁,而是使用 CAS + synchronized来实现的并发控制

下面总结几个ConcurrentHashMap的问题:

1. ConcurrentHashMap的get操作没有加锁,那么它是怎么保证读取的数据是最新的?

transient volatile Node<K,V>[] table;

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

可以看到,ConcurrentHashMap将table修饰为volatile,这样保证了在扩容过程中对其他线程的可见性,但是为了保证get获取到的数组中的节点 数据是最新的,可以看到,数组节点对象中的next和val值都是volatile修饰的,这样保证数据被其他线程修改,当前get获取的线程可以及时更新到最新 数据; 这样的解决办法也是ConcurrentHashMap相比于使用get加锁来保证线程安全大大提高了效率的关键

2. ConcurrentHashMap怎样保证put操作线程安全的?

当添加数据时,先判断是否有初始化,如果没有就初始化,这个上面有详细介绍

if (tab == null || (n = tab.length) == 0)
    tab = initTable();

计算hash位置,判断当前元素需要插入哪个位置

i = (n - 1) & hash

如果这个位置为空则直接添加,如果不为空,则取出这个节点

 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }

如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制

else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);

最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加操作(这里和hashmap一样可能有转树的操作)

3. ConcurrentHashMap中,如果线程发现当前正在扩容,那么就帮助扩容

else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);

其中相关的方法是:helpTransfer和transfer()

说实话,这里的逻辑真的很多,我是参考的网上大佬的注释半解半疑,有兴趣可以详细了解下
参考文章

当前线程如果发现有线程在扩容了,并且当前线程目前待插入的数据所在hash槽的数据已经都迁移了,那么当前线程就加入到迁移其他hash槽的队伍中,这里迁移 数据也就是扩容操作,这个和hashmap中一样,也就是先生成长度是2倍大小的新数组,然后迁移数据(重新hash),并且在迁移过程中,ConcurrentHashMap也是使用 了分段锁,但是相比于jdk1.7,这里是针对单个hash槽来加锁的

Node<K,V> f; int fh;
.....
synchronized (f) {
.....

ps:ConcurrentHashMap真的是很牛的一个数据结构,关于这个类的知识点还有太多太多,但由于我个人水平有限,有时间再好好研究研究.