首页 > 编程语言 > 详细

深入理解Java之多线程

时间:2016-03-28 08:45:28      阅读:164      评论:0      收藏:0      [点我收藏+]

一、为什么使用多线程

1. 并发与并行

    我们知道,在单核机器上,“多进程”并不是真正的多个进程在同时执行,而是通过CPU时间分片,操作系统快速在进程间切换而模拟出来的多进程。我们通常把这种情况成为并发,也就是多个进程的运行行为是“一并发生”的,但不是同时执行的,因为CPU核数的限制(PC和通用寄存器只有一套,严格来说在同一时刻只能存在一个进程的上下文)。
    现在,我们使用的计算机基本上都搭载了多核CPU,这时,我们能真正的实现多个进程并行执行,这种情况叫做并行,因为多个进程是真正“一并执行”的(具体多少个进程可以并行执行取决于CPU核数)。综合以上,我们知道,并发是一个比并行更加宽泛的概念。也就是说,在单核情况下,并发只是并发;而在多核的情况下,并发就变味了并行。下文中我们将统一用并发来指代这一概念。

2. 阻塞与非阻塞

    UNIX系统内核提供了一个名为read的函数,用来读取文件的内容:
typedef ssize_t int;
typedef size_t unsigned;

ssize_t read(int fd, void *buf, size_t n);

 

    这个函数从描述符为fd的当前文件位置复制至多n个字节到内存缓冲区buf。若执行成功则返回读取到的字节数;若失败则返回-1。read系统调用默认会阻塞,也就是说系统会一直等待这个函数执行完毕直到它产生一个返回值。然而我们知道,磁盘通常是一种慢速I/O设备,这意味着我们用read函数读取磁盘文件内容时,往往需要比较长的时间(相对于访问内存或者计算一些数值来说)。那么阻塞的时候我们当然不想让系统傻等着,我们想在这期间做点儿别的事情,等着磁盘准备好了通知我们一下,我们再来读取文件内容。实际上,操作系统正是这样做的。当阻塞在read这类系统调用中的时候,操作系统通常都会让该进程暂时休眠,调度一个别的进程来执行,以免干等着浪费时间,等到磁盘准备好了可以让我们来进行I/O了,它会发送一个中断信号通知操作系统,这时候操作系统重新调度原来的进程来继续执行read函数。这就是通过多进程实现的并发。
 

3. 多进程 vs 多线程

    进程就是一个执行中的程序实例,而线程可以看作一个进程的最小执行单元。线程与进程间的一个显著区别在于每个进程都有一整套变量,而同一个进程间的多个线程共享该进程的数据。多进程实现的并发通常在进程创建以及数据共享等方面的开销要比多线程更大,线程的实现通常更加轻量,相应的开销也就更小,因此在一般客户端开发场景下,我们更加倾向于使用多线程来实现并发。
    然而,有时候,多线程共享数据的便捷容易可能会成为一个让我们头疼的问题,我们在后文中会具体提到常见的问题及相应的解决方案。在上面的read函数的例子中,如果我们使用多线程,可以使用一个主线程去进行I/O的工作,再用一个或几个工作线程去执行一些轻量计算任务,这样当主线程阻塞时,线程调度程序会调度我们的工作线程来执行计算任务,从而更加充分的利用CPU时间片。而且,在多核机器上,我们的多个线程可以并行执行在多个核上,进一步提升效率。
 

 

二、如何使用多线程

1. 线程执行模型

    每个进程刚被创建时都只含有一个线程,这个线程通常被称作主线程(main thread)。而后随着进程的执行,若遇到创建新线程的代码,就会创建出新线程,而后随着新线程被启动,多个线程就会并发地运行。某时刻,主线程阻塞在一个慢速系统调用中(比如前面提到的read函数),这时线程调度程序会让主线程暂时休眠, 调度另一个线程来作为当前运行的线程。每个线程也有自己的一套变量,但相比于进程来说要少得多,因此线程切换的开销更小。
 

2. 创建一个新线程

    在Java中,有两种方法可以创建一个新线程。第一种方法是定义一个实现Runnable接口的类并实例化,然后将这个对象传入Thread的构造器来创建一个新线程,如以下代码所示:
