首页 > 编程语言 > 详细

JAVA并发(4)-并发队列ConcurrentLinkedQueue

时间:2021-06-05 11:24:35      阅读:34      评论:0      收藏:0      [点我收藏+]

本文开始介绍并发队列,为后面介绍线程池打下基础。并发队列莫非也是出队、入队操作,还有一个比较重要的点就是如何保证其线程安全性,有些并发队列保证线程安全是通过lock,有些是通过CAS。
我们从ConcurrentLinkedQueue开始吧。

1. 介绍

ConcurrentLinkedQueue是集合框架的一员,是一个***限且线程安全,基于单向链表的队列。该队列的顺序是FIFO。当多线程访问公共集合时,使用这个类是一个不错的选择。不允许null元素。是一个非阻塞的队列。

它的迭代器是弱一致性的,不会抛出java.util.ConcurrentModificationException,也可能在迭代期间,其他操作也正在进行。size()方法,不能保证是正确的,因为在迭代时,其他线程也可以操作该队列。

1.1 类图

技术分享图片
(显示的方法都是公有方法)

public?class?ConcurrentLinkedQueueextends?AbstractQueueimplements?Queue

继承至AbstractQueue,他提供了队列操作的一个框架,有基本的方法,add、remove,element等等,这些方法基于offer,poll,peek(最主要看这几个方法)。

2. 源码分析

2.1 类的整体结构

队列中的元素Node

private?static?class?Node{
????????//?保证两个字段的可见性
????????volatile?E?item;
????????volatile?Nodenext;

????????/**
?????????*?Constructs?a?new?node.??Uses?relaxed?write?because?item?can
?????????*?only?be?seen?after?publication?via?casNext.
?????????*/
????????Node(E?item)?{
????????????UNSAFE.putObject(this,?itemOffset,?item);
????????}

????????boolean?casItem(E?cmp,?E?val)?{
????????????return?UNSAFE.compareAndSwapObject(this,?itemOffset,?cmp,?val);
????????}

????????void?lazySetNext(Nodeval)?{
????????????//?putOrderedXXX是putXXXVolatile的延迟版本,设置某个值不会被其他线程立即看到(可见性)
????????????//?putOrderedXXX设置的值的修饰应该是volatile,这样该方法才有用

????????????//?关于为什么使用这个方法,主要目的肯定是提高效率,但是具体原理,我只能告诉大家跟内存屏障有关(我也不太清楚这一块,待我研究后,再写一篇文章)
????????????UNSAFE.putOrderedObject(this,?nextOffset,?val);
????????}

????????boolean?casNext(Nodecmp,?Nodeval)?{
????????????return?UNSAFE.compareAndSwapObject(this,?nextOffset,?cmp,?val);
????????}

????????//?Unsafe类中的东西,可以去了解一下

????????private?static?final?sun.misc.Unsafe?UNSAFE;
????????private?static?final?long?itemOffset;
????????private?static?final?long?nextOffset;

????????static?{
????????????try?{
????????????????UNSAFE?=?sun.misc.Unsafe.getUnsafe();
????????????????Class?k?=?Node.class;
????????????????itemOffset?=?UNSAFE.objectFieldOffset
????????????????????(k.getDeclaredField("item"));
????????????????nextOffset?=?UNSAFE.objectFieldOffset
????????????????????(k.getDeclaredField("next"));
????????????}?catch?(Exception?e)?{
????????????????throw?new?Error(e);
????????????}
????????}
????}

构造器1:

????//?private?transient?volatile?Nodehead;
????//?private?transient?volatile?Nodetail;
????public?ConcurrentLinkedQueue()?{
????????head?=?tail?=?new?Node(null);
????}

构造器2:

public?ConcurrentLinkedQueue(Collection?c)?{
????????Nodeh?=?null,?t?=?null;
????????for?(E?e?:?c)?{
????????????checkNotNull(e);
????????????NodenewNode?=?new?Node(e);
????????????if?(h?==?null)
????????????????h?=?t?=?newNode;
????????????else?{
????????????????t.lazySetNext(newNode);
????????????????t?=?newNode;
????????????}
????????}
????????if?(h?==?null)
????????????h?=?t?=?new?Node(null);
????????head?=?h;
????????tail?=?t;
????}

下面开始讲方法,从offer,poll,peek从这几个方法入手

2.2 offer

添加元素到队尾。因为队列是***的,这个方法永远不会返回false

