4、CyclicBarrier 栅栏
5、CountDownLatch 闭锁
6、Semaphore 信号量
在使用之前先明确 :
package thread.blogs.cooperation; import scala.Console; import java.util.concurrent.atomic.AtomicInteger; /** * Created by PerkinsZhu on 2017/8/21 10:25. */ public class TestWaitAndNotify { public static void main(String[] args) { TestWaitAndNotify test = new TestWaitAndNotify(); test.testWait(); } Object obj = new Object();//创建一个全局变量,用来协调各个线程 ThreadLocal<AtomicInteger> num = new ThreadLocal<AtomicInteger>();//设置一个线程wait和notify的触发条件 class MyRunner implements Runnable { @Override public void run() { num.set(new AtomicInteger(0)); while (true) { Console.println(Thread.currentThread().getName()); if (num.get().getAndIncrement() == 1) { synchronized (obj) {//如果要想调用wait方法,则必须持有该对象。否则将会抛出IllegalMonitorStateException try { Console.println(Thread.currentThread().getName() + "挂起等待"); obj.wait();//同一个线程可以wait多次,多个线程也可以使用同一个obj调用wait Console.println(Thread.currentThread().getName() + "唤醒!!!"); } catch (InterruptedException e) { e.printStackTrace(); } } } sleep(1000); } } } private void testWait() { MyRunner runner = new MyRunner(); new Thread(runner).start(); new Thread(runner).start(); AtomicInteger num03 = new AtomicInteger(0); Thread th03 = new Thread(new Runnable() { @Override public void run() { while (true) { synchronized (obj) {//调用notify/notifyAll和wait一样,同样需要持有该对象 if (num03.getAndIncrement() == 5) { obj.notify();//唤醒最先一个挂在obj上面的线程.每次只唤醒一个。这里是按照等待的先后顺序进行唤醒 } } sleep(1000); } } }); th03.start(); } private void sleep(int time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
例如,修改try catch代码代码块
Console.println(Thread.currentThread().getName() + "挂起等待"); obj.wait();//执行多次wait操作 obj.wait(); obj.wait(); Console.println(Thread.currentThread().getName() + "唤醒!!!");
new Thread(runner,"thread--01").start(); // new Thread(runner,"thread--02").start();
thread--01 thread--01 thread--01挂起等待
public final native void wait(long timeout) throws InterruptedException;
public final void wait(long timeout, int nanos) throws InterruptedException ;
public final void wait(long timeout, int nanos) throws InterruptedException Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object, or some other thread interrupts the current thread, or a certain amount of real time has elapsed. This method is similar to the wait method of one argument, but it allows finer control over the amount of time to wait for a notification before giving up. The amount of real time, measured in nanoseconds, is given by: 1000000*timeout+nanos In all other respects, this method does the same thing as the method wait(long) of one argument. In particular, wait(0, 0) means the same thing as wait(0). The current thread must own this object‘s monitor. The thread releases ownership of this monitor and waits until either of the following two conditions has occurred: Another thread notifies threads waiting on this object‘s monitor to wake up either through a call to the notify method or the notifyAll method. The timeout period, specified by timeout milliseconds plus nanos nanoseconds arguments, has elapsed. The thread then waits until it can re-obtain ownership of the monitor and resumes execution. As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop: synchronized (obj) { while (<condition does not hold>) obj.wait(timeout, nanos); ... // Perform action appropriate to condition } This method should only be called by a thread that is the owner of this object‘s monitor. See the notify method for a description of the ways in which a thread can become the owner of a monitor.
package thread.blogs.cooperation; import scala.Console; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Created by PerkinsZhu on 2017/8/21 11:59. */ public class TestCondition { public static void main(String[] args) { TestCondition test = new TestCondition(); test.testWait(); } ReentrantLock lock = new ReentrantLock(); ThreadLocal<AtomicInteger> num = new ThreadLocal<AtomicInteger>(); Condition condition = lock.newCondition(); private void testWait() { new Thread(new Runnable() { @Override public void run() { num.set(new AtomicInteger(1)); while (true) { if (num.get().getAndIncrement() == 5) { Console.println("signal---!!!"); try { lock.lock(); condition.signal(); } finally { lock.unlock(); } } Console.println("thread ---- 01"); sleep(1000); } } }).start(); new Thread(new Runnable() { @Override public void run() { num.set(new AtomicInteger(1)); while (true) { if (num.get().getAndIncrement() == 2) { try { //lock.tryLock(); //lock.tryLock(5000, TimeUnit.MILLISECONDS); lock.lock();//这里同样要加锁,否则会抛出IllegalMonitorStateException异常。注意的是这里不要使用synchronized进行加锁,而是使用lock condition.await();//注意这里不要调用wait!!! } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } Console.println("thread ---- 02"); sleep(1000); } } }).start(); } private void sleep(int time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
thread ---- 01 thread ---- 02 thread ---- 01 thread ---- 01 thread ---- 01 signal---!!! thread ---- 01 thread ---- 02 thread ---- 02 thread ---- 01
public static native void sleep(long millis) throws InterruptedException;
1 /** 2 * Causes the currently executing thread to sleep (temporarily cease 3 * execution) for the specified number of milliseconds, subject to 4 * the precision and accuracy of system timers and schedulers. The thread 5 * does not lose ownership of any monitors. 6 * 7 * @param millis 8 * the length of time to sleep in milliseconds 9 * 10 * @throws IllegalArgumentException 11 * if the value of {@code millis} is negative 12 * 13 * @throws InterruptedException 14 * if any thread has interrupted the current thread. The 15 * <i>interrupted status</i> of the current thread is 16 * cleared when this exception is thrown. 17 */
1 /** 2 * A hint to the scheduler that the current thread is willing to yield 3 * its current use of a processor. The scheduler is free to ignore this 4 * hint. 5 * 6 * <p> Yield is a heuristic attempt to improve relative progression 7 * between threads that would otherwise over-utilise a CPU. Its use 8 * should be combined with detailed profiling and benchmarking to 9 * ensure that it actually has the desired effect. 10 * 11 * <p> It is rarely appropriate to use this method. It may be useful 12 * for debugging or testing purposes, where it may help to reproduce 13 * bugs due to race conditions. It may also be useful when designing 14 * concurrency control constructs such as the ones in the 15 * {@link java.util.concurrent.locks} package. 16 */ 17 public static native void yield();
1 /** 2 * Waits for this thread to die. 3 * 4 * <p> An invocation of this method behaves in exactly the same 5 * way as the invocation 6 * 7 * <blockquote> 8 * {@linkplain #join(long) join}{@code (0)} 9 * </blockquote> 10 * 11 * @throws InterruptedException 12 * if any thread has interrupted the current thread. The 13 * <i>interrupted status</i> of the current thread is 14 * cleared when this exception is thrown. 15 */ 16 public final void join() throws InterruptedException;
1 public final synchronized void join(long millis) throws InterruptedException { 2 long base = System.currentTimeMillis(); 3 long now = 0; 4 if (millis < 0) { 5 throw new IllegalArgumentException("timeout value is negative"); 6 } 7 if (millis == 0) { 8 while (isAlive()) {//如果调用者依旧没有结束,让当前线程进行等待 9 wait(0);//注意这里的wait是等待的当前线程,而不是调用者线程 10 } 11 } else { 12 while (isAlive()) { 13 long delay = millis - now; 14 if (delay <= 0) { 15 break; 16 } 17 wait(delay);//指定等待的时间 18 now = System.currentTimeMillis() - base; 19 } 20 } 21 }
1 package thread.blogs.cooperation; 2 3 import scala.Console; 4 5 import java.util.concurrent.CyclicBarrier; 6 7 /** 8 * Created by PerkinsZhu on 2017/8/30 10:32. 9 */ 10 public class TestCyclicBarrier { 11 public static void main(String[] args) { 12 testCyclicBarrier(); 13 } 14 15 private static void testCyclicBarrier() { 16 /** 17 * 注意这里等待的是三个线程。这就相当于一个线程计数器,当指定个数的线程执行 barrier.await();方法之后,才会执行后续的代码,否则每个线程都会一直进行等待。 18 * 如果把3修改为4,则将永远等待下去,不会开始会议。 19 * 如果把3修改为2,则小张到达之后就会提前开始会议,不会继续等待小王。 20 */ 21 CyclicBarrier barrier = new CyclicBarrier(3); 22 23 Thread 小李 = new Thread(new MyRunner(barrier, "小李", 2000)); 24 小李.start(); 25 Thread 小张 = new Thread(new MyRunner(barrier, "小张", 4000)); 26 小张.start(); 27 Thread 小王 = new Thread(new MyRunner(barrier, "小王", 5000)); 28 小王.start(); 29 } 30 31 static class MyRunner implements Runnable { 32 CyclicBarrier barrier; 33 String name; 34 int time; 35 36 public MyRunner(CyclicBarrier barrier, String name, int time) { 37 this.barrier = barrier; 38 this.name = name; 39 this.time = time; 40 } 41 42 @Override 43 public void run() { 44 Console.println(name + " 开始出发去公司。"); 45 sleep(time); 46 Console.println(name + " 终于到会议室!!!"); 47 try { 48 barrier.await(); 49 } catch (Exception e) { 50 e.printStackTrace(); 51 } 52 startMeeting(name); 53 } 54 } 55 56 private static void startMeeting(String name) { 57 Console.println(name + "说:人齐了。会议开始!!"); 58 } 59 60 private static void sleep(int time) { 61 try { 62 Thread.sleep(time); 63 } catch (InterruptedException e) { 64 e.printStackTrace(); 65 } 66 } 67 }
1 小李 开始出发去公司。 2 小王 开始出发去公司。 3 小张 开始出发去公司。 4 小李 终于到会议室!!! 5 小张 终于到会议室!!! 6 小王 终于到会议室!!! 7 小王说:人齐了。会议开始!! 8 小李说:人齐了。会议开始!! 9 小张说:人齐了。会议开始!!
public CyclicBarrier(int parties, Runnable barrierAction) {}
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() { @Override public void run() { Console.println("======"); } });
小张 开始出发去公司。 小李 开始出发去公司。 小王 开始出发去公司。 小李 终于到会议室!!! 小张 终于到会议室!!! 小王 终于到会议室!!! ====== 小王说:人齐了。会议开始!! 小李说:人齐了。会议开始!! 小张说:人齐了。会议开始!!
使用示例如下: boss等待所有员工来开会,当所有人员都到齐之后,boss宣布开始会议!!!
1 package thread.blogs.cooperation; 2 3 import scala.Console; 4 5 import java.util.concurrent.CountDownLatch; 6 7 /** 8 * Created by PerkinsZhu on 2017/8/30 10:32. 9 */ 10 public class TestCyclicBarrier { 11 public static void main(String[] args) { 12 testCyclicBarrier(); 13 } 14 15 private static void testCyclicBarrier() { 16 17 CountDownLatch countDownLatch = new CountDownLatch(3);//注意这里的参数指定了等待的线程数量 18 19 new Thread(new MyRunner(countDownLatch, "小李", 2000)).start(); 20 new Thread(new MyRunner(countDownLatch, "小张", 4000)).start(); 21 new Thread(new MyRunner(countDownLatch, "小王", 5000)).start(); 22 23 try { 24 Console.println("等待员工到来开会。。。。。。。"); 25 countDownLatch.await();//注意这里是await。主线程将会一直等待在这里,当所有线程都执行 countDownLatch.countDown();之后当前线程才会继续执行 26 startMeeting("Boss"); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 } 31 32 static class MyRunner implements Runnable { 33 CountDownLatch countDownLatch; 34 String name; 35 int time; 36 37 public MyRunner(CountDownLatch countDownLatch, String name, int time) { 38 this.countDownLatch = countDownLatch; 39 this.name = name; 40 this.time = time; 41 } 42 43 @Override 44 public void run() { 45 Console.println(name + " 开始出发去公司。"); 46 sleep(time); 47 Console.println(name + " 终于到会议室!!!"); 48 countDownLatch.countDown();
Console.println(name + " 准备好了!!"); 49 } 50 } 51 52 private static void startMeeting(String name) { 53 Console.println(name + "说:人齐了。会议开始!!"); 54 } 55 56 private static void sleep(int time) { 57 try { 58 Thread.sleep(time); 59 } catch (InterruptedException e) { 60 e.printStackTrace(); 61 } 62 } 63 }
小王 开始出发去公司。
小张 开始出发去公司。
小李 开始出发去公司。
小李 终于到会议室!!!
小李 准备好了!!
小张 终于到会议室!!!
小张 准备好了!!
小王 终于到会议室!!!
小王 准备好了!!
1 package thread.blogs.cooperation; 2 3 import scala.Console; 4 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.Semaphore; 8 9 /** 10 * Created by PerkinsZhu on 2017/8/30 11:43. 11 */ 12 public class TestSemaphore { 13 public static void main(String[] args) { 14 testSemaphore(); 15 } 16 17 private static void testSemaphore() { 18 Semaphore semaphore = new Semaphore(2, true);//指定同时访问临界区资源的线程数量。第二个参数指定以公平方式访问临界区资源 19 ExecutorService excutorService = Executors.newFixedThreadPool(10); 20 for (int i = 0; i < 6; i++) {//启动10个线程请求资源 21 excutorService.execute(new MyRunner(semaphore)); 22 sleep(0);//逐个启动线程 23 } 24 excutorService.shutdown(); 25 } 26 27 static class MyRunner implements Runnable { 28 Semaphore semaphore; 29 30 public MyRunner(Semaphore semaphore) { 31 this.semaphore = semaphore; 32 } 33 34 @Override 35 public void run() { 36 String name = Thread.currentThread().getName(); 37 try { 38 Console.println(name + " ------请求资源!!"); 39 //semaphore.acquire(2);//设置请求资源的数量。必须有足够数量的资源才可进去临界区。不过释放的时候也要一起释放,请求几个就要调用几次release() 40 semaphore.acquire();//请求获取资源,如果有空闲资源则会立即获取,进入临界区,否则将会等待,一直等待到获取到临界区资源 41 Console.println(name + " ======获取资源!!"); 42 sleep(1000); 43 //semaphore.release(); 44 semaphore.release();//释放资源 45 Console.println(name + " ******释放资源!!"); 46 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 } 51 } 52 53 private static void sleep(int time) { 54 try { 55 Thread.sleep(time); 56 } catch (InterruptedException e) { 57 e.printStackTrace(); 58 } 59 } 60 }
pool-1-thread-1 ------请求资源!! pool-1-thread-2 ------请求资源!! pool-1-thread-6 ------请求资源!! pool-1-thread-5 ------请求资源!! pool-1-thread-3 ------请求资源!! pool-1-thread-4 ------请求资源!! pool-1-thread-2 ======获取资源!! pool-1-thread-1 ======获取资源!! pool-1-thread-1 ******释放资源!! pool-1-thread-6 ======获取资源!! pool-1-thread-5 ======获取资源!! pool-1-thread-2 ******释放资源!! pool-1-thread-6 ******释放资源!! pool-1-thread-4 ======获取资源!! pool-1-thread-3 ======获取资源!! pool-1-thread-5 ******释放资源!! pool-1-thread-4 ******释放资源!! pool-1-thread-3 ******释放资源!!
根据结果可以看出只有当有线程释放资源之后,才会有新的线程获取到资源。即控制了同一时间访问临界区资源的线程数量。当Semaphore(1)设置为1的时候,此时可以当做锁来使用。