1000字范文,内容丰富有趣,学习的好帮手!
1000字范文 > dubbo源码分析系列——dubbo-cluster模块源码分析

dubbo源码分析系列——dubbo-cluster模块源码分析

时间:2023-05-28 11:55:36

相关推荐

dubbo源码分析系列——dubbo-cluster模块源码分析

独角兽企业重金招聘Python工程师标准>>>

模块功能介绍

该模块的使用介绍请参考dubbo官方用户手册如下章节内容。

集群容错负载均衡路由规则配置规则注册中心参考手册

其中注册中心其实是对于目录服务的一种实现方式,本文不会对注册中心进行详细讲解。

核心类图

核心源码分析

核心接口概念及关系

各节点关系:

这里的Invoker是Provider的一个可调用Service的抽象,Invoker封装了Provider地址及Service接口信息。Directory代表多个Invoker,可以把它看成List<Invoker>,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。Router负责从多个Invoker中按路由规则选出子集,比如读写分离,应用隔离等。LoadBalance负责从多个Invoker中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选。

由于每种接口都有多种实现类,篇幅和时间有限,我们选择其中最为典型的一种来进行源码分析。

Cluster

扩展接口介绍

集群的源码如下。

package com.alibaba.dubbo.rpc.cluster;import com.mon.extension.Adaptive;import com.mon.extension.SPI;import com.alibaba.dubbo.rpc.Invoker;import com.alibaba.dubbo.rpc.RpcException;import com.alibaba.dubbo.rpc.cluster.support.FailoverCluster;/*** Cluster. (SPI, Singleton, ThreadSafe)* * <a href="/wiki/Computer_cluster">Cluster</a>* <a href="/wiki/Fault-tolerant_system">Fault-Tolerant</a>* * @author william.liangf*/@SPI(FailoverCluster.NAME)public interface Cluster {/*** Merge the directory invokers to a virtual invoker.* * @param <T>* @param directory* @return cluster invoker* @throws RpcException*/@Adaptive<T> Invoker<T> join(Directory<T> directory) throws RpcException;}

该接口只有一个方法,就是将directory对象中的多个invoker的集合整合成一个invoker对象。该方法被ReferenceConfig类的createProxy方法调用,调用它的代码如下。

// 对有注册中心的Cluster 只用 AvailableClusterURL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers));

Cluster内置有9个扩展实现类,都实现了不同的集群容错策略,我们只分析默认的自动故障转移的扩展实现FailoverCluster。

FailoverCluster

源码如下,只是构造了一个类型为FailoverClusterInvoker的invoker对象。

public class FailoverCluster implements Cluster {public final static String NAME = "failover";public <T> Invoker<T> join(Directory<T> directory) throws RpcException {return new FailoverClusterInvoker<T>(directory);}}

我们进入看看FailoverClusterInvoker的源码。

/*** 失败转移,当出现失败,重试其它服务器,通常用于读操作,但重试会带来更长延迟。* * <a href="/wiki/Failover">Failover</a>* * @author william.liangf* @author chao.liuc*/public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);public FailoverClusterInvoker(Directory<T> directory) {super(directory);}@SuppressWarnings({ "unchecked", "rawtypes" })public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyinvokers = invokers;checkInvokers(copyinvokers, invocation);int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;if (len <= 0) {len = 1;}// retry loop.RpcException le = null; // last exception.List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.Set<String> providers = new HashSet<String>(len);for (int i = 0; i < len; i++) {//重试时,进行重新选择,避免重试时invoker列表已发生变化.//注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变if (i > 0) {checkWheatherDestoried();copyinvokers = list(invocation);//重新检查一下checkInvokers(copyinvokers, invocation);}Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);invoked.add(invoker);RpcContext.getContext().setInvokers((List)invoked);try {Result result = invoker.invoke(invocation);if (le != null && logger.isWarnEnabled()) {logger.warn("Although retry the method " + invocation.getMethodName()+ " in the service " + getInterface().getName()+ " was successful by the provider " + invoker.getUrl().getAddress()+ ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size()+ ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion() + ". Last error is: "+ le.getMessage(), le);}return result;} catch (RpcException e) {if (e.isBiz()) { // biz exception.throw e;}le = e;} catch (Throwable e) {le = new RpcException(e.getMessage(), e);} finally {providers.add(invoker.getUrl().getAddress());}}throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "+ invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "+ Version.getVersion() + ". Last error is: "+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);}}

该类又继承自抽象实现类AbstractClusterInvoker,使用该类的一些方法,因此也要结合该类的源码一起看。

/** Copyright 1999- Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* */licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.alibaba.dubbo.rpc.cluster.support;import java.util.ArrayList;import java.util.List;import com.mon.Constants;import com.mon.URL;import com.mon.Version;import com.mon.extension.ExtensionLoader;import com.mon.logger.Logger;import com.mon.logger.LoggerFactory;import com.Utils;import com.alibaba.dubbo.rpc.Invocation;import com.alibaba.dubbo.rpc.Invoker;import com.alibaba.dubbo.rpc.Result;import com.alibaba.dubbo.rpc.RpcException;import com.alibaba.dubbo.rpc.cluster.Directory;import com.alibaba.dubbo.rpc.cluster.LoadBalance;import com.alibaba.dubbo.rpc.support.RpcUtils;/*** AbstractClusterInvoker* * @author william.liangf* @author chao.liuc*/public abstract class AbstractClusterInvoker<T> implements Invoker<T> {private static final Loggerlogger = LoggerFactory.getLogger(AbstractClusterInvoker.class);protected final Directory<T>directory;protected final booleanavailablecheck;private volatile boolean destroyed = false;private volatile Invoker<T>stickyInvoker = null;public AbstractClusterInvoker(Directory<T> directory) {this(directory, directory.getUrl());}public AbstractClusterInvoker(Directory<T> directory, URL url) {if (directory == null)throw new IllegalArgumentException("service directory == null");this.directory = directory ;//sticky 需要检测 avaliablecheck this.availablecheck = url.getParameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK) ;}public Class<T> getInterface() {return directory.getInterface();}public URL getUrl() {return directory.getUrl();}public boolean isAvailable() {Invoker<T> invoker = stickyInvoker;if (invoker != null) {return invoker.isAvailable();}return directory.isAvailable();}public void destroy() {directory.destroy();destroyed = true;}/*** 使用loadbalance选择invoker.</br>* a)先lb选择,如果在selected列表中 或者 不可用且做检验时,进入下一步(重选),否则直接返回</br>* b)重选验证规则:selected > available .保证重选出的结果尽量不在select中,并且是可用的 * * @param availablecheck 如果设置true,在选择的时候先选invoker.available == true* @param selected 已选过的invoker.注意:输入保证不重复* */protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (invokers == null || invokers.size() == 0)return null;String methodName = invocation == null ? "" : invocation.getMethodName();boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ;{//ignore overloaded methodif ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){stickyInvoker = null;}//ignore cucurrent problemif (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){if (availablecheck && stickyInvoker.isAvailable()){return stickyInvoker;}}}Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);if (sticky){stickyInvoker = invoker;}return invoker;}private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (invokers == null || invokers.size() == 0)return null;if (invokers.size() == 1)return invokers.get(0);// 如果只有两个invoker,退化成轮循if (invokers.size() == 2 && selected != null && selected.size() > 0) {return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);}Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);//如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试.if( (selected != null && selected.contains(invoker))||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){try{Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);if(rinvoker != null){invoker = rinvoker;}else{//看下第一次选的位置,如果不是最后,选+1位置.int index = invokers.indexOf(invoker);try{//最后在避免碰撞invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;}catch (Exception e) {logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);}}}catch (Throwable t){logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);}}return invoker;} /*** 重选,先从非selected的列表中选择,没有在从selected列表中选择.* @param loadbalance* @param invocation* @param invokers* @param selected* @return* @throws RpcException*/private Invoker<T> reselect(LoadBalance loadbalance,Invocation invocation,List<Invoker<T>> invokers, List<Invoker<T>> selected ,boolean availablecheck)throws RpcException {//预先分配一个,这个列表是一定会用到的.List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size()>1?(invokers.size()-1):invokers.size());//先从非select中选if( availablecheck ){ //选isAvailable 的非selectfor(Invoker<T> invoker : invokers){if(invoker.isAvailable()){if(selected ==null || !selected.contains(invoker)){reselectInvokers.add(invoker);}}}if(reselectInvokers.size()>0){return loadbalance.select(reselectInvokers, getUrl(), invocation);}}else{ //选全部非selectfor(Invoker<T> invoker : invokers){if(selected ==null || !selected.contains(invoker)){reselectInvokers.add(invoker);}}if(reselectInvokers.size()>0){return loadbalance.select(reselectInvokers, getUrl(), invocation);}}//最后从select中选可用的. {if(selected != null){for(Invoker<T> invoker : selected){if((invoker.isAvailable()) //优先选available && !reselectInvokers.contains(invoker)){reselectInvokers.add(invoker);}}}if(reselectInvokers.size()>0){return loadbalance.select(reselectInvokers, getUrl(), invocation);}}return null;}public Result invoke(final Invocation invocation) throws RpcException {checkWheatherDestoried();LoadBalance loadbalance;List<Invoker<T>> invokers = list(invocation);if (invokers != null && invokers.size() > 0) {loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));} else {loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);}RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);return doInvoke(invocation, invokers, loadbalance);}protected void checkWheatherDestoried() {if(destroyed){throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost()+ " use dubbo version " + Version.getVersion()+ " is now destroyed! Can not invoke any more.");}}@Overridepublic String toString() {return getInterface() + " -> " + getUrl().toString();}protected void checkInvokers(List<Invoker<T>> invokers, Invocation invocation) {if (invokers == null || invokers.size() == 0) {throw new RpcException("Failed to invoke the method "+ invocation.getMethodName() + " in the service " + getInterface().getName() + ". No provider available for the service " + directory.getUrl().getServiceKey()+ " from registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion()+ ". Please check if the providers have been started and registered.");}}protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,LoadBalance loadbalance) throws RpcException;protected List<Invoker<T>> list(Invocation invocation) throws RpcException {List<Invoker<T>> invokers = directory.list(invocation);return invokers;}}

源码实现分析。

AbstractClusterInvoker的invoke方法提供了一个骨架实现。逻辑是检查对象是否销毁状态,从directory获得invoker列表,获得loadbalance扩展实现对象,然后调用抽象方法doInvoke去执行真正的逻辑,交由具体子类实现。AbstractClusterInvoker实现了一个公共的protected的方法select,该方法实现了使用loadbalance选择合适的invoker对象。在选择方法的实现中支持粘滞连接特性。作为一个公共特性,所有的子类都支持。最后再调用private方法doselect实现进一步选择逻辑。AbstractClusterInvoker的doselect方法实现了真正的选择invoker逻辑。首先检查可选invoker,若没有则返回null;如果有2个可选invoker则退化为轮询;否则继续调用loadbalance的select方法选择一个invoker;然后在检查选中的invoker是否已经使用过或者不可用,如果不可用则会调用reselect重新选择,若重新选择成功则使用它,否则则使用invoker列表中当前index+1的invoker,如果已经是最后一个则直接使用当前的invoker。AbstractClusterInvoker的reselect方法的实现逻辑是:如果availablecheck标志为true,则只将未被selected的可用状态的invoker交给loadbalance进行选择,否则将所有的未被selected的invoker交给loadbalance选择,若可重新选择的invoker为空,则将selected的invoker列表交给loadbalance进行选择。FailoverClusterInvoker的doInvoke方法的实现逻辑为:检查invoker列表状态;获得参数中重试次数,默认次数是2;从directory获得invoker列表;调用select方法选择一个invoker;将选择的invoker加入到invoked集合,表示已经选择和使用的;调用invoker.invoke()方法,若成功则返回result,若抛出的是业务异常则抛出,否则继续重试选择并调用下一个invoker;

LoadBalance

负载均衡器

扩展接口定义

@SPI(RandomLoadBalance.NAME)public interface LoadBalance {/*** select one invoker in list.* * @param invokers invokers.* @param url refer url* @param invocation invocation.* @return selected invoker.*/@Adaptive("loadbalance")<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;}

上述源码所示,负载均衡只定义了一个方法,就是在候选的invokers中选择一个invoker对象出来。默认的扩展实现是random。那我么就分析RandomLoadBalance的源码。

RandomLoadBalance

/** Copyright 1999- Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* */licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.alibaba.dubbo.rpc.cluster.loadbalance;import java.util.List;import java.util.Random;import com.mon.URL;import com.alibaba.dubbo.rpc.Invocation;import com.alibaba.dubbo.rpc.Invoker;/*** random load balance.** @author qianlei* @author william.liangf*/public class RandomLoadBalance extends AbstractLoadBalance {public static final String NAME = "random";private final Random random = new Random();protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {int length = invokers.size(); // 总个数int totalWeight = 0; // 总权重boolean sameWeight = true; // 权重是否都一样for (int i = 0; i < length; i++) {int weight = getWeight(invokers.get(i), invocation);totalWeight += weight; // 累计总权重if (sameWeight && i > 0&& weight != getWeight(invokers.get(i - 1), invocation)) {sameWeight = false; // 计算所有权重是否一样}}if (totalWeight > 0 && ! sameWeight) {// 如果权重不相同且权重大于0则按总权重数随机int offset = random.nextInt(totalWeight);// 并确定随机值落在哪个片断上for (int i = 0; i < length; i++) {offset -= getWeight(invokers.get(i), invocation);if (offset < 0) {return invokers.get(i);}}}// 如果权重相同或权重为0则均等随机return invokers.get(random.nextInt(length));}}

该类继承了抽象类AbstractLoadBalance,因此我们也要结合该类一起分析。

/** Copyright 1999- Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* */licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.alibaba.dubbo.rpc.cluster.loadbalance;import java.util.List;import com.mon.Constants;import com.mon.URL;import com.alibaba.dubbo.rpc.Invoker;import com.alibaba.dubbo.rpc.Invocation;import com.alibaba.dubbo.rpc.cluster.LoadBalance;/*** AbstractLoadBalance* * @author william.liangf*/public abstract class AbstractLoadBalance implements LoadBalance {public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {if (invokers == null || invokers.size() == 0)return null;if (invokers.size() == 1)return invokers.get(0);return doSelect(invokers, url, invocation);}protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);protected int getWeight(Invoker<?> invoker, Invocation invocation) {int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);if (weight > 0) {long timestamp = invoker.getUrl().getParameter(Constants.TIMESTAMP_KEY, 0L);if (timestamp > 0L) {int uptime = (int) (System.currentTimeMillis() - timestamp);int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);if (uptime > 0 && uptime < warmup) {weight = calculateWarmupWeight(uptime, warmup, weight);}}}return weight;}static int calculateWarmupWeight(int uptime, int warmup, int weight) {int ww = (int) ( (float) uptime / ( (float) warmup / (float) weight ) );return ww < 1 ? 1 : (ww > weight ? weight : ww);}}

源码分析如下:

AbstractLoadBalance的select方法实现,只是做了参数校验,invoker列表若0个则返回null,1个元素则直接返回;否则调用抽象方法doSelect交给子类实现。AbstractLoadBalance定义了公共方法getWeight。该方法是获取invoker的权重的方法,公式是:(int) ( (float) uptime / ( (float) warmup / (float) weight ) );如果未设置权重或者权重值都一样,则直接调用random.nextInt()随机获得一个invoker;若设置了权重并且不一样,则在总权重中随机,分布在哪个invoker的分片上,则选择该invoker对象,实现了按照权重随机。

Router

接口定义

public interface Router extends Comparable<Router> {/*** get the router url.* * @return url*/URL getUrl();/*** route.* * @param invokers* @param url refer url* @param invocation* @return routed invokers* @throws RpcException*/<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;}

路由器就定义了上述2个方法,核心方法是route,从大的invoker列表结合中根据规则过滤出一个子集合。我们这里只分析实现类ConditionRouter的源码。

ConditionRouter

/** Copyright 1999- Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* */licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.alibaba.dubbo.rpc.cluster.router.condition;import java.text.ParseException;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Set;import java.util.regex.Matcher;import java.util.regex.Pattern;import com.mon.Constants;import com.mon.URL;import com.mon.logger.Logger;import com.mon.logger.LoggerFactory;import com.Utils;import com.mon.utils.StringUtils;import com.mon.utils.UrlUtils;import com.alibaba.dubbo.rpc.Invocation;import com.alibaba.dubbo.rpc.Invoker;import com.alibaba.dubbo.rpc.RpcException;import com.alibaba.dubbo.rpc.cluster.Router;/*** ConditionRouter* * @author william.liangf*/public class ConditionRouter implements Router, Comparable<Router> {private static final Logger logger = LoggerFactory.getLogger(ConditionRouter.class);private final URL url;private final int priority;private final boolean force;private final Map<String, MatchPair> whenCondition;private final Map<String, MatchPair> thenCondition;public ConditionRouter(URL url) {this.url = url;this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);this.force = url.getParameter(Constants.FORCE_KEY, false);try {String rule = url.getParameterAndDecoded(Constants.RULE_KEY);if (rule == null || rule.trim().length() == 0) {throw new IllegalArgumentException("Illegal route rule!");}rule = rule.replace("consumer.", "").replace("provider.", "");int i = rule.indexOf("=>");String whenRule = i < 0 ? null : rule.substring(0, i).trim();String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);// NOTE: When条件是允许为空的,外部业务来保证类似的约束条件this.whenCondition = when;this.thenCondition = then;} catch (ParseException e) {throw new IllegalStateException(e.getMessage(), e);}}public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)throws RpcException {if (invokers == null || invokers.size() == 0) {return invokers;}try {if (! matchWhen(url)) {return invokers;}List<Invoker<T>> result = new ArrayList<Invoker<T>>();if (thenCondition == null) {logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());return result;}for (Invoker<T> invoker : invokers) {if (matchThen(invoker.getUrl(), url)) {result.add(invoker);}}if (result.size() > 0) {return result;} else if (force) {logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));return result;}} catch (Throwable t) {logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);}return invokers;}public URL getUrl() {return url;}public int compareTo(Router o) {if (o == null || o.getClass() != ConditionRouter.class) {return 1;}ConditionRouter c = (ConditionRouter) o;return this.priority == c.priority ? url.toFullString().compareTo(c.url.toFullString()) : (this.priority > c.priority ? 1 : -1);}public boolean matchWhen(URL url) {return matchCondition(whenCondition, url, null);}public boolean matchThen(URL url, URL param) {return thenCondition != null && matchCondition(thenCondition, url, param);}private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param) {Map<String, String> sample = url.toMap();for (Map.Entry<String, String> entry : sample.entrySet()) {String key = entry.getKey();MatchPair pair = condition.get(key);if (pair != null && ! pair.isMatch(entry.getValue(), param)) {return false;}}return true;}private static Pattern ROUTE_PATTERN = pile("([&!=,]*)\\s*([^&!=,\\s]+)");private static Map<String, MatchPair> parseRule(String rule)throws ParseException {Map<String, MatchPair> condition = new HashMap<String, MatchPair>();if(StringUtils.isBlank(rule)) {return condition;} // 匹配或不匹配Key-Value对MatchPair pair = null;// 多个Value值Set<String> values = null;final Matcher matcher = ROUTE_PATTERN.matcher(rule);while (matcher.find()) { // 逐个匹配String separator = matcher.group(1);String content = matcher.group(2);// 表达式开始if (separator == null || separator.length() == 0) {pair = new MatchPair();condition.put(content, pair);}// KV开始else if ("&".equals(separator)) {if (condition.get(content) == null) {pair = new MatchPair();condition.put(content, pair);} else {condition.put(content, pair);}}// KV的Value部分开始else if ("=".equals(separator)) {if (pair == null)throw new ParseException("Illegal route rule \""+ rule + "\", The error char '" + separator+ "' at index " + matcher.start() + " before \""+ content + "\".", matcher.start());values = pair.matches;values.add(content);}// KV的Value部分开始else if ("!=".equals(separator)) {if (pair == null)throw new ParseException("Illegal route rule \""+ rule + "\", The error char '" + separator+ "' at index " + matcher.start() + " before \""+ content + "\".", matcher.start());values = pair.mismatches;values.add(content);}// KV的Value部分的多个条目else if (",".equals(separator)) { // 如果为逗号表示if (values == null || values.size() == 0)throw new ParseException("Illegal route rule \""+ rule + "\", The error char '" + separator+ "' at index " + matcher.start() + " before \""+ content + "\".", matcher.start());values.add(content);} else {throw new ParseException("Illegal route rule \"" + rule+ "\", The error char '" + separator + "' at index "+ matcher.start() + " before \"" + content + "\".", matcher.start());}}return condition;}private static final class MatchPair {final Set<String> matches = new HashSet<String>();final Set<String> mismatches = new HashSet<String>();public boolean isMatch(String value, URL param) {for (String match : matches) {if (! UrlUtils.isMatchGlobPattern(match, value, param)) {return false;}}for (String mismatch : mismatches) {if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {return false;}}return true;}}}

该源码实现了如下条件路由器功能。

基于条件表达式的路由规则,如:

规则:

"=>"之前的为消费者匹配条件,所有参数和消费者的URL进行对比,当消费者满足匹配条件时,对该消费者执行后面的过滤规则。"=>"之后为提供者地址列表的过滤条件,所有参数和提供者的URL进行对比,消费者最终只拿到过滤后的地址列表。如果匹配条件为空,表示对所有消费方应用,如:=> host != 10.20.153.11如果过滤条件为空,表示禁止访问,如:host = 10.20.153.10 =>

表达式:

参数支持: 服务调用信息,如:method,argument等(暂不支持参数路由)URL本身的字段,如:protocol, host, port 等以及URL上的所有参数,如:application, organization 等条件支持: 等号"="表示"匹配",如:host = 10.20.153.10不等号"!="表示"不匹配",如:host != 10.20.153.10值支持: 以逗号","分隔多个值,如:host != 10.20.153.10,10.20.153.11以星号"*"结尾,表示通配,如:host != 10.20.*以美元符"$"开头,表示引用消费者参数,如:host = $host

Directory

接口定义

public interface Directory<T> extends Node {/*** get service type.* * @return service type.*/Class<T> getInterface();/*** list invokers.* * @return invokers*/List<Invoker<T>> list(Invocation invocation) throws RpcException;}

目录服务定义了一个核心接口list,就是列举出某个接口在目录中的所有服务列表。

抽象实现AbstractDirectory

提供了一个抽象的目录实现类,源码如下。

/*** 增加router的Directory* * @author chao.liuc*/public abstract class AbstractDirectory<T> implements Directory<T> {// 日志输出private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);private final URL url ;private volatile boolean destroyed = false;private volatile URL consumerUrl ;private volatile List<Router> routers;public AbstractDirectory(URL url) {this(url, null);}public AbstractDirectory(URL url, List<Router> routers) {this(url, url, routers);}public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {if (url == null)throw new IllegalArgumentException("url == null");this.url = url;this.consumerUrl = consumerUrl;setRouters(routers);}public List<Invoker<T>> list(Invocation invocation) throws RpcException {if (destroyed){throw new RpcException("Directory already destroyed .url: "+ getUrl());}List<Invoker<T>> invokers = doList(invocation);List<Router> localRouters = this.routers; // local referenceif (localRouters != null && localRouters.size() > 0) {for (Router router: localRouters){try {if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {invokers = router.route(invokers, getConsumerUrl(), invocation);}} catch (Throwable t) {logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);}}}return invokers;}public URL getUrl() {return url;}public List<Router> getRouters(){return routers;}public URL getConsumerUrl() {return consumerUrl;}public void setConsumerUrl(URL consumerUrl) {this.consumerUrl = consumerUrl;}protected void setRouters(List<Router> routers){// copy listrouters = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);// append url routerString routerkey = url.getParameter(Constants.ROUTER_KEY);if (routerkey != null && routerkey.length() > 0) {RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);routers.add(routerFactory.getRouter(url));}// append mock invoker selectorrouters.add(new MockInvokersSelector());Collections.sort(routers);this.routers = routers;}public boolean isDestroyed() {return destroyed;}public void destroy(){destroyed = true;}protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException ;}

list方法的实现逻辑是:先检查目录是否销毁状态,若已经销毁则抛出异常;调用抽象方法doList实现真正的从目录服务中获取invoker列表,该方法需要子类实现;循环对象中的路由器列表,若路由器url为null或者参数runtime为true则调用该路由器的route方法进行路由,将返回的invoker列表替换为路由后的结果; 返回最终的invoker列表。

setRouters方法是设置路由器列表,除了参数参入的routers之外,还会追加2个默认的路由器,一个是参数router指定的routerFactory获得的router,另外一个是MockInvokersSelector对象;

默认实现StaticDirectory

模块还提供了一个默认目录实现类StaticDirectory,它是一个静态的内存缓存目录服务实现。源码如下:

public class StaticDirectory<T> extends AbstractDirectory<T> {private final List<Invoker<T>> invokers;public StaticDirectory(List<Invoker<T>> invokers){this(null, invokers, null);}public StaticDirectory(List<Invoker<T>> invokers, List<Router> routers){this(null, invokers, routers);}public StaticDirectory(URL url, List<Invoker<T>> invokers) {this(url, invokers, null);}public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {super(url == null && invokers != null && invokers.size() > 0 ? invokers.get(0).getUrl() : url, routers);if (invokers == null || invokers.size() == 0)throw new IllegalArgumentException("invokers == null");this.invokers = invokers;}public Class<T> getInterface() {return invokers.get(0).getInterface();}public boolean isAvailable() {if (isDestroyed()) {return false;}for (Invoker<T> invoker : invokers) {if (invoker.isAvailable()) {return true;}}return false;}public void destroy() {if(isDestroyed()) {return;}super.destroy();for (Invoker<T> invoker : invokers) {invoker.destroy();}invokers.clear();}@Overrideprotected List<Invoker<T>> doList(Invocation invocation) throws RpcException {return invokers;}}

它的doList方法的实现是直接将属性invokers的值返回,非常简单。

此外还有一个RegistryDirectory的实现类,该类是整合了注册中心和目录服务。

NEXT

因为考虑到本模块与dubbo-registry相关性较大,接下来我们将研究dubbo-registry-api和dubbo-registry-default模块的源码。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。