分为三种情况进行分析(一定自己跟着代码debug,一步步的走)

  1. 单线程时(使用IDEA debug一直进入的是 else if把我搞迷茫了,我会写一个博客来解释原因)
????????ConcurrentLinkedQueuequeue?=?new?ConcurrentLinkedQueue<>();
????????queue.offer("A");
????????queue.offer("B");

以上面的代码,分析每一个步骤。
执行构造函数后:
技术分享图片

此时链表的head与tail指向哨兵节点

插入"A", 此时没有设置tail(‘两跳机制‘,这里的原因后面详见)

技术分享图片

插入"B",
技术分享图片

单线程情况比较简单

  1. 多线程offer时
?public?boolean?offer(E?e)?{
????????checkNotNull(e);
????????final?NodenewNode?=?new?Node(e);

????????for?(Nodet?=?tail,?p?=?t;;)?{
????????????Nodeq?=?p.next;
????????????if?(q?==?null)?{
????????????????//?p?is?last?node
????????????????//?只有一个线程能够CAS成功,其余的都重试
????????????????if?(p.casNext(null,?newNode))?{

????????????????????//?延迟设置tail,第一个node入队不会设置tail,第二个node入队才会设置tail
????????????????????//以此类推,?‘两跳机制‘
????????????????????if?(p?!=?t)?//?hop?two?nodes?at?a?time
????????????????????????casTail(t,?newNode);??//?Failure?is?OK.
????????????????????return?true;
????????????????}
????????????????//?Lost?CAS?race?to?another?thread;?re-read?next
????????????}
????????????//?这里是有其他线程正在poll操作才会进入,此时只考虑多线程offer的情况,暂不分析
????????????else?if?(p?==?q)
????????????????//?We?have?fallen?off?list.??If?tail?is?unchanged,?it
????????????????//?will?also?be?off-list,?in?which?case?we?need?to
????????????????//?jump?to?head,?from?which?all?live?nodes?are?always
????????????????//?reachable.??Else?the?new?tail?is?a?better?bet.
????????????????p?=?(t?!=?(t?=?tail))???t?:?head;
????????????else
????????????????//?Check?for?tail?updates?after?two?hops.
????????????????//?存在tail被更改前,和更改后的两种情况
????????????????p?=?(p?!=?t?&&?t?!=?(t?=?tail))???t?:?q;
????????}
????}

结合上面的代码,看图

  • 步骤一,线程A、线程B都执行到
???if?(p.casNext(null,?newNode))

技术分享图片

  • 步骤二,只有一个线程执行成功,假设线程A成功,线程B失败
    技术分享图片
    因为p(a) == t(a), 此时不执行casTail,tail不变。q = p.next, 所以此时q(b) = Node2 ,那么 p(b) != q(b), 线程B执行p = (p != t && t != (t = tail)) ? t : q;

线程B即将执行

???p?=?(p?!=?t?&&?t?!=?(t?=?tail))???t?:?q;
  • 步骤三 此时线程C进入。
    此时,p(c) != q(c), 线程C执行
???p?=?(p?!=?t?&&?t?!=?(t?=?tail))???t?:?q;

执行完后,q(c)赋值给p(c). 再次循环,此时,q(c) == null, 设置p(c)的next,线程C将值入队
技术分享图片

  • 步骤四 p(c) != t(c), 线程C执行casTail(t, newNode), 线程C设置尾结点
    技术分享图片
  • 此时线程B执行
???p?=?(p?!=?t?&&?t?!=?(t?=?tail))???t?:?q;

因为p(b) == t(b),所以 q(b) 赋值给 p(b)。继续循环,最后得到
技术分享图片

  1. 多线程的另一种情况,回到步骤三,此时线程C把值入队了,但是还没有设置tail
    技术分享图片
  • 线程B,将值入队成功
    在步骤三的基础上,线程B入队成功后,目前的状况如下:
    技术分享图片

此时,线程C执行casTail(t, newNode),但是现在的tail != t(c), CAS失败, 直接返回。

2.2.1 小结

上面不管是多线程还是单线程,都是努力的去寻找next为null的节点,若为next节点为null,再判断是否满足设置tail的条件。

多线程offer的第一种情况存在设置tail滞后的问题,我把它称之为"两跳机制",后面讲使用这种机制的原因。
我们看到上面的情况一直没有进入else if (p == q)分支,进入else if分支只会发生在有其他线程在poll时,我们先讲讲poll,再讲讲何时进入else if分支。

2.3 poll

删除并返回头结点的值

