首页 > 编程语言 > 详细

java并发编程的艺术,读书笔记第六章 concurrentHashMap以及并发容器的介绍

时间:2017-08-28 19:37:52      阅读:373      评论:0      收藏:0      [点我收藏+]

ConcurrentHashMap的原理

将数据一段一段的存储然后给每一段数据分配一把锁,当线程访问数据的一段时,为每段分配一把锁,同时其他段的数据可以被其他线程数据访问

2)concurrentHashMap 的结构

concurrentHashMap 由segament数组和hashentry数组结构组成,segament是一种可靠的重入锁,在里面扮演锁的角色,hashentry用于存储键值对数据,一个segament里面包含一个hashentry数组,每个hashentry是一个链表结构的元素,每个segament守护着一个hashentry数组里的元素,当hashentry数组的数据进行修改时,必须首先获得与它对应的锁

初始化segament数组

if(concurrencyLevel>max_segament){

concurrencyLevel=max_segament;

}

int sshift=0;

int sszie=1;

while(ssize<concurrentcyLevel){

++sshift;

ssize<<=1;

}

segamentshift=32-sshift;

segamentMask=ssize-1;

this.segaments=Segament.newArray(ssize);

ssize 是2的n次方最接近concurrencylevel的数字。

segamentshift为段偏移量

segamentmark为段掩码

2定位segament:在插入或者获取元素的时候,必须通过散列算法对hashcode进行二次散列,之所以二次散列目的是减少散列冲突,使元素能够均匀的分布在不同的segament上,最坏情况,所有的值都散列在一个segament上

计算segament位置的伪代码如下:

final  Segament<K,V> segamentfor(int hash){

return segaments[hash>>>segamentshift&segamentMask]

}

ConcurrentHashMap 的操作:get put size

get伪代码如下:

public v get(Object hashcode)){

//此函数的作用是对hashcode进行二次散列,因为对效率有要求,故采用位移算法的方式

int hash = hash(hashcode);

return segamentfor(hash).get(key,hash);

}

get操作的高效之处在于在读取时不用加锁,除非是读到空值,重新加锁,那么如何做到不加锁?主要的方法在于使用volatitle变量,使之在线程间可见。能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写,根据happen-before原则对volatitle的写优先于读

定位hashentry和segament的算法虽然一样,但是值不一样

两个方法如下

hash>>>segamentshift&segamentMarsk 定位segament所使用的hash算法

int index=hash&(tab.length-1) //定位hashentry的算法

目的是在segament和hashentry中同时散列开

put操作:

put需要对共享变量进行写入,因此为了线程安全需要加锁

步骤1)判断是否需要扩容

注意:hashmap线插入后扩容,有可能产生无效扩容

concurrenthashmap 先扩容在插入。

扩容方式:将hashentry数组扩容至原来两倍,重新散列后插入,同时只会对segament进行扩容,不会全部扩容。

size操作

统计size的安全方式:在put clean remove方法时锁住变量,但这种效率非常低下

concurrenthashmap采用的方式是实用modcount变量 每进行一次 put  clean remove操作时 将变量 +1 然后在统计size前后进行比较时否相等,从而得知容器时否发生变化

concurrentlinkedqueue非阻塞队列

队列有两种实现方式阻塞队列和非阻塞队列,阻塞队列是采用阻塞算法,即使用锁来实现,非阻塞队列是使用cas循环来实现

concurrentlinkedqueue是一种无界队列采用了先进先出的方式进行了排序 ,采用了 wait -free算法

最简单的算法

offer(入队列)算法:

public boolean offer(E e){

Node node = new Node(e);

while(true){

Node tail=gettail();

if(tail.getnext().casNext(null,node)&&node.casTail(tail,node)){

return true;

}

}

}

//出队列 个人简化的伪代码,可能不准确欢迎指正

public e poll(){

while(true){

Node head = getHead();

currentNode = head.getNode();

//将head节点替换为下一个节点

if(head.cas(currentNode,currentNode.getNext())){

//断绝引用

currentNode.setNext()=null;

return current;

}

}

注意next 节点可以使用atomicReference 实现。

这样就可以实现cas操作

阻塞队列

阻塞队列是一个支持两个附加操作的队列,这两个操作支持阻塞的插入和移除

1)阻塞插入:当队列满时,队列阻塞所有的插入操作,直到队列不满

2)阻塞移除:当队列为空时,队列阻塞所有的移除操作,直到队列不空

阻塞队列常用语生产者和消费者场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程,阻塞队列是存放元素的容器

队列阻塞的四种处理方式

1)抛出异常

2)返回特殊值

3)一直阻塞

4)超时退出

java中常见的阻塞队列

1)ArrayBlockingQueue:是用数组实现的有界阻塞队列,此队列按照先进先出的原理进行排序

2)linkedBlockingQueue:是用链表实现的有界阻塞队列此队列的最大长度为Integer.MAX_VALUE 值

3)PriorityBlockingQueue:通过compartor方法或者compareTo方法进行排序

