温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎么在Java中利用zookeeper实现一个分布式锁

发布时间:2021-04-20 17:07:03 来源:亿速云 阅读:214 作者:Leah 栏目:编程语言

怎么在Java中利用zookeeper实现一个分布式锁?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

Java是什么

Java是一门面向对象编程语言,可以编写桌面应用程序、Web应用程序、分布式系统和嵌入式系统应用程序。

1、创建zookeeper的client

首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFramework client

public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean {  private static final Logger LOGGER = LoggerFactory.getLogger(ContractFileInfoController.class);  private String connectionString;  private int sessionTimeoutMs;  private int connectionTimeoutMs;  private RetryPolicy retryPolicy;  private CuratorFramework client;  public CuratorFactoryBean(String connectionString) {   this(connectionString, 500, 500);  }  public CuratorFactoryBean(String connectionString, int sessionTimeoutMs, int connectionTimeoutMs) {   this.connectionString = connectionString;   this.sessionTimeoutMs = sessionTimeoutMs;   this.connectionTimeoutMs = connectionTimeoutMs;  }  @Override  public void destroy() throws Exception {   LOGGER.info("Closing curator framework...");   this.client.close();   LOGGER.info("Closed curator framework.");  }  @Override  public CuratorFramework getObject() throws Exception {   return this.client;  }  @Override  public Class<?> getObjectType() {    return this.client != null ? this.client.getClass() : CuratorFramework.class;  }  @Override  public boolean isSingleton() {   return true;  }  @Override  public void afterPropertiesSet() throws Exception {   if (StringUtils.isEmpty(this.connectionString)) {    throw new IllegalStateException("connectionString can not be empty.");   } else {    if (this.retryPolicy == null) {     this.retryPolicy = new ExponentialBackoffRetry(1000, 2147483647, 180000);    }    this.client = CuratorFrameworkFactory.newClient(this.connectionString, this.sessionTimeoutMs, this.connectionTimeoutMs, this.retryPolicy);    this.client.start();    this.client.blockUntilConnected(30, TimeUnit.MILLISECONDS);   }  }  public void setConnectionString(String connectionString) {   this.connectionString = connectionString;  }  public void setSessionTimeoutMs(int sessionTimeoutMs) {   this.sessionTimeoutMs = sessionTimeoutMs;  }  public void setConnectionTimeoutMs(int connectionTimeoutMs) {   this.connectionTimeoutMs = connectionTimeoutMs;  }  public void setRetryPolicy(RetryPolicy retryPolicy) {   this.retryPolicy = retryPolicy;  }  public void setClient(CuratorFramework client) {   this.client = client;  } }

2、封装分布式锁

根据CuratorFramework创建InterProcessMutex(分布式可重入排它锁)对一行数据进行上锁

 public InterProcessMutex(CuratorFramework client, String path) {   this(client, path, new StandardLockInternalsDriver());  }

使用 acquire方法
1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(long time, TimeUnit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。

 public void acquire() throws Exception {   if (!this.internalLock(-1L, (TimeUnit)null)) {    throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);   }  }  public boolean acquire(long time, TimeUnit unit) throws Exception {   return this.internalLock(time, unit);  }

释放锁 mutex.release();

public void release() throws Exception {   Thread currentThread = Thread.currentThread();   InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);   if (lockData == null) {    throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);   } else {    int newLockCount = lockData.lockCount.decrementAndGet();    if (newLockCount <= 0) {     if (newLockCount < 0) {      throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);     } else {      try {       this.internals.releaseLock(lockData.lockPath);      } finally {       this.threadData.remove(currentThread);      }     }    }   }  }

封装后的DLock代码
1、调用InterProcessMutex processMutex = dLock.mutex(path);

2、手动释放锁processMutex.release();

3、需要手动删除路径dLock.del(path);

推荐 使用:
都是 函数式编程
在业务代码执行完毕后 会释放锁和删除path
1、这个有返回结果
public T mutex(String path, ZkLockCallback zkLockCallback, long time, TimeUnit timeUnit)
2、这个无返回结果
public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit)

