首页 > 编程语言 > 详细

java并发实践笔记

时间:2014-03-25 10:48:27      阅读:1329      评论:0      收藏:0      [点我收藏+]


底层的并发功能与并发语义不存在一一对应的关系。同步和条件等底层机制在实现应用层协议与策略须始终保持一致。(需要设计级别策略。----底层机制与设计级策略不一致问题)。

简介

1.并发简史。(资源利用率/公平性/便利性),进程通信通过粗粒度通信机制:文件/套接字/信号量/信号处理器/共享内存。高效做事----串行和异步好的平衡。

线程共享文件句柄和内存句柄,都有自己的程序计数器、栈、局部变量;都访问堆中内存,需要更细粒度的内存共享机制。

2.线程优势

降低程序开发维护成本,提升性能(将异步工作流转为串行工作流,模拟人类交互;降低代码复杂度)

(1)发挥多处理器计算能力

(2)建模的简单性(多项任务,串行编写--在关键位置同步)。将执行逻辑与调度机制的细节、交替执行的操作、异步IO以及资源等待等问题分离。Servlet/RMI.

(3)异步事件的简化处理.使用线程和同步IO。(非阻塞io的复杂性高,不如多线程用阻塞Io)。java.nio (早期os可建立的线程少,需要非阻塞io).

(4)响应更灵敏的用户界面。

3.线程风险

(1)线程安全. 竟态条件,多线程下能否返回唯一的值。

(2)活跃性问题。 无意中造成的无限循环,使得循环后的代码无法执行。 (活跃性的目标——-正确的事情最终会发生)。死锁、饥饿、活锁等。(依赖不同线程事件发生时序)

(3)性能问题。频繁出现上下文切换(保存和恢复上下文,丢失局部性,更多时间在线程调度非运行)。(同步机制让内存缓存数据无效,增加总线同步流量)

4.线程广泛使用。gui创建线程管理用户界面事件;Timer将创建线程执行延迟任务。(框架创建线程池调用线程方法)。

访问框架多线程的应用程序状态的代码路径必须线程安全。  线程安全的要求源自多线程模块,可能会延伸到整个程序。

(1)Timer.   TimerTask在Timer管理的线程运行,不由应用程序管理。TimerTask访问应用程序其他线程会访问的数据,都需要用线程安全的方式访问。(最好把线程安全封装在共享对象内部)

(2)Servlet/jsp。Servlet需要满足同时被多个线程调用,需要线程安全的。与其他servlet共享的信息,如serveltContext中的对象/HttpSession.(当多个servlet访问其中的对象时保证线程安全)。

(3)rmi。同一个远程对象的远程方法会不会被多个rmi线程同时调用。(协同多个对象中共享的状态,对远程对象本身状态的访问)

(4)swing/awt. JTable不是线程安全的,事件处理器访问由其他线程访问的状态要考虑。

一、并发编程的基本理论

(一)线程安全性

核心对状态访问的操作进行管理(特别可变的和共享的状态)

可变性:状态可变的。状态不仅依赖于对象的域,还包括依赖对象的域。

共享:访问模式(多线程访问),同步机制synchronized、volatile、显式锁、原子变量。

修复线程问题的办法:不共享变量;状态变量改为不可变;访问状态变量时用同步。一开始设计线程安全类,比后改容易的多(找那些位置访问同一个变量复杂)

封装有助于写出线程安全的类,易对所有访问实现同步、易找出变量在哪些条件访问,封装越好越易实现线程安全。设计线程安全类起作用的特性:面向对象/不可修改性/明晰的不变性规则。(先让程序跑起来,再优化)

1.线程安全

多个线程访问某个类时,不管运行环境用哪种调度方式或者这些线程将如何交替执行,并且在主调函数里不需要额外的同步或协同,这个类都能表现出正确的行为,就称这个类是线程安全的。

无状态的servlet,没有状态变量。大部分servelt线程安全

2.原子性

(1)竟态条件。计算的正确性取决于多线程交替执行的顺序时。情况:先检查后执行、读取修改写入。

(2)复合操作。如果竟态条件的复合操作是原子操作,竟态条件就不会发生。单个状态变量时可以使用原子变量类。如AtomicLong/AutomicReference.

3.加锁机制

(1)多线程操作的类(如servlet)有多个状态的时候用原子类满足不了要求了,需要用加锁机制。因为竟态条件又出现。不可变条件不能破坏(如因式分解的积与因子的不变性)

(2)内置锁。synchronized.    加在实例方法上,默认是用的本实例做锁对象,用在类方法(静态方法)上用本类对象做锁对象。synchronized(lock){}  

相当于互斥体,锁保护的代码会原子执行。任意一个执行同步代码块的线程都不能看到有其他线程在执行由同一个锁保护的同步代码块。

(3)重入:获取锁操作的粒度是线程不是调用,实现方式是为每个锁关联一个获取计数值和一个所有者线程。有了重入机制,子类重写了synchronized方法时调用父类方法不会死锁。【与pthread-posix线程不同,pthread以调用为粒度】

4.用锁保护状态

锁保护代码以串行的方式访问。

(1)仅用锁不够,用同步来协调对某个变量的访问,那么在访问这个变量的所有位置上都要同步且用同一个锁。(同时保证原子性和可见性)

只对写操作同步也不对,需要在读操作上用同步才能保证可见性。

(2)内置锁与状态没关系,用内置锁为了避免显式的创建对象;每个共享可变变量都要用同一个锁,且使维护人员指导。FindBugs会找出类似bug.

(3)对含有多个变量的不变性条件,其中涉及的所有变量都要用同一个锁保护。

5.活跃性与性能

(1)缩小同步块的范围,也不能过小。

(2)必须用synchronized的地方不要用原子类。 同步块大小需要简单性/安全性/性能平衡。

(3)执行时间长的计算或者可能无法快速完成的操作(网络io/控制台io)一定不要持有锁。

(二)对象的共享(可见性和共享对象的发布)。同步的另一方面:内存可见性。

1.可见性

代码会重排序,不能对操作的执行顺序进行判断。使用正确的同步保证多线程共享。

(1)失效数据。缺乏同步程序可能错误的情况。(几个变量可能仅有一部分失效),引入令人困惑的故障。

对get也要synchronized后才会获得可见性。

(2)非原子的64位操作。long/double.  除非synchronize或者volatile保证。

(3)加锁与可见性

加锁不只实现原子性还包括可见性,确保所有线程都能看到共享变量的最新值,所有执行读操作或者写操作的线程必须在同一个锁上同步。

(4)volatile关键字

保证可见性和内存操作不会重排序。volatile关键字修饰的变量不会存在与寄存器或者其他处理器不可见的地方。volatile不进行加锁就没有阻塞,轻量级的同步。

写入volatile变量相当于退出锁,读取相当于进入锁。不要过度依赖volatile变量。

只有能简化代码实现或者对同步策略验证时才用:确保自身状态可见性,确保引用状态可见性,标识生命周期事件的发生(初始化或关闭)。

volatile通常用于标识某个操作完成/中断或者其他状态标志。

满足下面所有条件才用volatile

---对变量写入不依赖原来值或者只单线程更新

--变量不会与其他变量一起纳入不变性条件

--访问变量时不需要加锁

2.发布和逸出

(1)发布位置:一个指向该对象的引用保存到其他代码可以访问的地方,或者在某个非私有方法中返回该引用,或者引用传递到其他方法中。不该发布的对象发布叫逸出。

(2)公共静态变量

(3)返回自身引用

(4)传入外部方法。【不是private也不是final的方法】

这些情况必须假定会误用该对象,需要使用封装的主要原因。封装:使得对正确性分析可能,无意中破坏设计约束条件变难。

(5)发布一个内部类的实例。不要在构造函数让this引用逸出,不正确构造。 this引用常见错误,在构造函数启动线程。(可以构造但是不要在构造函数启动)

构造函数注册监听器或启动线程,额可以用私有构造函数和公共的工厂方法来实现。

3.线程封闭

单线程访问就不需要同步了。 如connnection不是线程安全,可以让获取connection的方法安全,每个线程一个connection,不可能同时几个线程共用即可。

(1)ad-hoc线程封闭。线程封闭性的职责由程序实现承担。Ad-hoc封闭脆弱。事实上对线程封闭的对象常保存在公有变量中。

通常使用特定的子系统实现为单线程子系统(如gui). volatile变量存在特有的线程封闭,保证只有一个线程写即可。

(2)栈封闭。 封闭在执行线程中,只有局部变量访问对象,任何方法都无法获得对基本类型的引用,适合做栈封闭。栈封闭需要程序员保证不会逸出。

(3)TheadLocal类。用于防止对可变的单实例变量和全局变量共享。

如可以将jdbc连接放入threadlocal中。

bubuko.com,布布扣

这种机制很方便,经常滥用(作为隐藏方法参数手段),但是会降低代码的可重用性。

4.不变性

(1)不可变定义:创建以后不能改状态;所有字段final;对象正确创建(this没逸出)。 不可变对象和不可变对象引用有差异,对象不可变但是引用可以变化。

(2)final域。c++ const受限版本。java内存模型能保证final域初始化过程的安全性,可以不受限制的访问不可变对象,共享时不需要同步。能Final的都声明final.

volatile类型发布不可变对象。(多个状态字段组成一个不可变对象,每次重新构建一个,然后volatile修饰)

5.安全发布

(1)不正确发布,正确对象破坏。 (放到公共区域的字段容易出现)

(2)不可变对象提供初始化安全性保证,没有同步也可以安全访问。(发布时也可不用同步)。没有满足三个条件,只是Final类型的域如果可变还需要同步。

(3)安全发布常用模式。

   在静态初始化块中初始化对象引用;

   将对象应用保存到volatile类型的域或者AtomicReference域中;

   对象引用保存到某个正确构造对象的final类型域中;

   对象引用保存到一个由锁保存的域中。

【向线程安全的集合类加对象会保证安全发布; 还有数据传递机制--Future/Exchanger】

(4)事实不可变对象。没有额外同步,任何线程都可以安全用被安全发布的事实不可变对象。

(5)可变对象。安全发布只能确保发布当时的状态;不仅发布时要同步,每次访问都要用同步保证修改操作可见性。

对象发布取决于它的可见性:不可变对象任意发布;事实不可变通过安全方式发布;可变对象安全发布,必须线程安全或者锁机制保护起来。

(6)安全的共享对象。

发布对象时说明对象的访问方式。

线程封闭。

只读共享(不可变和事实不可变)

线程安全共享(线程安全的对象内部实现同步)

保护对象(通过持有锁访问)。

(三)对象的组合。(不希望每次写都要分析确保安全,希望组装线程安全类)

1.设计线程安全的类。封装使不对程序分析就可以判断类的安全。

(1)设计线程安全类,需要包含三个基本要素:找出构成对象的所有变量;找出约束变量的不变性;建立对象状态访问管理策略。

