转载自https://blog.csdn.net/defonds/article/details/44021605#t8
本指南根据 Jakob Jenkov 最新博客翻译,请随时关注博客更新:http://tutorials.jenkov.com/java-util-concurrent/index.html。
本指南已做成中英文对照阅读版的 pdf 文档,有兴趣的朋友可以去 Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf[带书签] 进行下载。
本文很大程度上还是个 "半成品",所以当你发现一些被漏掉的类或接口时,请耐心等待。在我空闲的时候会把它们加进来的。
抛异常 | 特定值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
移除 | remove(o) | poll(o) | take(o) | poll(timeout, timeunit) |
检查 | element(o) | peek(o) |
- public class BlockingQueueExample {
-
- public static void main(String[] args) throws Exception {
-
- BlockingQueue queue = new ArrayBlockingQueue(1024);
-
- Producer producer = new Producer(queue);
- Consumer consumer = new Consumer(queue);
-
- new Thread(producer).start();
- new Thread(consumer).start();
-
- Thread.sleep(4000);
- }
- }
- public class Producer implements Runnable{
-
- protected BlockingQueue queue = null;
-
- public Producer(BlockingQueue queue) {
- this.queue = queue;
- }
-
- public void run() {
- try {
- queue.put("1");
- Thread.sleep(1000);
- queue.put("2");
- Thread.sleep(1000);
- queue.put("3");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- public class Consumer implements Runnable{
-
- protected BlockingQueue queue = null;
-
- public Consumer(BlockingQueue queue) {
- this.queue = queue;
- }
-
- public void run() {
- try {
- System.out.println(queue.take());
- System.out.println(queue.take());
- System.out.println(queue.take());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- BlockingQueue queue = new ArrayBlockingQueue(1024);
-
- queue.put("1");
-
- Object object = queue.take();
- BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
-
- queue.put("1");
-
- String string = queue.take();
- public interface Delayed extends Comparable<Delayed< {
-
- public long getDelay(TimeUnit timeUnit);
-
- }
- DAYS
- HOURS
- MINUTES
- SECONDS
- MILLISECONDS
- MICROSECONDS
- NANOSECONDS
- public class DelayQueueExample {
-
- public static void main(String[] args) {
- DelayQueue queue = new DelayQueue();
-
- Delayed element1 = new DelayedElement();
-
- queue.put(element1);
-
- Delayed element2 = queue.take();
- }
- }
DelayedElement 是我所创建的一个 DelayedElement 接口的实现类,它不在 java.util.concurrent 包里。你需要自行创建你自己的 Delayed 接口的实现以使用 DelayQueue 类。
- BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
- BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024);
-
- bounded.put("Value");
-
- String value = bounded.take();
- BlockingQueue queue = new PriorityBlockingQueue();
-
- //String implements java.lang.Comparable
- queue.put("Value");
-
- String value = queue.take();
据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。
抛异常 | 特定值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | addFirst(o) | offerFirst(o) | putFirst(o) | offerFirst(o, timeout, timeunit) |
移除 | removeFirst(o) | pollFirst(o) | takeFirst(o) | pollFirst(timeout, timeunit) |
检查 | getFirst(o) | peekFirst(o) |
抛异常 | 特定值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | addLast(o) | offerLast(o) | putLast(o) | offerLast(o, timeout, timeunit) |
移除 | removeLast(o) | pollLast(o) | takeLast(o) | pollLast(timeout, timeunit) |
检查 | getLast(o) | peekLast(o) |
BlockingQueue | BlockingDeque |
---|---|
add() | addLast() |
offer() x 2 | offerLast() x 2 |
put() | putLast() |
remove() | removeFirst() |
poll() x 2 | pollFirst() |
take() | takeFirst() |
element() | getFirst() |
peek() | peekFirst() |
- BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
-
- deque.addFirst("1");
- deque.addLast("2");
-
- String two = deque.takeLast();
- String one = deque.takeFirst();
- BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
-
- deque.addFirst("1");
- deque.addLast("2");
-
- String two = deque.takeLast();
- String one = deque.takeFirst();
- ConcurrentMap concurrentMap = new ConcurrentHashMap();
-
- concurrentMap.put("key", "value");
-
- Object value = concurrentMap.get("key");
NavigableMap 中的方法不再赘述,本小节我们来看一下 ConcurrentNavigableMap 添加的方法。
- ConcurrentNavigableMap map = new ConcurrentSkipListMap();
-
- map.put("1", "one");
- map.put("2", "two");
- map.put("3", "three");
-
- ConcurrentNavigableMap headMap = map.headMap("2");
- ConcurrentNavigableMap map = new ConcurrentSkipListMap();
-
- map.put("1", "one");
- map.put("2", "two");
- map.put("3", "three");
-
- ConcurrentNavigableMap tailMap = map.tailMap("2");
- ConcurrentNavigableMap map = new ConcurrentSkipListMap();
-
- map.put("1", "one");
- map.put("2", "two");
- map.put("3", "three");
-
- ConcurrentNavigableMap subMap = map.subMap("2", "3");
关于这些方法更多信息参考官方 Java 文档。
- CountDownLatch latch = new CountDownLatch(3);
-
- Waiter waiter = new Waiter(latch);
- Decrementer decrementer = new Decrementer(latch);
-
- new Thread(waiter) .start();
- new Thread(decrementer).start();
-
- Thread.sleep(4000);
-
- public class Waiter implements Runnable{
-
- CountDownLatch latch = null;
-
- public Waiter(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public void run() {
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- System.out.println("Waiter Released");
- }
- }
-
- public class Decrementer implements Runnable {
-
- CountDownLatch latch = null;
-
- public Decrementer(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public void run() {
-
- try {
- Thread.sleep(1000);
- this.latch.countDown();
-
- Thread.sleep(1000);
- this.latch.countDown();
-
- Thread.sleep(1000);
- this.latch.countDown();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
CyclicBarrier barrier = new CyclicBarrier(2);
barrier.await();
barrier.await(10, TimeUnit.SECONDS);
- Runnable barrierAction = ... ;
- CyclicBarrier barrier = new CyclicBarrier(2, barrierAction);
- Runnable barrier1Action = new Runnable() {
- public void run() {
- System.out.println("BarrierAction 1 executed ");
- }
- };
- Runnable barrier2Action = new Runnable() {
- public void run() {
- System.out.println("BarrierAction 2 executed ");
- }
- };
-
- CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);
- CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);
-
- CyclicBarrierRunnable barrierRunnable1 =
- new CyclicBarrierRunnable(barrier1, barrier2);
-
- CyclicBarrierRunnable barrierRunnable2 =
- new CyclicBarrierRunnable(barrier1, barrier2);
-
- new Thread(barrierRunnable1).start();
- new Thread(barrierRunnable2).start();
- public class CyclicBarrierRunnable implements Runnable{
-
- CyclicBarrier barrier1 = null;
- CyclicBarrier barrier2 = null;
-
- public CyclicBarrierRunnable(
- CyclicBarrier barrier1,
- CyclicBarrier barrier2) {
-
- this.barrier1 = barrier1;
- this.barrier2 = barrier2;
- }
-
- public void run() {
- try {
- Thread.sleep(1000);
- System.out.println(Thread.currentThread().getName() +
- " waiting at barrier 1");
- this.barrier1.await();
-
- Thread.sleep(1000);
- System.out.println(Thread.currentThread().getName() +
- " waiting at barrier 2");
- this.barrier2.await();
-
- System.out.println(Thread.currentThread().getName() +
- " done!");
-
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- }
Thread-1 done!
- Exchanger exchanger = new Exchanger();
-
- ExchangerRunnable exchangerRunnable1 =
- new ExchangerRunnable(exchanger, "A");
-
- ExchangerRunnable exchangerRunnable2 =
- new ExchangerRunnable(exchanger, "B");
-
- new Thread(exchangerRunnable1).start();
- new Thread(exchangerRunnable2).start();
ExchangerRunnable 代码:- public class ExchangerRunnable implements Runnable{
-
- Exchanger exchanger = null;
- Object object = null;
-
- public ExchangerRunnable(Exchanger exchanger, Object object) {
- this.exchanger = exchanger;
- this.object = object;
- }
-
- public void run() {
- try {
- Object previous = this.object;
-
- this.object = this.exchanger.exchange(this.object);
-
- System.out.println(
- Thread.currentThread().getName() +
- " exchanged " + previous + " for " + this.object
- );
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
Thread-1 exchanged B for A
如果你将信号量用于保护一个重要部分,试图进入这一部分的代码通常会首先尝试获得一个许可,然后才能进入重要部分(代码块),执行完之后,再把许可释放掉。比如这样:
- Semaphore semaphore = new Semaphore(1);
-
- //critical section
- semaphore.acquire();
-
- ...
-
- semaphore.release();
Semaphore semaphore = new Semaphore(1, true);
这些方法的细节请参考 Java 文档。
- ExecutorService executorService = Executors.newFixedThreadPool(10);
-
- executorService.execute(new Runnable() {
- public void run() {
- System.out.println("Asynchronous task");
- }
- });
-
- executorService.shutdown();
- ExecutorService executorService1 = Executors.newSingleThreadExecutor();
-
- ExecutorService executorService2 = Executors.newFixedThreadPool(10);
-
- ExecutorService executorService3 = Executors.newScheduledThreadPool(10);
- ExecutorService executorService = Executors.newSingleThreadExecutor();
-
- executorService.execute(new Runnable() {
- public void run() {
- System.out.println("Asynchronous task");
- }
- });
-
- executorService.shutdown();
- Future future = executorService.submit(new Runnable() {
- public void run() {
- System.out.println("Asynchronous task");
- }
- });
-
- future.get(); //returns null if the task has finished correctly.
- Future future = executorService.submit(new Callable(){
- public Object call() throws Exception {
- System.out.println("Asynchronous Callable");
- return "Callable Result";
- }
- });
-
- System.out.println("future.get() = " + future.get());
- ExecutorService executorService = Executors.newSingleThreadExecutor();
-
- Set<Callable<String>> callables = new HashSet<Callable<String>>();
-
- callables.add(new Callable<String>() {
- public String call() throws Exception {
- return "Task 1";
- }
- });
- callables.add(new Callable<String>() {
- public String call() throws Exception {
- return "Task 2";
- }
- });
- callables.add(new Callable<String>() {
- public String call() throws Exception {
- return "Task 3";
- }
- });
-
- String result = executorService.invokeAny(callables);
-
- System.out.println("result = " + result);
-
- executorService.shutdown();
- ExecutorService executorService = Executors.newSingleThreadExecutor();
-
- Set<Callable<String>> callables = new HashSet<Callable<String>>();
-
- callables.add(new Callable<String>() {
- public String call() throws Exception {
- return "Task 1";
- }
- });
- callables.add(new Callable<String>() {
- public String call() throws Exception {
- return "Task 2";
- }
- });
- callables.add(new Callable<String>() {
- public String call() throws Exception {
- return "Task 3";
- }
- });
-
- List<Future<String>> futures = executorService.invokeAll(callables);
-
- for(Future<String> future : futures){
- System.out.println("future.get = " + future.get());
- }
-
- executorService.shutdown();
如果你想要立即关闭 ExecutorService,你可以调用 shutdownNow() 方法。这样会立即尝试停止所有执行中的任务,并忽略掉那些已提交但尚未开始处理的任务。无法担保执行任务的正确执行。可能它们被停止了,也可能已经执行结束。
- int corePoolSize = 5;
- int maxPoolSize = 10;
- long keepAliveTime = 5000;
-
- ExecutorService threadPoolExecutor =
- new ThreadPoolExecutor(
- corePoolSize,
- maxPoolSize,
- keepAliveTime,
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()
- );
但是,除非你确实需要显式为 ThreadPoolExecutor 定义所有参数,使用 java.util.concurrent.Executors 类中的工厂方法之一会更加方便,正如 ExecutorService 小节所述。
- ScheduledExecutorService scheduledExecutorService =
- Executors.newScheduledThreadPool(5);
-
- ScheduledFuture scheduledFuture =
- scheduledExecutorService.schedule(new Callable() {
- public Object call() throws Exception {
- System.out.println("Executed!");
- return "Called!";
- }
- },
- 5,
- TimeUnit.SECONDS);
- ScheduledExecutorService scheduledExecutorService =
-
- Executors.newScheduledThreadPool(5);
- ScheduledExecutorService scheduledExecutorService =
- Executors.newScheduledThreadPool(5);
-
- ScheduledFuture scheduledFuture =
- scheduledExecutorService.schedule(new Callable() {
- public Object call() throws Exception {
- System.out.println("Executed!");
- return "Called!";
- }
- },
- 5,
- TimeUnit.SECONDS);
-
- System.out.println("result = " + scheduledFuture.get());
-
- scheduledExecutorService.shutdown();
你可以使用从 ExecutorService 接口继承来的 shutdown() 或 shutdownNow() 方法将 ScheduledExecutorService 关闭。参见 ExecutorService 关闭部分以获取更多信息。
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.RecursiveAction;
-
- public class MyRecursiveAction extends RecursiveAction {
-
- private long workLoad = 0;
-
- public MyRecursiveAction(long workLoad) {
- this.workLoad = workLoad;
- }
-
- @Override
- protected void compute() {
-
- //if work is above threshold, break tasks up into smaller tasks
- if(this.workLoad > 16) {
- System.out.println("Splitting workLoad : " + this.workLoad);
-
- List<MyRecursiveAction> subtasks =
- new ArrayList<MyRecursiveAction>();
-
- subtasks.addAll(createSubtasks());
-
- for(RecursiveAction subtask : subtasks){
- subtask.fork();
- }
-
- } else {
- System.out.println("Doing workLoad myself: " + this.workLoad);
- }
- }
-
- private List<MyRecursiveAction> createSubtasks() {
- List<MyRecursiveAction> subtasks =
- new ArrayList<MyRecursiveAction>();
-
- MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
- MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
-
- subtasks.add(subtask1);
- subtasks.add(subtask2);
-
- return subtasks;
- }
-
- }
- MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);
-
- forkJoinPool.invoke(myRecursiveAction);
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.RecursiveTask;
-
-
- public class MyRecursiveTask extends RecursiveTask<Long> {
-
- private long workLoad = 0;
-
- public MyRecursiveTask(long workLoad) {
- this.workLoad = workLoad;
- }
-
- protected Long compute() {
-
- //if work is above threshold, break tasks up into smaller tasks
- if(this.workLoad > 16) {
- System.out.println("Splitting workLoad : " + this.workLoad);
-
- List<MyRecursiveTask> subtasks =
- new ArrayList<MyRecursiveTask>();
- subtasks.addAll(createSubtasks());
-
- for(MyRecursiveTask subtask : subtasks){
- subtask.fork();
- }
-
- long result = 0;
- for(MyRecursiveTask subtask : subtasks) {
- result += subtask.join();
- }
- return result;
-
- } else {
- System.out.println("Doing workLoad myself: " + this.workLoad);
- return workLoad * 3;
- }
- }
-
- private List<MyRecursiveTask> createSubtasks() {
- List<MyRecursiveTask> subtasks =
- new ArrayList<MyRecursiveTask>();
-
- MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
- MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);
-
- subtasks.add(subtask1);
- subtasks.add(subtask2);
-
- return subtasks;
- }
- }
- MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
-
- long mergedResult = forkJoinPool.invoke(myRecursiveTask);
-
- System.out.println("mergedResult = " + mergedResult);
在你计划在自己的项目里使用 ForkJoinPool 之前最好读一下该篇文章。
- Lock lock = new ReentrantLock();
-
- lock.lock();
-
- //critical section
-
- lock.unlock();
unlock() 方法对 Lock 实例解锁。一个 Lock 实现将只允许锁定了该对象的线程来调用此方法。其他(没有锁定该 Lock 对象的线程)线程对 unlock() 方法的调用将会抛一个未检查异常(RuntimeException)。
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-
-
- readWriteLock.readLock().lock();
-
- // multiple readers can enter this section
- // if not locked for writing, and not writers waiting
- // to lock for writing.
-
- readWriteLock.readLock().unlock();
-
-
- readWriteLock.writeLock().lock();
-
- // only one writer can enter this section,
- // and only if no threads are currently reading.
-
- readWriteLock.writeLock().unlock();
注意如何使用 ReadWriteLock 对两种锁实例的持有。一个对读访问进行保护,一个队写访问进行保护。
AtomicBoolean atomicBoolean = new AtomicBoolean();
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
- AtomicBoolean atomicBoolean = new AtomicBoolean(true);
-
- boolean value = atomicBoolean.get();
- AtomicBoolean atomicBoolean = new AtomicBoolean(true);
-
- atomicBoolean.set(false);
- AtomicBoolean atomicBoolean = new AtomicBoolean(true);
-
- boolean oldValue = atomicBoolean.getAndSet(false);
- AtomicBoolean atomicBoolean = new AtomicBoolean(true);
-
- boolean expectedValue = true;
- boolean newValue = false;
-
- boolean wasNewValueSet = atomicBoolean.compareAndSet(
- expectedValue, newValue);
本示例对 AtomicBoolean 的当前值与 true 值进行比较,如果相等,将 AtomicBoolean 的值更新为 false。
AtomicInteger atomicInteger = new AtomicInteger();
AtomicInteger atomicInteger = new AtomicInteger(123);
- AtomicInteger atomicInteger = new AtomicInteger(123);
-
- int theValue = atomicInteger.get();
- AtomicInteger atomicInteger = new AtomicInteger(123);
-
- atomicInteger.set(234);
- AtomicInteger atomicInteger = new AtomicInteger(123);
-
- int expectedValue = 123;
- int newValue = 234;
- atomicInteger.compareAndSet(expectedValue, newValue);
- AtomicInteger atomicInteger = new AtomicInteger();
-
-
- System.out.println(atomicInteger.getAndAdd(10));
- System.out.println(atomicInteger.addAndGet(10));
decrementAndGet() 将 AtomicInteger 的值减一,并返回减一后的值。getAndDecrement() 也将 AtomicInteger 的值减一,但它返回的是减一之前的值。
AtomicLong atomicLong = new AtomicLong();
AtomicLong atomicLong = new AtomicLong(123);
- AtomicLong atomicLong = new AtomicLong(123);
-
- long theValue = atomicLong.get();
- AtomicLong atomicLong = new AtomicLong(123);
-
- atomicLong.set(234);
AtomicLong 类也有一个原子性的 compareAndSet() 方法。这一方法将 AtomicLong 实例的当前值与一个期望值进行比较,如果两种相等,为 AtomicLong 实例设置一个新值。AtomicLong.compareAndSet() 使用示例:
- AtomicLong atomicLong = new AtomicLong(123);
-
- long expectedValue = 123;
- long newValue = 234;
- atomicLong.compareAndSet(expectedValue, newValue);
- AtomicLong atomicLong = new AtomicLong();
-
-
- System.out.println(atomicLong.getAndAdd(10));
- System.out.println(atomicLong.addAndGet(10));
decrementAndGet() 将 AtomicLong 的值减一,并返回减一后的值。getAndDecrement() 也将 AtomicLong 的值减一,但它返回的是减一之前的值。
AtomicReference atomicReference = new AtomicReference();
- String initialReference = "the initially referenced string";
- AtomicReference atomicReference = new AtomicReference(initialReference);
- AtomicReference<String> atomicStringReference =
- new AtomicReference<String>();
- String initialReference = "the initially referenced string";
- AtomicReference<String> atomicStringReference =
- new AtomicReference<String>(initialReference);
- AtomicReference atomicReference = new AtomicReference("first value referenced");
-
- String reference = (String) atomicReference.get();
- AtomicReference<String> atomicReference =
- new AtomicReference<String>("first value referenced");
-
- String reference = atomicReference.get();
- AtomicReference atomicReference =
- new AtomicReference();
-
- atomicReference.set("New object referenced");
- String initialReference = "initial value referenced";
-
- AtomicReference<String> atomicStringReference =
- new AtomicReference<String>(initialReference);
-
- String newReference = "new value referenced";
- boolean exchanged = atomicStringReference.compareAndSet(initialReference, newReference);
- System.out.println("exchanged: " + exchanged);
-
- exchanged = atomicStringReference.compareAndSet(initialReference, newReference);
- System.out.println("exchanged: " + exchanged);
Java 并发工具包 java.util.concurrent 用户指南
原文:https://www.cnblogs.com/edda/p/12676992.html