1 文章概述
我们在服务端开发时如果需要实现异步调用,首先声明一个线程池,并将调用业务方法封装成一个任务提交至线程池,如果不需要获取返回值则封装为Runnable,需要获取返回值则封装为Callable并通过Future对象接受结果。
class CalcTask1 implements Callable
@Override
public Integer call() throws Exception {
System.out.println("task1耗时计算");
Thread.sleep(1000L);
return 100;
}
}
class CalcTask2 implements Callable
@Override
public Integer call() throws Exception {
System.out.println("task2耗时计算");
Thread.sleep(3000L);
return 200;
}
}
public class CallableTest {
public static void test1() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
CalcTask1 task1 = new CalcTask1();
Future
CalcTask2 task2 = new CalcTask2();
Future
Integer result1 = f1.get();
Integer result2 = f2.get();
System.out.println("final result=" + (result1 + result2));
executor.shutdown();
}
public static void test2() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
List<Callable
CalcTask1 task1 = new CalcTask1();
CalcTask2 task2 = new CalcTask2();
tasks.add(task1);
tasks.add(task2);
for (int i = 0; i < tasks.size(); i++) {
Future
System.out.println("result=" + future.get());
}
executor.shutdown();
}
}
1.1 什么是消费异步化
在使用DUBBO进行异步化调用时不需要这么麻烦,DUBBO基于NIO非阻塞能力使得服务消费者无需启用多线程就可以实现并行调用多个服务,在此我们给出基于2.7.0版本调用实例。
1.1.1 生产者
(1) 服务声明
public interface CalcSumService {
public Integer sum(int a, int b);
}
public class CalcSumServiceImpl implements CalcSumService {
@Override
public Integer sum(int a, int b) {
return a + b;
}
}
public interface CalcSubtractionService {
public Integer subtraction(int a, int b);
}
public class CalcSubtractionServiceImpl implements CalcSubtractionService {
@Override
public Integer subtraction(int a, int b) {
return a - b;
}
}
(2) 配置文件
<dubbo:application name="java-front-provider" />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:protocol name="dubbo" port="9999" />
<dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSumService" ref="calcSumService" />
<dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" ref="calcSubtractionService" />
(3) 服务发布
public class Provider {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:META-INF/spring/dubbo-provider.xml");
context.start();
System.out.println(context);
System.in.read();
}
}
1.1.2 消费者
(1) 配置文件
<dubbo:application name="java-front-consumer" />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:reference id="calcSumService" interface="com.java.front.dubbo.demo.provider.service.CalcSumService" timeout="10000">
<dubbo:method name="sum" async="true" />
</dubbo:reference>
<dubbo:reference id="calcSubtractionService" interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" timeout="10000">
<dubbo:method name="subtraction" async="true" />
</dubbo:reference>
(2) 服务消费
public class Consumer {
public static void main(String[] args) throws Exception {
testAsync();
System.in.read();
}
public static void testAsync() {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath:META-INF/spring/dubbo-consumer.xml" });
System.out.println(context);
context.start();
/** 加法运算 /
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
CompletableFuture
/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
CompletableFuture
/
int sumResult = futureSum.get();
int subtractionResult = futureSubtraction.get();
System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.2 为什么消费异步化
异步化可以将原本串行的调用并行化,减少执行时间从而提升性能。假设上述实例加法服务需要100ms,减法服务需要200ms,那么串行化执行时间为二者之和300ms:
DUBBO消费异步化实例与源码分析
如果消费异步化那么执行时间减少为二者最大值200ms,异步化所带来的性能提升不言而喻:
DUBBO消费异步化实例与源码分析
2 保护性暂停模式
分析DUBBO源码之前我们首先介绍一种多线程设计模式:保护性暂停模式。我们设想这样一种场景:线程A生产数据,线程B读取这个数据。我们必须面对一种情况:线程B准备读取数据时,此时线程A还没有生产出数据。在这种情况下线程B不能一直空转,也不能立即退出,线程B要等到生产数据完成并拿到数据之后才退出。
那么在数据没有生产出这段时间,线程B需要执行一种等待机制,这样可以达到对系统保护目的,这就是保护性暂停。
public class MyData implements Serializable {
private static final long serialVersionUID = 1L;
private String message;
public MyData(String message) {
this.message = message;
}
}
class Resource {
private MyData data;
private Object lock = new Object();
public MyData getData() {
synchronized (lock) {
while (data == null) {
try {
// 没有数据则释放锁并暂停等待被唤醒
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return data;
}
}
public void sendData(MyData data) {
synchronized (lock) {
// 生产数据后唤醒消费线程
this.data = data;
lock.notifyAll();
}
}
}
public class ProtectDesignTest {
public static void main(String[] args) {
Resource resource = new Resource();
new Thread(() -> {
try {
MyData data = new MyData("hello");
System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(3);
resource.sendData(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
MyData data = resource.getData();
System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
}, "t2").start();
}
}
在上述代码实例中线程1生产数据,线程2消??数据,Resource类通过wait/notify实现了保护性暂停模式,关于保护性暂停模式请参看我之前《保护性暂停模式详解以及其在DUBBO应用源码分析》这篇文章。
3 源码分析
本章节我们分析对比2.6.9和2.7.0两个版本源码,之所以选取这两个版本是因为2.7.0是一个里程碑版本,异步化能力得到了明显增强。
3.1 version_2.6.9
3.1.1 异步调用
我们首先看看这个版本异步调用使用方式,生产者内容和消费者配置文件同第一章节不再赘述,我们重点分析服务消费代码。
public class AsyncConsumer {
public static void main(String[] args) throws Exception {
test1();
System.in.read();
}
public static void test1() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
System.out.println(context);
context.start();
/ 加法运算 /
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
Future
/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
Future
/
int sumResult = futureSum.get();
int subtractionResult = futureSubtraction.get();
System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
}
}
消费者最终执行DubboInvoker.doInvoke,这个方法包含异步调用核心:
public class DubboInvoker
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 单向调用
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
}
// 异步调用
else if (isAsync) {
// 发起请求给生产者
ResponseFuture future = currentClient.request(inv, timeout);
// 设置future对象至上下文
RpcContext.getContext().setFuture(new FutureAdapter
原文:https://www.cnblogs.com/a509968115/p/14848366.html