4)DelayQueue:是一个支持延迟获取元素的无界阻塞队列,使用pritorityQueue队列,实现Delay接口

delayqueue非常有用

1)缓存设计:可以使用delayqueue保存缓存元素的有效期,使用一个线程循环查询delayqueue,当查到元素时,表示该缓存到期了

2)定时任务调度:使用delayqueue保存当天将会执行的任务和执行时间,当从delayqueue查询到了任务时就开始执行。

如何实现delay接口

1)在创建对象时初始化

2)实现getdelay方法,返回执行还需要的时间

3)实现compareTo方法,将返回时间最长的放在最后面

4)linkedblockingdqeque:由链表组成的双向阻塞队列;可以用在工作-窃取模式中

阻塞队列实现原理:

1)使用通知模式实现,主要通过condtion方式实现队列阻塞

伪代码如下:

Lock lock = new RetreenLock();

private final Condition notempty =lock.newCondition();//表示可以获取

private final  Condition notFull = lock.newConditon();//表示可以插入

//插入方法

public void put(E e){

final RetreenLock lock =this.lock;

lock.lockinterrupt();

try{

while(e.length=max_count){

notfull.await();

}

insert(e);

}finally{

lock.unlock();

}

private void insert(E e){

items[putIndex]=e;

putIndex=inc(e);

++count;

notEmpty.signal();//表示通知take线程可以获取

}

//获取方法

public E take(){

final Reentrantlock lock = this.lock;

lock.interruptibly();

try{

while(item.length==0){

//表示队列为空,线程阻塞

notempty.await();

}

return extract();

}finally{

lock.unlock();

}

}

fork/join框架

fork/join框架是jdk 1.7推出的新特性,用于一个并行执行的任务框架

tips:并行与并发

并发的实质是一个物理CPU(也可以多个物理CPU) 在若干道程序之间多路复用,并发性是对有限物理资源强制行使多用户共享以提高效率。

并行性指两个或两个以上事件或活动在同一时刻发生。在多道程序环境下,并行性使多个程序同一时刻可在不同CPU上同时执行。

原理:fork是把一个大任务拆分成若干个小任务,join就是合并这些小任务得到的结果。最后得到这个大任务的结果.

工作窃取算法(working-stealing)

是某个线程从其他队列里获取任务用来执行,应用场景:如某个大任务A拆分成多个互不依赖的子任务放入不同的队列中,当某些队列中的任务执行完毕,另外一些队列中的任务还未执行完,于是执行完队列的线程就会去执行其他还有任务的队列的任务,(比如 线程x 发现队列a中的任务没有了,那么就去执行队列b中的任务)

这样做的好处是;充分利用线程间并行计算,减少线程间竞争

这样做的坏处是:某些情况下存在竞争。如任务队列中只有一个任务。并且会消耗更多的系统资源,比如创建队列,创建线程池

注意点:为了减少竞争,通常采用双端队列,窃取方式为从队尾获取。正常线程从头部获取

fork/join 主要有两个工作步骤

1)分割任务

2)执行任务并合并结果

RecursiveAction 用于执行没有返回结果的任务

RecursiveTask 用于执行有返回结果的任务

forkjointask 需用由forkjoinpool来执行

使用方法:

首先继承RecursiveTask 通过重写compute方法来拆分任务并且fork执行,join合并

//main方法使用

public static void main(String[] args){

//创建线程池

ForkJoinPool pool = new ForkJoinPool();

//创建主任务

CountTask task = new CountTask();

//执行任务

Future<Integer> result = pool.submit(task);

system.out.println(result.get());

}

异常处理代码

//时否是异常处理完成

if(task.isCompletedAbnormally()){

Sysmte.out.println(task.getException());

FORK/JOIN框架的实现原理

1)fork的实现原理

调用puttask放入任务,采用异步调用

public final forkJoinTask<V> fork(){

((ForkJoinWorkThread)Thread.currentThread).putTask(this);

}

2)putTask把当前任务放入任务数组中,然后调用ForkJoinPool的signalwork来唤醒或者创建一个新的线程

伪代码如下

public final void pushTask(ForkJoinTask<> t){

ForkJoinTask <> [] q;int s,m;

if(q=queue)!=null){

//计算出偏移量

long u =( (s=queueTop)&(m=queueTop.length-1))<<ASHIFT+ABASE;

//根据偏移量直接刷新主内存

unsafe.putOrderObject(q,u,t);

queueTop=s+1;

if(s-=queueBase<=-2){

//唤醒工作线程

signalwork();

}else{

//创建新线程

growQueue();

}

}

}

1)join方法原理,首先查看任务时否执行完成,如果完成,则直接返回完成状态

2)如果没有完成,则从数组里取出任务并行执行,如果正常,则返回normal如果异常则返回exception

如果正常则返回结果


本文出自 “iter工作总结” 博客,请务必保留此出处http://5855156.blog.51cto.com/5845156/1959758

java并发编程的艺术,读书笔记第六章 concurrentHashMap以及并发容器的介绍

原文:http://5855156.blog.51cto.com/5845156/1959758

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!