class MyRunnable implements Runnable {
     ...
    public void run() {
         //这里是新线程需要执行的任务
    }
}
 
Runnable r = new MyRunnable();
Thread t = new Thread(r);

 

 

    第二种创建一个新线程的方法是直接定义一个Thread的子类并实例化,从而创建一个新线程。比如以下代码:
class MyThread extends Thread {
    public void run() {
        //这里是线程要执行的任务
    }
}

 

    创建了一个线程对象后,我们直接对其调用start方法即可启动这个线程:
t.start();

 


3. 线程的属性

(1)线程的状态

    线程在它的声明周期中可能处于以下几种状态之一:
  • New(新生):线程对象刚刚被创建出来;
  • Runnable(可运行):在线程对象上调用start方法后,相应线程便会进入Runnable状态,若被线程调度程序调度,这个线程便会成为当前运行(Running)的线程;
  • Blocked(被阻塞):若一段代码被线程A”上锁“,此时线程B尝试执行这段代码,线程B就会进入Blocked状态;
  • Waiting(等待):当线程等待另一个线程通知线程调度器一个条件时,它本身就会进入Waiting状态;
  • Time Waiting(计时等待):计时等待与等待的区别是,线程只等待一定的时间,若超时则不再等待;
  • Terminated(被终止):线程的run方法执行完毕或者由于一个未捕获的异常导致run方法意外终止会进入Terminated状态。

(2)线程的优先级

    在Java中,每个线程都有一个优先级,默认情况下,线程会继承它的父线程的优先级。可以用setPriority方法来改变线程的优先级。Java中定义了三个描述线程优先级的常量:MAX_PRIORITY、NORM_PRIORITY、MIN_PRIORITY。
    每当线程调度器要调度一个新的线程时,它会首先选择优先级较高的线程。然而线程优先级是高度依赖与操作系统的,在有些系统的Java虚拟机中,甚至会忽略线程的优先级。因此我们不应该将程序逻辑的正确性依赖于优先级。线程优先级相关的API如下:
void setPriority(int newPriority) //设置线程的优先级,可以使用系统提供的三个优先级常量
static void yield() //使当前线程处于让步状态,这样当存在其他优先级大于等于本线程的线程时,线程调度程序会调用那个线程

 

4. 同步

(1)race condition

    当两个或两个以上的线程同时修改同一数据对象时,可能会产生不正确的结果,我们称这个时候存在一个竞争条件(race condition)。在多线程程序中,我们必须要充分考虑到多个线程同时访问一个数据时可能出现的各种情况,确保对数据进行同步存取,以防止错误结果的产生。请考虑以下代码:
public class Counter {
    private long count = 0;
    public void add(long value) {
        this.count = this.count + value;  
    }
}

 

    我们注意一下改变count值的那一行,通常这个操作不是一步完成的,它大概分为以下三步:
  • 第一步,把count的值加载到寄存器中;
  • 第二步,把相应寄存器的值加上value的值;
  • 第三步,把寄存器的值写回count变量。
    我们可以编译以上代码然后用javap查看下编译器为我们生成的字节码:
技术分享

    我们可以看到,大致过程和我们以上描述的基本一样。那么我们考虑下面这样一个场景:假设count的初值为0,首先线程A加载了count到寄存器中,并且加上了1,而就当它要写回之前,线程B进入了add方法,它加载了count到寄存器中(由于此时线程A还没有把count写回,因此count还是0),并加上了2,然后线程B写回了count。在线程B完成了写回后,线程调度程序调度了线程A,线程A也写回了count。注意,此时count的值为1而不是我们希望的三。我们不希望一个线程在执行add方法时被其他线程打断,因为这会造成数据的破坏。我们希望的情况是这样的:线程A完整执行完毕add方法后,待count变量的值更新为1时,线程B开始执行add方法,在线程B完整执行完毕之前, 没有别的线程能够打断它,若有别的线程想调用add,也得等线程B执行完毕写回count值后。
    像add这种方法代码所在的内存区,我们称之为临界区(critical area)。对于临界区,在同一时刻我们只希望有一个线程能够访问它,我们希望在一个线程进入临界区后把通往这个区的门“上锁”,离开后把门"解锁“,这样当一个线程执行临界区的代码时其他想要进来的线程只能在门外等着,这样可以保证了多个线程共享的数据不会被破坏。下面我们来介绍下为临界区“上锁”的方法。