同步策略:不违背对象不变性或者后验条件的情况下对状态访问协同。(规定了如何用不可变性,封闭性,加锁等结合,规定哪些变量由哪些锁保护,确保方便分析维护(同步策略形成文档)

(2)收集同步需求。状态区间尽量少(多用Final).  后验条件确保迁移有效(++,17下一个条件18).   ------------不变性条件和状态转换施加了很多约束,需要同步和封装。

上下界NumberRange/原子性需求。 (满足约束和有效条件,借助于原子性和封装性)

(3)依赖状态的操作。(先验条件)等待某个条件为真的各种内置机制都与加锁机制有关(含等待和通知机制)。 使用现有库(BlockingQueue/信号量semaphore)来实现依赖行为。

(4)状态所有权。根节点对象图的子集构成状态。(只考虑对象拥有的数据,所有权在java中没有很好体现 hashmap---自己和Map.entry哪些对象)。

gc让我们避免处理所有权问题,减少了许多引用共享方面常见的错误。所有权与封装性关连--对象拥有状态,对他封装状态有所有权;状态变量的所有者决定采用何种枷锁协议维持状态完整性,所有权意味着控制权;发布应用,共享控制权;构造函数或者方法中传递进来的对象类通常不拥有他们,除非设计来转移所有权---如同步容器封装的工场方法。

容器类--所有权分离:容器类拥有自身状态;客户代码拥有容器中对象的状态。如ServletContext  注册和get都必须线程安全,使用里面的对象需要同步,容器只是暂时管理。防止并发引起的问题需要保护起来或者线程安全对象。

2.实例封闭

(1)对象封装在类的内部,可以将数据的访问限制在方法上,更容易确保线程访问数据时总持有正确的锁。(封装后代码路径已知). 对象一定不要超出他们既定的作用域(超出就逸出)。

实例封闭是构建线程安全类最简单方式,不同的状态可以用不同的锁来锁定。本该封闭的对象发布出去,也会破坏封闭性。

java中有很多线程封闭的实例,Collections.synchronized*()方法的用途就是把不安全的变成线程安全的。(通过装饰器模式)---对底层的访问都通过包装器进行。

(2)监视器模式:将对象的所有可变状态封装起来,主要优势简单。这种模式是对代码的一种约定,对于任何一种锁对象自始至终都要用这个对象。如private final Object myLock=new Object(). 用私有的锁,有更好封装性,让客户端得不到锁(也可以共有方法提供,参与同步策略);如果获取了错误锁,会引发活跃性问题。

(3)所有用到可变对象的地方都加入synchronized。  监视器模式有时性能有问题。

3.线程安全性的委托(用现有的安全类)

(1)包装一个不变的对象做底层(如不变的Point类)。容器对象使用concurrentMap.  如果修改不变的Point类,每次用replace替换一个新值。(这个会反应实时变化;如果不要反应变化可以返回拷贝。)

(2)多个独立的状态变量,可以委托到底层安全类。多个状态变量变量相关时,委托机制会失效。(可以将多个变量封装成一个类,维护不变性)----------一个类由多个独立且线程安全的状态变量组成,在所有操作中都不包含无效状态变换,可以安全性委托给底层的状态变量(跟volatile规则类似);如果可能破坏,需要枷锁机制。

(3)发布底层状态变量。状态变量发布的条件,它是安全的;没有任何条件来约束它的值,变量上的操作也不存在任何状态转换,就可以发布这个变量。

似有构造函数的捕获模式。可以用Private A(Int[]) 来让public A(int ,int)不产生竞态条件。

4.现有的线程安全类添加功能

(1)客户端加锁:获得一个跟原有类一样的锁再添加。(或者修改原有类/扩展类)

直接用synchronize并不能获得线程安全性,与原类方法使用了不同的锁,需要知道要扩展的类的锁才可以。

(2)组合。上面的方法强耦合。

5.将同步策略文档化

在文档中说明客户代码了解的线程安全性,以及代码维护人员了解的同步策略。(synchronized/volatile/其他安全类都对应于某种策略,用于并发时的数据安全性。

包括--哪些操作原子,哪些声明为volatile/哪些变量锁保护/锁保护哪些变量。 没有声明安全就不要认为安全。

有些接口的安全性从基本功能上能猜测---httpsession/datasource/servletContext.

(四)基础构建模块

1.同步容器类. Vector/HashTable/Collections.synchronized*()方法。(这些类实现方式:状态封装,每个共有方法同步)

(1)问题:ArrayIndexOutOfBoundsException,在读取的同时,其他线程修改容器。修改办法----客户端加锁的办法,获取同步容器类的实例做锁。size()和get()迭代时,还是可能抛出异常,迭代时加锁会降低并发性。

(2)ConcurrentModificationException. Iterator遍历时容器发生改变。(hasNext/next会访问是否改变的状态,变化了就抛出这个异常)

解决:客户端加锁,迭代时加锁影响并发。  另一个解决办法就是克隆容器,在副本迭代。(也有性能开销,好坏取决于大小/元素上执行的操作/迭代的频率)

(3)隐藏迭代器:必须对所有用到迭代器地方都要加锁。hashcode/equals/tostring/containsAll/removeAll/retainAll.

2.并发容器.增加常见操作,增加并发性。(QUEUE/BlockingQueue/ConcurrentLinkingQueue/PriorityQueue) ConcurrentSkipListMap,ConcurrentSkipListSet(对应treeset/treemap)

(1)ConcurrentHashMap:使用了不同于同步的hashmap的策略,不是在每个方法上同步并使得只有一个线性容器;使用了更细粒度的加锁机制来实现更大程度共享,称为分段锁(lock stripping)。

这种机制下任意数量的读取线程可以并发访问Map,一定数量的写入线程可以并发修改map(多线程更高吞吐量,单线程轻微损耗)。

不会有ConcurrentModificationException。 返回的迭代器有弱一致性,并非及时失败(弱一致性容忍并发修改;遍历容器可以但不保证修改反应给容器);size/isempty的结果可能已经过期。

没有实现加锁进行独占访问。(不常见的情况需要这个功能:如保证迭代的顺序)。 只有需要对map独占访问时才不用这个类。

(2)额外的map操作:putIfAbsent/remove 映射到v才删除/replace  (k,o,n) 值是o才替换为N/replace(k,n)有值时才替换。

(3)copyOnWriteList:遍历为主的操作用它。每次修改就发布一个副本。迭代多于修改时才用它。

3.阻塞队列和生产者-消费者模式

(1)常见:线程池与工作队列的组合;Executor执行框架。

offer方法数据项不能添加到队列中,会返回失败状态;可以用来处理负载过载的情况,如减轻负载,将多余的工作序列化写入磁盘,减少生产者线程数量(通过某种方式抑制生产者线程)。设置右边界的阻塞队列考虑的越早越好,如果阻塞队列不满足条件,还可以用semaphore创建更多的阻塞数据结构。

实现:LinkedBlockingQueue/ArrayBlockingQueue/PriorityBlockingQueue(Comparable/comparator). SynchronousQueue没有存储(take/put一直阻塞,不用入队和出队;足够的消费者时适合用这种)

(2)分解为更简单的组件。消费者线程永远不会退出,程序无法终止,后面介绍如何解决;还可以通过Executor任务执行框架实现(本身也是生产消费者模式)。

(3)串行线性封闭。 所有权的安全递交。除了阻塞队列,ConcurrentMap的原子方法remove或者AutomicReference的原子方法CompareAndSet来完成工作。

(4)双端队列,工作密取:Deque/BolockingDeque.  (生产者消费者中所有消费者有一个共享的工作队列,工作密取模式中每个消费者都有各自的双端队列,一个线程完成了双端队列的所有工作,可以从其他消费者的双端队列秘密的获取工作----更高的伸缩性,不在单个共享的队列上操作,在自己的队列操作,需要从其他线程获取时不从头从尾部获取)。适应:既是生产者又是消费者的问题---如网页爬虫,爬着发现有更多的页面要处理。 搜索图算法--垃圾回收阶段对堆进行标记(工作密取高效并行)----加入自己队列(或在工作共享模式中加入其它工作者线程的队列)。

4.阻塞方法与中断方法。Blocked/waiting/timed_waiting.(与执行长时间的方法区别---阻塞是因为必须等待不受控制的事件完成)

BlockingQueue的put/take等方法会抛出异常---抛出InterruptedException时表示是一个阻塞方法。Thread的interrupt方法,用于中断线程或查询线程是否已经终端(每个线程有个表示是否中断的属性,表示这个状态或者终端时设置)。

中断是协作机制,一个线程不能强制其他线程中止(A终端B,仅仅是要求B在可以暂停的地方停止正在运行的操作,前提B愿意停下来).中断最常用的操作还是取消这个操作(中断请求响应度越高,越容易及时取消执行时间很长的操作)。调用了抛出InterruptedException异常方法自己的方法就成了阻塞方法,就必须对中断处理,库代码处理的两种方式:传递InterruptedException,恢复中断(Thread.currentThread().interrupt())。

5.同步工具类:可以是任何对象,只要它能根据自己的状态协调线程的控制流。

(1)闭锁(Latch)。CountDownLatch.              latch.countDown() ----等latch降到0时,在latch.await()方法处所有的线程都可以通过。

(2)FutureTask.  通过Callable实现,类似于可以产生结果的Runnable. 可以有三种状态:等待运行、正在运行和运行完成。

执行结束的状态:正常/异常结束/取消结束。Future.get行为没有完成会阻塞等到完成。完成了会返回结果或者抛异常。FutureTask常在executor框架中表示异步任务,还可以执行时间较长的运算。futureTask可以传递给Thread.

-----ExecutionException的处理三种情况:Callable抛出的受检查异常;RuntimeException;Error.

(3)信号量。Semaphore.可以用实现资源池。new Semaphore(bound).   remove是移除一个许可,release是创建许可(semaphore并不受限于创建时的大小)

(2)栅栏(Barrier)。与闭锁的区别:所有线程都到达栅栏才会继续(阻塞直到某个事件发生。CylicBarrier.(所有线程都调用await()));闭锁用于等待事件,栅栏用于等待其他线程。

所有线程通过栅栏后,栅栏可以重置(await中断,所有的await抛出BrokenBarrierException.) 成功通过栅栏,await会为每个线程返回一个唯一的到达索引号,可以用这些索引选择领导者线程,并在下次迭代中由该领导线程执行一些特殊的工作。还可以给cylincBarrier传递runnable在通过栅栏后执行。new Barrier的时候要指定barrier的数量和runnable.

另一种形式的栅栏是Exchanger,一种两方栅栏,各方在栅栏位置处交换数据(两方执行不对称的操作时,Exchanger会很有用;如一个线程向缓冲区写入数据,另一个读入数据-线程可以用exchanger汇合,可以把这俩对象安全发布到另一方。)

数据交换的时机取决于应用程序响应需求。最简单缓冲区满的时候,由填充任务进行交换;空的时候由清空任务进行交换(这样交换次数降到了最低)。(另一个方法填满交换---填充到一定程度保持一段时间也要交换)。

6.构建高效且可伸缩的结果缓存

(1)计算任务缓存。 ConcurrentHashMap /Futuretask来缓存任务(putIfAbsent())。

(2)缓存逾期的问题。通过FutureTask解决,为每个结果指定一个逾期时间,定期扫描逾期元素。(没有解决缓存清理问题,移除旧的计算结果)

(五)总结-主要概念和原则

1.可变状态是至关重要的(所有并发问题都可以归结为如何协调并发状态访问。可变状态越少,越容易确保线程安全性)。

2.尽量将域声明为final类型,除非需要他们是可变的。

3.不可变对象一定是线程安全的。不可变对象能极大地降低并发编程的复杂性。他们更为简单且安全,可以任意共享而无须使用加锁或保护性复制等机制。

4.封装有利于管理复杂性。编写线程安全的程序时,虽可以都保存在全局变量中,但封装在对象中更易于维持不变性条件:将同步机制封装在对象中,更容易遵循同步策略

5.用锁来保护每个可变变量。

6.当保护同一个不变性条件中的所有变量时,要使用同一个锁。

7.在执行复合操作期间,要持有锁。

8.如果从多个线程中访问同一个可变变量时没有同步机制,那么程序会出现问题。

9.不要故作聪明地推断出不使用同步。

10.在设计过程中考虑线程安全,或者在文档中明确地指出它不是线程安全的。

11.将同步策略文档化。

二、并发应用程序的构造理论

(一)任务执行

1.线程中执行的任务。串行/每个请求一个线程..    自然的事务边界。

(1)清洗的任务边界和明确的任务执行策略。

(2)结论:任务处理过程从主线程中分离出来,使得主线程能更快的等待下一个到来的连接,提高响应;同时服务多个线程;代码必须线程安全。

(3)每个任务分配线程不足:线程生命周期开销很高;资源消耗;稳定性。

2.Executor执行框架。任务是一组逻辑工作单元,线程是任务异步执行机制。java.util.concurrent提供了灵活的线程池作为executor一部分。任务执行的主要对象不是thread,而是executor.  public interface Executor {void executo(Runnable command);} 异步执行框架的基础。(这个框架支持不同类型的执行策略)。

(1)任务的提交和执行解耦;Executor的实现还提供对声明周期的支持,统计信息收集,应用程序管理机制和性能监视等机制。

executor基于生产者消费者模式。提交任务相当于生产者,执行任务相当于消费者。execute(runnable)-提交;execute方法的具体实现是执行。

executor的配置是一次性的,而提交任务的代码会不断扩充到整个程序。

(2)执行策略:what/when/where/how. 执行策略是一种资源管理工具,最佳策略取决于可用资源及对服务质量的需求。(限制并发任务数量). new Thread(runnable).start都可以用executor代替thread.

(3)线程池。 工作队列(work queue).

Executors 静态工场方法:newFixedThreadPool/newCachedThreadPool/newSingleThreadExecutor/newScheduledThreadPool.

newFixedThreadPool/newCachedThreadPool 返回ThreadPoolExecutor实例,可以用用来构造专门用途的Executor.

(4)Executor的生命周期。 ExecutorService.(关闭粗暴/平缓).

 

public interface ExecutorService extends Executor {

   void shutdown(); //平缓                    关闭后提交的任务由拒绝执行处理器(rejected Execution handler来处理,抛弃任务)。

   List<Runnable> shutdownNow();//粗暴

   boolean isShutdown();

   boolean isTerminated();

   boolean awaitTermination(long timeout, TimeUnit unit)  //等待到终止状态。

       throws InterruptedException;

   //  ... additional convenience methods for task submission

 

 

}

(5)延迟任务与周期任务。Timer负责这类任务,但是存在缺陷,应该考虑用ScheduledThreadPoolExecutor代替它。ScheduledThreadPoolExecutor的搞糟函数或者通过new ScheduledThreadPool工场方法创建。

Timer缺点:只在一个线程Timer管理的线程中运行,长时间任务会影响执行;如果抛出异常,Timer会关闭(新任务不调度--线程泄漏);只支持绝对时间,对系统时钟变化敏感。

如果要自己构建调度任务,可以使用DelayQueue,它实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度功能。DelayQueue管理一组Delay对象,delay对象都有一个相应的延迟时间;delayqueue只有元素逾期后才能从delayqueue中执行take.从delayQueue中返回的对象将根据延迟时间排序。

3.找出可利用的并发性。(1)异构任务并发没有太大优势。只有大量独立同构的任务可以并发处理时,才能体现出将程序的工作负载分配到多个任务带来的提升。

(2)携带结果的任务Callable与Future. Executor一些辅助方法能将其他类型的任务封装为Callable,如Runnable/java.security.PriviledgedAction.

Callable/Runnable都是抽象的计算任务,有范围(都有明确起始点,并且会结束)---创建/提交/开始/完成。

Executor框架中,已提交未开始的任务可以取消,已经开始的只能响应中断取消。

Future表示一个任务的声明周期,提供了相应的方法,提供方法判断完成/取消,以及获得任务的结果和取消任务等。

有许多方法可以创建Future描述任务,ExecutorService的submit方法会返回一个Future.(提交Runnable/Callable). ---调用newTaskFor/--然后调用new FutureTask.

Future get有状态依赖特性,调用者不依赖任务的状态(提交和获得结果包含安全发布属性也确保这个方法是线程安全的)。Future.get()---两个问题(任务本身遇到Exception/get结果之前被中断)。

(3)CompletionService/BlockingQueue.   ExecutorCompletionService. 将executor与blockingqueue融合在一起,将Callable任务提交给他来执行,用类似队列take/poll方法获取结果,结果完成时封装为Future.

ExecutorCompletionService实现简单,在构造函数中创建一个BlockingQueue,当计算完成时,调用FutureTask的done方法(重写done,继承FutureTask,包装为QueueingFuture改写done)

多个ExecutorCompletionService可以共享一个Executor.

(4)为任务设置时限。不在指定的时间内完成就不需要结果了。支持时间先吃的Future.get方法Future.get(timenum,unit)

使用限时任务注意,,停止超时的任务(超时会抛出TimeoutException 可在异常处理中调用Future.cancel())。如果编写的任务是可以取消的可以用Future.cancel()。

(5)创建n个任务给线程池,保留n个future.使用限时的get方法通过Future获取每个结果。还可以用线程池的invokeAll方法。返回future的列表。

 

(二)取消与关闭

java只提供了中断协作机制,使一个线程终止另一个线程的当前工作。

1.任务取消。外部代码能在某个操作正常完成之前就将其置入“完成”状态,那么这个操作就可以称为可取消的(Cancellable).

取消的原因:用户请求取消;有限时间的操作;应用程序事件;错误;关闭。java中没有抢占式方法停止线程,遵循商量好的协议。

(1)方法一请求取消标志。任务定期的查看已请求取消标志,如果设置了它那么任务提前结束)。cacelled=true.

可取消的任务要有取消策略(how/when/what. 其他代码如何请求)

(2)中断。用标志法一个阻塞任务可能永远不检查取消标志,永远不会结束。特殊的阻塞库方法支持中断(中断是协作机制)通过这种机制通知另一个线程,告诉它在合适的或者可能情况下停止线程)。 虽然没有将中断与取消语义关联,取消以外的其他操作使用中断,一般都是不合适的--难以支撑大应用。

