Zookeeper是一个分布式的、开源的分布式应用程序的协调服务,基于ZAB协议(ZooKeeper Atomic Broadcast)实现分布式数据一致性

Zookeeper内部的数据模型类似文件系统的树形结构(ZNode),每个节点可存储不超过1MB的数据

Zookeeper提供的主要功能包括:

  • 服务注册与发现
  • 配置管理
  • 分布式锁
  • 集群管理

Zookeeper数据模型

ZooKeeper是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构

这里面的每一个节点都被称为ZNode,每个节点上都会保存自己的数据和节点信息

节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下(可以通过jute.maxbuffer修改单个节点数据大小限制)

节点可以分为四大类:

  • PERSISTENT:持久化节点,创建后永久存在(除非显式删除)
  • EPHEMERAL:临时节点 -e,会话结束后自动删除(用于实现服务注册与心跳检测)
  • PERSISTENT_SEQUENTIAL:持久化顺序节点 -s,顺序节点的名称后面会自动追加单调递增序号(如 /lock/seq-0000000001
  • EPHEMERAL_SEQUENTIAL :临时顺序节点 -es
  • TTL节点:3.6.0+ 版本支持,设置存活时间后自动删除

Zookeeper常用命令

服务端

  • 启动ZooKeeper服务: ./zkServer.sh start
  • 查看ZooKeeper服务状态: ./zkServer.sh status
  • 停止ZooKeeper服务: ./zkServer.sh stop
  • 重启ZooKeeper服务: ./zkServer.sh restart

客户端

  • 连接ZooKeeper服务端:./zkCli.sh –server ip:port
  • 断开连接:quit
  • 设置节点值:set /节点path value
  • 删除单个节点:delete /节点path
  • 显示指定目录下节点:ls 目录
  • 删除带有子节点的节点:deleteall /节点path
  • 创建节点:create /节点path value
  • 获取节点值:get /节点path
  • 创建临时节点:create -e /节点path value
  • 创建顺序节点:create -s /节点path value
  • 查询节点详细信息:ls –s /节点path
    • czxid:节点被创建的事务ID
    • ctime: 创建时间
    • mzxid: 最后一次被更新的事务ID
    • mtime: 修改时间
    • pzxid:子节点列表最后一次被更新的事务ID
    • cversion:子节点的版本号
    • dataversion:数据版本号
    • aclversion:权限版本号
    • ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
    • dataLength:节点存储的数据的长度
    • numChildren:当前节点的子节点个数

ZooKeeper JavaAPI 操作

Curator是Netflix开源的高级客户端,可以简化ZooKeeper客户端的使用

Curator API常用操作有:

  • 建立连接
  • 添加节点
  • 删除节点
  • 修改节点
  • 查询节点
  • Watch事件
  • 监听
  • 分布式锁实现

添加Maven依赖并启动Zookeeper服务:

<!-- Curator 核心库 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.4.0</version>
</dependency>

<!-- Curator 扩展工具 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.4.0</version>
</dependency>

创建客户端连接:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorDemo {
private static final String ZK_ADDRESS = "localhost:2181";

public static CuratorFramework createClient() {
// 重试策略:初始等待1秒,最多重试3次
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);

return CuratorFrameworkFactory.builder()
.connectString(ZK_ADDRESS) // ip地址+端口
.sessionTimeoutMs(5000) // 会话超时时间(毫秒)
.connectionTimeoutMs(3000) // 连接建立超时时间(毫秒)
.namespace("/demo") // 设置命名空间(所有操作路径自动添加前缀)
.retryPolicy(retry)
.build();
}

public static void main(String[] args) throws Exception {
CuratorFramework client = createClient();
client.start(); // 启动连接
System.out.println("ZooKeeper 连接状态: " + client.getState());

// ...

client.close(); // 关闭连接
}
}

添加节点:

// 创建持久节点(默认开放权限)
String path = client.create()
.forPath("/persistent-node", "data".getBytes());

// 创建临时顺序节点(会话结束后自动删除,名称追加序号)
String seqPath = client.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath("/temp-seq-", "data".getBytes());
方法链 作用 示例
.withMode() 指定节点类型(共6种) CreateMode.PERSISTENT_SEQUENTIAL
.withACL() 设置自定义权限列表 .withACL(ZooDefs.Ids.CREATOR_ALL_ACL)
.creatingParentsIfNeeded() 自动创建父节点(递归) 适用于深度路径如 /a/b/c
.withTtl() 设置TTL(需ZK 3.5+,且需配置ExtendedFeatureType.TTL .withTtl(10000L) // 10秒后自动删除
.withProtection() 保护模式(防重放攻击) 配合临时顺序节点使用

删除节点:

// 删除叶子节点
client.delete()
.forPath("/path");

// 带版本号的删除(CAS)
client.delete()
.withVersion(stat.getVersion())
.forPath("/path");
方法链 作用 示例
.deletingChildrenIfNeeded() 递归删除子节点 用于删除非空目录
.guaranteed() 确保删除成功(持续重试直到ZK确认) 配合.withVersion()使用
.withVersion() 指定版本号(实现CAS删除) .withVersion(5)

修改节点:

// 基础更新
client.setData()
.forPath("/path", "new-data".getBytes());

// 带版本号的原子更新(CAS操作)
Stat newStat = client.setData()
.withVersion(stat.getVersion()) // 使用之前获取的版本号
.forPath("/path", "cas-data".getBytes());

// 无条件强制更新
client.setData()
.withVersion(-1) // 忽略版本冲突
.forPath("/path", "force-data".getBytes());

// 异步回调更新
client.setData()
.inBackground((curator, event) -> {
System.out.println("异步更新结果: " + event.getResultCode());
})
.forPath("/async-path", "data".getBytes());

获取节点数据:

// 基本读取(返回byte[])
byte[] data = client.getData()
.forPath("/path");

// 带状态读取(获取Stat元信息)
Stat stat = new Stat();
byte[] dataWithStat = client.getData()
.storingStatIn(stat) // 存储节点状态
.forPath("/path");
System.out.println("版本号: " + stat.getVersion());

子节点查询:

// 获取子节点列表
List<String> children = client.getChildren()
.forPath("/parent");

// 带监听器的查询(子节点变化时触发)
List<String> watchedChildren = client.getChildren()
.usingWatcher((CuratorWatcher) event -> {
System.out.println("子节点变化: " + event.getType());
}).forPath("/parent");

存在性检查:

// 检查节点是否存在(返回Stat或null)
Stat existsStat = client.checkExists()
.forPath("/path");

// 带监听的存在检查
client.checkExists()
.usingWatcher((CuratorWatcher) event -> {
if (event.getType() == Watcher.Event.EventType.NodeCreated) {
System.out.println("节点被创建");
}
}).forPath("/path-to-watch");

事务操作:

// 创建事务
CuratorTransactionFinal txFinal = client.inTransaction()
.create().forPath("/txn-node1", "data1".getBytes()).and()
.setData().forPath("/txn-node2", "data2".getBytes()).and()
.delete().forPath("/txn-node3").and()
.commit(); // 原子化执行

// 检查结果
for (CuratorTransactionResult result : txFinal.getResults()) {
System.out.println("操作类型: " + result.getType() + ", 路径: " + result.getForPath());
}

批量操作:

// 创建批量操作
List<CuratorOp> ops = Arrays.asList(
client.transactionOp().create().forPath("/batch/new", "data".getBytes()),
client.transactionOp().setData().forPath("/batch/existing", "update".getBytes()),
client.transactionOp().delete().forPath("/batch/old")
);

// 执行批量操作
List<CuratorTransactionResult> results = client.transaction().forOperations(ops);

Watch事件监听

ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是ZooKeeper实现分布式协调服务的重要特性。

ZooKeeper中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。

ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐。Curator引入了Cache来实现对 ZooKeeper服务端事件的监听。

ZooKeeper提供了三种Watcher:

  • NodeCache:只是监听某一个特定的节点
  • PathChildrenCache:监控一个ZNode的子节点
  • TreeCache:可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

一次侦听:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;

public class OneTimeWatcherDemo {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
client.start();

// 添加一次性Watcher(触发后自动移除)
byte[] data = client.getData()
.usingWatcher((CuratorWatcher) event -> {
System.out.println("【一次性监听】事件类型: " + event.getType() + ", 路径: " + event.getPath());
})
.forPath("/test-watch");

// 测试触发(修改节点数据)
client.setData().forPath("/test-watch", "new-data".getBytes());
// 再次修改不会触发,因为Watcher已被移除
client.setData().forPath("/test-watch", "another-data".getBytes());

Thread.sleep(3000);
client.close();
}
}