(2)锁对象

    Java类库中为我们提供了能够给临界区“上锁”的ReentrantLock类。使用方法很简单,下面的代码即完成了给add方法“上锁”:
Lock myLock = new ReentrantLock();
public void add(long value) {
    myLock.lock();
    try {
        this.count = this.count + value;
    } finally {
        myLock.unlock();
    }
}

 


    ReentrantLock类实现了Lock接口,使用它来上锁时只需要先获取一个它的实例。然后通过lock方法进行上锁,通过unlock方法进行解锁。注意,我们使用了一个try-finally块,以确保即使发生异常也总是会解锁,不然其他线程会一直无法执行add方法。当一个线程执行完“myLock.lock()”时,它就获得了一个锁对象,这就相当于它给临界区上了锁,其他线程都无法进来,只有这个线程执行完“myLock.unlock()"时,释放了锁对象,其他线程才能再通过“myLock.lock()"获得锁对象,从而进入临界区。也就是说,当一个线程获取了锁对象后,其他尝试获取锁对象的线程都会被阻塞,进入Blocked状态,直至获取锁对象的线程释放了锁对象。
    有了锁对象,尽管线程A在执行add方法的过程中被线程调度程序剥夺了运行权,其他的线程也进入不了临界区,因为线程A还在持有锁对象。这样一来,我们就很好的保护了临界区。还有一点我们要知道的是,每个Counter类对象都有一个锁对象,它每次只能把这个锁对象给一个线程,两个线程可以同时获取不同Counter对象的锁。
    锁是可重入的,这意味着线程可以重复获得已经持有的锁,每个锁对象内部都持有一个计数,每当线程获取依次锁对象,这个计数就加1,释放一次就减1。只有当计数值变为0时,才意味着这个线程释放了锁对象,其他线程才可以来获取。

(3)条件对象

    有些时候,线程进入临界区后不能立即执行,它需要等某一条件满足后才开始执行。比如,我们希望count值大于5的时候才增加它的值,我们最先想到的是加个条件判断:
public void add(int value) {    
    if (this.count > 5) {
        this.count = this.count + value;
    }
}

 

    然而上面的代码存在一个问题。假设线程A执行完了条件判断并的值count值大于5,而在此时该线程被线程调度程序中断执行,转而调度线程B,线程B对统一counter对象的count值进行了修改,使得它不再大于5,这时线程调度程序又来调度线程A,线程A刚才判定了条件为真,所以会执行add方法,尽管此时count值已不再大于5。显然,这与我们所希望的情况的不符的。对于这种问题,我们想到了可以再条件判断前后加锁与解锁:
public void add(int value) {
    myLock.lock();
    try {
        while (counter.getCount() <= 5) {
            //等待直到大于5
        }
        this.count = this.count + value;
    } finally {
        myLock.unlock();
    }
}

 

   在以上代码中,若线程A发现count值小于等于5,它会一直等到别的线程增加它的值直到它大于5。然而线程A此时持有锁对象,其他线程无法进入临界区来改变count的值,所以当线程A进入临界区时若count小于等于5,线程A会一直在循环中等待,其他的线程也无法进入临界区。这种情况下,我们可以使用条件对象来管理那些已经获得了一个锁却不能开始干活的线程。一个锁对象可以有一个或多个相关的条件对象,在锁对象上调用newCondition方法就可以获得一个条件对象。比如我们可以为“count值大于5”获得一个条件对象:
Condition enoughCount = myLock.newCondition();

 

    然后,线程A发现count值不够时,调用“enoughCount.await()”即可,这时它便会进入Waiting状态,放弃它持有的锁对象,以便其他线程能够进入临界区。当线程B进入临界区修改了count值后,发现了count值大于5,线程B可通过"enoughCount.signalAll()"来“唤醒所有等待这一条件满足的线程(这里只有线程A)。此时线程A会从Waiting状态进入Runnable状态。当线程A再次被调度时,它便会从await方法返回,重新获得锁并接着刚才继续执行。注意,此时线程A会再次测试条件是否满足,若满足则执行相应操作。也就是说signalAll方法仅仅是通知线程A一声count的值可能大于5了,应该再测试一下。还有一个signal方法,会随机唤醒一个正在等待某条件的线程,这个方法的风险在于若随机唤醒的线程测试条件后发现仍然不满足,它还是会再次进入Waiting状态,若以后不再有线程唤醒它,它便不能再运行了。

(4)synchronized关键字

    Java中的每个对象都有一个内部锁,若一个实例方法用synchronized关键字修饰,那么这个对象的内部锁会“保护”此方法,我们称此方法为同步方法。这意味着只有获取了该对象内部锁的线程才能够执行此方法。也就是说,以下的代码:
public synchronized void add(int value) {
    ...
}

 

等价于:
public void add(int value) {
    this.<em>innerLock</em>.lock();
    try {
        ...
    } finally {
        this.<em>innerLock</em>.unlock();
    }
}

 

    这意味着,我们通过给add方法加上synchronized关键字即可保护它,加锁解锁的工作不需要我们再手动完成。对象的内部锁在同一时刻只能由一个线程持有,其他尝试获取的线程都会被阻塞直至该线程释放锁。
    内部锁对象只有一个相关条件。wait方法添加一个线程到这个条件的等待集中;notifyAll / notify方法会唤醒等待集中的线程。也就是说wait() / notify()等价于enoughCount.await() / enoughCount.signAll()。以上add方法我们可以这么实现:
public synchronized void add(int value) {
    while (this.count <= 5) {
        wait(); 
    }
    this.count += value;
    notifyAll();
}

 

    这份代码显然比我们上面的实现要简洁得多,实际开发中也更加常用。
    
    我们也可以用synchronized关键字修饰静态方法,这样的话,进入该方法的线程或获取相关类的Class对象的内部锁。例如,若Counter中含有一个synchronized关键字修饰的静态方法,那么进入该方法的线程会获得Bank.class的内部锁。这意味着其他任何线程不能执行Counter类的任何同步静态方法。
   
     对象内部锁存在一些局限性:
  • 不能中断一个正在试图获取锁的线程;
  • 试图获取锁时不能设定超时;
  • 每个锁仅有一个相关条件;
     那么我们究竟应该使用Lock/Condition还是synchronized关键字呢?答案是能不用尽量都不用,我们应尽可能使用java.util.concurrent包中提供给我们的相应机制(后面会介绍)。非得使用的情况下应优先考虑synchronized关键字,因为它的简洁性能减少我们出错的可能。只有我们需要Lock/Condition的特性时,才应该考虑使用它(比如多个条件还有计时等待版本的await函数)。
 

(5)同步阻塞

    上面我们提到了一个线程调用synchronized方法可以获得对象的内部锁(前提是还未被其他线程获取),获得对象内部锁的另一种方法就是通过同步阻塞:
synchronized (obj) {
    //临界区
}

 

    一个线程执行上面的代码块便可以获取obj对象的内部锁,直至它离开这个代码块才会释放锁。
    我们经常会看到一种特殊的锁,如下所示:
public class Counter {
    private Object lock = new Object();

    synchronized (lock) {
        //临界区
    }
    ...
}

 

    那么这种使用这种锁有什么好处呢?我们知道Counter对象只有一个内部锁,这个内部锁在同一时刻只能被一个对象持有,那么设想Counter对象中定义了两个synchronized方法。在某一时刻,线程A进入了其中一个synchronized方法并获取了内部锁,此时线程B尝试进去另一个synchronized方法时由于对象内部锁还没有被线程A释放,因此线程B只能被阻塞。然而我们的两个synchronized方法是两个不同的临界区,它们不会相互影响,所以它们可以在同一时刻被不同的线程所执行。这时我们就可以使用如上面所示的显式的锁对象,它允许不同的方法同步在不同的锁上。

