Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。
介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。
队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。
从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。
但是对 volatile类型的变量进行 CAS 操作,存在伪共享问题,下面介绍一下
CPU的缓存系统是以缓存行(cache line)为单位存储的,一般的大小为64bytes。在多线程程序的执行过程中,存在着一种情况,多个需要频繁修改的变量存在同一个缓存行当中。
假设:有两个线程分别访问并修改X和Y这两个变量,X和Y恰好在同一个缓存行上,这两个线程分别在不同的CPU上执行。那么每个CPU分别更新好X和Y时将缓存行刷入内存时,发现有别的修改了各自缓存行内的数据,这时缓存行会失效,从L3中重新获取。这样的话,程序执行效率明显下降。为了减少这种情况的发生,其实就是避免X和Y在同一个缓存行中,可以主动添加一些无关变量将缓存行填充满,比如在X对象中添加一些变量,让它有64 Byte那么大,正好占满一个缓存行。
简单的说,就是 以空间换时间: 使用占位字节,将变量的所在的 缓冲行 塞满。
disruptor 无锁框架就是这么干的。
在Disruptor中有一个重要的类Sequence,该类包装了一个volatile修饰的long类型数据value,无论是Disruptor中的基于数组实现的缓冲区RingBuffer,还是生产者,消费者,都有各自独立的Sequence,RingBuffer缓冲区中,Sequence标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用RingBuffer.next()来获得下一个可使用的相对位置。对于生产者和消费者来说,Sequence标示着它们的事件序号。
/**
* 停车场问题.
* 1) 事件对象Event
* 2)三个消费者Handler
* 3)一个生产者Processer
* 4)执行Main方法
*/
public class DisruptorCar {
private static final Integer NUM = 1; // 1,10,100,1000
/**
* 测试入口 ,
* 一个生产者(汽车进入停车场);
* 三个消费者(一个记录汽车信息,一个发送消息给系统,一个发送消息告知司机)
* 前两个消费者同步执行,都有结果了再执行第三个消费者
*/
@Test
public void main() throws InterruptedException {
long beginTime = System.currentTimeMillis();
int bufferSize = 2048; // 2的N次方
try {
// 创建线程池,负责处理Disruptor的四个消费者
ExecutorService executor = Executors.newFixedThreadPool(4);
// 初始化一个 Disruptor
Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
@Override
public MyInParkingDataEvent newInstance() {
return new MyInParkingDataEvent(); // Event 初始化工厂
}
}, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
// 使用disruptor创建消费者组 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());
// 当上面两个消费者处理结束后在消耗 smsHandler
MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
handlerGroup.then(myParkingDataSmsHandler);
// 启动Disruptor
disruptor.start();
CountDownLatch countDownLatch = new CountDownLatch(1); // 一个生产者线程准备好了就可以通知主线程继续工作了
// 生产者生成数据
executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
countDownLatch.await(); // 等待生产者结束
disruptor.shutdown();
executor.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime));
}
public class MyInParkingDataEvent {
private String carLicense; // 车牌号
public String getCarLicense() {
return carLicense;
}
public void setCarLicense(String carLicense) {
this.carLicense = carLicense;
}
}
/**
* Handler 第一个消费者,负责保存进场汽车的信息
*/
public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent>, WorkHandler<MyInParkingDataEvent> {
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
long threadId = Thread.currentThread().getId(); // 获取当前线程id
String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
System.out.println(String.format("Thread Id %s 保存 %s 到数据库中 ....", threadId, carLicense));
}
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
throws Exception {
this.onEvent(myInParkingDataEvent);
}
}
/**
* 第二个消费者,负责发送通知告知工作人员(Kafka是一种高吞吐量的分布式发布订阅消息系统)
*/
public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent> {
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
throws Exception {
long threadId = Thread.currentThread().getId(); // 获取当前线程id
String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
System.out.println(String.format("Thread Id %s 发送 %s 进入停车场信息给 kafka系统...", threadId, carLicense));
}
}
/**
* 第三个消费者,sms短信服务,告知司机你已经进入停车场,计费开始。
*/
public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent> {
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
throws Exception {
long threadId = Thread.currentThread().getId(); // 获取当前线程id
String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
System.out.println(String.format("Thread Id %s 给 %s 的车主发送一条短信,并告知他计费开始了 ....", threadId, carLicense));
}
}
/**
* 生产者,进入停车场的车辆
*/
public class MyInParkingDataEventPublisher implements Runnable {
private CountDownLatch countDownLatch; // 用于监听初始化操作,等初始化执行完毕后,通知主线程继续工作
private Disruptor<MyInParkingDataEvent> disruptor;
public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
Disruptor<MyInParkingDataEvent> disruptor) {
this.countDownLatch = countDownLatch;
this.disruptor = disruptor;
}
@Override
public void run() {
MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
try {
for (int i = 0; i < NUM; i++) {
disruptor.publishEvent(eventTranslator);
Thread.sleep(1000); // 假设一秒钟进一辆车
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown(); // 执行完毕后通知 await()方法
System.out.println(NUM + "辆车已经全部进入进入停车场!");
}
}
}
class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {
@Override
public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
this.generateData(myInParkingDataEvent);
}
private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
myInParkingDataEvent.setCarLicense("车牌号: 鄂A-" + (int) (Math.random() * 100000)); // 随机生成一个车牌号
System.out.println("Thread Id " + Thread.currentThread().getId() + " 写完一个event");
return myInParkingDataEvent;
}
}
}
原文:https://www.cnblogs.com/lori/p/14853661.html