创建redisClient BinaryJedisCluster
我们可以从jedis给出官方的redis-clusterdemo上可以看到通过构造一个BinaryJedisCluster,这个类就是jedis给我们提供的一个与redis-cluster集群交互的一个基本接口类,有了这个类我们就能执行各种查询写入操作了。
Set<HostAndPort> set = new HashSet<>();
HostAndPort hostAndPort = new HostAndPort("127.0.0.1", 7000);
set.add(hostAndPort);
BinaryJedisCluster client = new BinaryJedisCluster(set);
- 我们从BinaryJedisCluster的构造函数一路debug下去,可以看到这个类有一个JedisClusterConnectionHandler成员变量,在该类的构造函数当中,会去给这个成员变量赋值。进而会调用JedisSlotBasedConnectionHandler类的构造方法,又会调用到父类的构造方法。
- JedisClusterConnectionHandler类中有一个JedisClusterInfoCache成员变量,在该类的构造函数中,会给JedisClusterInfoCache初始化。JedisSlotBasedConnectionHandler这个类是该类的子类,实现了抽象父类的getConnection,getConnectionFromSlot(int)两个方法。
- JedisClusterInfoCache这个类缓存了redis集群的集群分片信息,我们下面来看下。
abstract class JedisClusterConnectionHandler{
protected final JedisClusterInfoCache cache;
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
try {
cache.discoverClusterNodesAndSlots(jedis);
break;
} catch (JedisConnectionException e) {
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
}
public class JedisClusterInfoCache {
private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
public void discoverClusterNodesAndSlots(Jedis jedis) {
w.lock();
try {
reset();
List<Object> slots = jedis.clusterSlots();
for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;
if (slotInfo.size() <= MASTER_NODE_INDEX) {
continue;
}
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
// hostInfos
int size = slotInfo.size();
for (int i = MASTER_NODE_INDEX; i < size; i++) {
List<Object> hostInfos = (List<Object>) slotInfo.get(i);
if (hostInfos.size() <= 0) {
continue;
}
HostAndPort targetNode = generateHostAndPort(hostInfos);
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
}
}
}
} finally {
w.unlock();
}
}
}
可以看到,JedisClusterInfoCache的discoverClusterNodesAndSlots方法通过任意一个jedis实例然后调用jedis.clusterSlots()拿到redis集群分配的slot信息,然后把slot信息以及hostinfo信息缓存到两个对应的JedisPool map里面,这样启动阶段基本完成了。
通过BinaryJedisCluster与redis-cluster交互
我们通过以下代码入手
BinaryJedisCluster client = new BinaryJedisCluster(set);
client.get("key".getBytes());
public class BinaryJedisCluster{
@Override
public byte[] get(final byte[] key) {
return new JedisClusterCommand<byte[]>(connectionHandler, maxAttempts) {
@Override
public byte[] execute(Jedis connection) {
return connection.get(key);
}
}.runBinary(key);
}
}
上述通过一个简单的get操作,最终会调用到BinaryJedisCluster.runBinary(key)方法。
public abstract class JedisClusterCommand<T> {
private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
if (attempts <= 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}
Jedis connection = null;
try {
if (asking) {
// TODO: Pipeline asking with the original command to make it
// faster....
connection = askConnection.get();
connection.asking();
// if asking success, reset asking flag
asking = false;
} else {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
}
return execute(connection);
} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
// release current connection before recursion
releaseConnection(connection);
connection = null;
if (attempts <= 1) {
//We need this because if node is not reachable anymore - we need to finally initiate slots renewing,
//or we can stuck with cluster state without one node in opposite case.
//But now if maxAttempts = 1 or 2 we will do it too often. For each time-outed request.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
this.connectionHandler.renewSlotCache();
//no more redirections left, throw original exception, not JedisClusterMaxRedirectionsException, because it's not MOVED situation
throw jce;
}
return runWithRetries(key, attempts - 1, tryRandomNode, asking);
} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache
// recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
// release current connection before recursion or renewing
releaseConnection(connection);
connection = null;
if (jre instanceof JedisAskDataException) {
asking = true;
askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
} else if (jre instanceof JedisMovedDataException) {
} else {
throw new JedisClusterException(jre);
}
return runWithRetries(key, attempts - 1, false, asking);
} finally {
releaseConnection(connection);
}
}
}
- 正常情况:上述代码connection=connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));这个是根据key来计算对应的slot,然后根据启动阶段JedisClusterInfoCache的private final Map<Integer, JedisPool> slots = new HashMap<Integer,JedisPool>()属性来找到该slot对应的JedisPool,然后从pool中获取一个jedis connection,最终执行connection.get(key);
- 异常情况1:JedisNoReachableClusterNodeException,显然都没有可以访问的节点了,直接向上层抛出,不做处理。
- 异常情况2:JedisConnectionException会进行重试,如果当前已经是最后一次重试,则this.connectionHandler.renewSlotCache()更新slotcache。
- 异常情况3:JedisRedirectionException和异常情况2差不多基本也是更新slot,不同是此时jedis是通的,所以可以直接传进renewSlotCache(jedis),避免renewSlotCache方法内部多余的轮询获取jedis的操作。然后重试。