PathChildrenCache持续侦听:

import org.apache.curator.framework.recipes.cache.*;

public class PersistentWatchDemo {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
client.start();
client.create().creatingParentsIfNeeded().forPath("/test-parent");

// 创建PathChildrenCache监听器
PathChildrenCache cache = new PathChildrenCache(client, "/test-parent", true);
cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

// 添加监听回调
cache.getListenable().addListener((client1, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("【子节点新增】路径: " + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("【子节点更新】路径: " + event.getData().getPath() +
", 数据: " + new String(event.getData().getData()));
break;
case CHILD_REMOVED:
System.out.println("【子节点删除】路径: " + event.getData().getPath());
break;
}
});

// 测试操作
client.create().forPath("/test-parent/child1", "data1".getBytes());
client.setData().forPath("/test-parent/child1", "updated".getBytes());
client.delete().forPath("/test-parent/child1");

Thread.sleep(1000);
cache.close();
client.close();
}
}

NodeCache节点全量监听:

import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;

public class NodeCacheDemo {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
client.start();
client.create().forPath("/test-node", "init".getBytes());

// 创建NodeCache监听指定节点
NodeCache nodeCache = new NodeCache(client, "/test-node");
nodeCache.start(true);

// 添加监听器
nodeCache.getListenable().addListener(() -> {
if (nodeCache.getCurrentData() != null) {
System.out.println("【节点数据变更】当前数据: " +
new String(nodeCache.getCurrentData().getData()));
} else {
System.out.println("【节点被删除】");
}
});

// 测试操作
client.setData().forPath("/test-node", "changed".getBytes());
client.delete().forPath("/test-node");

Thread.sleep(1000);
nodeCache.close();
client.close();
}
}

TreeCache树形结构监听:

import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;

public class TreeCacheDemo {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
client.start();
client.create().creatingParentsIfNeeded().forPath("/test-tree/sub1");

// 创建TreeCache监听子树
TreeCache treeCache = TreeCache.newBuilder(client, "/test-tree").build();
treeCache.start();

// 添加监听器
treeCache.getListenable().addListener((client1, event) -> {
if (event.getData() != null) {
System.out.printf("【树形事件】类型: %s, 路径: %s, 数据: %s\n",
event.getType(),
event.getData().getPath(),
event.getData().getData() != null ?
new String(event.getData().getData()) : "null");
}
});

// 测试操作
client.create().forPath("/test-tree/sub2", "sub2-data".getBytes());
client.setData().forPath("/test-tree/sub1", "update".getBytes());
client.delete().forPath("/test-tree/sub1");

Thread.sleep(1000);
treeCache.close();
client.close();
}
}
事件类型 触发条件 适用监听器
NodeCreated 节点被创建 Watcher/NodeCache
NodeDeleted 节点被删除 所有监听器
NodeDataChanged 节点数据变更 所有监听器
NodeChildrenChanged 子节点数量变化(不包含数据变更) PathChildrenCache
INITIALIZED TreeCache初始化完成 TreeCache
CONNECTION_LOST 连接断开 所有监听器

