第0章:简介
第1章:阻塞队列BlockingQueue
第0节:札记
* BlockingQueue是一种特殊的Queue,若BlockingQueue是空的,
* 从BlockingQueue取东西的操作将会被阻断进入等待状态直到BlocingkQueue进了新货才会被唤醒。
* 同样,如果BlockingQueue是满的任何试图往里存东西的操作也会被阻断进入等待状态,
* 直到BlockingQueue里有新的空间才会被唤醒继续操作。
* BlockingQueue提供的方法主要有:
* add(anObject): 把anObject加到BlockingQueue里,如果BlockingQueue可以容纳返回true,否则抛出IllegalStateException异常。
* offer(anObject):把anObject加到BlockingQueue里,如果BlockingQueue可以容纳返回true,否则返回false。
* put(anObject):把anObject加到BlockingQueue里,如果BlockingQueue没有空间,调用此方法的线程被阻断直到BlockingQueue里有新的空间再继续。
* poll(time):取出BlockingQueue里排在首位的对象,若不能立即取出可等time参数规定的时间。取不到时返回null。
* take():取出BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。
*
* 根据不同的需要BlockingQueue有4种具体实现:
* (1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。
* (2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,
* 若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO(先入先出)顺序排序的。
* LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,
* 导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue。
* (3)PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
* (4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。
第1节:实例
package com.mcc.core.test.thread;
import com.mcc.core.concurrent.ExecutorServiceUtils;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
* BlockingQueue的生产消费实例
*
* @author <a href="mailto:417877417@qq.com">menergy</a>
* DateTime: 13-12-30 下午2:05
*/
public class BlockingQueueTest {
public static void main(String args[]){
//原子计数器
final AtomicInteger productNum = new AtomicInteger(0);
//阻塞队列
final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
//final BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();
//final BlockingQueue<String> blockingQueue = new PriorityBlockingQueue<String>();
//final BlockingQueue<String> blockingQueue = new SynchronousQueue<String>();
ExecutorService executorService = ExecutorServiceUtils.getExecutor("test", 2);
//生产
executorService.execute(new Runnable() {
@Override
public void run() {
try {
while (true) {
// 生产产品
productNum.getAndIncrement();
blockingQueue.put("产品--" + productNum.get());
System.out.println("生产产品:"+ productNum.get() + ",阻塞队列:"+ blockingQueue.toString());
// 休眠300ms
Thread.sleep(300);
}
} catch (InterruptedException ex) {
}
}
});
//消费
executorService.execute(new Runnable(){
@Override
public void run() {
try {
while (true) {
// 消费产品
String product = blockingQueue.take();
System.out.println("消费产品:"+ product + ",阻塞队列:"+ blockingQueue.toString());
// 休眠1000ms
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
}
}
});
// 程序运行5s后,所有任务停止
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
executorService.shutdownNow();
System.out.println("main thread finished");
}
}
第2章:监视器Condition
第0节:札记
第1节:实例
package com.mcc.core.test.thread;
import com.mcc.core.concurrent.ExecutorServiceUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Condition实现生产消费实例
*
* Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。
*
* @author <a href="mailto:417877417@qq.com">menergy</a>
* DateTime: 13-12-31 上午10:12
*/
public class ConditionTest {
public static void main(String args[]){
final Lock lock = new ReentrantLock();
final Condition produced = lock.newCondition();
final Condition consumed = lock.newCondition();
//资源持有开关,假设产品架最多只能放3个
final AtomicInteger productNum = new AtomicInteger(0);
ExecutorService executorService = ExecutorServiceUtils.getExecutor("test", 10);
//5个生产者
for(int i = 0; i < 5; i++){
//生产
final int produceId = i;
executorService.execute(new Runnable() {
@Override
public void run() {
lock.lock();
try {
if (productNum.get() == 3){
System.out.println("产品架满,生产者" + produceId + "等待,产品数量:" + productNum.get());
//放弃锁,进入睡眠,等待消费者
consumed.await();
}
System.out.println("生产者" + produceId + "开始生产产品,产品数量:" + productNum.get());
// 生产产品
TimeUnit.MILLISECONDS.sleep(500);
productNum.getAndIncrement();
System.out.println("生产者" + produceId + "生产完,产品数量:" + productNum.get());
//唤醒某个等待的生产线程
produced.signal();
//唤醒所有等待的生产线程
//produced.signalAll();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
});
}
//5个消费者
for(int i = 0; i < 5; i++){
//消费
final int consumeId = i;
executorService.execute(new Runnable(){
@Override
public void run() {
lock.lock();
try {
if (productNum.get() == 0){
System.out.println("产品架空,消费者" + consumeId + "等待,产品数量:" + productNum.get());
//放弃锁,进入睡眠,等待生产者
produced.await();
}
System.out.println("消费者" + consumeId + "开始消费产品,产品数量:" + productNum.get());
// 生产产品
TimeUnit.MILLISECONDS.sleep(300);
productNum.getAndDecrement();
System.out.println("消费者" + consumeId + "消费完,产品数量:" + productNum.get());
//唤醒某个等待的消费线程
consumed.signal();
//唤醒所有等待的消费线程
//consumed.signalAll();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
});
}
}
}
第3章:线程计数器CountDownLacth
第0节:札记
* CountDownLatch 是一个通用同步工具,它有很多用途。
* 将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:
* 在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。
* 用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,
* 或者使其在某项操作完成 N 次之前一直等待。
第1节:实例
package com.mcc.core.test.thread;
import com.mcc.core.concurrent.ExecutorServiceUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
/**
* CountDownLacth线程计数器使用实例
*
* @author <a href="mailto:417877417@qq.com">menergy</a>
* DateTime: 13-12-27 下午2:39
*/
public class CountDownLacthTest {
public static void main(String args[]){
// 实例化线程计数器
int size = 3;
final CountDownLatch lacth = new CountDownLatch(size);
Executor executor = ExecutorServiceUtils.getExecutor("test",size);
for(int i = 0; i < 3; i++){
final int threadNum = i;
executor.execute(new Runnable(){
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread running:" + threadNum);
lacth.countDown();
}
});
}
// 主线程等待子线程,一直到程序计数器为0
try {
lacth.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("main trhread finished !");
}
}
第4章:线程计数器(栅栏)CyclicBarrier
第0节:札记
* CyclicBarrier类似于CountDownLatch也是个计数器,
* 不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数,
* 当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。
* CyclicBarrier就象它名字的意思一样,可看成是个障碍,
* 所有的线程必须到齐后才能一起通过这个障碍。
* CyclicBarrier初始时还可带一个Runnable的参数,
* 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。
第1节:实例
package com.mcc.core.test.thread;
import com.mcc.core.concurrent.ExecutorServiceUtils;
import java.util.concurrent.*;
/**
* CyclicBarrierTest线程计数器使用实例
*
* @author <a href="mailto:417877417@qq.com">menergy</a>
* DateTime: 13-12-30 上午10:39
*/
public class CyclicBarrierTest {
public static void main(String args[]){
// 实例化线程计数器
int size = 3;
final CyclicBarrier barrier = new CyclicBarrier(size,new Runnable(){
@Override
public void run() {
System.out.println("barrier thread running !");
}
});
Executor executor = ExecutorServiceUtils.getExecutor("test", size);
//这里测试7个线程
for(int i = 0; i < size * 2 + 1; i++){
final int threadNum = i;
executor.execute(new Runnable(){
@Override
public void run() {
try {
System.out.println("thead wait:" + threadNum);
//线程等待,barrier初始化为3,所以要等齐三个线程抵达栅栏时才能一起通过栅栏,前6个线程需分两批3个通过,第7个通不过,除非到达超时
//barrier.await();
try {
barrier.await(5000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
System.out.println("thread timeout:"+ threadNum);
}
//线程通过栅栏
System.out.println("thread running:" + threadNum);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
// 主线程等待子线程,一直到程序计数器为0
System.out.println("main trhread finished !");
}
}
第5章:延迟队列DelayQueue
第0节:札记
* DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。
* 这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。
* Delayed接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
第1节:实例
package com.mcc.core.test.thread;
import com.mcc.core.concurrent.ExecutorServiceUtils;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* DelayQueue实例测试
*
* @author <a href="mailto:417877417@qq.com">menergy</a>
* DateTime: 13-12-31 下午3:20
*/
public class DelayQueueTest {
static class DelayTask implements Runnable,Delayed{
//任务名
private String name;
//时间
private long time = 0;
public DelayTask(String name,long delay) {
this.name = name;
time = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(delay, TimeUnit.MILLISECONDS);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
long result = ((DelayTask) o).getTime() - this.getTime();
if (result < 0) {
return 1;
}
if (result > 0) {
return -1;
}
return 0;
}
@Override
public void run() {
System.out.println("产品被消费:" + this.toString());
}
public String toString() {
return "{name:" + this.getName() + ",延时:" + this.getDelay(TimeUnit.MILLISECONDS) + "}";
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
}
public static void main(String args[]){
//原子计数器
final AtomicInteger productNum = new AtomicInteger(0);
final DelayQueue<DelayTask> delayQueue = new DelayQueue<DelayTask>();
ExecutorService executorService = ExecutorServiceUtils.getExecutor("test", 2);
//生产
executorService.execute(new Runnable() {
@Override
public void run() {
long[] times = new long[]{3000,1000,5000,500,2000,5000};
try {
for(int i = 0; i < 5; i++) {
// 生产产品
productNum.getAndIncrement();
delayQueue.add(new DelayTask("产品-" + productNum.get(), times[i]));
// 休眠300ms
Thread.sleep(300);
System.out.println("生产产品:" + productNum.get() + ",无界阻塞队列:" + delayQueue.toString());
}
} catch (InterruptedException ex) {
}
}
});
//消费
executorService.execute(new Runnable(){
@Override
public void run() {
try {
while (true) {
// 获取延时任务
DelayTask delayTask = delayQueue.take();
//消费产品
delayTask.run();
// 休眠1000ms
Thread.sleep(1000);
System.out.println("消费完产品:"+ delayTask.getName() + ",无界阻塞队列:"+ delayQueue.toString());
}
} catch (InterruptedException ex) {
}
}
});
// 程序运行5s后,所有任务停止
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
}
executorService.shutdownNow();
System.out.println("main thread finished");
}
}
第6章:线程间通信Exchanger
第0节:札记
* Exchanger让两个线程可以互换信息。
* 每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。
* Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。
第1节:实例
package com.mcc.core.test.thread;
import com.mcc.core.concurrent.ExecutorServiceUtils;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Exchanger实例测试,实例以字符串接龙方式呈现
*
*
* @author <a href="mailto:417877417@qq.com">menergy</a>
* DateTime: 13-12-30 下午4:58
*/
public class ExchangerTest {
public static void main(String args[]){
//原子计数器1
final AtomicInteger productNum1 = new AtomicInteger(0);
//原子计数器2
final AtomicInteger productNum2 = new AtomicInteger(0);
//资源持有开关,假设只有一个资源
final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
// 初始化一个Exchanger,交换的信息以字符串呈现
final Exchanger<String> exchanger = new Exchanger<String>();
//交换的信息
final String[] info = {""};
ExecutorService executorService = ExecutorServiceUtils.getExecutor("test", 2);
//甲线程
executorService.execute(new Runnable() {
@Override
public void run() {
try {
while (true) {
if(!atomicBoolean.compareAndSet(false,true)){
// 处理消息
TimeUnit.MILLISECONDS.sleep(200);
productNum1.getAndIncrement();
System.out.println("甲" + productNum1.get() + "处理完成,等待乙");
//处理完,等待乙交换
info[0] = info[0] + "甲" + productNum1.get() + "--";
exchanger.exchange(info[0]);
System.out.println("甲" + productNum1.get() + "处理完成,交换给乙,信息:" + info[0]);
atomicBoolean.compareAndSet(true, false);
}else{
System.out.println("甲不处理,等待乙");
//处理完,等待乙交换
info[0] = exchanger.exchange(info[0]);
System.out.println("甲不处理,和乙交换,信息:" + info[0]);
}
}
} catch (InterruptedException ex) {
}
}
});
//乙线程
executorService.execute(new Runnable(){
@Override
public void run() {
try {
while (true) {
if(!atomicBoolean.compareAndSet(false,true)){
// 处理消息
TimeUnit.MILLISECONDS.sleep(500);
productNum2.getAndIncrement();
System.out.println("乙" + productNum2.get() + "处理完成,等待甲");
//处理完,等待乙交换
info[0] = info[0] + "乙" + productNum2.get() + "--";
exchanger.exchange(info[0]);
System.out.println("乙" + productNum2.get() + "处理完成,交换给甲,信息:" + info[0]);
atomicBoolean.compareAndSet(true, false);
}else{
System.out.println("乙不处理,等待甲");
//处理完,等待乙交换
info[0] = exchanger.exchange(info[0]);
System.out.println("乙不处理,和甲交换,信息:" + info[0]);
}
}
} catch (InterruptedException ex) {
}
}
});
// 程序运行10s后,所有任务停止
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
executorService.shutdownNow();
System.out.println("main thread finished");
}
}
第7章:线程间调用Join
第0节:札记
第1节:实例
package com.mcc.core.test.thread;
import com.mcc.core.concurrent.ExecutorServiceUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 一个线程等待另一个线程测试
*
*
* @author <a href="mailto:417877417@qq.com">menergy</a>
* DateTime: 13-12-31 下午1:09
*/
public class JoinTest {
/**
* A线程
*/
static class AThread implements Runnable{
@Override
public void run() {
try {
System.out.println(" A线程开始处理");
TimeUnit.MILLISECONDS.sleep(2000);
System.out.println(" A线程处理完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* B线程
*/
static class BThread implements Runnable{
private Thread aThread;
public BThread(Thread aThread) {
this.aThread = aThread;
}
@Override
public void run() {
try {
System.out.println("B线程开始处理");
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println("B线程处理到一半,启动A线程,等待A线程处理");
//启动A线程
aThread.start();
//等待A线程完成
aThread.join();
System.out.println("B线程继续处理另一半");
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println("B线程处理完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]){
Thread aThread = new Thread(new AThread());
ExecutorService executorService = ExecutorServiceUtils.getExecutor("test", 2);
//启动B线程
executorService.execute(new BThread(aThread));
}
}
第8章:读写锁ReadWriteLock
第0节:札记
* ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。
* 只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。
* 从理论上讲,与互斥锁相比,使用读-写锁所允许的并发性增强将带来更大的性能提高。
* 与互斥锁相比,使用读-写锁能否提升性能则取决于读写操作期间读取数据相对于修改数据的频率,
* 以及数据的争用——即在同一时间试图对该数据执行读取或写入操作的线程数。
* 此锁最多支持 65535 个递归写入锁和 65535 个读取锁。试图超出这些限制将导致锁方法抛出 Error。
第1节:实例
package com.mcc.core.test.thread;
import com.mcc.core.concurrent.ExecutorServiceUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* ReadWriteLock读写锁测试
*
*
* @author <a href="mailto:417877417@qq.com">menergy</a>
* DateTime: 13-12-31 上午11:26
*/
public class ReadWriteLockTest {
public static void main(String args[]){
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
final Lock readLock = readWriteLock.readLock();
final Lock writeLock = readWriteLock.writeLock();
ExecutorService executorServiceMain = ExecutorServiceUtils.getExecutor("Main", 10);
final ExecutorService executorService = ExecutorServiceUtils.getExecutor("test", 10);
executorServiceMain.execute(new Runnable() {
@Override
public void run() {
//10个读线程
for(int i = 0; i < 10; i++){
final int readId = i;
executorService.execute(new Runnable() {
@Override
public void run() {
readLock.lock();
try {
System.out.println("线程" + readId +"开始读");
// 读处理
TimeUnit.MILLISECONDS.sleep(300);
System.out.println("线程" + readId +"读完成");
}catch (InterruptedException e){
e.printStackTrace();
}finally {
readLock.unlock();
}
}
});
}
}
});
executorServiceMain.execute(new Runnable() {
@Override
public void run() {
//2个写线程
for(int i = 0; i < 2; i++){
//消费
final int writeId = i;
executorService.execute(new Runnable(){
@Override
public void run() {
writeLock.lock();
try {
System.out.println("线程" + writeId +"开始写");
// 读处理
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("线程" + writeId + "写完成");
}catch (InterruptedException e){
e.printStackTrace();
}finally {
writeLock.unlock();
}
}
});
}
}
});
}
}
第9章:信号量Semaphore
第0节:札记
* 获得一项前,每个线程必须从信号量获取许可,从而保证可以使用该项。
* 该线程结束后,将项返回到池中并将许可返回到该信号量,从而允许其他线程获取该项。
* 注意,调用 acquire() 时无法保持同步锁,因为这会阻止将项返回到池中。
* 信号量封装所需的同步,以限制对池的访问,这同维持该池本身一致性所需的同步是分开的。
第1节:实例
package com.mcc.core.test.thread;
import com.mcc.core.concurrent.ExecutorServiceUtils;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 信号量线程控制实例测试
*
* @author <a href="mailto:417877417@qq.com">menergy</a>
* DateTime: 13-12-30 下午3:47
*/
public class SemaphoreTest {
public static void main(String args[]){
//信号量,初始化2
final Semaphore semaphore = new Semaphore(2);
//同步锁
final Lock lock = new ReentrantLock();
//资源池
final ArrayList<String> resourcePool = new ArrayList<String>();
for (int i = 0; i < 8; i++) {
resourcePool.add("Resource " + i);
}
ExecutorService executorService = ExecutorServiceUtils.getExecutor("test", 5);
//启动5个线程执行5个任务
for (int i = 0; i < 5; i++) {
executorService.submit(new Runnable(){
@Override
public void run() {
try {
//获取通行证,只有得到通行证后才能得到资源
semaphore.acquire();
System.out.println("获取通行证,可用数量:" + semaphore.availablePermits());
//取走资源,需同步
lock.lock();
String resource = resourcePool.remove(0);
System.out.println("取走资源:" + resource + "资源池:" + resourcePool.toString());
lock.unlock();
//使用资源
System.out.println("使用资源:" + resource + "资源池:" + resourcePool.toString());
TimeUnit.MILLISECONDS.sleep(2000);
//归还资源,需同步
lock.lock();
resourcePool.add(resource);
System.out.println("归还资源:" + resource + "资源池:" + resourcePool.toString());
lock.unlock();
//释放许可证,可以给其它线程使用
semaphore.release();
System.out.println("释放通行证,可用数量:" + semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
}
}
java多线程(2):并发编程实践
原文:http://blog.csdn.net/menergy/article/details/19114531