通过分析如下代码,大致了解Disruptor的原理
1 public static void main(String[] args)throws Exception{ 2 EventFactory<LongEvent> eventFactory = new LongEventFactory(); 3 int ringBufferSize = 1024; 4 ExecutorService executors = Executors.newCachedThreadPool(); 5 final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executors,ProducerType.SINGLE, new BlockingWaitStrategy()); 6 //final Disruptor<IntEvent> disruptor = new Disruptor<IntEvent>(eventFactory, ringBufferSize, executors,ProducerType.MULTI, new BlockingWaitStrategy()); //多生产者 7 Consumer consumer1 = new Consumer(); 8 Consumer consumer2 = new Consumer(); 9 EventHandlerGroup<LongEvent> firstLevel = disruptor.handleEventsWith(consumer1,consumer2); 10 Consumer consumer21 = new Consumer(); 11 Consumer consumer22 = new Consumer(); 12 EventHandlerGroup<LongEvent> secondLevel = firstLevel.then(consumer21,consumer22); 13 disruptor.start(); //启动disruptor,consumer开始等待,消费数据 14 Producer producer = new Producer(disruptor); 15 16 //启动生产者 17 new Thread(producer).start(); 18 }
1. 第2行代码 EventFactory<LongEvent> eventFactory = new LongEventFactory();
数据工厂类构造单个数据,disruptor使用此工厂类预分配数据。
2. 第5行代码 final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executors,ProducerType.SINGLE, new BlockingWaitStrategy());
预分配数据,构建RingBuffer,指定生产者类型(单生产者、多生产者)、消费者执行的线程池、生产者等待可发布数据空间和消费者等待可消费数据的策略。
3. 第9行代码 EventHandlerGroup<LongEvent> firstLevel = disruptor.handleEventsWith(consumer1,consumer2);
每个消费者Handler都会被封装为一个Processor,其可消费序号由其sequence barrier决定。
4. 第12行代码 EventHandlerGroup<LongEvent> secondLevel = firstLevel.then(consumer21,consumer22);
5. 第13行代码 disruptor.start(); //启动disruptor,consumer开始等待,消费数据
6. 第14-17行代码,创建启动生产者,发布数据
原文:http://www.cnblogs.com/airchina/p/5295064.html