Zookeeper分布式锁

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。

  1. 客户端获取锁时,在lock节点下创建临时顺序节点。
  2. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
  3. 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。(每个节点只监听比自己小的、最大的那个节点)
  4. 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

锁类型及其应用

互斥锁(InterProcessMutex):基于临时顺序节点实现,通过竞争最小序号获得锁

import org.apache.curator.framework.recipes.locks.InterProcessMutex;

public class MutexLockDemo {
public static void main(String[] args) {
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000));
client.start();

InterProcessMutex lock = new InterProcessMutex(client, "/locks/mutex");

try {
// 获取锁(阻塞等待,支持超时设置)
if (lock.acquire(10, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " 获得锁");
// 模拟业务处理
Thread.sleep(5000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (lock.isAcquiredInThisProcess()) {
lock.release();
System.out.println(Thread.currentThread().getName() + " 释放锁");
}
} catch (Exception e) {
e.printStackTrace();
}
client.close();
}
}
}

读写锁(InterProcessReadWriteLock):

  • 读锁:共享锁,多个客户端可同时持有
  • 写锁:独占锁,与其他所有锁互斥
InterProcessReadWriteLock rwLock = new InterProcessReadWriteLock(client, "/locks/rwlock");

// 获取读锁
InterProcessMutex readLock = rwLock.readLock();
readLock.acquire();

// 获取写锁
InterProcessMutex writeLock = rwLock.writeLock();
writeLock.acquire();

联锁(InterProcessMultiLock):同时锁定多个路径,原子性获取/释放多个锁。

List<InterProcessLock> locks = Arrays.asList(
new InterProcessMutex(client, "/locks/lock1"),
new InterProcessMutex(client, "/locks/lock2")
);

InterProcessMultiLock multiLock = new InterProcessMultiLock(locks);
multiLock.acquire(); // 所有锁同时获取
multiLock.release(); // 所有锁同时释放

信号量(InterProcessSemaphoreV2):控制同时访问资源的客户端数量。

InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/locks/semaphore", 3);
Lease lease = semaphore.acquire(); // 获取一个许可
semaphore.returnLease(lease); // 释放许可

Zookeeper集群

节点角色:

角色 职责 数量要求
Leader 处理所有写请求,发起提案投票 1(唯一活跃)
Follower 参与投票,处理读请求(可能返回旧数据) ≥1
Observer 仅同步数据,不参与投票(扩展读性能) 可选(可水平扩展)

Leader的选举(Fast Leader Election算法)

Serverid:服务器ID,比如有三台服务器,编号分别是1,2,3。编号越大在选择算法中的权重越大。

Zxid:数据ID,服务器中存放的最大数据ID。值越大说明数据越新,在选举算法中数据越新权重越大。

在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票,则此ZooKeeper就可以成为Leader了。

并且在选取leader后新增节点,只要leader节点不挂,就不会重新选举

核心机制

ZAB协议(ZooKeeper Atomic Broadcast)

  • 两种模式
    1. 崩溃恢复:选举新Leader并同步数据
    2. 消息广播:Leader将写请求转化为Proposal广播给所有Follower
  • 数据一致性
    • 顺序一致性:所有请求按全局顺序执行
    • 原子性:更新要么全部成功,要么全部失败

会话管理

  • 会话周期:客户端连接时创建,超时或断开时结束
  • 临时节点:会话结束自动删除(用于实现服务注册)

数据同步流程

  1. 客户端向Leader发起写请求
  2. Leader生成Proposal广播给所有Follower
  3. Follower持久化Proposal后返回ACK
  4. Leader收到半数以上ACK后提交Commit
  5. Leader通知所有节点应用变更

常见问题

  1. 如果Leader挂了,其他节点没挂会怎么样?

那么会在其他Follower中选取新Leader,如果节点数量过少,就直接显示不可用

  1. 如果Follower挂了会怎么样?

不怎么样,其他节点继续工作,但是如果剩余节点过少,就显示不可用