(6)volatile域

    有时候,仅仅为了同步一两个实例域就使用synchronized关键字或是Lock/Condition,会造成很多不必要的开销。这时候我们可以使用volatile关键字,使用volatile关键字修饰一个实例域会告诉编译器和虚拟机这个域可能会被多线程并发访问,这样编译器和虚拟机就能确保它的值总是我们所期望的。
    volatile关键字的实现原理大致是这样的:我们在访问内存中的变量时,通常都会把它缓存在寄存器中,以后再需要读它的值时,只需从相应寄存器中读取,若要对该变量进行写操作,则直接写相应寄存器,最后写回该变量所在的内存单元。若线程A把count变量的值缓存在寄存器中,并将count加2(将相应寄存器的值加2),这时线程B被调度,它读取count变量加2后并写回。然后线程A又被调度,它会接着刚才的操作,也就是会把count值写回,此时线程A是直接把寄存器中的值写回count所在单元,而这个值是过期的。若count被volatile关键字修饰,这个问题便可被圆满解决。volatile变量有一个性质,就是任何时候读取它的值时,都会直接去相应内存单元读取,而不是读取缓存在寄存器中的值。这样一来,在上面那个场景中,线程A把count写回时,会从内存中读取count最新的值,从而确保了count的值总是我们所期望的。

(7)死锁

    假设现在进程中只有线程A和线程B这两个线程,考虑下面这样一种情形:

    线程A获取了counterA对象的内部锁,线程B获取了counterB对象的内部锁。而线程A只有在获取counterB的内部锁后才能继续执行,线程B只有在获取线程A的内部锁后才能继续执行。这样一来,两个线程在互相等待对方释放锁从而谁也没法继续执行,这种现象就叫做死锁(deadlock)。

    除了以上情况,还有一种类似的死锁情况是两个线程获取锁后都不满足条件从而进入条件的等待集中,相互等待对方唤醒自己。

    Java没有为解决死锁提供内在机制,因此我们只有在开发时格外小心,以避免死锁的发生。

 

(8)中断线程

    介绍中断线程之前,我们先来介绍一下join方法,它是一个实例方法。我们可以在主线程中对某个子线程调用join方法,这样主线程就会阻塞直到那个子线程执行完毕后主线程才跟着它后头继续运行(所以叫join)。那么,有时候子线程会运行很长时间,这时候它可能想要告诉主线程一声“我还得好久,你不用等我了”。这种情况下,我们可以通过中断线程来实现。在一个线程上调用interrupt方法即可中断它,实际上这个方法通过修改相应线程的中断状态来告诉它“你被中断了”。每个线程都有一个被称为中断状态的boolean类型的标识。对于不在阻塞状态的线程,对其调用这个方法仅仅会修改它的中断状态;而对于通过调用可中断的阻塞方法(sleep、join、wait、await等等)而被阻塞的线程,当其收到“中断信号“时,会先把中断状态设为true,而后抛出一个InterruptedException异常,同时把中断状态设回false。这样一来我们就可以捕获InterruptedException并进行相应处理了,比如以上我们举的主线程join子线程的例子中,我们就可以在主线程的InterruptedException处理器中接着做之前的工作。
    若InterruptedException异常未被捕获,相应的线程会被终止,进入Terminated状态。以下代码即可获取当前线程的中断状态:

boolean isInterrupted =Thread.currentThread().isInterrupted();

 

   

(9)锁测试与超时

    lockObject.lock方法默认是阻塞的,也就是说当锁未被释放时,申请获取锁的线程会一直阻塞,知道得到一个锁。tryLock方法也用来获取锁,不过当它发现锁被占用时会立刻返回false,若此时锁未被占用会获取锁并返回true。线程便可以去做其他工作。比如以下代码:

if (myLock.tryLock()) {
    try {
        …
    } finally {
        myLock.unlock();
    }
} else {
    //做其他的工作
}

 

    tryLock方法还有一个计时等待版本,可以指定最长等待多长时间,如以下代码最多等待100毫秒,若还无法获取锁即返回:

