背景
最近一周,一直在做squirrel-ha-service高可用的改进。简单介绍下squirrel-ha-service。squirrel-ha-service(后文都简写为ha)是线上持续监控redis集群,保证redis集群高可用的一个服务。会通过redis cluster nodes命令获取redis集群所有节点的状态,如果某个节点宕机了,ha一旦发现宕机节点,首先会通过修改zk通知redis客户端不再访问该节点,然后自动替换宕机节点。替换完节点后,再次通过修改zk通知redis客户端刷新本地路由,将新添加的节点加入本地路由表。
上述左图是我们目前线上ha大致架构,基本能保证redis集群的整体可用性。但是最近我们在做机房容灾相关,这个架构就有问题了。
上述是右图我们线上一个redis集群的部署情况,基本都是3机房部署。图中是dx,yf,gh三个机房,每个机房都有一个主节点一个从节点。redis集群这样部署,能保证最高的可用性。即使线上某个机房发生故障,剩下的两个机房也能继续提供服务。
这里我们假设现在gh机房发生网络故障,先忽视图中红色框。gh-master节点原来有一个dx-slave节点,一旦光环机房不可用,dx-slave节点会发起提升自己为master的请求,dx和yf机房的两个主节点投票通过,这样一个新的集群dx(2个master节点),yf(一个master,一个slave)可以继续对外提供服务。
从redis集群角度看,如果机房是三机房主从均匀部署,单个机房发生故障另外两个机房依然能继续提供服务。但是此时我们的squirrel-ha服务因为也处于gh机房(我们线上一个redis集群唯一对应一个ha监控服务),由于gh机房和另外两个机房dx,yf网络不通,此时ha服务无法刷新zk,通知dx和yf的redis客户端更新路由了。之所以发生这个问题,是因为ha自身没有保证高可用,所以我们考虑引进zk选举来保证ha服务的可用性。如上图红框,当gh机房发生网络故障,之前dx-ha-watcher会替代gh-ha成为新的leader,开始监控redis集群。
Curator选主
上面我们说了,在发生机房网络分区时候,ha自身不能保证高可用。所以接下来我们会将ha接入zk,通过zk的选举功能保证ha自身的高可用。我们会使用Netflix开源的curator来实现选主,这个框架解决了原生zookeeper client断线重连相关问题,并且提供了2套选主方案。
- LeaderLatch:随机从候选着中选出一台作为leader,选中之后除非调用close()释放leadship,否则其他的后选择无法成为leader。这种策略适合主备应用,当主节点意外宕机之后,多个从节点会自动选举其中一个为新的主节点
- Leader Election:这种选举策略跟Leader Latch选举策略不同之处在于每个实例都能公平获取领导权,而且当获取领导权的实例在释放领导权之后,该实例还有机会再次获取领导权。另外,选举出来的leader不会一直占有领导权,当 takeLeadership(CuratorFramework client) 方法执行结束之后会自动释放领导权。
具体选择哪种策略,还是要用户根据自己的需求选择。
LeaderLatch
public class ZKLeaderUtils {
private static String KEY_LEADER_PATH = "/test";
public static void main(String[] args) throws Exception {
List<LeaderLatch> list = new ArrayList<>();
final List<CuratorFramework> clients = new ArrayList<>();
for(int i=0; i<3; i++){
CuratorFramework curatorClient = CuratorFrameworkFactory.newClient("localhost:2181", 60 * 1000, 30 * 1000,
new RetryNTimes(3, 1000));
curatorClient.start();
final LeaderLatch leaderLatch = new LeaderLatch(curatorClient,KEY_LEADER_PATH, String.valueOf(i));
leaderLatch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
System.out.println(leaderLatch.getId() + ":I am leader. I am doing jobs!");
}
@Override
public void notLeader() {
System.out.println(leaderLatch.getId() + ":I am not leader. I will do nothing!");
}
});
leaderLatch.start();
list.add(leaderLatch);
clients.add(curatorClient);
}
Thread.sleep(2000);
int i=0;
for(LeaderLatch leaderLatch : list){
leaderLatch.close();
clients.get(i).close();
i++;
}
Thread.sleep(100000);
}
}
上述代码,创建了3个LeaderLatch实例,然后sleep 2s,让3个实例进行选主。最后依次调用close方法,释放leader。
控制台会随机输出:
2:I am leader. I am doing jobs!
重复执行几次,可以看到不同的client随机获得leader。
Leader Election
private static class ExampleClient extends LeaderSelectorListenerAdapter{
private volatile boolean isLeaderRelease = false;
private LeaderSelector leaderSelector;
ExampleClient(CuratorFramework client, String path){
leaderSelector = new LeaderSelector(client, path, this);
//释放leader后,还可以重新获取leader
leaderSelector.autoRequeue();
}
public void start(){
//leader选举会在后台线程进行,该方法会立即返回
leaderSelector.start();
}
/**
* 能进入到takeLeadership方法,表示leaderSelector已经成为leader了。只要该方法不退出或者
* 不发生其他异常,就一直持有leader
* @param client
* @throws Exception
*/
public void takeLeadership(CuratorFramework client) throws Exception {
while (!isLeaderRelease){
Thread.sleep(1000);
}
}
public boolean isLeader() {
return leaderSelector.hasLeadership();
}
/**
* 设置isLeaderRelease 为false,让takeLeadership方法退出,这样leader就释放了。
* 注意在成功释放leader后,我们又重新设置isLeaderRelease 为true,这样再次获得
* leader后,如果不主动释放或者出现其他异常情况,又可以一直持有leader。
*/
public void releaseLeader() {
this.isLeaderRelease = true;
while (leaderSelector.hasLeadership()){
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {}
}
this.isLeaderRelease = false;
}
}
上述代码ExampleClient主要关注以下四点
- 构造方法中leaderSelector.autoRequeue();这个确保了leaderSelector在释放leader后,还可以重新获取leader。
- takeLeaderShip方法,一旦进入这个方法,就表示leaderSelector已经成为leader,从这个方法退出,就释放leader。可以看到这就同LeaderLatch不一样,LeaderLatch只能通过主动close释放leader。
- releaseLeader方法我们通过设置isLeaderRelease为true,让takeLeaderShip能退出循环,达到释放leader目的。
- 通过继承了LeaderSelectorListenerAdapter类,一旦出现SUSPENDED或者LOST连接问题,能主动释放leader,这个下面会详细说下。
LeaderSelectorListenerAdapter
public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( (newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST) )
{
throw new CancelLeadershipException();
}
}
}
一旦LeaderSelector启动,它会向curator客户端添加监听器。 使用LeaderSelector必须时刻注意连接的变化。一旦出现连接问题如SUSPENDED,或者LOST,curator实例必须确保其不再是leader并且其takeLeadership()应该直接退出。
推荐的做法是,如果发生SUSPENDED或者LOST连接问题,最好直接抛CancelLeadershipException,此时,leaderSelector实例会尝试中断并且取消正在执行takeLeadership()方法的线程。 建议扩展LeaderSelectorListenerAdapter, LeaderSelectorListenerAdapter中已经提供了推荐的处理方式 。
LeaderSelector
private static class WrappedListener implements LeaderSelectorListener
{
private final LeaderSelector leaderSelector;
private final LeaderSelectorListener listener;
public WrappedListener(LeaderSelector leaderSelector, LeaderSelectorListener listener)
{
this.leaderSelector = leaderSelector;
this.listener = listener;
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
listener.takeLeadership(client);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
try
{
listener.stateChanged(client, newState);
}
catch ( CancelLeadershipException dummy )
{
leaderSelector.interruptLeadership();
}
}
}
/**
* Attempt to cancel and interrupt the current leadership if this instance has leadership
*/
public synchronized void interruptLeadership()
{
Future<?> task = ourTask.get();
if ( task != null )
{
task.cancel(true);
}
}
可以看到一旦catch到listener.stateChanged抛出的CancelLeadershipException异常,会调用leaderSelector.interruptLeadership()尝试中断,所以我们上面的ExampleClient的takeLeaderShip方法必须要是可以响应中断的
/**
* 能进入到takeLeadership方法,表示leaderSelector已经成为leader了。只要该方法不退出或者
* 不发生其他异常,就一直持有leader
* @param client
* @throws Exception
*/
public void takeLeadership(CuratorFramework client) throws Exception {
while (!isLeaderRelease){
Thread.sleep(1000);
}
}
上述方法Thread.sleep确实可以响应中断,所以一旦出现SUSPENDED或者LOST连接问题,就会从takeLeaderShip方法退出并释放leader。
下面我们来测试下上述的ExampleClient
public static void main(String[] args) throws InterruptedException {
List<ExampleClient> clients = new ArrayList<>();
for(int i=0; i<3; i++){
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
final ExampleClient exampleClient = new ExampleClient(client, "/test");
exampleClient.start();
clients.add(exampleClient);
final int index = i;
new Thread(new Runnable() {
@Override
public void run() {
//外层while(true)循环防止exampleClient还没有获取leader,就直接退出线程了
while (true){
//内层循环如果成为leader就打印日志
while (exampleClient.isLeader()){
System.out.println("haLeaderService " + index + " is leader");
try {
Thread.sleep(2000);
} catch (InterruptedException ignore) {}
}
try {
Thread.sleep(1000);
} catch (InterruptedException ignore) {}
}
}
}).start();
Thread.sleep(2000);
}
while (true){
Thread.sleep(7000);
for(ExampleClient exampleClient : clients){
if(exampleClient.isLeader()){
exampleClient.releaseLeader();
break;
}
}
}
}
上述我们创建了3个ExampleClient,如果成为leader就会打印日志。后续每隔7s,又会主动释放leader,这样其他follower就会成为leader。
控制台输出如下:
haLeaderService 0 is leader
haLeaderService 0 is leader
haLeaderService 0 is leader
haLeaderService 0 is leader
haLeaderService 1 is leader
haLeaderService 1 is leader
haLeaderService 1 is leader
haLeaderService 1 is leader
haLeaderService 2 is leader
haLeaderService 2 is leader
haLeaderService 2 is leader
haLeaderService 2 is leader
haLeaderService 0 is leader
haLeaderService 0 is leader
haLeaderService 0 is leader
可以看到每隔7s,leader确实会切换一次。
上图是我们通过zkCli命令连接到zk server获取的信息。我们创建3个ExampleClient,会在/test节点下面分别创建3个临时节点,观察后面的数字0019,0020,0021。
其实Leader Election内部通过一个分布式锁来实现选主;并且选主结果是公平的,zk会按照各节点请求的次序成为主节点,当前最小序号的节点成为主节点,其他节点会添加一个对于当前最小节点的监听watcher。一旦发现最小节点不存在,第二小的节点就会成为leader。