public class DLock {  private final Logger logger;  private static final long TIMEOUT_D = 100L;  private static final String ROOT_PATH_D = "/dLock";  private String lockRootPath;  private CuratorFramework client;  public DLock(CuratorFramework client) {   this("/dLock", client);  }  public DLock(String lockRootPath, CuratorFramework client) {   this.logger = LoggerFactory.getLogger(DLock.class);   this.lockRootPath = lockRootPath;   this.client = client;  }  public InterProcessMutex mutex(String path) {   if (!StringUtils.startsWith(path, "/")) {    path = Constant.keyBuilder(new Object[]{"/", path});   }   return new InterProcessMutex(this.client, Constant.keyBuilder(new Object[]{this.lockRootPath, "", path}));  }  public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback) throws ZkLockException {   return this.mutex(path, zkLockCallback, 100L, TimeUnit.MILLISECONDS);  }  public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException {   String finalPath = this.getLockPath(path);   InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath);   try {    if (!mutex.acquire(time, timeUnit)) {     throw new ZkLockException("acquire zk lock return false");    }   } catch (Exception var13) {    throw new ZkLockException("acquire zk lock failed.", var13);   }   T var8;   try {    var8 = zkLockCallback.doInLock();   } finally {    this.releaseLock(finalPath, mutex);   }   return var8;  }  private void releaseLock(String finalPath, InterProcessMutex mutex) {   try {    mutex.release();    this.logger.info("delete zk node path:{}", finalPath);    this.deleteInternal(finalPath);   } catch (Exception var4) {    this.logger.error("dlock", "release lock failed, path:{}", finalPath, var4); //   LogUtil.error(this.logger, "dlock", "release lock failed, path:{}", new Object[]{finalPath, var4});   }  }  public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException {   String finalPath = this.getLockPath(path);   InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath);   try {    if (!mutex.acquire(time, timeUnit)) {     throw new ZkLockException("acquire zk lock return false");    }   } catch (Exception var13) {    throw new ZkLockException("acquire zk lock failed.", var13);   }   try {    zkLockCallback.response();   } finally {    this.releaseLock(finalPath, mutex);   }  }  public String getLockPath(String customPath) {   if (!StringUtils.startsWith(customPath, "/")) {    customPath = Constant.keyBuilder(new Object[]{"/", customPath});   }   String finalPath = Constant.keyBuilder(new Object[]{this.lockRootPath, "", customPath});   return finalPath;  }  private void deleteInternal(String finalPath) {   try {    ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(finalPath);   } catch (Exception var3) {    this.logger.info("delete zk node path:{} failed", finalPath);   }  }  public void del(String customPath) {   String lockPath = "";   try {    lockPath = this.getLockPath(customPath);    ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(lockPath);   } catch (Exception var4) {    this.logger.info("delete zk node path:{} failed", lockPath);   }  } }
@FunctionalInterface public interface ZkLockCallback<T> {  T doInLock(); } @FunctionalInterface public interface ZkVoidCallBack {  void response(); } public class ZkLockException extends Exception {  public ZkLockException() {  }  public ZkLockException(String message) {   super(message);  }  public ZkLockException(String message, Throwable cause) {   super(message, cause);  } }

配置CuratorConfig

@Configuration public class CuratorConfig {  @Value("${zk.connectionString}")  private String connectionString;  @Value("${zk.sessionTimeoutMs:500}")  private int sessionTimeoutMs;  @Value("${zk.connectionTimeoutMs:500}")  private int connectionTimeoutMs;  @Value("${zk.dLockRoot:/dLock}")  private String dLockRoot;  @Bean  public CuratorFactoryBean curatorFactoryBean() {   return new CuratorFactoryBean(connectionString, sessionTimeoutMs, connectionTimeoutMs);  }  @Bean  @Autowired  public DLock dLock(CuratorFramework client) {   return new DLock(dLockRoot, client);  } }

测试代码

@RestController @RequestMapping("/dLock") public class LockController {  @Autowired  private DLock dLock;  @RequestMapping("/lock")  public Map testDLock(String no){   final String path = Constant.keyBuilder("/test/no/", no);   Long mutex=0l;   try {    System.out.println("在拿锁:"+path+System.currentTimeMillis());     mutex = dLock.mutex(path, () -> {     try {      System.out.println("拿到锁了" + System.currentTimeMillis());      Thread.sleep(10000);      System.out.println("操作完成了" + System.currentTimeMillis());     } finally {      return System.currentTimeMillis();     }    }, 1000, TimeUnit.MILLISECONDS);   } catch (ZkLockException e) {    System.out.println("拿不到锁呀"+System.currentTimeMillis());   }   return Collections.singletonMap("ret",mutex);  }  @RequestMapping("/dlock")  public Map testDLock1(String no){   final String path = Constant.keyBuilder("/test/no/", no);   Long mutex=0l;   try {    System.out.println("在拿锁:"+path+System.currentTimeMillis());    InterProcessMutex processMutex = dLock.mutex(path);    processMutex.acquire();    System.out.println("拿到锁了" + System.currentTimeMillis());    Thread.sleep(10000);    processMutex.release();    System.out.println("操作完成了" + System.currentTimeMillis());   } catch (ZkLockException e) {    System.out.println("拿不到锁呀"+System.currentTimeMillis());    e.printStackTrace();   }catch (Exception e){    e.printStackTrace();   }   return Collections.singletonMap("ret",mutex);  }  @RequestMapping("/del")  public Map delDLock(String no){   final String path = Constant.keyBuilder("/test/no/", no);   dLock.del(path);   return Collections.singletonMap("ret",1);  } }

关于怎么在Java中利用zookeeper实现一个分布式锁问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI