温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

使用Redisson怎么实现一个分布式锁

发布时间:2021-06-17 11:18:41 来源:亿速云 阅读:166 作者:Leah 栏目:编程语言

使用Redisson怎么实现一个分布式锁,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

Redisson锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等。加上Redisson中Redis具备分布式的特性,所以非常适合用来做Java中的分布式锁。 下面我们对其加锁、解锁过程中的源码细节进行一一分析。

锁的接口定义了一下方法:

使用Redisson怎么实现一个分布式锁

分布式锁当中加锁,我们常用的加锁接口:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

下面我们来看一下方法的具体实现:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {   long time = unit.toMillis(waitTime);   long current = System.currentTimeMillis();   final long threadId = Thread.currentThread().getId();   Long ttl = tryAcquire(leaseTime, unit, threadId);   // lock acquired   if (ttl == null) {    return true;   }      time -= (System.currentTimeMillis() - current);   if (time <= 0) {    acquireFailed(threadId);    return false;   }      current = System.currentTimeMillis();   final RFuture subscribeFuture = subscribe(threadId);   if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {    if (!subscribeFuture.cancel(false)) {     subscribeFuture.addListener(new FutureListener() {      @Override      public void operationComplete(Future future) throws Exception {       if (subscribeFuture.isSuccess()) {        unsubscribe(subscribeFuture, threadId);       }      }     });    }    acquireFailed(threadId);    return false;   }   try {    time -= (System.currentTimeMillis() - current);    if (time <= 0) {     acquireFailed(threadId);     return false;    }       while (true) {     long currentTime = System.currentTimeMillis();     ttl = tryAcquire(leaseTime, unit, threadId);     // lock acquired     if (ttl == null) {      return true;     }     time -= (System.currentTimeMillis() - currentTime);     if (time = 0 && ttl < time) {      getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);     } else {      getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);     }     time -= (System.currentTimeMillis() - currentTime);     if (time <= 0) {      acquireFailed(threadId);      return false;     }    }   } finally {    unsubscribe(subscribeFuture, threadId);   } //  return get(tryLockAsync(waitTime, leaseTime, unit));  }

首先我们看到调用tryAcquire尝试获取锁,在这里是否能获取到锁,是根据锁名称的过期时间TTL来判定的(TTL

下面我们接着看一下tryAcquire的实现:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {  return get(tryAcquireAsync(leaseTime, unit, threadId)); }

可以看到真正获取锁的操作经过一层get操作里面执行的,这里为何要这么操作,本人也不是太理解,如有理解错误,欢迎指正。

get 是由CommandAsyncExecutor(一个线程Executor)封装的一个Executor

设置一个单线程的同步控制器CountDownLatch,用于控制单个线程的中断信息。个人理解经过中间的这么一步:主要是为了支持线程可中断操作。

public V get(RFuture future) {  if (!future.isDone()) {   final CountDownLatch l = new CountDownLatch(1);   future.addListener(new FutureListener() {    @Override    public void operationComplete(Future future) throws Exception {     l.countDown();    }   });      boolean interrupted = false;   while (!future.isDone()) {    try {     l.await();    } catch (InterruptedException e) {     interrupted = true;    }   }      if (interrupted) {    Thread.currentThread().interrupt();   }  }  // commented out due to blocking issues up to 200 ms per minute for each thread:由于每个线程的阻塞问题,每分钟高达200毫秒  // future.awaitUninterruptibly();  if (future.isSuccess()) {   return future.getNow();  }  throw convertException(future); }

我们进一步往下看:

private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {  if (leaseTime != -1) {   return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);  }  RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);  ttlRemainingFuture.addListener(new FutureListener() {   @Override   public void operationComplete(Future future) throws Exception {    if (!future.isSuccess()) {     return;    }    Long ttlRemaining = future.getNow();    // lock acquired    if (ttlRemaining == null) {     scheduleExpirationRenewal(threadId);    }   }  });  return ttlRemainingFuture; }

首先判断锁是否有超时时间,有过期时间的话,会在后面获取锁的时候设置进去。没有过期时间的话,则会用默认的

private long lockWatchdogTimeout = 30 * 1000;

下面我们在进一步往下分析真正获取锁的操作:

RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {  internalLockLeaseTime = unit.toMillis(leaseTime);  return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,     "if (redis.call('exists', KEYS[1]) == 0) then " +      "redis.call('hset', KEYS[1], ARGV[2], 1); " +      "redis.call('pexpire', KEYS[1], ARGV[1]); " +      "return nil; " +     "end; " +     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +      "redis.call('pexpire', KEYS[1], ARGV[1]); " +      "return nil; " +     "end; " +     "return redis.call('pttl', KEYS[1]);",     Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }

我把里面的重点信息做了以下三点总结:

1:真正执行的是一段具有原子性的Lua脚本,并且最终也是由CommandAsynExecutor去执行。

2:锁真正持久化到Redis时,用的hash类型key field value

3:获取锁的三个参数:getName()是逻辑锁名称,例如:分布式锁要锁住的methodName+params;internalLockLeaseTime是毫秒单位的锁过期时间;getLockName则是锁对应的线程级别的名称,因为支持相同线程可重入,不同线程不可重入,所以这里的锁的生成方式是:UUID+":"threadId。有的同学可能会问,这样不是很缜密:不同的JVM可能会生成相同的threadId,所以Redission这里加了一个区分度很高的UUID;

Lua脚本中的执行分为以下三步:

1:exists检查redis中是否存在锁名称;如果不存在,则获取成功;同时把逻辑锁名称KEYS[1],线程级别的锁名称[ARGV[2],value=1,设置到redis。并设置逻辑锁名称的过期时间ARGV[2],返回;

2:如果检查到存在KEYS[1],[ARGV[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过期时间

3:key不存,直接返回key的剩余过期时间(-2)

关于使用Redisson怎么实现一个分布式锁问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI