Zookeeper是一个分布式的、开源的分布式应用程序的协调服务,基于ZAB协议(ZooKeeper Atomic Broadcast)实现分布式数据一致性
Zookeeper内部的数据模型类似文件系统的树形结构(ZNode),每个节点可存储不超过1MB的数据
Zookeeper提供的主要功能包括:
Zookeeper数据模型
ZooKeeper是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构
这里面的每一个节点都被称为ZNode,每个节点上都会保存自己的数据和节点信息
节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下(可以通过jute.maxbuffer修改单个节点数据大小限制)
节点可以分为四大类:
- PERSISTENT:持久化节点,创建后永久存在(除非显式删除)
- EPHEMERAL:临时节点 -e,会话结束后自动删除(用于实现服务注册与心跳检测)
- PERSISTENT_SEQUENTIAL:持久化顺序节点 -s,顺序节点的名称后面会自动追加单调递增序号(如
/lock/seq-0000000001
)
- EPHEMERAL_SEQUENTIAL :临时顺序节点 -es
- TTL节点:3.6.0+ 版本支持,设置存活时间后自动删除

Zookeeper常用命令
服务端
- 启动ZooKeeper服务: ./zkServer.sh start
- 查看ZooKeeper服务状态: ./zkServer.sh status
- 停止ZooKeeper服务: ./zkServer.sh stop
- 重启ZooKeeper服务: ./zkServer.sh restart
客户端
- 连接ZooKeeper服务端:./zkCli.sh –server ip:port
- 断开连接:quit
- 设置节点值:set /节点path value
- 删除单个节点:delete /节点path
- 显示指定目录下节点:ls 目录
- 删除带有子节点的节点:deleteall /节点path
- 创建节点:create /节点path value
- 获取节点值:get /节点path
- 创建临时节点:create -e /节点path value
- 创建顺序节点:create -s /节点path value
- 查询节点详细信息:ls –s /节点path
- czxid:节点被创建的事务ID
- ctime: 创建时间
- mzxid: 最后一次被更新的事务ID
- mtime: 修改时间
- pzxid:子节点列表最后一次被更新的事务ID
- cversion:子节点的版本号
- dataversion:数据版本号
- aclversion:权限版本号
- ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
- dataLength:节点存储的数据的长度
- numChildren:当前节点的子节点个数
ZooKeeper JavaAPI 操作
Curator是Netflix开源的高级客户端,可以简化ZooKeeper客户端的使用
Curator API常用操作有:
- 建立连接
- 添加节点
- 删除节点
- 修改节点
- 查询节点
- Watch事件
- 监听
- 分布式锁实现
添加Maven依赖并启动Zookeeper服务:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.4.0</version> </dependency>
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.4.0</version> </dependency>
|
创建客户端连接:
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorDemo { private static final String ZK_ADDRESS = "localhost:2181"; public static CuratorFramework createClient() { ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3); return CuratorFrameworkFactory.builder() .connectString(ZK_ADDRESS) .sessionTimeoutMs(5000) .connectionTimeoutMs(3000) .namespace("/demo") .retryPolicy(retry) .build(); }
public static void main(String[] args) throws Exception { CuratorFramework client = createClient(); client.start(); System.out.println("ZooKeeper 连接状态: " + client.getState()); client.close(); } }
|
添加节点:
String path = client.create() .forPath("/persistent-node", "data".getBytes());
String seqPath = client.create() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath("/temp-seq-", "data".getBytes());
|
方法链 |
作用 |
示例 |
.withMode() |
指定节点类型(共6种) |
CreateMode.PERSISTENT_SEQUENTIAL |
.withACL() |
设置自定义权限列表 |
.withACL(ZooDefs.Ids.CREATOR_ALL_ACL) |
.creatingParentsIfNeeded() |
自动创建父节点(递归) |
适用于深度路径如 /a/b/c |
.withTtl() |
设置TTL(需ZK 3.5+,且需配置ExtendedFeatureType.TTL ) |
.withTtl(10000L) // 10秒后自动删除 |
.withProtection() |
保护模式(防重放攻击) |
配合临时顺序节点使用 |
删除节点:
client.delete() .forPath("/path");
client.delete() .withVersion(stat.getVersion()) .forPath("/path");
|
方法链 |
作用 |
示例 |
.deletingChildrenIfNeeded() |
递归删除子节点 |
用于删除非空目录 |
.guaranteed() |
确保删除成功(持续重试直到ZK确认) |
配合.withVersion() 使用 |
.withVersion() |
指定版本号(实现CAS删除) |
.withVersion(5) |
修改节点:
client.setData() .forPath("/path", "new-data".getBytes());
Stat newStat = client.setData() .withVersion(stat.getVersion()) .forPath("/path", "cas-data".getBytes());
client.setData() .withVersion(-1) .forPath("/path", "force-data".getBytes());
client.setData() .inBackground((curator, event) -> { System.out.println("异步更新结果: " + event.getResultCode()); }) .forPath("/async-path", "data".getBytes());
|
获取节点数据:
byte[] data = client.getData() .forPath("/path");
Stat stat = new Stat(); byte[] dataWithStat = client.getData() .storingStatIn(stat) .forPath("/path"); System.out.println("版本号: " + stat.getVersion());
|
子节点查询:
List<String> children = client.getChildren() .forPath("/parent");
List<String> watchedChildren = client.getChildren() .usingWatcher((CuratorWatcher) event -> { System.out.println("子节点变化: " + event.getType()); }).forPath("/parent");
|
存在性检查:
Stat existsStat = client.checkExists() .forPath("/path");
client.checkExists() .usingWatcher((CuratorWatcher) event -> { if (event.getType() == Watcher.Event.EventType.NodeCreated) { System.out.println("节点被创建"); } }).forPath("/path-to-watch");
|
事务操作:
CuratorTransactionFinal txFinal = client.inTransaction() .create().forPath("/txn-node1", "data1".getBytes()).and() .setData().forPath("/txn-node2", "data2".getBytes()).and() .delete().forPath("/txn-node3").and() .commit();
for (CuratorTransactionResult result : txFinal.getResults()) { System.out.println("操作类型: " + result.getType() + ", 路径: " + result.getForPath()); }
|
批量操作:
List<CuratorOp> ops = Arrays.asList( client.transactionOp().create().forPath("/batch/new", "data".getBytes()), client.transactionOp().setData().forPath("/batch/existing", "update".getBytes()), client.transactionOp().delete().forPath("/batch/old") );
List<CuratorTransactionResult> results = client.transaction().forOperations(ops);
|
Watch事件监听
ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是ZooKeeper实现分布式协调服务的重要特性。
ZooKeeper中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐。Curator引入了Cache来实现对 ZooKeeper服务端事件的监听。
ZooKeeper提供了三种Watcher:
- NodeCache:只是监听某一个特定的节点
- PathChildrenCache:监控一个ZNode的子节点
- TreeCache:可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
一次侦听:
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.WatchedEvent;
public class OneTimeWatcherDemo { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000)); client.start();
byte[] data = client.getData() .usingWatcher((CuratorWatcher) event -> { System.out.println("【一次性监听】事件类型: " + event.getType() + ", 路径: " + event.getPath()); }) .forPath("/test-watch");
client.setData().forPath("/test-watch", "new-data".getBytes()); client.setData().forPath("/test-watch", "another-data".getBytes());
Thread.sleep(3000); client.close(); } }
|
PathChildrenCache持续侦听:
import org.apache.curator.framework.recipes.cache.*;
public class PersistentWatchDemo { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000)); client.start(); client.create().creatingParentsIfNeeded().forPath("/test-parent");
PathChildrenCache cache = new PathChildrenCache(client, "/test-parent", true); cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
cache.getListenable().addListener((client1, event) -> { switch (event.getType()) { case CHILD_ADDED: System.out.println("【子节点新增】路径: " + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("【子节点更新】路径: " + event.getData().getPath() + ", 数据: " + new String(event.getData().getData())); break; case CHILD_REMOVED: System.out.println("【子节点删除】路径: " + event.getData().getPath()); break; } });
client.create().forPath("/test-parent/child1", "data1".getBytes()); client.setData().forPath("/test-parent/child1", "updated".getBytes()); client.delete().forPath("/test-parent/child1");
Thread.sleep(1000); cache.close(); client.close(); } }
|
NodeCache节点全量监听:
import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener;
public class NodeCacheDemo { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000)); client.start(); client.create().forPath("/test-node", "init".getBytes());
NodeCache nodeCache = new NodeCache(client, "/test-node"); nodeCache.start(true);
nodeCache.getListenable().addListener(() -> { if (nodeCache.getCurrentData() != null) { System.out.println("【节点数据变更】当前数据: " + new String(nodeCache.getCurrentData().getData())); } else { System.out.println("【节点被删除】"); } });
client.setData().forPath("/test-node", "changed".getBytes()); client.delete().forPath("/test-node");
Thread.sleep(1000); nodeCache.close(); client.close(); } }
|
TreeCache树形结构监听:
import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
public class TreeCacheDemo { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000)); client.start(); client.create().creatingParentsIfNeeded().forPath("/test-tree/sub1");
TreeCache treeCache = TreeCache.newBuilder(client, "/test-tree").build(); treeCache.start();
treeCache.getListenable().addListener((client1, event) -> { if (event.getData() != null) { System.out.printf("【树形事件】类型: %s, 路径: %s, 数据: %s\n", event.getType(), event.getData().getPath(), event.getData().getData() != null ? new String(event.getData().getData()) : "null"); } });
client.create().forPath("/test-tree/sub2", "sub2-data".getBytes()); client.setData().forPath("/test-tree/sub1", "update".getBytes()); client.delete().forPath("/test-tree/sub1");
Thread.sleep(1000); treeCache.close(); client.close(); } }
|
事件类型 |
触发条件 |
适用监听器 |
NodeCreated |
节点被创建 |
Watcher/NodeCache |
NodeDeleted |
节点被删除 |
所有监听器 |
NodeDataChanged |
节点数据变更 |
所有监听器 |
NodeChildrenChanged |
子节点数量变化(不包含数据变更) |
PathChildrenCache |
INITIALIZED |
TreeCache初始化完成 |
TreeCache |
CONNECTION_LOST |
连接断开 |
所有监听器 |
Zookeeper分布式锁
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
- 客户端获取锁时,在lock节点下创建临时顺序节点。
- 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
- 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。(每个节点只监听比自己小的、最大的那个节点)
- 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。
锁类型及其应用
互斥锁(InterProcessMutex):基于临时顺序节点实现,通过竞争最小序号获得锁
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
public class MutexLockDemo { public static void main(String[] args) { CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 1000)); client.start(); InterProcessMutex lock = new InterProcessMutex(client, "/locks/mutex"); try { if (lock.acquire(10, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " 获得锁"); Thread.sleep(5000); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (lock.isAcquiredInThisProcess()) { lock.release(); System.out.println(Thread.currentThread().getName() + " 释放锁"); } } catch (Exception e) { e.printStackTrace(); } client.close(); } } }
|
读写锁(InterProcessReadWriteLock):
InterProcessReadWriteLock rwLock = new InterProcessReadWriteLock(client, "/locks/rwlock");
InterProcessMutex readLock = rwLock.readLock(); readLock.acquire();
InterProcessMutex writeLock = rwLock.writeLock(); writeLock.acquire();
|
联锁(InterProcessMultiLock):同时锁定多个路径,原子性获取/释放多个锁。
List<InterProcessLock> locks = Arrays.asList( new InterProcessMutex(client, "/locks/lock1"), new InterProcessMutex(client, "/locks/lock2") );
InterProcessMultiLock multiLock = new InterProcessMultiLock(locks); multiLock.acquire(); multiLock.release();
|
信号量(InterProcessSemaphoreV2):控制同时访问资源的客户端数量。
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/locks/semaphore", 3); Lease lease = semaphore.acquire(); semaphore.returnLease(lease);
|
Zookeeper集群
节点角色:
角色 |
职责 |
数量要求 |
Leader |
处理所有写请求,发起提案投票 |
1(唯一活跃) |
Follower |
参与投票,处理读请求(可能返回旧数据) |
≥1 |
Observer |
仅同步数据,不参与投票(扩展读性能) |
可选(可水平扩展) |
Leader的选举(Fast Leader Election算法)
Serverid:服务器ID,比如有三台服务器,编号分别是1,2,3。编号越大在选择算法中的权重越大。
Zxid:数据ID,服务器中存放的最大数据ID。值越大说明数据越新,在选举算法中数据越新权重越大。
在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票,则此ZooKeeper就可以成为Leader了。
并且在选取leader后新增节点,只要leader节点不挂,就不会重新选举
核心机制
ZAB协议(ZooKeeper Atomic Broadcast)
- 两种模式
- 崩溃恢复:选举新Leader并同步数据
- 消息广播:Leader将写请求转化为Proposal广播给所有Follower
- 数据一致性
- 顺序一致性:所有请求按全局顺序执行
- 原子性:更新要么全部成功,要么全部失败
会话管理
- 会话周期:客户端连接时创建,超时或断开时结束
- 临时节点:会话结束自动删除(用于实现服务注册)
数据同步流程
- 客户端向Leader发起写请求
- Leader生成Proposal广播给所有Follower
- Follower持久化Proposal后返回ACK
- Leader收到半数以上ACK后提交Commit
- Leader通知所有节点应用变更
常见问题
- 如果Leader挂了,其他节点没挂会怎么样?
那么会在其他Follower中选取新Leader,如果节点数量过少,就直接显示不可用
- 如果Follower挂了会怎么样?
不怎么样,其他节点继续工作,但是如果剩余节点过少,就显示不可用