每个线程都有一个boolean的中断状态,中断时这个线程的这个状态为true.(Thread中包含了中断线程/检查线程中断状态方法)

 

public class Thread {

   public void interrupt() { ... }

   public boolean isInterrupted() { ... }

   public static boolean interrupted() { ... }  //清除中断状态。

   ...

}库方法Thread.sleep/Object.wait相应中断时的操作,清除中断状态,抛出InterruptedException表示由于中断而结束。

线程在非阻塞下中断,中断状态会被设置;然后根据被取消的操作轮询这个状态监测中断,这样如果没有抛出InterruptedException,这个状态会一直保留直到显式清除状态。(调用interrupt并不意味立即停止进行工作,只是传递请求中断消息。)

中断操作的正确理解:并不会真正中断在运行的线程,只是发出中断请求,然后线程的下一个合适时机中断自己(这些时机称为取消点)-wait/sleep/join严格处理这种请求,自己设计好的方法可以忽略只要用代码对终端请求进行某种处理即可。

静态的interrupted小心,消除状态,调用时返回true,出发想屏蔽,必须进行处理(可以跑出InterruptedException/或者再interrupt恢复中断状态).

两个位置可以判断中断:阻塞的方法调用中和循环开始处查询中断状态时(显式检查状态对快速响应起到一定帮助) while(true)改为while(Thread.currentThread().isInterrupted())

(3)中断策略:正如任务Future/ExecuteService应该有取消策略一样,线程应该有中断策略。中断策略:规定线程如何解释某个中断请求。

合理的中断策略是线程级别或者服务级别(service level).  (尽快推出,执行必要清理,通知所有者线程已经推出;其他策略-重启/暂停服务;对于非标准策略的线程/线程池只能用于了解策略的任务中)。

区分任务和线程的对中断反应很重要。一个中断请求可以有一个或多个接受者(取消当前任务或者关闭工作者线程)。

任务不是在自己拥有的线程中执行,而在某个服务(如线程池)拥有的线程中执行。对于非线程所有者的代码应小心保留线程的状态。(拥有线程的代码才能处理中断),即使非所有者线程也能响应。----大多数可阻塞的库函数只是抛出InterruptedException.(不知道自己由哪个线程运行,这里体现了最合理策略:尽快推出,将中断传递给调用者)。

检查中断请求,任务不需要放弃所有操作,可以推迟中断处理,可以先执行完,然后抛出异常。

任务也不应该对执行任务线程的中断策略进行假设,出发该任务专门设计为服务中运行,服务包含中断策略。抛出InterruptedException策略外,还可以调用Thread.currentThread().interrupt();

任务代码不该对线程中断假设,取消代码也不该对线程中断策略做假设。线程应该由所有者中断。所有者可以将中断策略封装到合适的取消机制中,如shutdown方法。(线程自身拥有中断线程,除非知道中断含义,否则就不该中断。)

(4)响应中断。最常见:传递INterruptedException/恢复中断interupt方法。

不想或无法传递想另外的办法----一种是恢复状态interrupt()方法。只有实现了线程中断策略的代码可以屏蔽中断请求(常规任务和库代码都不该这样做)。

 

(三)线程池的使用

1.任务和执行策略之间的隐性耦合:依赖性任务;线程封闭机制的任务;响应性敏感的任务;使用ThreadLocal的任务(线程池不太适合用-丢了会新生成一个,只有线程本地值受限于任务的生命周期,才适合用,线程池线程间不适合用threadlocal任务间传递值);

----线程饥饿死锁:单线程executor中执行依赖任务,需要将线程池足够大才可能不死锁;运行时间长的任务---不死锁等待也久,缓解办法---限时办法(等待时间长,将其标记失败,重新放入队列;总充满了阻塞任务,表明线程池太小)

2.设置线程池大小

(1)取决于提交任务的类型以及部署的特性。

(2)是否使用jdbc/file等稀缺资源。 (与链接池等关连)

(3)不受限制的。nthreads=ncpu* ucpu* (1+w/c).     ucpu是cpu使用率,w/c阻塞/计算。  (计算密集型,ncpu+1比较好)。 ncpu=Runtime.getRuntime().availableProcessors()

3.配置ThreadPoolExecutor.

(1) public ThreadPoolExecutor(int corePoolSize,int maxmumpoolsize,long keepalivetime,TimeUnit unit,Blockingqueue,threadfactory,rejectexecutionhandler).

newCachedThreadPool将线程池最大大小设为最大,存活时间设置为1分钟(线程池无限扩展),需求降低自动收缩。 空闲时间超过了存活时间---就会回收超出coresize部分。

(2)管理队列任务:无限创建线程,将导致不稳定,用固定大小的线程也会耗费掉内存。(都提交到队列中了)。

基本任务排队方法:有界队列、无界队列和同步移交。

如果用有界队列,到了最大大小,将会根据默认的rejectexecutionhandler会拒绝,(只有线程池无界或可以拒绝任务时synchronosQueue才有实际价值)

(3)饱和策略:有四个:AbortPolicy/CallerRunnsPolicy/DiscardPolicy/DiscardOldPolicy.   abort默认会抛出RejectedExecutionException.

调用者运行(CallerRunsPolicy)实现了一种调节机制,既不会抛弃也不会抛出异常,跑到调用者,降低新任务量。(执行时没法收到新任务,如server.accept()没法运行,新请求会保存在tcp队列不再应用程序队列)---过载蔓延:线程工作队列-应用程序-tcp层-客户端。

除了这些策略还可以在启动线程的executor中限制提交速率。 用semaphore.   semophore.acqure()-----   semophore.release().

(4)线程工厂:ThreadFactory.   (可以定义UncaughtExceptionHandler/维护统计信息)

*/

public class MyThreadFactory implements ThreadFactory {

   private final String poolName;

 

   public MyThreadFactory(String poolName) {

       this.poolName = poolName;

   }

 

   public Thread newThread(Runnable runnable) {

       return new MyAppThread(runnable, poolName);

   }

}应用程序中利用安全策略控制代码库对某些特殊代码的访问权限,通过executor中的privilegedThreadFactory定制自己的线程工场。这种方式创建出来的线程将与privilegedThreadFactory有相同的访问权限、AccessControlContext、contextClassLoader. (没有它会从submit/execute中继承)

(5)调用构造函数后定制ThreadPoolExecutor。setter修改构造参数。 executors工厂函数创建的对象可以显转换。(single等或者其他不想转换可以用工厂函数executors.unconfigurableExecutorService包装,只暴露executorService的方法)

4.扩展ThreadPoolExecutor。beforeExecute/afterExecute/terminated.

beforeExecute/afterExecute添加日志监控等信息,线程池关闭会调用terminated,所有任务完成后,可以用来executors释放生命周期内分配的资源,发送通知/记录日志/收集finalzie统计信息。

如统计信息(beforeExecute/afterExecute 算一个任务的时间。 terminated计算平均时间)。

5.并发的谜题解答器:

 

public class ConcurrentPuzzleSolver <P, M> {
   private final Puzzle<P, M> puzzle;
   private final ExecutorService exec;
   private final ConcurrentMap<P, Boolean> seen;
   protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();
 
   public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
       this.puzzle = puzzle;
       this.exec = initThreadPool();
       this.seen = new ConcurrentHashMap<P, Boolean>();
       if (exec instanceof ThreadPoolExecutor) {
           ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
           tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
       }
   }
 
   private ExecutorService initThreadPool() {
       return Executors.newCachedThreadPool();
   }
 
   public List<M> solve() throws InterruptedException {
       try {
           P p = puzzle.initialPosition();
           exec.execute(newTask(p, null, null));
           // block until solution found
           PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
           return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
       } finally {
           exec.shutdown();
       }
   }
 
   protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
       return new SolverTask(p, m, n);
   }
 
   protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
       SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
           super(pos, move, prev);
       }
 
       public void run() {
           if (solution.isSet()
                   || seen.putIfAbsent(pos, true) != null)
               return; // already solved or seen this position
           if (puzzle.isGoal(pos))
               solution.setValue(this);
           else
               for (M m : puzzle.legalMoves(pos))
                   exec.execute(newTask(puzzle.move(pos, m), m, this));
       }
   }

}

 

 

public class ValueLatch <T> {
   @GuardedBy("this") private T value = null;
   private final CountDownLatch done = new CountDownLatch(1);
 
   public boolean isSet() {
       return (done.getCount() == 0);
   }
 
   public synchronized void setValue(T newValue) {
       if (!isSet()) {
           value = newValue;
           done.countDown();
       }
   }
 
   public T getValue() throws InterruptedException {
       done.await();
       synchronized (this) {
           return value;
       }
   }

}缺点:遍历完了还没有结果,将会永远等下去。解决方案之一:记录活动任务的数量(该值为0时解答设为null).

 

解决器的几个约束条件:时间限制-易实现(valueLatch里面用限时的getvalue---限时await),超时关闭executor并声明失败;另一个结束条件--特定于谜题标准(搜索特定数量的位置,另外取消机制)。

 

public class PuzzleSolver <P,M> extends ConcurrentPuzzleSolver<P, M> {
   PuzzleSolver(Puzzle<P, M> puzzle) {
       super(puzzle);
   }
 
   private final AtomicInteger taskCount = new AtomicInteger(0);
 
   protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
       return new CountingSolverTask(p, m, n);
   }
 
   class CountingSolverTask extends SolverTask {
       CountingSolverTask(P pos, M move, PuzzleNode<P, M> prev) {
           super(pos, move, prev);
           taskCount.incrementAndGet();
       }
 
       public void run() {
           try {
               super.run();
           } finally {
               if (taskCount.decrementAndGet() == 0)
                   solution.setValue(null);
           }
       }
   }

}并发执行任务,executor是强而灵活框架,提供大量可调节的选项(建线程/关闭线程策略,处理任务策略,处理多任务的策略,钩子方法扩展行为newTaskFor/before/after..)

 

(四)图形GUI应用程序

1.gui单线程原因:输入事件的处理过程与gui的面向对象模型存在错误的交互。(响应事件--与重绘界面是相反的控制流程,容易引起死锁;另一方面mvc. 控制两边都调用,顺序不同,死锁。)。

2.串行事件处理:(1)封闭在单线程(2)几个方法能从其他线程安全调用。SwingUtilities.isEventDispatchThread.   SwingUtilities.invokeLater(). SwingUtilities.invokeAndWait().

SwingUtilities.repaint/revalidation; SwingUtilities.添加移除监听。

可以用SingleThreadExecutor创建出类似SwingUtilities的工具。

3.短时间的gui任务直接在事件线程执行。

4.长时间的gui任务。

(1)借助Executors/Future.  在执行主线程创建executor,提交listener.

(2)取消:cacelButteon.addlistner (....runtask.cancel)

(3)速度标识和进度标识。FutureTask (done方法---提示。---onCompletion())           BackGround包装FutureTask

(4)swingworker

5.共享数据模型 (1)线程安全的数据模型 (2)分解数据模型(增量变化)。

三、并发编程的性能调忧

(一)避免活跃性风险

1.死锁与资源死锁。(错误的加锁顺序)

(1)已经出现的死锁java并不能解决。-------锁顺序死锁。

(2)动态的锁顺序死锁。对两个参数加锁,参数的顺序不固定。

解决:对参数排序后再加锁。对于二者比较相同的参数,采取多加一个锁的办法。

(3)协作对象间发生的死锁。一个加锁的程序,调用了另一个同步的方法。(相当于两个锁) 。 检查死锁比较难一些。

解决--开放调用。

(4)开放调用。调用方法时不需要持有锁。(采用open call避免死锁类似于封装提供线程安全)分析开放调用的程序安全的多。

使同步代码块仅用于保护那些涉及共享状态的操作,收缩代码块保护范围还可提高伸缩性。 某些方法变为非原子调用可接受,有些不行。丢掉原子性引发错误的,需要另一种机制实现,添加监视状态灯。

(5)资源死锁。连接两个数据库时。 线程饥饿死锁。

2.死锁的避免与诊断

尽量减少潜在的加锁交互数量,将获取锁时需要遵循的协议写入文档。 细粒度锁程序中用两阶段锁策略检查代码中的死锁。(找出多锁集合;分析顺序)

(1)支持定时的锁。Lock.tryLock()(失败后并不知道失败的原因---但至少能记录,可以平缓的重启计算) ---可以获取失败再重试。

(2)通过线程转储信息来分析死锁。

线程转储包括各个运行中线程的栈追踪信息,类似于发生异常的栈追踪信息。(有哪些锁=栈帧中的锁=被阻塞线程等待的锁)。生成转储前搜索循环图找死锁。

转储:linux/unix--- kill -3 或者 ctrl-\      windows:ctrl-Break.

显式的Lock类不支持转储(5.0前,现在6.0上有信息但是还是少)。

3。其他活跃性危险。

(1)饥饿:无法访问它想要的资源。(优先级使用不当,持有无法结束锁的结构)尽量不用优先级,增加平台依赖。

(2)丢失信号:等待已经过去的信号。(如在wait之前不判断,导致没有notify唤醒。 用notifyall的地方用了notify,导致notify了不当的线程都沉睡了)

(3)糟糕的响应:开发框架把运行时间长或者io型任务放到后台线程。降低后台cpu密集型任务优先级;减少不良的锁。

(4)活锁:不会阻塞线程,也不继续执行,线程不断重复执行相同的操作,总会失败。发生在处理事务消息的应用程序需,不成功处理就重试(总放到消息开头,反复调用)

错误的把不可恢复的错误作为可修复的错误。

协作线程彼此相应改各自状态,如十字路口让路,解决:需要在重试机制中引入随机性。

(二)性能与可伸缩性

性能与安全性和活跃性冲突。要永远把安全性放到第一位(首先可运行,然后根据性能要求调优,考虑因素不是把性能提高到极限)。

1.对性能的思考。吞吐量、伸缩性与响应性。

多线程额外开销:线程协调;上下文切换;线程的创建和销毁;线程的调度等。  并发提高性能:有效利用现有资源;有新资源时利用好新资源。

(1)性能与可伸缩性。 衡量指标---多快;多少。两方面完全独立,有时矛盾。(性能主要是对单线程算法调优)

可伸缩性:增加计算资源时,程序的吞吐量和处理能力能相应增加。(可伸缩性调优主要是并行化)

三层模型要比单一程序单元性能低(任务排队-协调-数据复制)。单一系统会达到处理极限,会接受每个工作单元更长时间/消耗更多资源换取应用程序增加资源时处理更高的负载。

服务器应用程序多少---可伸缩性;吞吐量;生产量。(交互式-延迟更重要)

(2)评估性能的权衡因素:避免不成熟优化-先正确,再提速。 决策--用某种成本来降低另一种形式开销,也会增加开销换取安全性。(性能提升是并发错误的最大来源)。哪种方法速度快会有错觉(以测试为基准不要猜测。perfbar工具可以测试cpu忙碌程度信息)

2.Amdahl定律

bubuko.com,布布扣串行和并行部分。

F是串行部分的占比。所有并发程序都有串行部分(如果认为没有请检查)。  如从队列中取得数据,对结果进行汇总。

(1)各种框架隐藏的串行部分。如concurrentLinkedQueue/SynchronizedList(LinkedList) 二者吞吐量差异来自于不同比例的串行部分。SynchronizedList用单个锁,offer/move都持有;concurrentLinkedQueue用了更复杂的非阻塞队列,使用原子引用更新链接指针。

(2)amdahl定律应用。算法考虑多处理器下的速度---对可伸缩性局限有了解。

降低锁粒度技术:锁分解;锁分段。

3线程引入的开销.

(1)上下文切换。可运行线程大于cpu数量。(调度出线程,会导致上下文切换)。   调度--访问Os/jvm共享数据结构,缓存缺失。(占用了cpu周期--用于计算的减少) 。 锁竞争阻塞会把它交换出去;越多的竞争越多的阻塞,越多的上下文切换,降低调度开销,降低吞吐量。------无阻塞算法。

(2)内存同步。synchronized/volatile.(特殊指令--内存栅栏,抑制编译器优化功能)。

有竞争同步和无竞争同步(volatile都是)。无竞争同步2--250个时钟周期。jvm优化会去掉一些锁。 jvm通过逸出分析,找出不会发布到堆的对象/锁粒度优化。

某个线程同步会影响其他线程性能(共享内存总线的通信量)。

(3)阻塞:非竞争同步在jvm处理;竞争的同步需要操作系统接入,增加开销。   自旋等待/os挂起线程。(这两种方式取决于获取锁的时间与上下文切换开销)。

4.较少锁的竞争。(减少锁竞争可以提高性能/可伸缩性)。 可伸缩的最主要威胁--独占方式的资源锁。

两个因素影响竞争可能性:锁的请求频率以及锁上的请求时间。--二者的乘积越小越好。

