上文我们介绍了 NetFlix Dynomite 客户端 DynoJedisClient 的 连接管理和拓扑感知部分,本文将继续分析自动发现和故障转移。
我们还是要回顾下基本思路和图例。
因为要为上层屏蔽信息,所以 DynoJedisClient 就需要应对各种复杂信息,需要对系统有深刻的了解,,比如:
因此,DynoJedisClient 的思路是:java驱动提供多个策略接口,可以用来驱动程序行为调优。包括负载均衡,重试请求,管理节点连接等等。
目前图例如下:
自动发现 是在 ConnectionPoolImpl 的 start 方法中,启动了线程,定期刷新host状态,进行update。
刷新线程逻辑如下,就是定期:
@Override
public Future<Boolean> start() throws DynoException {
HostSupplier hostSupplier = cpConfiguration.getHostSupplier();
HostStatusTracker hostStatus = hostsUpdater.refreshHosts();
Collection<Host> hostsUp = hostStatus.getActiveHosts();
final ExecutorService threadPool = Executors.newFixedThreadPool(Math.max(10, hostsUp.size()));
final List<Future<Void>> futures = new ArrayList<Future<Void>>();
// 初始化,添加host
for (final Host host : hostsUp) {
// Add host connection pool, but don‘t init the load balancer yet
futures.add(threadPool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
addHost(host, false);
return null;
}
}));
}
boolean success = started.compareAndSet(false, true);
if (success) {
idling.set(false);
idleThreadPool.shutdownNow();
selectionStrategy = initSelectionStrategy();
cpHealthTracker.start();
// 启动定时线程
connPoolThreadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
// 调用 hostsUpdater 来获得最新的状态
HostStatusTracker hostStatus = hostsUpdater.refreshHosts();
cpMonitor.setHostCount(hostStatus.getHostCount());
// 调用updateHosts来依据这些状态 更新 ConnectionPoolImpl 内部成员变量
updateHosts(hostStatus.getActiveHosts(), hostStatus.getInactiveHosts());
}
}
}, 15 * 1000, 30 * 1000, TimeUnit.MILLISECONDS);
}
return getEmptyFutureTask(true);
}
上面代码中,具体 updateHosts 完成更新 host 操作,分别就是添加,删除。
@Override
public Future<Boolean> updateHosts(Collection<Host> hostsUp, Collection<Host> hostsDown) {
boolean condition = false;
if (hostsUp != null && !hostsUp.isEmpty()) {
for (Host hostUp : hostsUp) {
condition |= addHost(hostUp);
}
}
if (hostsDown != null && !hostsDown.isEmpty()) {
for (Host hostDown : hostsDown) {
condition |= removeHost(hostDown);
}
}
return getEmptyFutureTask(condition);
}
updateHosts 方法调用了 addHost,可以看到,其对于 selectionStrategy,cpMonitor,cpHealthTracker,cpMap 都进行相应操作,因为这些都需要针对 host 的变化做处理。
public boolean addHost(Host host, boolean refreshLoadBalancer) {
HostConnectionPool<CL> connPool = cpMap.get(host);
final HostConnectionPool<CL> hostPool = hostConnPoolFactory.createHostConnectionPool(host, this);
HostConnectionPool<CL> prevPool = cpMap.putIfAbsent(host, hostPool);
if (prevPool == null) {
// This is the first time we are adding this pool.
try {
int primed = hostPool.primeConnections();
if (hostPool.isActive()) {
if (refreshLoadBalancer) {
selectionStrategy.addHost(host, hostPool);
}
cpHealthTracker.initializePingHealthchecksForPool(hostPool);
cpMonitor.hostAdded(host, hostPool);
} else {
cpMap.remove(host);
}
return primed > 0;
}
}
}
此时逻辑如下:
+--------------------------------------------------------------------------+
| |
| ConnectionPoolImpl |
| |
| Timer |
| +----------------------+ |
| refreshHosts | | |
| hostsUpdater +----------------> | connPoolThreadPool | |
| | | |
| +------------+---------+ |
| | |
| | |
| updateHosts | |
| | |
| +----------------+-----------------------------+ |
| | | | | |
| | | | | |
| v v v v |
| cpHealthTracker selectionStrategy cpMonitor cpMap |
| |
| |
| |
+--------------------------------------------------------------------------+
发现是通过 HostsUpdater 进行操作。
此类的主要作用是调用 HostSupplier 进行刷新:
public class HostsUpdater {
private final HostSupplier hostSupplier;
private final TokenMapSupplier tokenMapSupplier;
private final AtomicReference<HostStatusTracker> hostTracker = new AtomicReference<HostStatusTracker>(null);
public HostStatusTracker refreshHosts() {
List<Host> allHostsFromHostSupplier = hostSupplier.getHosts();
/**
* HostTracker should return the hosts that we get from TokenMapSupplier.
* Hence get the hosts from HostSupplier and map them to TokenMapSupplier
* and return them.
*/
Set<Host> hostSet = new HashSet<>(allHostsFromHostSupplier);
// Create a list of host/Tokens
List<HostToken> hostTokens = tokenMapSupplier.getTokens(hostSet);
// The key here really needs to be a object that is overlapping between
// the host from HostSupplier and TokenMapSupplier. Since that is a
// subset of the Host object itself, Host is the key as well as value here.
Map<Host, Host> allHostSetFromTokenMapSupplier = new HashMap<>();
for (HostToken ht : hostTokens) {
allHostSetFromTokenMapSupplier.put(ht.getHost(), ht.getHost());
}
for (Host hostFromHostSupplier : allHostsFromHostSupplier) {
if (hostFromHostSupplier.isUp()) {
// 设置 up 状态
Host hostFromTokenMapSupplier = allHostSetFromTokenMapSupplier.get(hostFromHostSupplier);
// 用hostFromHostSupplier的属性来各种设置upHostBuilder,比如ip, hostname, status, port, DatastorePort, rack, datacenter, hashtag, password...
HostBuilder upHostBuilder = new HostBuilder()
.setHostname()......;
hostsUpFromHostSupplier.add(upHostBuilder.createHost());
allHostSetFromTokenMapSupplier.remove(hostFromTokenMapSupplier);
} else {
// 设置 down 状态
Host hostFromTokenMapSupplier = allHostSetFromTokenMapSupplier.get(hostFromHostSupplier);
// downHostBuilder,比如ip, hostname, status, port, DatastorePort, rack, datacenter, hashtag, password...
HostBuilder downHostBuilder = new HostBuilder()
.setHostname()......;
hostsDownFromHostSupplier.add(downHostBuilder.createHost());
allHostSetFromTokenMapSupplier.remove(hostFromTokenMapSupplier);
}
}
HostStatusTracker newTracker = hostTracker.get().computeNewHostStatus(hostsUpFromHostSupplier, hostsDownFromHostSupplier);
hostTracker.set(newTracker);
return hostTracker.get();
}
}
此类作用是记录,其他模块从这里提取信息。
public class HostStatusTracker {
// the set of active and inactive hosts
private final Set<Host> activeHosts = new HashSet<Host>();
private final Set<Host> inactiveHosts = new HashSet<Host>();
}
HostSupplier就是具体刷新,有很多实现。Dynomite 的具体的业务工作本来也就是需要依托给其他具体功能类来实现。
我们以EurekaHostsSupplier为例,就是调用 EurekaClient discoveryClient 获取信息。
public class EurekaHostsSupplier implements HostSupplier {
private final EurekaClient discoveryClient;
@Override
public List<Host> getHosts() {
return getUpdateFromEureka();
}
private List<Host> getUpdateFromEureka() {
Application app = discoveryClient.getApplication(applicationName);
List<Host> hosts = new ArrayList<Host>();
List<InstanceInfo> ins = app.getInstances();
hosts = Lists.newArrayList(Collections2.transform(ins, info -> {
Host.Status status = info.getStatus() == InstanceStatus.UP ? Host.Status.Up : Host.Status.Down;
String rack = null;
try {
if (info.getDataCenterInfo() instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) info.getDataCenterInfo();
rack = amazonInfo.get(MetaDataKey.availabilityZone);
}
}
Host host = new HostBuilder().setHostname(info.getHostName()).setIpAddress(info.getIPAddr()).setRack(rack).setStatus(status).createHost();
return host;
}));
return hosts;
}
}
因此,此时逻辑拓展如下:
+--------------------------------------------------------------------------------------+
| ConnectionPoolImpl |
| |
| |
| +-----------------------------------+ |
| | hostsUpdater | |
| | | |
| | | |
| | | Timer |
| | | +----------------------+ |
| | | refreshHosts | | |
| | HostStatusTracker -------------------------> | connPoolThreadPool | |
| | ^ | | | |
| | | | +------------+---------+ |
| | getUpdateFromEureka | | | |
| | | | | |
| | | | updateHosts | |
| | +----------------------+ | | |
| | | HostSupplier | | | +-----------------------------+ |
| | | | | | | | | |
| | | + | | | | | |
| | | EurekaClient | | v v v |
| | | | | selectionStrategy cpMonitor cpMap |
| | +----------------------+ | |
| +-----------------------------------+ |
+--------------------------------------------------------------------------------------+
既然我们正在运行一个群集而不是一个实例,那么我们将在故障转移时采取一些保护措施。
Dynomite 之中,错误主要有3种:
依据错误级别,错误处理 分别有 重试 与 fallback选择 两种,我们下面按照错误级别进行介绍。
当节点发生故障或无法访问时,驱动程序会自动并透明地尝试其他节点并安排重新连接到后台中的死节点。
但是 由于网络条件的临时更改也会使节点显示为脱机,因此驱动程序还提供了一种 retry策略 来重试因网络相关错误而失败的查询。这消除了在客户端代码中编写重试逻辑的需要。
retry策略确定当请求超时或节点不可用时要采用的默认行为。
Java驱动程序提供了几个RetryPolicy实现:
具体在执行命令时,我们可以看到,驱动会透明的尝试其他节点并在后台调度重新连接死亡节点:
简略版代码如下:
@Override
public <R> OperationResult<R> executeWithFailover(Operation<CL, R> op) throws DynoException {
RetryPolicy retry = cpConfiguration.getRetryPolicyFactory().getRetryPolicy();
retry.begin();
DynoException lastException = null;
do {
Connection<CL> connection = null;
try {
connection = selectionStrategy.getConnectionUsingRetryPolicy(op,
cpConfiguration.getMaxTimeoutWhenExhausted(), TimeUnit.MILLISECONDS, retry);
OperationResult<R> result = connection.execute(op); // 执行操作
retry.success(); // 如果成功,就执行操作策略的success方法,跳出循环
return result;
} catch (DynoException e) {
retry.failure(e); // 如果失败,就执行操作策略的failure方法
lastException = e;
if (connection != null) {
cpMonitor.incOperationFailure(connection.getHost(), e);
if (retry.allowRetry()) { // 调用具体 retry 实现策略
cpMonitor.incFailover(connection.getHost(), e);
}
}
}
} while (retry.allowRetry()); // 如果允许重试,就继续执行循环
}
具体我们以RetryNTimes为例。
可以看出来,就是通过sucess,failure来设置内部变量,以此决定是否允许重试。
public class RetryNTimes implements RetryPolicy {
private int n;
private final AtomicReference<RetryState> state = new AtomicReference<>(new RetryState(0, false));
public RetryNTimes(int n, boolean allowFallback) {
this.n = n;
this.allowCrossZoneFallback = allowFallback;
}
@Override
public void success() {
boolean success = false;
RetryState rs;
while (!success) {
rs = state.get(); // 设置内部变量
success = state.compareAndSet(rs, new RetryState(rs.count + 1, true));
}
}
@Override
public void failure(Exception e) {
boolean success = false;
RetryState rs;
while (!success) {
rs = state.get(); // 设置内部变量
success = state.compareAndSet(rs, new RetryState(rs.count + 1, false));
}
}
@Override
public boolean allowRetry() {
final RetryState rs = state.get();
return !rs.success && rs.count <= n;
}
private class RetryState {
private final int count;
private final boolean success;
public RetryState(final int count, final boolean success) {
this.count = count;
this.success = success;
}
}
}
因为重试有时候不能解决问题,所以下面我们谈谈解决更加严重问题 的 fallback 选择策略。
驱动可以对集群中的任何节点进行查询,然后将其称为该查询的协调节点。根据查询的内容,协调器可以与其他节点通信以满足查询。如果客户端要在同一节点上引导其所有查询,则会在集群上产生不平衡负载,尤其是在其他客户端执行相同操作的情况下。
为了防止单节点作为过多请求的协调节点,DynoJedisClient 驱动程序提供了一个可插拔的机制来平衡多个节点之间的查询负载。通过选择 HostSelectionStrategy 策略接口的实现来实现负载平衡。
每个 HostSelectionStrategy 将群集中的每个节点分类为本地,远程或忽略。驱动程序更喜欢与本地节点的交互,并且与远程节点保持与本地节点的更多连接。
HostSelectionStrategy 在构建时在群集上设置。驱动程序提供了两种基本的负载平衡实现:RoundRobin Policy 和 TokenAwareSelection。
HostSelectionWithFallback 是选择协调器。
下面我们看看具体定义。
HostSelectionWithFallback 的具体成员变量为:
具体类定义如下:
public class HostSelectionWithFallback<CL> {
// Only used in calculating replication factor
private final String localDataCenter;
// tracks the local zone
private final String localRack;
// The selector for the local zone
private final HostSelectionStrategy<CL> localSelector;
// Track selectors for each remote zone
private final ConcurrentHashMap<String, HostSelectionStrategy<CL>> remoteRackSelectors = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Host, HostToken> hostTokens = new ConcurrentHashMap<>();
private final TokenMapSupplier tokenSupplier;
private final ConnectionPoolConfiguration cpConfig;
private final ConnectionPoolMonitor cpMonitor;
private final AtomicInteger replicationFactor = new AtomicInteger(-1);
// Represents the *initial* topology from the token supplier. This does not affect selection of a host connection
// pool for traffic. It only affects metrics such as failover/fallback
private final AtomicReference<TokenPoolTopology> topology = new AtomicReference<>(null);
// list of names of remote zones. Used for RoundRobin over remote zones when local zone host is down
private final CircularList<String> remoteRackNames = new CircularList<>(new ArrayList<>());
private final HostSelectionStrategyFactory<CL> selectorFactory;
}
HostSelectionStrategy是选择Host策略,具体实现是 RoundRobinSelection 和 TokenAwareSelection。
负载平衡负责建立与整个集群(不仅在一个节点上)的连接,并维护与集群中每个主机的连接池。负载平衡还确定主机是本地主机还是远程主机。
它具有将某些请求发送到某些节点的逻辑。与哪些主机建立连接以及向哪些主机发送请求由负载平衡策略确定。
实际上,对每个请求都会算出一个查询计划。查询计划确定向哪个主机发送请求以及以哪个顺序发送(取决于推测执行策略和重试策略)。
此策略如字面意思,使用 ROUND ROBIN 策略,以线程安全方式在环形数据结构上提供 RR 负载均衡。
public class RoundRobinSelection<CL> implements HostSelectionStrategy<CL> {
// The total set of host pools. Once the host is selected, we ask it‘s corresponding pool to vend a connection
private final ConcurrentHashMap<Long, HostConnectionPool<CL>> tokenPools = new ConcurrentHashMap<Long, HostConnectionPool<CL>>();
// the circular list of Host over which we load balance in a round robin fashion
private final CircularList<HostToken> circularList = new CircularList<HostToken>(null);
@Override
public HostConnectionPool<CL> getPoolForOperation(BaseOperation<CL, ?> op, String hashtag) throws NoAvailableHostsException {
int numTries = circularList.getSize();
HostConnectionPool<CL> lastPool = null;
while (numTries > 0) {
lastPool = getNextConnectionPool();
numTries--;
if (lastPool.isActive() && lastPool.getHost().isUp()) {
return lastPool;
}
}
// If we reach here then we haven‘t found an active pool. Return the last inactive pool anyways,
// and HostSelectionWithFallback can choose a fallback pool from another dc
return lastPool;
}
@Override
public void initWithHosts(Map<HostToken, HostConnectionPool<CL>> hPools) {
for (HostToken token : hPools.keySet()) {
tokenPools.put(token.getToken(), hPools.get(token));
}
circularList.swapWithList(hPools.keySet());
}
@Override
public boolean addHostPool(HostToken host, HostConnectionPool<CL> hostPool) {
HostConnectionPool<CL> prevPool = tokenPools.put(host.getToken(), hostPool);
if (prevPool == null) {
List<HostToken> newHostList = new ArrayList<HostToken>(circularList.getEntireList());
newHostList.add(host);
circularList.swapWithList(newHostList);
}
return prevPool == null;
}
}
RR 策略大致如下,可以理解为从一个 circularList 里面顺序选择下一个策略:
+--------------------------+
|HostSelectionWithFallback |
+-------------+------------+
|
+--------------+--------------+
| |
v v
+---------+------------+ +-----------+--------+
| RoundRobinSelection | | TokenAwareSelection|
| | +--------------------+
| |
| circularList |
| + |
| | |
+----------------------+
|
|
v
+--> Pool1, Pool2, Pool3,..., Pooln +----+
| |
| |
+----------------------------------------+
TokenAwareSelection使用 TOKEN AWARE 算法进行处理。
TOKEN_AWARE就是根据主键token请求到相同的客户端,就是根据token把对同一条记录的请求,发到同一个节点。
所以此模块需要了解 dynomite ring topology,从而可以依据操作的key把其map到正确的token owner节点。
这种策略使用二分法查找来依据key得到token,然后通过token定位到dynomite topology ring。
提供不同的函数用来返回各种形式的连接,比如
public class TokenAwareSelection<CL> implements HostSelectionStrategy<CL> {
private final BinarySearchTokenMapper tokenMapper;
private final ConcurrentHashMap<Long, HostConnectionPool<CL>> tokenPools = new ConcurrentHashMap<Long, HostConnectionPool<CL>>();
public TokenAwareSelection(HashPartitioner hashPartitioner) {
this.tokenMapper = new BinarySearchTokenMapper(hashPartitioner);
}
/**
* Identifying the proper pool for the operation. A couple of things that may affect the decision
* (a) hashtags: In this case we will construct the key by decomposing from the hashtag
* (b) type of key: string keys vs binary keys.
* In binary keys hashtags do not really matter.
*/
@Override
public HostConnectionPool<CL> getPoolForOperation(BaseOperation<CL, ?> op, String hashtag) throws NoAvailableHostsException {
String key = op.getStringKey();
HostConnectionPool<CL> hostPool;
HostToken hToken;
if (key != null) {
// If a hashtag is provided by Dynomite then we use that to create the key to hash.
if (hashtag == null || hashtag.isEmpty()) {
hToken = this.getTokenForKey(key);
} else {
String hashValue = StringUtils.substringBetween(key, Character.toString(hashtag.charAt(0)), Character.toString(hashtag.charAt(1)));
hToken = this.getTokenForKey(hashValue);
}
hostPool = tokenPools.get(hToken.getToken());
} else {
// the key is binary
byte[] binaryKey = op.getBinaryKey();
hToken = this.getTokenForKey(binaryKey);
hostPool = tokenPools.get(hToken.getToken());
}
return hostPool;
}
@Override
public boolean addHostPool(HostToken hostToken, HostConnectionPool<CL> hostPool) {
HostConnectionPool<CL> prevPool = tokenPools.put(hostToken.getToken(), hostPool);
if (prevPool == null) {
tokenMapper.addHostToken(hostToken);
return true;
} else {
return false;
}
}
}
上面代码使用到了 BinarySearchTokenMapper,所以我们再看看。
其实这个类就是key与token的对应关系,查找时候使用了二分法。
public class BinarySearchTokenMapper implements HashPartitioner {
private final HashPartitioner partitioner;
private final AtomicReference<DynoBinarySearch<Long>> binarySearch = new AtomicReference<DynoBinarySearch<Long>>(null);
private final ConcurrentHashMap<Long, HostToken> tokenMap = new ConcurrentHashMap<Long, HostToken>();
@Override
public HostToken getToken(Long keyHash) {
Long token = binarySearch.get().getTokenOwner(keyHash);
return tokenMap.get(token);
}
public void initSearchMechanism(Collection<HostToken> hostTokens) {
for (HostToken hostToken : hostTokens) {
tokenMap.put(hostToken.getToken(), hostToken);
}
initBinarySearch();
}
public void addHostToken(HostToken hostToken) {
HostToken prevToken = tokenMap.putIfAbsent(hostToken.getToken(), hostToken);
if (prevToken == null) {
initBinarySearch();
}
}
private void initBinarySearch() {
List<Long> tokens = new ArrayList<Long>(tokenMap.keySet());
Collections.sort(tokens);
binarySearch.set(new DynoBinarySearch<Long>(tokens));
}
}
Token Aware 策略如下,就是一个 map,依据 token 做 key,来选择 Pool:
+--------------------------+
|HostSelectionWithFallback |
+-------------+------------+
|
+--------------+--------------+
| |
v v
+---------+------------+ +-----------+------------+
| RoundRobinSelection | | TokenAwareSelection |
| | | |
| | | |
| circularList | | ConcurrentHashMap |
| + | | + |
| | | | | |
+----------------------+ +------------------------+
| |
| |
v | +-------------------+
| | [token 1 : Pool 1]|
+--> Pool1, Pool2, Pool3,..., Pooln +----+ | | |
| | +---> | [token 2 : Pool 2]|
| | | |
+----------------------------------------+ | ...... |
| |
| [token 3 : Pool 3]|
+-------------------+
我们最后介绍下压缩的实现。
启用压缩可以减少驱动程序消耗的网络带宽,但代价是客户端和服务器的CPU使用量会增加。
驱动中,有两种压缩方式,就是简单的不压缩与限制压缩Threshold。
enum CompressionStrategy {
/**
* Disables compression
*/
NONE,
/**
* Compresses values that exceed {@link #getValueCompressionThreshold()}
*/
THRESHOLD
}
从代码看,使用
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
具体操作如下:
private abstract class CompressionValueOperation<T> extends BaseKeyOperation<T>
implements CompressionOperation<Jedis, T> {
@Override
public String compressValue(String value, ConnectionContext ctx) {
String result = value;
int thresholdBytes = connPool.getConfiguration().getValueCompressionThreshold();
try {
// prefer speed over accuracy here so rather than using
// getBytes() to get the actual size
// just estimate using 2 bytes per character
if ((2 * value.length()) > thresholdBytes) {
result = ZipUtils.compressStringToBase64String(value);
ctx.setMetadata("compression", true);
}
}
return result;
}
@Override
public String decompressValue(String value, ConnectionContext ctx) {
try {
if (ZipUtils.isCompressed(value)) {
ctx.setMetadata("decompression", true);
return ZipUtils.decompressFromBase64String(value);
}
}
return value;
}
}
以操作举例,当需要压缩时,就生成CompressionValueOperation。
public OperationResult<Map<String, String>> d_hgetAll(final String key) {
if (CompressionStrategy.NONE == connPool.getConfiguration().getCompressionStrategy()) {
return connPool.executeWithFailover(new BaseKeyOperation<Map<String, String>>(key, OpName.HGETALL) {
@Override
public Map<String, String> execute(Jedis client, ConnectionContext state) throws DynoException {
return client.hgetAll(key);
}
});
} else {
return connPool
.executeWithFailover(new CompressionValueOperation<Map<String, String>>(key, OpName.HGETALL) {
@Override
public Map<String, String> execute(final Jedis client, final ConnectionContext state) {
return CollectionUtils.transform(client.hgetAll(key),
new CollectionUtils.MapEntryTransform<String, String, String>() {
@Override
public String get(String key, String val) {
return decompressValue(val, state);
}
});
}
});
}
}
至此,DynoJedisClient 初步分析完毕,我们看到了 DynoJedisClient 是如何应对各种复杂信息,比如:
我们接下来引出 基于 DynoJedisClient 的 分布式延迟队列 Dyno-queues ,看看它是如何实现的。
http://www.ningoo.net/html/2010/cassandra_token.html
[源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(2)
原文:https://www.cnblogs.com/rossiXYZ/p/14364281.html