温馨提示×

如何使用Zookeeper实现分布式锁

小樊
46
2025-08-03 12:55:06
栏目: 大数据

使用 ZooKeeper 实现分布式锁是一种常见的方法,ZooKeeper 提供了强一致性的协调服务,可以用来管理分布式系统中的锁机制。以下是使用 ZooKeeper 实现分布式锁的基本步骤:

1. 创建 ZooKeeper 客户端

首先,你需要创建一个 ZooKeeper 客户端连接到 ZooKeeper 集群。

import org.apache.zookeeper.ZooKeeper; public class DistributedLock { private static final String ZK_ADDRESS = "localhost:2181"; private static final int SESSION_TIMEOUT = 3000; private ZooKeeper zk; public DistributedLock() throws IOException { zk = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, event -> { // 处理连接事件 }); } } 

2. 创建锁节点

在 ZooKeeper 中创建一个临时顺序节点来表示锁。

import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; public void createLockNode() throws KeeperException, InterruptedException { String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Created lock node: " + lockPath); } 

3. 获取锁

获取锁的过程包括检查当前节点是否是最小的节点,如果不是,则监听前一个节点的删除事件。

import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; public boolean acquireLock() throws KeeperException, InterruptedException { String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Created lock node: " + lockPath); while (true) { List<String> children = zk.getChildren("/lock", false); Collections.sort(children); String currentNode = lockPath.substring(lockPath.lastIndexOf("/") + 1); if (currentNode.equals(children.get(0))) { // 当前节点是最小的节点,获取锁成功 return true; } else { // 监听前一个节点的删除事件 int previousNodeIndex = Collections.binarySearch(children, currentNode) - 1; String previousNodePath = "/lock/" + children.get(previousNodeIndex); Stat stat = zk.exists(previousNodePath, event -> { if (event.getType() == Watcher.Event.EventType.NodeDeleted) { synchronized (this) { notifyAll(); } } }); if (stat != null) { synchronized (this) { wait(); } } } } } 

4. 释放锁

释放锁的过程包括删除当前节点。

public void releaseLock() throws KeeperException, InterruptedException { String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); zk.delete(lockPath, -1); System.out.println("Released lock node: " + lockPath); } 

5. 关闭 ZooKeeper 客户端

在程序结束时,关闭 ZooKeeper 客户端。

public void close() throws InterruptedException { zk.close(); } 

完整示例

以下是一个完整的示例代码:

import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; public class DistributedLock { private static final String ZK_ADDRESS = "localhost:2181"; private static final int SESSION_TIMEOUT = 3000; private ZooKeeper zk; public DistributedLock() throws IOException { zk = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, event -> { // 处理连接事件 }); } public void createLockNode() throws KeeperException, InterruptedException { String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Created lock node: " + lockPath); } public boolean acquireLock() throws KeeperException, InterruptedException { String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Created lock node: " + lockPath); while (true) { List<String> children = zk.getChildren("/lock", false); Collections.sort(children); String currentNode = lockPath.substring(lockPath.lastIndexOf("/") + 1); if (currentNode.equals(children.get(0))) { // 当前节点是最小的节点,获取锁成功 return true; } else { // 监听前一个节点的删除事件 int previousNodeIndex = Collections.binarySearch(children, currentNode) - 1; String previousNodePath = "/lock/" + children.get(previousNodeIndex); Stat stat = zk.exists(previousNodePath, event -> { if (event.getType() == Watcher.Event.EventType.NodeDeleted) { synchronized (this) { notifyAll(); } } }); if (stat != null) { synchronized (this) { wait(); } } } } } public void releaseLock() throws KeeperException, InterruptedException { String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); zk.delete(lockPath, -1); System.out.println("Released lock node: " + lockPath); } public void close() throws InterruptedException { zk.close(); } public static void main(String[] args) { try { DistributedLock lock = new DistributedLock(); lock.createLockNode(); if (lock.acquireLock()) { System.out.println("Lock acquired!"); // 执行业务逻辑 Thread.sleep(5000); lock.releaseLock(); System.out.println("Lock released!"); } lock.close(); } catch (IOException | KeeperException | InterruptedException e) { e.printStackTrace(); } } } 

这个示例展示了如何使用 ZooKeeper 实现一个简单的分布式锁。在实际应用中,你可能需要处理更多的异常情况和边界条件。

0