实现Leader选举通过 Curator

  1. Curator Framework 深入了解
    1. 选举功能实现 (Leader Election)
      1. LeaderLatch
      2. LeaderSelector

Curator Framework 深入了解

本文受到 colobu 前辈文章的指引,深入了解 Curator Framework 的工作流程,十分感谢 colobu 前辈的博文给予的启发和指导。

选举功能实现 (Leader Election)

Curator 提供了 Leader 选举的功能,用于在分布式计算中选举出一个节点作为一组节点的 Leader。Curator 提供了两种 Leader Election 的 Recipe:

LeaderLatch

构造方法:

// LeaderLatch.class

public LeaderLatch(CuratorFramework client, String latchPath)

public LeaderLatch(CuratorFramework client, String latchPath, String id/*zk的 path:value 中的 value*/)

同之前几章的使用风格,需要 start() 方法调用了才会开启选举。 start() 方法之后会调用真正的工作开始方法:

// LeaderLatch.class
private synchronized void internalStart() {
        if ( state.get() == State.STARTED ) { // 状态标记为开始 start()会完成
            // 很重要的一条实践,客户端需要注册一个 lisenter 用来监听和 zk 连接的状态,比如中断、重连等
            client.getConnectionStateListenable().addListener(listener);
            //...
            // 开始选举相关的工作
            reset();
            //...
        }
}

reset() 是一个会重复执行的方法,用来争抢当前的 leader:

// LeaderLatch.class
void reset() throws Exception {
        setLeadership(false); // 当前不是leader,先置为 false;如果是leader不会进行这个操作
        setNode(null); // 成为leader后会创建他的节点,存储起来方便下次删除旧节点
        // Curator 方法非常通用的一种设计,专门用来做回调
        BackgroundCallback callback = new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                // 不知道这个 debugResetWaitLatch 这个什么用... 一开始就被赋值 null,没有修改过。看起来开发开发另一个新特性的 hook。
                // volatile CountDownLatch debugResetWaitLatch = null;
                if ( debugResetWaitLatch != null ) {
                    debugResetWaitLatch.await();
                    debugResetWaitLatch = null;
                }
                // 节点创建成功
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) {
                    setNode(event.getName()); // 将当前 path 的名称记录下来,方便后续删除
                    if ( state.get() == State.CLOSED ) {
                        setNode(null); // 这应该是一个安全检测,如果这时候leaderLatch被 close() 了,这里的 node 也就不存了。下面创建的也是临时节点。
                    } else {
                        getChildren(); // 获取latchPath(构造方法中传入的)下所有的节点,用来关键的判断谁拿到了 leader 权限
                    }
                } else {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
        //这里可以看到创建的是一个临时节点,value的值就是 id
    client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }

checkLeadership() 是关键的终结方法了,他用来判断是谁拿到了 leader 权限:

// LeaderLatch.class
private void getChildren() throws Exception {
        BackgroundCallback callback = new BackgroundCallback() {
            public void processResult(... )throws Exception {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) {
                    // 终结方法,找到对应的 leader
                    checkLeadership(event.getChildren());
                }
            }
        };
    // 获取 latchPath 所有的节点
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
    }

篇幅有限,checkLeadership() 只介绍获得 leader 身份的情况了:

// LeaderLatch.class
private void checkLeadership(List<String> children) throws Exception {
        final String localOurPath = ourPath.get(); // 当前 LeaderLacth 获取的节点
        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); // 排序 latchPath 下所有的节点
        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; // 很明白的代码,查询当前 LeaderLacth 类的节点是否出现在排序数组中
        if ( ourIndex < 0 ) {// 没有出现,就 reset() 方法重新来
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            reset();
        }
        else if ( ourIndex == 0 ) {// 这里就是关键了, == 0,排在第一位,获得 leader 权限
            setLeadership(true);
        } else { /*...*/}
}

至此一个 Leader 选举的过程就完成了,Curator 利用了 ZooKeeper 的各种特性可谓是玩出了花儿…

这里还介绍一个阻塞的方法等待当前对象获取到 Leader 身份:

// LeaderLatch.class
public void await() throws InterruptedException, EOFException {
        synchronized(this) { // 锁住当前对象
            while ((state.get() == State.STARTED) && !hasLeadership.get()){
                wait(); // 等待成为 Leader,这里 setLeadership(true) 的方法里会 notifyAll()来唤醒的
            }
        }
        if ( state.get() != State.STARTED ) {
            throw new EOFException();
        }
}
// 超时版本
public boolean await(long timeout, TimeUnit unit) throws InterruptedException

LeaderSelector

Curator还提供了另外一种选举方法,注意涉及以下四个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException
// LeaderSelector.class
// 构造函数
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)

public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener)

需要分析 LeaderSelector 依旧需要从 start() 方法开始,但在开始之前还有一个重要的方法 autoRequeue() 。如果需要该实例不停的去尝试获取 leader 身份,就需要调用此方法一次,在构造好该对象之后先调用 autoRequeue()start()

start() 的逻辑是 :

start() -> requeue() -> internalRequeue() ----
                               ↑             ↓ autoRequeue == true
                               --------------

internalRequeue() 中配置了一个 Future 任务执行 doWorkLoop() 方法,每次调用 internalRequeue() 是同步的,并且 Future 任务执行也是同步的,也就是必须一次一次同步的去尝试获取 leader 身份。

// LeaderSelector.class
void doWork() throws Exception {
        hasLeadership = false;
        try {
            // 这里就是关键了,这是一个分布式锁
            // InterProcessMutex mutex
            // 一旦这个拿到了就是持有锁了
            // 下面只需要 takeLeadership 方法阻塞住方法,不让这边执行到 finally 代码块就好了
            mutex.acquire();

            hasLeadership = true;
            try {/*...*/}
            catch(/**/){/**/}
            finally {
                clearIsQueued();
            }
        }
        catch ( InterruptedException e ) {
            Thread.currentThread().interrupt();
            throw e;
        }
        finally {
            if ( hasLeadership ) {
                hasLeadership = false;
                try {
                    mutex.release(); // 释放了锁,其他的可以去竞争 leader 了
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    log.error("The leader threw an exception", e);
                    // ignore errors - this is just a safety
                }
            }
        }
    }

异常处理
LeaderSelectorListener 类继承了 ConnectionStateListenerLeaderSelector 必须小心连接状态的改变。如果实例成为 leader, 当 SUSPENDED 状态出现时, 实例必须假定在重新连接成功之前它可能不再是 leader了。 如果 LOST 状态出现, 实例不再是 leader, takeLeadership() 方法返回。

重要:推荐处理方式是当收到 SUSPENDEDLOST 时抛出 CancelLeadershipException 异常。 这会导致 LeaderSelector 实例中断并取消执行 takeLeadership()方法的异常。Curator 提供了 LeaderSelectorListenerAdapter 以供继承,此 Adapter 提供了推荐的处理逻辑。

public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState ){
        if ( client.getConnectionStateErrorPolicy().isErrorState(newState) ){
            throw new CancelLeadershipException();
        }
    }
}

这里跑出异常以中断 takeLeadership()方法只能抛出CancelLeadershipException 异常:

// LeaderSelector.WrappedListener.class
public void stateChanged(CuratorFramework client, ConnectionState newState) {
       try{
                listener.stateChanged(client, newState);
       } catch ( CancelLeadershipException dummy ) {
                // 中断逻辑 
                leaderSelector.interruptLeadership();
       }
}

LeaderLatch 相比, 通过 LeaderSelectorListener 可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 [email protected]

Title:实现Leader选举通过 Curator

Count:1.6k

Author:nickChen

Created At:2018-04-17, 14:42:41

Updated At:2023-05-08, 23:27:10

Url:http://nickchenyx.github.io/2018/04/17/CuratorFramework-leader/

Copyright: 'Attribution-non-commercial-shared in the same way 4.0' Reprint please keep the original link and author.