在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。
1 import java.util.concurrent.BlockingQueue; 2 import java.util.concurrent.ExecutorService; 3 import java.util.concurrent.Executors; 4 import java.util.concurrent.LinkedBlockingQueue; 5 6 /** 7 * @author jackyuj 8 */ 9 public class BlockingQueueTest { 10 11 public static void main(String[] args) throws InterruptedException { 12 // 声明一个容量为10的缓存队列 13 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); 14 15 Producer producer1 = new Producer(queue); 16 Producer producer2 = new Producer(queue); 17 Producer producer3 = new Producer(queue); 18 Consumer consumer = new Consumer(queue); 19 20 // 借助Executors 21 ExecutorService service = Executors.newCachedThreadPool(); 22 // 启动线程 23 service.execute(producer1); 24 service.execute(producer2); 25 service.execute(producer3); 26 service.execute(consumer); 27 28 // 执行10s 29 Thread.sleep(10 * 1000); 30 producer1.stop(); 31 producer2.stop(); 32 producer3.stop(); 33 34 Thread.sleep(2000); 35 // 退出Executor 36 service.shutdown(); 37 } 38 } 39 1 40 2 41 3 42 4 43 5 44 6 45 7 46 8 47 9 48 10 49 11 50 12 51 13 52 14 53 15 54 16 55 17 56 18 57 19 58 20 59 21 60 22 61 23 62 24 63 25 64 26 65 27 66 28 67 29 68 30 69 31 70 32 71 33 72 34 73 35 74 36 75 37 76 38 77 39 78 40 79 41 80 42 81 43 82 44 83 45 84 46 85 47 86 48 87 49 88 50 89 51 90 52 91 53 92 54 93 55 94 56 95 57 96 58 97 59 98 60 99 61 100 62 101 63 102 64 103 65 104 66 105 67 106 68 107 69 108 70 109 71 110 72 111 73 112 74 113 75 114 76 115 77 116 78 117 79 118 80 119 81 120 82 121 83 122 84 123 85 124 86 125 87 126 88 127 89 128 90 129 91 130 92 131 93 132 94 133 import java.util.Random; 134 import java.util.concurrent.BlockingQueue; 135 import java.util.concurrent.TimeUnit; 136 137 /** 138 * 消费者线程 139 * 140 * @author jackyuj 141 */ 142 public class Consumer implements Runnable { 143 144 public Consumer(BlockingQueue<String> queue) { 145 this.queue = queue; 146 } 147 148 public void run() { 149 System.out.println("启动消费者线程!"); 150 Random r = new Random(); 151 boolean isRunning = true; 152 try { 153 while (isRunning) { 154 System.out.println("正从队列获取数据..."); 155 String data = queue.poll(2, TimeUnit.SECONDS); 156 if (null != data) { 157 System.out.println("拿到数据:" + data); 158 System.out.println("正在消费数据:" + data); 159 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); 160 } else { 161 // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。 162 isRunning = false; 163 } 164 } 165 } catch (InterruptedException e) { 166 e.printStackTrace(); 167 Thread.currentThread().interrupt(); 168 } finally { 169 System.out.println("退出消费者线程!"); 170 } 171 } 172 173 private BlockingQueue<String> queue; 174 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; 175 } 176 177 import java.util.Random; 178 import java.util.concurrent.BlockingQueue; 179 import java.util.concurrent.TimeUnit; 180 import java.util.concurrent.atomic.AtomicInteger; 181 182 /** 183 * 生产者线程 184 * 185 * @author jackyuj 186 */ 187 public class Producer implements Runnable { 188 189 public Producer(BlockingQueue queue) { 190 this.queue = queue; 191 } 192 193 public void run() { 194 String data = null; 195 Random r = new Random(); 196 197 System.out.println("启动生产者线程!"); 198 try { 199 while (isRunning) { 200 System.out.println("正在生产数据..."); 201 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); 202 203 data = "data:" + count.incrementAndGet(); 204 System.out.println("将数据:" + data + "放入队列..."); 205 if (!queue.offer(data, 2, TimeUnit.SECONDS)) { 206 System.out.println("放入数据失败:" + data); 207 } 208 } 209 } catch (InterruptedException e) { 210 e.printStackTrace(); 211 Thread.currentThread().interrupt(); 212 } finally { 213 System.out.println("退出生产者线程!"); 214 } 215 } 216 217 public void stop() { 218 isRunning = false; 219 } 220 221 private volatile boolean isRunning = true; 222 private BlockingQueue queue; 223 private static AtomicInteger count = new AtomicInteger(); 224 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; 225 226 }
原文:http://www.cnblogs.com/ilinuxer/p/4827585.html