各个项目之间相互调用的控制逻辑。通过这一特性可以实现服务提供者的灰度发布等操作。具体信息可以参照官网说明)
对于版本2.7+之后的来说,dubbo服务注册粒度可选为:应用级别(application)、服务级别(service)这两个级别。所有在路由配置这一块来说,也是分为这两个粒度。具体的配置内容可以参考 官网
简单的来说,条件路由
加载过程分为两个部分初始化
和实际作用阶段
。在初始化阶段中,从配置中心拉取该服务(该应用)配置的规则信息(如果有的话),进行装载;在项目启动完后,当有消费者调用服务提供者的时候,调用的过程经过如下步骤:
路由(条件/标签)
的筛选得到合适的invoker(服务提供者)
信息;在项目启动的过程中,ServiceDiscoveryRegistryDirectory
调用buildChain()
来从配置中心获取到自己关联的服务配置的路由策略信息,在javadoc的介绍中,条件路由的具体实现已从2.6.6的ConditionRouter
变为ServiceRouter
;
// ListenableRouter
public ListenableRouter(URL url, String ruleKey) {
super(url); // 通过url信息获取ruleRepository(zk、apollo、nacos....)
this.setForce(false);
// ruleKey格式(servicepath + ":" + group + ":" + version): cn.com.dubbo.demo.provider.service.MyEmpService::
this.init(ruleKey);
}
private synchronized void init(String ruleKey) {
if (StringUtils.isEmpty(ruleKey)) {
return;
}
// key末尾条件常量:.condition-router
String routerKey = ruleKey + RULE_SUFFIX;
// 添加监听,及时获取到变化更新信息
this.getRuleRepository().addListener(routerKey, this);
// 从配置中心获取配置的路由信息(如果有的话)
// DefaultGovernanceRuleRepositoryImpl#getRule()
String rule = this.getRuleRepository().getRule(routerKey, DynamicConfiguration.DEFAULT_GROUP);
if (StringUtils.isNotEmpty(rule)) {
this.process(new ConfigChangedEvent(routerKey, DynamicConfiguration.DEFAULT_GROUP, rule));
}
}
// DefaultGovernanceRuleRepositoryImpl#getRule()
@Override
public String getRule(String key, String group, long timeout) throws IllegalStateException {
// 通过application.properties等系统配置文件获取到连接配置中心客户端并且初始化相关的信息
DynamicConfiguration dynamicConfiguration = getDynamicConfiguration();
if (dynamicConfiguration != null) {
// 获取配置的路有信息
return dynamicConfiguration.getConfig(key, group, timeout);
}
return null;
}
@Override
public String getConfig(String key, String group, long timeout) throws IllegalStateException {
return (String) iterateConfigOperation(configuration -> configuration.getConfig(key, group, timeout));
}
// func参数是一个函数,通过apply()触发func被调用
private Object iterateConfigOperation(Function<DynamicConfiguration, Object> func) {
Object value = null;
for (DynamicConfiguration configuration : configurations) {
// 触发 configuration -> configuration.getConfig(key, group, timeout)
value = func.apply(configuration);
if (value != null) {
break;
}
}
return value;
}
@Override
public synchronized void process(ConfigChangedEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("Notification of condition rule, change type is: " + event.getChangeType() +
", raw rule is:\n " + event.getContent());
}
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
routerRule = null;
conditionRouters = Collections.emptyList();
} else {
try {
routerRule = ConditionRuleParser.parse(event.getContent());
// 初始化 ConditionRouter,当后调用的时候,从ConditionRouter中获取到相关的路由策略来判断是否invokers是否符合
generateConditions(routerRule);
} catch (Exception e) {
logger.error("Failed to parse the raw condition rule and it will not take effect, please check " +
"if the condition rule matches with the template, the raw rule is:\n " + event.getContent(), e);
}
}
}
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
// Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
// if (contextAttachments != null && contextAttachments.size() != 0) {
// ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(contextAttachments);
// }
// 获取可用的服务提供者列表
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
其中路由的作用也是在list()
中体现的,具体的路由匹配RouterChain.route()
如下:
public List<Invoker<T>> route(URL url, Invocation invocation) {
AddrCache<T> cache = this.cache.get();
if (cache == null) {
throw new RpcException(RpcException.ROUTER_CACHE_NOT_BUILD, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + url.getServiceInterface()
+ ". address cache not build "
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion()
+ ".");
}
BitList<Invoker<T>> finalBitListInvokers = new BitList<>(invokers, false);
for (StateRouter stateRouter : stateRouters) {
if (stateRouter.isEnable()) {
RouterCache<T> routerCache = cache.getCache().get(stateRouter.getName());
finalBitListInvokers = stateRouter.route(finalBitListInvokers, routerCache, url, invocation);
}
}
List<Invoker<T>> finalInvokers = new ArrayList<>(finalBitListInvokers.size());
for(Invoker<T> invoker: finalBitListInvokers) {
finalInvokers.add(invoker);
}
for (Router router : routers) {
finalInvokers = router.route(finalInvokers, url, invocation);
}
return finalInvokers;
}
有五个默认的router
,每一种类型的路由都有一个专属的handler来做相应的处理,对于条件路由来说,需要关注的就是ServiceRouter
的处理逻辑。
ConditionRouter.route()
,遍历所有的服务提供者信息,来匹配过滤已经设定好的路由规则得到符合条件的服务提供者信息。
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
throws RpcException {
if (!enabled) {
return invokers;
}
if (CollectionUtils.isEmpty(invokers)) {
return invokers;
}
try {
if (!matchWhen(url, invocation)) {
return invokers;
}
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
if (thenCondition == null) {
logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
return result;
}
for (Invoker<T> invoker : invokers) {
if (matchThen(invoker.getUrl(), url)) {
result.add(invoker);
}
}
if (!result.isEmpty()) {
return result;
} else if (this.isForce()) {
logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(RULE_KEY));
return result;
}
} catch (Throwable t) {
logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
}
return invokers;
}
原文:https://www.cnblogs.com/KevinStark/p/15260946.html