if (myLock.tryLock(100,TimeUnit.MILLISECONDS)) …

 

 

    除了MILLISECONDS,我们还可以指定不同的时间单位:SECONDS、MICROSECONDS、NANOSECONDS。

    tryLock方法与lock方法还有一个显著的差异,那就是lock方法不能被中断,而tryLock方法可以被中断。

 

    await方法也有计时等待的版本,比如以下代码: 

enougnCountCondition.await(100, TimeUnit.MILLISECONDS);

 

 

(10)读/写锁

    若很多线程从一个内存区域读取数据,但其中只有极少的一部分线程会对其中的数据进行修改,此时我们希望所有Reader线程共享数据,而所有Writer线程对数据的访问要互斥。我们可以使用读/写锁来达到这一目的。
    Java中的读/写锁对应着ReentrantReadWriteLock类,它的使用步骤通常如下所示:
//构造一个ReentrantReadWriteLock对象
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

//分别从中“提取”读锁和写锁
private Lock readLock = rwl.readLock();
private Lock writeLock = rwl.writeLock();

//对所有的Reader线程加读锁
readLock.lock();
try {
    //读操作可并发,但写操作会互斥
} finally {
    readLock.unlock();
}

//对所有的Writer线程加写锁
writeLock.lock();
try {
    //排斥所有其他线程的读和写操作
} finally {
    writeLock.unlock();
}

 

5. 阻塞队列

    以上我们所介绍的都属于Java并发机制的底层基础设施。在实际编程我们应该尽量避免使用以上介绍的较为底层的机制,而使用Java类库中提供给我们封装好的较高层次的抽象。对于许多同步问题,我们可以通过使用一个或多个队列来解决:生产者线程向队列中插入元素,消费者线程则取出他们。考虑一下我们最开始提到的Counter类,我们可以通过队列来这样解决它的同步问题:增加计数值的线程不能直接访问Counter对象,而是把add指令对象插入到队列中,然后由另一个可访问Counter对象的线程从队列中取出add指令对象并执行add操作(只有这个线程能访问Counter对象,因此无需采取额外措施来同步)。
    当试图向满队列中添加元素或者向空队列中移除元素时,阻塞队列(blocking queue)会导致线程阻塞。通过阻塞队列,我们可以按以下模式来工作:工作者线程可以周期性的将中间结果放入阻塞队列中,其他线程可取出中间结果并进行进一步操作。若前者工作的比较慢(还没来得及向队列中插入元素),后者会等待它(试图从空队列中取元素从而阻塞);若前者运行的快(试图向满队列中插元素),它会等待其他线程。阻塞队列提供了以下方法:
  • add方法:添加一个元素。若队列已满,会抛出IllegalStateException异常。
  • element方法:返回队列的头元素。若队列为空,会抛出NoSuchElementException异常。
  • offer方法:添加一个元素,若成功则返回true。若队列已满,则返回false。
  • peek方法:返回队列的头元素。若队列为空,则返回null。
  • poll方法:删除并返回队列的头元素。若队列为空,则返回null。
  • put方法:添加一个元素。若队列已满,则阻塞。
  • remove方法:移除并返回头元素。若队列为空,会抛出NoSuchElementException。
  • take方法:移除并返回头元素。若队列为空,则阻塞。
    java.util.concurrent包提供了阻塞队列的几个变种:
  • LinkedBlockingQueue是一个基于链表实现的阻塞队列。默认容量没有上限,但也有可以指定最大容量的构造方法。它有的“双端队列版本”为LinkedBlockingDeque。
  • ArrayBlockingQueue是一个基于数组实现的阻塞队列,它在构造时需要指定容量。它还有一个构造方法可以指定一个公平性参数,若这个参数为true,那么等待了最长时间的线程会得到优先处理(指定公平性参数会降低性能)。
  • PriorityBlockingQueue是一个基于堆实现的带优先级的阻塞队列。元素会按照它们的优先级被移除队列。
    下面我们来看一个使用阻塞队列的示例:
public class BlockingQueueTest {
    private int size = 20;
    private ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(size);
     
    public static void main(String[] args)  {
        BlockingQueueTest test = new BlockingQueueTest();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
         
        producer.start();
        consumer.start();
    }
     