简单提一下单线程与多线程的poll,着重分析一下poll与offer共存的情况

  1. 单线程时
    技术分享图片
    单线程比较简单,就不画图了,按照上面的queue,进行一步一步的debug就行了

  2. 多线程,只有poll时

?public?E?poll()?{
????????restartFromHead:
????????for?(;;)?{
????????????for?(Nodeh?=?head,?p?=?h,?q;;)?{
????????????????E?item?=?p.item;

????????????????//?casItem这里只有一个线程能够成功,其余的继续下面的代码
????????????????if?(item?!=?null?&&?p.casItem(item,?null))?{
????????????????????//?Successful?CAS?is?the?linearization?point
????????????????????//?for?item?to?be?removed?from?this?queue.
????????????????????if?(p?!=?h)?//?hop?two?nodes?at?a?time
????????????????????????updateHead(h,?((q?=?p.next)?!=?null)???q?:?p);
????????????????????return?item;
????????????????}
????????????????else?if?((q?=?p.next)?==?null)?{
????????????????????updateHead(h,?p);
????????????????????return?null;
????????????????}
????????????????else?if?(p?==?q)
????????????????????continue?restartFromHead;
????????????????else
????????????????????p?=?q;
????????????}
????????}
????}
????final?void?updateHead(Nodeh,?Nodep)?{
????????if?(h?!=?p?&&?casHead(h,?p))
????????????//?将之前的头节点,自己指向自己,等待被GC
????????????h.lazySetNext(h);
????}

从上面代码可以看出,修改item与head都会使用CAS,这些变量都是被volatile修饰,所以保证了这些变量的线程安全性。不管是单线程还是多线程的poll,它们都是去寻找一个有效的头节点,删除并返回该值,若不是有效的就继续找,若队列为空了,就返回null。

最后分析一下,offer与poll共存的情况

  • 线程A做offer操作,线程B做poll操作,初始的状态如下:
    技术分享图片

  • 线程A进入。
    技术分享图片

  • 线程A将要执行

Nodeq?=?p.next;

线程B进入,进行poll操作
此时,线程B执行了一次内循环,将q(b)赋值给了p(b);
技术分享图片

  • 线程B再次执行内循环,此时将p(b).item置空,将p(b)赋值给head,之前的h(b)的next指向自己,线程B退出
    技术分享图片

  • 线程A执行

??Nodeq?=?p.next;

技术分享图片

此时,p(a).next 指向自己(等待被GC), 进入else if (p == q)分支,线程A退出,经过一番执行后,最后得到的状态,如下:
技术分享图片

进入else if (p == q)分支的情况,只会发生在poll与offer共存的情况下。

2.4 peek

获取首个有效的节点,并返回

public?E?peek()?{
????????restartFromHead:
????????for?(;;)?{
????????????for?(Nodeh?=?head,?p?=?h,?q;;)?{
????????????????E?item?=?p.item;
????????????????if?(item?!=?null?||?(q?=?p.next)?==?null)?{
????????????????????updateHead(h,?p);
????????????????????return?item;
????????????????}
????????????????else?if?(p?==?q)
????????????????????continue?restartFromHead;
????????????????else
????????????????????p?=?q;
????????????}
????????}
????}

peek与poll的操作类似,这里就贴一下代码就是了。

3. 总结

ConcurrentLinkedQueue是使用非阻塞的方式保证线程的安全性,在设置关系到整个Queue结构的变量时(这些变量都被volatile修饰),都使用CAS的方式对它们进行赋值。

  • size方法是线程不安全的,返回的结果可能不准确

关于“两跳机制”(自己取得名字),

Both head and tail are permitted to lag. ?In fact, failing to update them every time one could is a significant optimization (fewer CASes). As with LinkedTransferQueue (see the internal documentation for that class), we use a slack threshold of two; that is, we update head/tail when the current pointer appears to be two or more steps away from the first/last node.

Since head and tail are updated concurrently and independently, it is possible for tail to lag behind head (why not)? -- ConcurrentLinkedQueue

大致意思,head与tail允许被延迟设置。不是每次更新它们是一个重大的优化,这样做就可以更少的CAS(这样在很多线程使用时,积少成多,效率更高)。它的延迟阈值是2,设置head/tail时,当前的结点离first/last有两步或更多的距离。 这就是“两跳机制”

我们想不通的地方,可能是这个类或方法的一个优化的地方。向着大佬看齐~

JAVA并发(4)-并发队列ConcurrentLinkedQueue

原文:https://blog.51cto.com/u_15162069/2868260

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!