Dubbo是一款开源的RPC中间件框架,底层数据传输默认使用的Netty,那么请求的处理理论上是异步的,为什么我们在使用的时候是同步的呢?肯定是Dubbo框架,做了异步转同步的处理。
首先我们来梳理下,异步转同步,我们的需求是怎样的?
1、调用方请求远程服务之后,需要等待结果,此刻,请求线程应该阻塞
2、远程服务返回结果后,唤醒请求线程,调用方得到结果
Dubbo异步转同步,核心类是DefaultFuture,核心方法是get(),received(Channel channel, Response response)。
DefaultFuture构造函数:
1 private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); 2 3 // 每次请求都会生成一个DefaultFuture对象,然后保存到FUTURES中,请求返回结果时,根据id从FUTURES中找到对应的DefaultFuture对象,并删除 4 private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); 5 6 // AtomicLong从0开始递增,创建Request对象时生成的id 7 private final long id; 8 private final Channel channel; 9 // 请求对象 10 private final Request request; 11 // 超时的设置 12 private final int timeout; 13 // 这里使用Lock和Condition实现等待通知机制 14 private final Lock lock = new ReentrantLock(); 15 private final Condition done = lock.newCondition(); 16 private final long start = System.currentTimeMillis(); 17 private volatile long sent; 18 // 请求的返回结果 19 private volatile Response response; 20 private volatile ResponseCallback callback; 21 22 public DefaultFuture(Channel channel, Request request, int timeout) { 23 this.channel = channel; 24 this.request = request; 25 this.id = request.getId(); 26 this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 27 // put into waiting map. 28 FUTURES.put(id, this); 29 CHANNELS.put(id, channel); 30 }
get():
1 public Object get(int timeout) throws RemotingException { 2 if (timeout <= 0) { 3 timeout = Constants.DEFAULT_TIMEOUT; 4 } 5 // isDone()方法就是判断Response是否有值(即是否有返回结果) 6 if (!isDone()) { 7 long start = System.currentTimeMillis(); 8 lock.lock(); 9 try { 10 while (!isDone()) { 11 // 超时等待 12 done.await(timeout, TimeUnit.MILLISECONDS); 13 // 如果有返回结果了,或者,超时了,就退出循环 14 if (isDone() || System.currentTimeMillis() - start > timeout) { 15 break; 16 } 17 } 18 } catch (InterruptedException e) { 19 throw new RuntimeException(e); 20 } finally { 21 lock.unlock(); 22 } 23 // 如果是超时了,就抛出异常 24 if (!isDone()) { 25 throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); 26 } 27 } 28 // 远程服务正常返回结果,则返回给调用方 29 return returnFromResponse(); 30 }
received(Channel channel, Response response):
1 public static void received(Channel channel, Response response) { 2 try { 3 // 根据请求id从FUTURES中获取DefaultFuture,并删除 4 DefaultFuture future = FUTURES.remove(response.getId()); 5 if (future != null) { 6 future.doReceived(response); 7 } else { 8 logger.warn("The timeout response finally returned at " 9 + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 10 + ", response " + response 11 + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 12 + " -> " + channel.getRemoteAddress())); 13 } 14 } finally { 15 // CHANNELS也删除 16 CHANNELS.remove(response.getId()); 17 } 18 }
1 private void doReceived(Response res) { 2 lock.lock(); 3 try { 4 response = res; 5 if (done != null) { 6 // 唤醒阻塞的线程 7 done.signal(); 8 } 9 } finally { 10 lock.unlock(); 11 } 12 if (callback != null) { 13 invokeCallback(callback); 14 } 15 }
总结:Dubbo异步转同步的原理,其实就是利用Lock和Condition实现了等待通知机制。请求与返回结果进行匹配,则是通过传递以及接收请求id实现的。
原文:https://www.cnblogs.com/dushenzi/p/12369955.html