温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Redis怎么实现分布式锁和等待序列

发布时间:2021-06-04 10:59:30 来源:亿速云 阅读:193 作者:小新 栏目:数据库

这篇文章主要介绍了Redis怎么实现分布式锁和等待序列,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized 、 cas 、 ReentrankLock 这些锁的作用范围都是 JVM ,说白了在集群下没啥用。这时我们就需要能在多台 JVM 之间决定执行顺序的锁了,现在分布式锁主要有 redis 、 Zookeeper 实现的,还有数据库的方式,不过性能太差,也就是需要一个第三方的监管。

背景

最近在做一个消费 Kafka 消息的时候发现,由于线上的消费者过多,经常会遇到,多个机器同时处理一个主键类型的数据的情况发生,如果最后是执行更新操作的话,也就是一个更新顺序的问题,但是如果恰好都需要插入数据的时候,会出现主键重复的问题。这是生产上不被允许的(因为公司有异常监管的机制,扣分啥的),这是就需要个分布式锁了,斟酌后用了 Redis 的实现方式(因为网上例子多)

分析

redis 实现的分布式锁,实现原理是 set 方法,因为多个线程同时请求的时候,只有一个线程可以成功并返回结果,还可以设置有效期,来避免死锁的发生,一切都是这么的完美,不过有个问题,在 set 的时候,会直接返回结果,成功或者失败,不具有阻塞效果,需要我们自己对失败的线程进程处理,有两种方式

  • 丢弃

  • 等待重试 由于我们的系统需要这些数据,那么只能重新尝试获取。这里使用 redis 的 List 类型实现等待序列的作用

代码

直接上代码 其实直接redis的工具类就可以解决了

package com.test import redis.clients.jedis.Jedis; import java.util.Collections; import java.util.List; /**  * @desc redis队列实现方式  * @anthor   * @date   **/ public class RedisUcUitl {   private static final String LOCK_SUCCESS = "OK";   private static final String SET_IF_NOT_EXIST = "NX";   private static final String SET_WITH_EXPIRE_TIME = "PX";   private static final Long RELEASE_SUCCESS = 1L;   private RedisUcUitl() {   }   /**    * logger    **/   /**    * 存储redis队列顺序存储 在队列首部存入    *    * @param key  字节类型    * @param value 字节类型    */   public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {     return jedis.lpush(key, value);      }   /**    * 移除列表中最后一个元素 并将改元素添加入另一个列表中 ,当列表为空时 将阻塞连接 直到等待超时    *    * @param srckey    * @param dstkey    * @param timeout 0 表示永不超时    * @return    */   public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {     return jedis.brpoplpush(srckey, dstkey, timeout);   }   /**    * 返回制定的key,起始位置的redis数据    * @param redisKey    * @param start    * @param end -1 表示到最后    * @return    */   public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {          return jedis.lrange(redisKey, start, end);   }   /**    * 删除key    * @param redisKey    */   public static void delete(Jedis jedis, final byte[] redisKey) {           return jedis.del(redisKey);   }   /**    * 尝试加锁    * @param lockKey key名称    * @param requestId 身份标识    * @param expireTime 过期时间    * @return    */   public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {     String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);     return LOCK_SUCCESS.equals(result);   }   /**    * 释放锁    * @param lockKey key名称    * @param requestId 身份标识    * @return    */   public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {     final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";     jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));     return RELEASE_SUCCESS.equals(result);   } }

业务逻辑主要代码如下

1.先消耗队列中的

while(true){   // 消费队列   try{     // 被放入redis队列的数据 序列化后的     byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);     if(bytes == null || bytes.isEmpty()){       // 队列中没数据时退出       break;     }     // 反序列化对象     Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes);     // 塞入唯一的值 防止被其他线程误解锁     String requestId = UUID.randomUUID().toString();     boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);     if(lockGetFlag){       // 成功获取锁 进行业务处理       //TODO       // 处理完毕释放锁        boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);     }else{       // 未能获得锁放入等待队列      RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));        }        }catch(Exception e){     break;   }    }

2.处理最新接到的数据

同样是走尝试获取锁,获取不到放入队列的流程

一般序列化用 fastJson 之列的就可以了,这里用的是 JDK 自带的,工具类如下

public class ObjectSerialUtil {   private ObjectSerialUtil() { //    工具类   }   /**    * 将Object对象序列化为byte[]    *    * @param obj 对象    * @return byte数组    * @throws Exception    */   public static byte[] objectToBytes(Object obj) throws IOException {     ByteArrayOutputStream bos = new ByteArrayOutputStream();     ObjectOutputStream oos = new ObjectOutputStream(bos);     oos.writeObject(obj);     byte[] bytes = bos.toByteArray();     bos.close();     oos.close();     return bytes;   }   /**    * 将bytes数组还原为对象    *    * @param bytes    * @return    * @throws Exception    */   public static Object bytesToObject(byte[] bytes) {     try {       ByteArrayInputStream bin = new ByteArrayInputStream(bytes);       ObjectInputStream ois = new ObjectInputStream(bin);       return ois.readObject();     } catch (Exception e) {       throw new BaseException("反序列化出错!", e);     }   } }

感谢你能够认真阅读完这篇文章,希望小编分享的“Redis怎么实现分布式锁和等待序列”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI