Curator Framework 深入了解
本文受到 colobu 前辈文章的指引,深入了解 Curator Framework 的工作流程,十分感谢 colobu 前辈的博文给予的启发和指导。
选举功能实现 (Leader Election)
Curator 提供了 Leader 选举的功能,用于在分布式计算中选举出一个节点作为一组节点的 Leader。Curator 提供了两种 Leader Election 的 Recipe:
LeaderLatch
构造方法:
// LeaderLatch.class
public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath, String id/*zk的 path:value 中的 value*/)
同之前几章的使用风格,需要 start()
方法调用了才会开启选举。 start()
方法之后会调用真正的工作开始方法:
// LeaderLatch.class
private synchronized void internalStart() {
if ( state.get() == State.STARTED ) { // 状态标记为开始 start()会完成
// 很重要的一条实践,客户端需要注册一个 lisenter 用来监听和 zk 连接的状态,比如中断、重连等
client.getConnectionStateListenable().addListener(listener);
//...
// 开始选举相关的工作
reset();
//...
}
}
reset()
是一个会重复执行的方法,用来争抢当前的 leader:
// LeaderLatch.class
void reset() throws Exception {
setLeadership(false); // 当前不是leader,先置为 false;如果是leader不会进行这个操作
setNode(null); // 成为leader后会创建他的节点,存储起来方便下次删除旧节点
// Curator 方法非常通用的一种设计,专门用来做回调
BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
// 不知道这个 debugResetWaitLatch 这个什么用... 一开始就被赋值 null,没有修改过。看起来开发开发另一个新特性的 hook。
// volatile CountDownLatch debugResetWaitLatch = null;
if ( debugResetWaitLatch != null ) {
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}
// 节点创建成功
if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) {
setNode(event.getName()); // 将当前 path 的名称记录下来,方便后续删除
if ( state.get() == State.CLOSED ) {
setNode(null); // 这应该是一个安全检测,如果这时候leaderLatch被 close() 了,这里的 node 也就不存了。下面创建的也是临时节点。
} else {
getChildren(); // 获取latchPath(构造方法中传入的)下所有的节点,用来关键的判断谁拿到了 leader 权限
}
} else {
log.error("getChildren() failed. rc = " + event.getResultCode());
}
}
};
//这里可以看到创建的是一个临时节点,value的值就是 id
client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
checkLeadership()
是关键的终结方法了,他用来判断是谁拿到了 leader 权限:
// LeaderLatch.class
private void getChildren() throws Exception {
BackgroundCallback callback = new BackgroundCallback() {
public void processResult(... )throws Exception {
if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) {
// 终结方法,找到对应的 leader
checkLeadership(event.getChildren());
}
}
};
// 获取 latchPath 所有的节点
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
}
篇幅有限,checkLeadership()
只介绍获得 leader 身份的情况了:
// LeaderLatch.class
private void checkLeadership(List<String> children) throws Exception {
final String localOurPath = ourPath.get(); // 当前 LeaderLacth 获取的节点
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); // 排序 latchPath 下所有的节点
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; // 很明白的代码,查询当前 LeaderLacth 类的节点是否出现在排序数组中
if ( ourIndex < 0 ) {// 没有出现,就 reset() 方法重新来
log.error("Can't find our node. Resetting. Index: " + ourIndex);
reset();
}
else if ( ourIndex == 0 ) {// 这里就是关键了, == 0,排在第一位,获得 leader 权限
setLeadership(true);
} else { /*...*/}
}
至此一个 Leader 选举的过程就完成了,Curator 利用了 ZooKeeper 的各种特性可谓是玩出了花儿…
这里还介绍一个阻塞的方法等待当前对象获取到 Leader 身份:
// LeaderLatch.class
public void await() throws InterruptedException, EOFException {
synchronized(this) { // 锁住当前对象
while ((state.get() == State.STARTED) && !hasLeadership.get()){
wait(); // 等待成为 Leader,这里 setLeadership(true) 的方法里会 notifyAll()来唤醒的
}
}
if ( state.get() != State.STARTED ) {
throw new EOFException();
}
}
// 超时版本
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
LeaderSelector
Curator还提供了另外一种选举方法,注意涉及以下四个类:
- LeaderSelector
- LeaderSelectorListener
- LeaderSelectorListenerAdapter
- CancelLeadershipException
// LeaderSelector.class
// 构造函数
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener)
需要分析 LeaderSelector
依旧需要从 start()
方法开始,但在开始之前还有一个重要的方法 autoRequeue()
。如果需要该实例不停的去尝试获取 leader 身份,就需要调用此方法一次,在构造好该对象之后先调用 autoRequeue()
再 start()
。
start()
的逻辑是 :
start() -> requeue() -> internalRequeue() ----
↑ ↓ autoRequeue == true
--------------
在 internalRequeue()
中配置了一个 Future
任务执行 doWorkLoop()
方法,每次调用 internalRequeue()
是同步的,并且 Future
任务执行也是同步的,也就是必须一次一次同步的去尝试获取 leader 身份。
// LeaderSelector.class
void doWork() throws Exception {
hasLeadership = false;
try {
// 这里就是关键了,这是一个分布式锁
// InterProcessMutex mutex
// 一旦这个拿到了就是持有锁了
// 下面只需要 takeLeadership 方法阻塞住方法,不让这边执行到 finally 代码块就好了
mutex.acquire();
hasLeadership = true;
try {/*...*/}
catch(/**/){/**/}
finally {
clearIsQueued();
}
}
catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
throw e;
}
finally {
if ( hasLeadership ) {
hasLeadership = false;
try {
mutex.release(); // 释放了锁,其他的可以去竞争 leader 了
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("The leader threw an exception", e);
// ignore errors - this is just a safety
}
}
}
}
异常处理LeaderSelectorListener
类继承了 ConnectionStateListener
,LeaderSelector
必须小心连接状态的改变。如果实例成为 leader, 当 SUSPENDED
状态出现时, 实例必须假定在重新连接成功之前它可能不再是 leader了。 如果 LOST
状态出现, 实例不再是 leader, takeLeadership()
方法返回。
重要:推荐处理方式是当收到 SUSPENDED
或 LOST
时抛出 CancelLeadershipException
异常。 这会导致 LeaderSelector
实例中断并取消执行 takeLeadership()
方法的异常。Curator 提供了 LeaderSelectorListenerAdapter
以供继承,此 Adapter
提供了推荐的处理逻辑。
public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState ){
if ( client.getConnectionStateErrorPolicy().isErrorState(newState) ){
throw new CancelLeadershipException();
}
}
}
这里跑出异常以中断 takeLeadership()
方法只能抛出CancelLeadershipException
异常:
// LeaderSelector.WrappedListener.class
public void stateChanged(CuratorFramework client, ConnectionState newState) {
try{
listener.stateChanged(client, newState);
} catch ( CancelLeadershipException dummy ) {
// 中断逻辑
leaderSelector.interruptLeadership();
}
}
与 LeaderLatch
相比, 通过 LeaderSelectorListener
可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 [email protected]