三种方式降低锁的竞争程度:减少锁持有时间;降低锁的请求频率;使用带有协调机制的独占锁,允许更高的并发。

(1)缩小锁范围--快进快出。锁无关代码移除同步代码块。(实际把计算/阻塞操作从代码块移除时,才考虑代码块大小)。

(2)减小锁的粒度(锁分解)一个锁之保护一个变量,提高伸缩性。(对竞争适中的锁分解把这些锁变为非竞争的锁,从而提高性能伸缩性)。

(3)锁分段。一组独立对象上的锁进行分解,如concurrenthashmap.(包含16个锁的数组)---支持16个写入器。

劣势:获取多个锁实现独占访问更困难,开销更高。(扩展映射范围-值分到更大桶集合)

(4)避免热点域。  每个操作请求多个变量,粒度难降低。反复计算的结果缓存起来,造成一些热点域。(hashmap. size/empty. put/take时都更新一个状态count,在concurrentmap中变成了热点--看似优化。 避免热点---concurrentmap将每个段的数量计数,求的时候想加,不是全局的。)

(5)替代独占所方式。并发容器/读写锁/不可变对象/原子变量。

读写锁提高读取的吞吐/原子变量降低热点域的开销。

(6)监测cpu利用率。

vmstat/mpstat. windows--perfmon.

cpu没有充分利用的原因:负载不足;io密集;外部限制;锁竞争(转储查找)。

(7)不用对象池。对象循环利用。(降低了垃圾回收开销,但是对高开销意外的对象,仍然性能缺失)。对象分配线程本地的内存块,可以不用再堆结构上同步。协调对象池访问,可能造成阻塞,成为性能瓶颈。

5.比较map性能。map中只有一个锁。concurrentmap对大多数读操作不加锁(写加入了分段技术)。

6.减少上下文切换开销。切换--请求产生日志信息(日志调度行为--一种直接写到io/另一种后台线程专门写到日志)。

通过将io操作移动到一个专门线程,提高处理效率。

(三)并发程序测试。 (几个方面:吞吐量/响应性/可伸缩性) 安全性测试(采用测试不变形条件的形式)/活跃性测试。

1.正确性测试

(1) 跟串行一样写测试用例。

(2) 对阻塞操作的测试。(很少有工具创建线程和监视线程,要把失败信息传回主线程)。jsr166创建了测试基类(遵循规定等待所有线程结束才完成)。

该阻塞时阻塞就正常。(阻塞后再中断它抛出Interruption)。

(3)安全性测试。(并发类-并发测试类一样难设计)。找容易检查的属性。(生产者-消费者的检查比较好的方法检查放入队列和从队列取出的各个元素)。

----方法一:创建影子列表,判断影子列表的一些东西。(这种方法会干扰线程调度)

----方法二:对顺序敏感的校验和函数检查入列和出列(单-单);多生产者-多消费者用对顺序不敏感的校验和计算函数检查入列和出列。(多线程不要共享校验和变量,这样可以不用同步) 。同时不能让编译器能提前知道是什么整数(会优化掉),用随机数方式。

RNG生成xorshift.    static int xorshift(int y){y^=(y<<6);y^=(y>>>21);y^=(y<<7);}   N个线程放数N个线程取数(测试后合并起来)

为了让所有线程同时启动以及同时得到结果。(使用了开始栅栏和结束栅栏)。 开始阀门和结束阀门。

[测试线程数量要多于cpu数量]。 (测试框架要放弃指定时间内没有完成的测试)

(4)资源管理的测试。(测试都侧重与设计规范一致程度)。另一方面测试是否没有做不该做的事情(泄露;没有关闭资源等)

使用堆检查工具。

(5)使用回调。在对象生命周期的已知位置上,这些位置非常适合判断不变性条件是否被破坏ThreadPoolExecutor调用runnable/ThreadFactory.

自定义线程和线程工厂。

2.性能测试。   性能测试是要测端到端性能/根据经验调整限值(线程数量/缓存容量)

(1)putTakeTest增加计时功能。Barrierprimer  .   run()  {if(!started) {started=true;starttime=t;} else endtime=t;}  提供gettime方法。

测试生产消费者在不同参数下的吞吐量/不同线程下伸缩性/如何选择缓存大小。

(2)多种算法比较。缓存算法不高原因:put/take都要竞争锁,其他实现方法竞争的位置少很多。

LinkedBlockingQueue的伸缩性高于ArrayBlockingQueue.(开始看奇怪,虽然会有更多的内存分配和gc开销,因为优化后的连接队列算法能加队列的头结点更新与尾结点更新分离开来。因为内存分配通常线程本地,算法能通过多执行内存分配操作来降低竞争,算法有更高的可伸缩性)  ----直觉和提升性能需求是背道而驰的。

(3)响应性衡量。(更小的服务时间;更小的服务时间变动)。 公屏模式和非公平模式的差别明显。

公平性开销比较大主要是因为线程阻塞引起的。(沉睡/唤醒---两次上下文交换)。

3.避免性能测试陷阱

(1)垃圾回收. 看起来很高,实际发生了gc.

(2)动态编译.解释执行与本地执行差距明显

(3)对代码路径不真实采样。(倒退并解释执行)

(4)不真实的竞争程度。实际有一些线程本地计算,并不都是在锁上竞争、

(5)无用代码的消除,造成性能伪提升。

4.其他测试方法。

(1)代码审查.

(2)静态分析工具。 findbugs.

不一致的同步;调用thread.run;未被释放的锁;空同步块;双重检查加锁;构造函数中启动一个线程;通知错误;条件等待中的错误;Lock/conditon误用;休眠或者等待时持有锁;自旋循环;

(3)面向方面的测试技术。用spring的aop等。

(4)分析与检测工具。 商业工具。----大多都支持线程。

四、java并发编程的一些高级主题

(一)显式锁

1.Lock/ReentrantLock.

 

public interface Lock {

   void lock();

   void lockInterruptibly() throws InterruptedException;

   boolean tryLock();

   boolean tryLock(long timeout, TimeUnit unit)

       throws InterruptedException;

   void unlock();

   Condition newCondition();

}

 

(1)lock提供无条件的、可轮询的、定时的、可中断的锁获取操作。 可见性要与同步块同,但是在加锁语义、调度算法、顺序保证以及性能特性方面可以不同。

ReentrantLock.这种更灵活的机制能提供搞好的活跃性和性能。比内置锁复杂一些,必须在finally释放锁。

 

Lock lock = new ReentrantLock();

...

lock.lock();

try {

// update object state

// catch exceptions and restore invariants if necessary

} finally {

   lock.unlock();  //没在finally释放是定时炸弹。

}

 

(2)轮询锁与定时锁。 trylock 与 tryLock(long timeout, TimeUnit unit).没成功就重试。

 

while (true) {

       if (fromAcct.lock.tryLock()) {

           try {

               if (toAcct.lock.tryLock()) {

                   try {

                       if (fromAcct.getBalance().compareTo(amount)

                               < 0)

                           throw new InsufficientFundsException();

                       else {

                           fromAcct.debit(amount);

                           toAcct.credit(amount);

                           return true;

                       }

                   } finally {

                       toAcct.lock.unlock();

                   }

                }          } finally {

                fromAcct.lock.unlock();

            }

        }

        if (System.nanoTime() < stopTime)

            return false;

        NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);

    }

 

if (!lock.tryLock(nanosToLock, NANOSECONDS))

       return false;

   try {

       return sendOnSharedLine(message);

   } finally {

       lock.unlock();

   }

(2)可中断的锁获取。lock.lockInterruptibly();

 

public boolean sendOnSharedLine(String message)

       throws InterruptedException {

   lock.lockInterruptibly();

   try {

       return cancellableSendOnSharedLine(message);

   } finally {

       lock.unlock();

   }

}

private boolean cancellableSendOnSharedLine(String message)

   throws InterruptedException { ... }

(3)非块结构加锁。  内置锁都是基于代码块的。用类似分段技术降低锁的粒度,为链表节点使用一个独立的锁,使不同的线程能独立的对链表的不同部分操作。(每个节点的锁保护链接指针及该节点存储的数据。遍历或者修改时,持有本节点的锁,只有获取了下一个节点的锁时才释放前一个节点的锁。(连锁式加锁或者锁耦合)。

 

 

 

2.性能因素考虑

java6.0以后差不多了;性能是一个不断变化的指标。java5.0选取lock性能是个重要因素。

3.公平性

公平性和非公平性。   非公平性锁--允许插队。

公平性,永远加到队尾。非公平性--锁被某个线程持有时,信发出线程才会放入队列。      公平性由于挂起线程和恢复线程的开销大大降低性能。

性能高的原因:恢复一个被挂起的线程与该线程真正运行之间有严重的延迟。(延迟期间已经能处理完新来的请求)。

如果持有锁的时间较长或者请求锁的平均时间间隔长,应该用公平锁。,插队的性能提升不会体现。

4.synchronized与reentrantLock之间选择。

内置锁无法满足需求的情况下,ReentrantLock可以作为一种高级工具。高级功能时采用reentrantlock(定时/轮询/中断/公平队列/非块结构)

5.读写锁。(其实内部是一个锁,提供了一个互斥量)   一个调用tryaquireshared/一个调用tryacquire()

 

public interface ReadWriteLock {

   Lock readLock();

   Lock writeLock();

}

(1)readwritelock可选实现:释放优先;读线程插队;重入性;降级;升级。

(2)ReentrantReadWriteLock.只能有唯一的所有者,并由获得该锁的线程释放。

 

(二)构建自定义的同步工具

1.状态依赖的管理.   状态依赖类---不满足某个条件的时候阻塞或者抛异常。

依赖状态的操作可以一直阻塞直到可以继续运行,比先失败再实现起来要更为方便。依赖管理加锁模式有点特别:锁在操作的执行过程中被释放与重新获取。

构成前提条件的状态变量必须由锁来保护。

(1)抛失败给调用者。给使用者增加负担。 调用者处理,除了沉睡一段时间重试外,还可以调用thread.yield让出时间片。

(2)轮询与休眠来实现阻塞。如果符合条件就睡一段时间。方便调用者处理。但沉睡时间需要在响应性和cpu使用率间平衡。

处理InterruptedException.,可以实现取消。  如果有挂起和提醒方法简化实现。

(3)条件队列。wait/notify/notifyall()   这三个方法构成了内部条件队列的api.

条件队列名字:使得一组线程(集合)能够通过某种方式来等待特定的条件变成真。(传统队列是元素,这里是等待条件的线程)。

object--wait会释放锁,但是需要发生特定事情唤醒。

用条件队列优化了:cpu效率/上下文切换开销/响应性。  (但是事先的功能与轮询休眠是相同的)    支持定时的,可以用定时版本的object.wait.

2.使用条件队列

(1)条件谓词。使得某个操作成为依赖操作的前提条件。(需要将条件谓词与等待操作都写入文档)。

条件等待的三元关系:加锁/wait方法/一个条件谓词。 条件谓词中可以包含多个状态变量,条件谓词只能由一个锁来保护,锁对象与条件队列对象必须是同一个对象。wait被唤醒后在返回测试条件谓词前必须再获取锁。

条件谓词不为真---必须等待。

(2)过早唤醒。notifyAll会把所有唤醒,大多数都是过早唤醒。wait方法还可以假装返回不一定notify.    在重新获取锁的时间中可能有其他线程获取了这个锁改变了标志。

基于这个原因,从wait中唤醒获得锁后,需要再次测试条件谓词。

 

void stateDependentMethod() throws InterruptedException {

// condition predicate must be guarded by lock

   synchronized(lock) {

       while (!conditionPredicate())

           lock.wait();

// object is now in desired state

   }

}

使用条件等待时:通常都有一个条件谓词(状态测试);在调用wait之前先测试,从wait中返回需要再次测试;在一个循环中用wait;确保使用与条件队列相关锁来保护条件谓词相关的变量;调用wait/notify/notifyall时持有相关锁;检查条件谓词后开始操作前不要释放锁。

 

(3)丢失的信号。等待一个已经为真的条件。

出现原因:开始等待前没有检查条件谓词。(一般都是编码错误)

(4)通知。notify/notifyall.每当等待一个条件时一定要通过某种方式发出通知。

只有满足两个条件才用notify不用notifyall:所有等待线程类型相同(只有一个条件谓词,wait后返回执行相同操作);单进单出(每次通知最多唤醒一个)。

优化:可以在状态变幻时才wait和notify.

(5)子类的安全问题。不提倡用状态变幻时才wait和notify等优化,因为对实现有了依赖,子类可能会有安全问题。(等待和通知的协议要向子类公开)

禁止子类化。(如果子类破坏了notify,需要基类修复)。 不应该过度依赖notify.   用Notifyall子类更安全。

(6)封装条件队列。将条件队列封装,除了使用条件队列类,不在其他地方访问它。(可以单独用不被公开的条件队列对象,不用内置的)。 通常应该封装起来。

(7)入口协议和出口协议。每个依赖操作和修改依赖状态的操作,都该定义一个入口协议和出口协议。入口协议是操作的条件谓词;出口协议包括:检查被该操作修改的所有状态变量,并确认他们是否使某个其他的条件谓词为真.  在AbstractQueuedSynchronizer中使用出口协议,(这个类不是同步类执行自己的通知,而是要求同步器返回一个值表示该类的操作已经解除一个或者多个等待线程的阻塞),这种明确的api调用需求使得难以忘记通知。

3.显式的Condition对象

(1)lock可以有多个Condition对象,有更丰富和灵活的功能。会继承lock的公平性。

public interface Condition {

       void await() throws InterruptedException;

       boolean await(long time, TimeUnit unit)

       throws InterruptedException;

       long awaitNanos(long nanosTimeout) throws InterruptedException;

       void awaitUninterruptibly();

       boolean awaitUntil(Date deadline) throws InterruptedException;

       void signal();

       void signalAll();

}

(2)选择用conditon,与选择用lock是一样的。

4.Synchronizer剖析。 semaphore/reenterantLock都可以做阀门--可通过/等待/取消/支持公平和非公平队列操作。

他们都基于AbstractQueuedSynchronizer(AQS)实现,AQS是构建锁和同步器的框架。除了上面两个外还有CountDownLatch/ReentrantReadWriteLock/SynchronousQueue/FutureTask.

AQS解决了实现同步器时涉及的大量细节问题(如等待线程用fifo队列的操作顺序),不同的同步器定义了灵活的标准来通过还是等待。减少实现;不必在处理多个位置上的竞争问题。

5.AbstractQueuedSynchronizer.

(1)基本的操作是获取和释放操作。 (锁或者许可)。

一个类想成为状态依赖的类,那么它必须拥有一定的状态。AQS负责同步器类中的状态,管理了一个整数状态(通过getState/setState/COmpareAndSetState)等protected进行操作,整数可以表示任意含义。

reenterantlock用它表示进入锁次数;FutureTask表示状态。

同步器不同--获取操作可以是独占(reenterantLock), 也可以是非独占状态(semaphore/countdownlatch).

----首先同步器判断当前状态是否允许获得操作,如果是线程允许执行/不允许(阻塞)----由同步器的语义定义的。

----其次更新同步器的状态。获取同步器的线程可能会对其他线程获取该同步器是否造成影响。

同步器支持独占的操作,需要实现一些保护方法----tryAcquire/tryRelease/isHeldExclusively.      

支持共享操作,需要实现方法---tryAcquireShared/tryReleaseShared等方法。    (这写方法供acquire/acquireshared/release/releaseshared调用)。

通过这些方法的返回值状态告知获取和释放操作是否成功。tryAcquireShared返回负值,表示获取操作失败,返回0值表示同步器以独占方式获取,正值以非独占方式获取。 释放操作使得所有阻塞线程回复执行,需要tryRelease和tryReleased方法都为true.

闭锁(在打开状态之前tryAcquireShared的获取状态都是失败)。 同步类都不是通过扩展AQS实现的,都是委托给AQS的子类。

6.concurrent中的AQS。

(1)ReentrantLock.  除了实现独占操作的方法,还维护一个owner变量保存所有者线程。只有获取锁和释放的时候才修改这个变量。检查owner确保当前线程在Unlock前获得了锁。用它确定重入还是竞争。

用CompareAndSet修改状态。newCondition是Lock的内部类。

(2)Semaphore与CountDownLatch。保存当前许可的数量。   countdownlatch保存的是当前的计数值。

(3)FutureTask。Future.get(). AQS保存任务的状态。(运行/完成/取消)。维护引用指向正在执行计算任务的线程,如果取消,线程中断。

(4)ReentrantReadWriteLock。单个AQS子类同时管理读取加锁和写入加锁。用了一个16位状态保存写入锁计数;另一个16位状态保存读取锁计数。读取锁上用共享的获取方法与释放方法,写入锁上用独占的。

AQS内部维护等待队列,记录了某线程请求的是独占访问还是共享访问。

(三)原子变量和非阻塞同步机制

非锁的方案方案设计和实现上都复杂的多,在可伸缩性和活跃性有巨大的优势,更细的粒度调度减少调度开销。原子变量可做更好的volatile类型变量,更适合于做计数器、序列发生器和统计数据收集。

1.锁的劣势。锁 细粒度的操作,在锁上存在竞争时调度与工作的开销比很高。

volatile变量-轻量级的同步,但是有局限不能用于构建原子复合操作。在一个变量依赖其他变量,或者当变量的新值依赖于旧值时,就不能用volatile变量。(不能用互斥体/计数器).

锁的缺点:等待锁不能做其他事情,持有锁情况下被延迟执行/阻塞线程优先级高-持有的优先级低(优先级反转)。  

锁对细粒度的操作(如递增)是高代价的事情。

2.硬件对并发支持。锁是悲观机制。

乐观方法---不发生干扰时完成更新,需要借助冲突检查机制。

(1)比较并交换.

public synchronized int compareAndSwap(int expectedValue,

                                           int newValue) {

        int oldValue = value;

        if (oldValue == expectedValue)

            value = newValue;

        return oldValue;

    }

    public synchronized boolean compareAndSet(int expectedValue,

                                              int newValue) {

        return (expectedValue

                == compareAndSwap(expectedValue, newValue));

    } 根据true/false来进行重试。

 

(2)非阻塞的计数器。  simulateCas value (这个可以用原子变量类替代)。竞争比较低的时候,性能远远超过了基于锁的计数器。

cas的缺点:让调用者处理竞争问题,锁中自动处理竞争。cas性能随着处理器数量变化很大。(非竞争的锁在快速代码路径上的开销大约是cas开销两倍)。

(3)JVM对CAS的支持。java 5.0以后才加入的。

3.原子变量类

类似于一种泛化的volatile变量。  公有12个原子变量类,分为4组:标量类、更新器类、数组类以及复合变量类。最常用的是标量类--AtomicInteger/AtomicLong/AtomicBoolean/AtomicReference,所有这些都支持CAS.AtomicInteger/AtomicLong还支持算术运算。

原子数组类(INteger/Long/Reference)的元素可以实现原子更新(为数组的元素提供了volatile语义,普通数组不具备-volatile只在引用上volatile).

原子变量类扩展了Number没有扩展包装类。(实际上他们不能扩展,都是不变类型;而原子变量是可变类型)。原子变量没有重定义hashcode/equals,每个实例不同---不宜做散列容器的键值。

(1)更好的volatile。

(2)性能比较,锁与原子变量。在高度竞争的情况下,锁的性能优于原子变量性能。(锁阻塞会降低生产者线程)。真实的压力下,原子变量性能更好。

ThreadLocal性能最好。

 4.非阻塞算法。

锁算法有活跃性过账,非阻塞没有。(一个线程失败或挂起不会导致其他线程也失败或挂起,成为非阻塞算法;如果算法的每个步骤中都存在某个线程能够执行下去,无所算法。)  CAS--无锁/非阻塞。

非阻塞算法---实现栈/队列/优先队列/散列表等。

(1)非阻塞的栈。非阻塞更复杂。

关键在于:如何将原子修改的范围缩小到单个变量上。   

public void push(E item) {

        Node<E> newHead = new Node<E>(item);

        Node<E> oldHead;

        do {

            oldHead = top.get();

            newHead.next = oldHead;

        } while (!top.compareAndSet(oldHead, newHead));

    }

 

  (2)  非阻塞的链表。

更复杂,需要单独维护两个指针,头指针和尾指针。  两个指针位于尾部:当前最后一个元素的next指针以及尾节点。(原因需要更新上一个节点的指针,而栈只需要更新top指向对象的指针即可)。

技巧---多步骤更新确保总是处于一致状态。B到达发现A在更新,B等待A更新完成互不干扰(缺点,A失败后会丢失一个操作)。设计算法---一个线程失败让另一线程继续执行。

技巧二:如果B到达时发现A在修改数据结构,数据结构中有足够的信息,让B可以帮A完成更新操作。(多操作一次没事)。

空队列包含哨兵节点(sentinel)/哑节点(Dummy)节点。初始化首尾节点都应该指向哑节点。

@ThreadSafe

public class LinkedQueue <E> {

    private static class Node <E> {

       final E item;

        final AtomicReference<Node<E>> next;

        public Node(E item, Node<E> next) {

            this.item = item;

            this.next = new AtomicReference<Node<E>>(next);

        }

    }

    private final Node<E> dummy = new Node<E>(null, null);

    private final AtomicReference<Node<E>> head

            = new AtomicReference<Node<E>>(dummy);

    private final AtomicReference<Node<E>> tail

            = new AtomicReference<Node<E>>(dummy);

    public boolean put(E item) {

        Node<E> newNode = new Node<E>(item, null);

        while (true) {

            Node<E> curTail = tail.get();

            Node<E> tailNext = curTail.next.get();

            if (curTail == tail.get()) {

                if (tailNext != null) {                     

                    // Queue in intermediate state, advance tail

                    tail.compareAndSet(curTail, tailNext);   

                } else {

                    // In quiescent state, try inserting new node

                    if (curTail.next.compareAndSet(null, newNode)) { 

                        // Insertion succeeded, try advancing tail

                        tail.compareAndSet(curTail, newNode);        

                        return true;

                   } }} } }

}

 

 

(3)原子的域更新器。

上面说了linkedqueue的算法,实际中略有区别。ConcurrentLinkedQueue中没有用原子引用表示Node,而是用普通的volatile引用。

private class Node<E> {

    private final E item;

    private volatile Node<E> next;

    public Node(E item) {

        this.item = item;

    }

}

private static AtomicReferenceFieldUpdater<Node, Node> nextUpdater

        = AtomicReferenceFieldUpdater.newUpdater(Node.class,Node.class,"next");    这个next必须是volatile字段,第一个字段是第三个字段所在的类,第二个字段是更新对象next所属的类。 这个是一个基于反射的视图,能从已有的volatile域上用cas.(更新器没有构造函数,只有newUpdater工场方法,制定类和域的名字)。

域更新器类与某个特定的实例关联在一起,可以更新目标雷任意实例的域;原子性保证性比普通元子类弱一些(无法保证底层的域不被修改)。

用nextUpdater的compareAndSet方法更新Node的next,方法繁琐为了提升性能。(分配频繁,生命周期短暂的对象,如链接节点,如果能去掉每个Node的AtomicReference创建过程会降低插入开销)。 极少情况采用域更新器。

 

(4)ABA问题。

比较并交换---在判断与替换的中间,变动过一次。

解决办法:不更新某个引用的值,而是更新两个值,一个引用和一个版本号。 AtomicStampedReference(以及AtomicMarkableReference)支持两个变量上执行原子条件更新。(对象引用布尔值的二元组)

(四)java内存模型

1.什么是jmm,为什么需要他。

(1)串行结果不变,所有操作都允许。 所以又了重排序技术;全局寄存器。JMM规定了JVM遵循的最小保证,规定了变量写入何时对其他线程可见。

(2)平台的内存模型。每个处理器都有自己的缓存,定期与主线程协调(允许处理器看到不同值)

确保处理器了解其他处理器做的操作,开销很大;特殊指令-内存栅栏/栅栏,需要共享时指令实现存储协调保证。 乐观的模型--串行一致性(现代处理器都不提供这种一致性)

(3)重排序

JMM使得不同线程的执行顺序不同。 同步会限制编译器/运行时/硬件对内存操作的重排序方式,在实施重排序时不破坏jmm可见性保证。

(4)java内存模型简介--通过各种操作定义的(变量读写,监视器加锁解锁,线程的启动合并操作。)JMM为所有操作定义了偏序关系Happens-Before.

两个操作缺乏Happens-Before关系,就可以重排序。

多个线程读取至少一个线程写入,操作之间没有依照Happens-Before来排序,就会有数据竞争问题。正确同步没有数据竞争。

Happens-before规则:

程序顺序规则。(程序中操作A在操作B之前,线程中A操作在B操作前完成)

监视器锁规则。-----全序关系

volatile规则。 全序

线程启动规则

线程结束规则

中断规则

终结器规则

传递性。

在不同的锁上同步就不存在happens-before的关系。

(5)借助同步。可以借助happens顺序规则与其他某个顺序规则结合起来,对为被锁保护变量访问操作进行排序。(这项技术对语句顺序敏感,容易出错),最大限度提升类的性能时才用。

FutureTask使用了AbstractQueuedSynchronizer.

类库中提供的happens-before:

--一个元素放入线程安全容器操作,在另一个线程从该容器获取操作之前执行;

---在countdownlatch上的倒数操作将在闭锁上await方法返回之前执行。

---释放semaphoe许可的操作在该semaphore上获得一个许可之前执行。

---Future表示的任务所有操作都在从future.get返回之前完成

--Executor提交一个runnable或者Callable操作在任务开始之前执行。

---线程到达CylicBarrier或Exchanger的操作在其他到达该栅栏或交换点的线程被释放之前执行。

2.发布

(1)不安全的发布。缺少happens-before会发生重排序。(解释了为什么没有充分同步另一个线程会看到部分构造对象)

错误的延迟初始化;在构造函数中启动线程。

(2)安全的发布。Happens-before比安全发布提供了更强可见性和顺序发布。(happens-before是内存访问级别上的-并发汇编,安全发布更接近程序设计)

(3)安全的初始化模式。

synchronized.   初始化器以特殊方式处理静态化域,有安全保证。(可以在静态块中)  =-----这项技术和jvm延迟加载结合起来,一种延迟初始化技术。

(4)双重检查加锁。   

public class DoubleCheckedLocking {

    private static Resource resource;

    public static Resource getInstance() {

        if (resource == null) {

            synchronized (DoubleCheckedLocking.class) {

                if (resource == null)

                    resource = new Resource();

            }

        }

        return resource;

    }

} 问题在于返回时没有加锁,看到的是构造的未完全的对象,看到失效值。而且双重检查加锁并没有提高多少性能。

 

3.初始化过程的安全性。

不可变对象永远是安全的。(初始化安全性确保,所有线程看到的由构造函数为对象给final域设置的正确值,不管何种方式发布),而且可以通过final域到达的所有对象对其他线程可见。

final域对象初始化安全性可以防止对对象的初始化引用被重新排序到构造构成之前。(final域写入操作,都会被冻结)(final即使不安全的延迟初始化,也能正确发布)。

final只在开始时保证可见性,构造完成后可能改变的值,需要用同步保证可见性。

java并发实践笔记,布布扣,bubuko.com

java并发实践笔记

原文:http://www.cnblogs.com/wanghongjun/p/3622008.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!