首页 > 其他 > 详细

架构师养成记--16.disruptor并发框架中RingBuffer的使用

时间:2017-01-11 10:06:20      阅读:319      评论:0      收藏:0      [点我收藏+]

很多时候我们只需要消息中间件这样的功能,那么直需要RinBuffer就可以了。

入口:

 1 import java.util.concurrent.Callable;
 2 import java.util.concurrent.ExecutorService;
 3 import java.util.concurrent.Executors;
 4 import java.util.concurrent.Future;
 5 
 6 import com.lmax.disruptor.BatchEventProcessor;
 7 import com.lmax.disruptor.EventFactory;
 8 import com.lmax.disruptor.RingBuffer;
 9 import com.lmax.disruptor.SequenceBarrier;
10 import com.lmax.disruptor.YieldingWaitStrategy;
11 
12 public class Main1 {  
13    
14     public static void main(String[] args) throws Exception {  
15         int BUFFER_SIZE=1024;  
16         int THREAD_NUMBERS=4;  
17         /* 
18          * createSingleProducer创建一个单生产者的RingBuffer, 
19          * 第一个参数叫EventFactory,从名字上理解就是"事件工厂",其实它的职责就是产生数据填充RingBuffer的区块。 
20          * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 
21          * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 
22          */  
23         final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
24             @Override  
25             public Trade newInstance() {  
26                 return new Trade();  
27             }  
28         }, BUFFER_SIZE, new YieldingWaitStrategy());  
29         
30         //创建线程池  
31         ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
32         
33         //创建SequenceBarrier  
34         SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
35           
36         /****************** @beg 消费者消费数据 2017-1-11******************/
37         //创建消息处理器  
38         BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(  
39                 ringBuffer, sequenceBarrier, new TradeHandler());  
40           
41         //这一步的目的就是把消费者的位置信息引用注入到生产者    如果只有一个消费者的情况可以省略 
42         ringBuffer.addGatingSequences(transProcessor.getSequence());  
43           
44         //把消息处理器提交到线程池  
45         executors.submit(transProcessor);  
46         /****************** @end 消费者消费数据 2017-1-11******************/
47         
48         //如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类  
49           
50         /****************** @beg 生产者生产数据  2017-1-11******************/
51 
52         Future<?> future= executors.submit(new Callable<Void>() {  
53             @Override  
54             public Void call() throws Exception {  
55                 long seq;  
56                 for(int i=0;i<10;i++){  
57                     seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块  
58                     ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据 
59                     ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见  
60                 }  
61                 return null;  
62             }  
63         }); 
64 
65         /****************** @end 生产者生产数据 2017-1-11******************/
66         
67         future.get();//等待生产者结束  
68         Thread.sleep(1000);//等上1秒,等消费都处理完成  
69         transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)  
70         executors.shutdown();//终止线程  
71     }  
72 }  

 

消费者:

 1 import java.util.UUID;
 2 
 3 import com.lmax.disruptor.EventHandler;
 4 import com.lmax.disruptor.WorkHandler;
 5 
 6 public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
 7       
 8     @Override  
 9     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
10         this.onEvent(event);  
11     }  
12   
13     @Override  
14     public void onEvent(Trade event) throws Exception {  
15         //这里做具体的消费逻辑  
16         event.setId(UUID.randomUUID().toString());//简单生成下ID  
17         System.out.println(event.getId());  
18     }  
19 }  

 

 

数据对象:

 1 import java.util.concurrent.atomic.AtomicInteger;
 2 
 3 public class Trade {  
 4     
 5     private String id;//ID  
 6     private String name;
 7     private double price;//金额  
 8     private AtomicInteger count = new AtomicInteger(0);
 9     
10     public String getId() {
11         return id;
12     }
13     public void setId(String id) {
14         this.id = id;
15     }
16     public String getName() {
17         return name;
18     }
19     public void setName(String name) {
20         this.name = name;
21     }
22     public double getPrice() {
23         return price;
24     }
25     public void setPrice(double price) {
26         this.price = price;
27     }
28     public AtomicInteger getCount() {
29         return count;
30     }
31     public void setCount(AtomicInteger count) {
32         this.count = count;
33     } 
34       
35       
36 }  

 

架构师养成记--16.disruptor并发框架中RingBuffer的使用

原文:http://www.cnblogs.com/sigm/p/6272384.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!