    class Consumer extends Thread{
        @Override
        public void run() {
             while(true){
                try {
                    //从阻塞队列中取出一个元素
                    queue.take();
                    System.out.println("队列剩余" + queue.size() + "个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
     
    class Producer extends Thread{         
        @Override
        public void run() {
            while (true) {
                try {
                    //向阻塞队列中插入一个元素
                    queue.put(1);
                    System.out.println("队列剩余空间:" + (size - queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 


    在以上代码中,我们有一个生产者线程不断地向一个阻塞队列中插入元素,同时消费者线程从这个队列中取出元素。若生产者生产的比较快,消费者取的比较慢导致队列满,此时生产者再尝试插入时就会阻塞在put方法中,直到消费者取出一个元素;反过来,若消费者消费的比较快,生产者生产的比较慢导致队列空,此时消费者尝试从中取出时就会阻塞在take方法中,直到生产者插入一个元素。
 

 

6. 执行器

    创建一个新线程涉及和操作系统的交互,因此会产生一定的开销。在有些应用场景下,我们会在程序中创建大量生命周期很短的线程,这时我们应该使用线程池(thread pool)。通常,一个线程池中包含一些准备运行的空闲线程,每次将Runnable对象交给线程池,就会有一个线程执行run方法。当run方法执行完毕时,线程不会进入Terminated
状态,而是在线程池中准备等下一次Runnable到来时提供服务。使用线程池统一管理线程可以减少并发线程的数目,线程数过多往往会在线程上下文切换上以及同步操作上浪费过多时间。
    执行器类(java.util.concurrent.Executors)提供了许多静态工厂方法来构建线程池。
 
(1)线程池
    以下三个方法会返回一个实现了ExecutorService接口的ThreadPoolExecutor类的对象:
ExecutorService newCachedThreadPool() //返回一个带缓存的线程池,该池在必要的时候创建线程,在线程空闲60s后终止线程
ExecutorService newFixedThreadPool(int threads) //返回一个线程池,线程数目由threads参数指明
ExecutorService newSingleThreadExecutor() //返回只含一个线程的线程池,它在一个单一的线程中依次执行各个任务
    对于newCachedThreadPool方法返回的线程池:对每个任务,若有空闲线程可用,则立即让它执行任务;若没有可用的空闲线程,它就会创建一个新线程并加入线程池中。
    newFixed方法返回的线程池里的线程数目由创建时指定,并一直保持不变。若提交给它的任务多于线程池中的空闲线程数目,那么就会把任务放到队列中,当其他任务执行完毕后再来执行它们。
    newSingleThreadExecutor会返回一个大小为1的线程池,由一个线程执行提交的任务。
    
   以下方法可将一个Runnable对象或Callable对象提交给线程池:
Future<T> submit(Callable<T> task)
Future<T> submit(Runnable task, T result)
Future<?> submit(Runnable task)
    调用submit方法会返回一个Future对象,可通过这个对象查询该任务的状态。我们可以在这个Future对象上调用isDone、cancle、isCanceled等方法(Future接口会在下面进行介绍)。第一个submit方法提交一个Callable对象到线程池中;第二个方法提交一个Runnable对象,并且Future的get方法在完成的时候返回指定的result对象。
    当我们使用完线程池时,就调用shutdown方法,该方法会启动该线程池的关闭例程。被关闭的线程池不能再接受新的任务,当关闭前已存在的任务执行完毕后,线程池死亡。shutdownNow方法可以取消线程池中尚未开始的任务并尝试中断所有线程池中正在运行的线程。
    在使用线程池时,我们通常应该按照以下步骤来进行:
  • 调用Executors中相关方法构建一个线程池;
  • 调用submit方法提交一个Runnable对象或Callable对象到线程池中;
  • 若想要取消一个任务,需要保存submit返回的Future对象;
  • 当不再提交任何任务时,调用shutdown方法。
    
(2)预定执行
    ScheduledExecutorService接口含有为预定执行(Scheduled Execution)或重复执行的任务专门设计的方法。Executors类newScheduledThreadPool和newSingleThreadScheduledExecutor方法会返回实现了ScheduledExecutorService接口的对象。可以使用以下方法来预定执行的任务:
ScheduledFuture<V> schedule(Callable<V> task, long time, TimeUnit unit)
ScheduledFuture<?> schedule(Runnable task, long time, TimeUnit unit)
//以上两个方法预定在指定时间过后执行任务
SchedukedFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) //在指定的延迟(initialDelay)过后,周期性地执行给定任务
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) //在指定延迟(initialDelay)过后周期性的执行任务,每两个任务间的间隔为delay指定的时间

 

 
(3)控制任务组
    对ExecutorService对象调用invokeAny方法可以把一个Callable对象集合提交到相应的线程池中执行,并返回某个已经完成的任务的结果,该方法的定义如下:
T invokeAny(Collection<Callable<T>> tasks)
T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)

 

    该方法可以指定一个超时参数。这个方法的不足再于我们无法知道它返回的结果是哪个任务执行的结果。如果集合中的任意Callable对象的执行结果都能满足我们的需求的话,使用invokeAny方法是很好的。
    invokeAll方法也会提交Callable对象集合到相应的线程池中,并返回一个Future对象列表,代表所有任务的解决方案。该方法的定义如下:
List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)

 

 

7. Callable与Future

    我们之前提到了创建线程的两种方式,它们有一个共同的缺点,那就是异步方法run没有返回值,也就是说我们无法直接获取它的执行结果,只能通过共享变量或者线程间通信等方式来获取。好消息是通过使用Callable和Future,我们可以方便的获得线程的执行结果。
    Callable接口与Runnable接口类似,区别在于它定义的异步方法call有返回值。Callable接口的定义如下:
public interface Callable<V> {
    V call() throws Exception;
}

 

    类型参数V即为异步方法call的返回值类型。
 
    Future可以对具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成以及获取结果。可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。Future接口的定义如下:
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

 

    在Future接口中声明了5个方法,每个方法的作用如下:
  • cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false(即如果取消已经完成的任务会返回false);如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
  • isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  • isDone方法表示任务是否已经完成,若任务完成,则返回true;
  • get()方法用来获取执行结果,这个方法会阻塞,一直等到任务执行完才返回;
  • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

  也就是说Future提供了三种功能:

  1. 判断任务是否完成;
  2. 能够中断任务;
  3. 能够获取任务执行结果。

     Future接口的实现类是FutureTask:

public class FutureTask<V> implements RunnableFuture<V>

 

    FutureTask类实现了RunnableFuture接口,这个接口的定义如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
    可以看到RunnableFuture接口扩展了Runnable接口和Future接口。

 

    FutureTask类有如下两个构造器:

public FutureTask(Callable<V> callable) 
public FutureTask(Runnable runnable, V result) 

 

8. 同步器

    java.util.concurrent包提供了几个帮助我们管理相互合作的线程集的类,这些类的主要功能和适用场景如下:

  • CyclicBarrier:它允许线程集等待直至其中预定数目的线程到达一个公共障栅(barrier),然后可以选择执行一个处理障栅的动作。适用场景:当大量的线程需要在它们的结果可用之前完成时,我们可以考虑使用此类。
  • CountDownLatch:允许线程集等待直到计数器减为0。适用场景:当一个或多个线程需要等待直到指定数目的事件发生。
  • Exchanger:允许两个线程在要交换的对象准备好时交换对象。适用场景:当两个线程工作在统一数据结构的两个实例上时,一个向示例中添加数据,另一个从实例中移除数据。
  • Semaphore:允许线程集等待知道被允许继续运行为止。适用场景:限制访问资源的线程总数。若许可数为1,通常阻塞线程知道另一个线程给出许可为止。
  • SynchronousQueue:允许一个线程把对象交给另一个线程。适用场景:在没有显式同步的情况下,当两个线程准备好将一个对象从一个线程传递到另一个线程。

 

三、参考资料

  1.  Java并发编程:Callable、Future和FutureTask
  2. 《Java核心技术(卷一)》
    

深入理解Java之多线程

原文:http://www.cnblogs.com/absfree/p/5327678.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!