# Redis限流的实现方法有哪些 ## 目录 1. [限流技术概述](#限流技术概述) 2. [Redis实现限流的优势](#Redis实现限流的优势) 3. [基于计数器的限流](#基于计数器的限流) 4. [滑动窗口限流](#滑动窗口限流) 5. [令牌桶算法](#令牌桶算法) 6. [漏桶算法](#漏桶算法) 7. [分布式限流方案](#分布式限流方案) 8. [Redis模块扩展](#Redis模块扩展) 9. [实际应用场景](#实际应用场景) 10. [性能优化建议](#性能优化建议) 11. [常见问题与解决方案](#常见问题与解决方案) 12. [总结](#总结) --- ## 限流技术概述 限流(Rate Limiting)是保障系统稳定性的核心手段,通过控制单位时间内的请求量,防止系统因突发流量导致资源耗尽。常见的限流维度包括: - QPS(每秒查询率) - 并发连接数 - 用户/API级别配额 传统单机限流存在一致性难题,而Redis凭借其单线程特性、原子操作和丰富的数据结构,成为分布式限流的首选方案。 --- ## Redis实现限流的优势 1. **原子性保证**:INCR、SETNX等命令的原子执行 2. **高性能**:内存操作达到10万+ QPS 3. **持久化支持**:RDB/AOF保障限流数据不丢失 4. **分布式协调**:多节点共享限流状态 5. **丰富数据结构**:String/Hash/ZSet等灵活组合 --- ## 基于计数器的限流 最简单的限流实现,适用于对精度要求不高的场景。 ### 基础实现 ```python def is_allowed(key, limit, period): current = redis.incr(key) if current == 1: redis.expire(key, period) return current <= limit
-- KEYS[1]: 限流key -- ARGV[1]: 时间窗口(秒) -- ARGV[2]: 最大阈值 local counter = redis.call("INCR", KEYS[1]) if counter == 1 then redis.call("EXPIRE", KEYS[1], ARGV[1]) end return counter <= tonumber(ARGV[2]) and 1 or 0
通过多时间片实现更平滑的流量控制,典型实现方式:
def sliding_window(key, window_size, max_count): now = time.time() pipeline = redis.pipeline() pipeline.zadd(key, {now: now}) pipeline.zremrangebyscore(key, 0, now - window_size) pipeline.zcard(key) _, _, count = pipeline.execute() return count <= max_count
将窗口划分为多个子区间(如60个1秒区间),使用Hash存储:
local current_time = tonumber(ARGV[1]) local window_size = tonumber(ARGV[2]) local limit = tonumber(ARGV[3]) local slots = {} for i=1, window_size do slots[i] = current_time - window_size + i end local sum = 0 for _, slot in ipairs(slots) do sum = sum + tonumber(redis.call("HGET", KEYS[1], slot) or 0) end if sum >= limit then return 0 end redis.call("HSET", KEYS[1], current_time, (redis.call("HGET", KEYS[1], current_time) or 0) + 1) redis.call("EXPIRE", KEYS[1], window_size * 2) return 1
允许突发流量的经典算法,Redis实现要点:
key = { "last_time": 最后更新时间戳, "tokens": 当前令牌数, "rate": 每秒生成速率, "capacity": 桶容量 }
local key = KEYS[1] local now = tonumber(ARGV[1]) local tokens_requested = tonumber(ARGV[2]) local bucket = redis.call("HMGET", key, "last_time", "tokens", "rate", "capacity") local last_time = tonumber(bucket[1]) local tokens = tonumber(bucket[2]) local rate = tonumber(bucket[3]) local capacity = tonumber(bucket[4]) -- 初始化桶 if not last_time then last_time = now tokens = capacity redis.call("HMSET", key, "last_time", last_time, "tokens", tokens, "rate", rate, "capacity", capacity) end -- 计算新增令牌 local elapsed = now - last_time local new_tokens = elapsed * rate if new_tokens > 0 then tokens = math.min(tokens + new_tokens, capacity) last_time = now end -- 检查令牌是否充足 if tokens >= tokens_requested then tokens = tokens - tokens_requested redis.call("HMSET", key, "last_time", last_time, "tokens", tokens) return 1 end return 0
恒定速率输出的流量整形算法,与令牌桶的区别: 1. 处理速率恒定 2. 不支持突发流量
local key = KEYS[1] local now = tonumber(ARGV[1]) local capacity = tonumber(ARGV[2]) local rate = tonumber(ARGV[3]) local bucket = redis.call("HMGET", key, "water", "last_time") local water = tonumber(bucket[1] or 0) local last_time = tonumber(bucket[2] or now) -- 计算流出量 local elapsed = now - last_time water = math.max(0, water - elapsed * rate) -- 尝试加水 if water + 1 <= capacity then redis.call("HMSET", key, "water", water + 1, "last_time", now) return 1 end return 0
方案 | 优点 | 缺点 |
---|---|---|
中心化计数器 | 实现简单 | 单点压力大 |
分片计数 | 负载均衡 | 精度下降 |
Redis+Celery | 准实时聚合 | 延迟较高 |
# 使用Redis集群的hash tag保证相同用户路由到同一节点 def cluster_limiter(user_id, limit): key = f"limiter:{{{user_id}}}" return redis.incr(key) <= limit
提供原子化的令牌桶实现:
CL.THROTTLE user123 100 400 60 1 参数说明: - key=user123 - 最大容量=100 - 每秒补充400个令牌 - 时间窗口=60秒 - 本次申请令牌数=1
方法 | QPS | 内存消耗 |
---|---|---|
原生Lua | 35,000 | 低 |
redis-cell | 75,000 | 中 |
多级缓存 | 120,000 | 高 |
location /api/ { lua_shared_dict my_limit_req_store 100m; access_by_lua_file /path/to/redis_limiter.lua; }
// Redisson实现 RRateLimiter rateLimiter = redisson.getRateLimiter("seckill"); rateLimiter.trySetRate(RateType.OVERALL, 1000, 1, RateIntervalUnit.SECONDS); if(rateLimiter.tryAcquire()) { // 处理订单 }
# Scrapy中间件示例 def process_request(self, request, spider): domain = urlparse(request.url).netloc if not check_redis_limit(f"crawl:{domain}", 5, 60): raise IgnoreRequest("Domain rate limited")
redis-cli info stats | grep instantaneous_ops redis-cli slowlog get
现象:时间窗口边缘出现流量突增
解决方案:采用滑动窗口+子区间划分
现象:限流检查导致请求延迟
解决方案: - 设置合理的连接超时 - 降级策略:本地限流+Redis限流组合
现象:内存快速增长
解决方案: - 设置自动过期时间 - 定期扫描清理(使用SCAN替代KEYS)
Redis限流方案选型指南:
算法 | 适用场景 | 实现复杂度 | 精准度 |
---|---|---|---|
固定窗口 | 简单快速拦截 | ★☆☆☆☆ | 低 |
滑动窗口 | API精准控制 | ★★★☆☆ | 高 |
令牌桶 | 允许突发流量 | ★★★★☆ | 中 |
漏桶 | 恒定速率输出 | ★★★☆☆ | 高 |
技术演进趋势: 1. 硬件加速:Redis 7.0的Function加速 2. 混合架构:本地限流+Redis协调 3. 自适应限流:基于实时指标的动态调整 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。