Skip to content

Commit d1fc118

Browse files
committed
Add Streams support
1 parent 476a34a commit d1fc118

File tree

6 files changed

+1122
-186
lines changed

6 files changed

+1122
-186
lines changed

.travis.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ matrix:
5252
env: DRIVER=ruby REDIS_BRANCH=3.2 LOW_TIMEOUT=0.3
5353
- rvm: jruby-9.1.17.0
5454
env: DRIVER=ruby REDIS_BRANCH=4.0 LOW_TIMEOUT=0.3
55+
- rvm: 2.5.3
56+
env: DRIVER=ruby REDIS_BRANCH=5.0
5557

5658
notifications:
5759
irc:

lib/redis.rb

Lines changed: 368 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2803,6 +2803,322 @@ def geodist(key, member1, member2, unit = 'm')
28032803
end
28042804
end
28052805

2806+
# Returns the stream information each subcommand.
2807+
#
2808+
# @example stream
2809+
# redis.xinfo(:stream, 'mystream')
2810+
# @example groups
2811+
# redis.xinfo(:groups, 'mystream')
2812+
# @example consumers
2813+
# redis.xinfo(:consumers, 'mystream', 'mygroup')
2814+
#
2815+
# @param subcommand [String] e.g. `stream` `groups` `consumers`
2816+
# @param key [String] the stream key
2817+
# @param group [String] the consumer group name, required if subcommand is `consumers`
2818+
#
2819+
# @return [Hash] information of the stream if subcommand is `stream`
2820+
# @return [Array<Hash>] information of the consumer groups if subcommand is `groups`
2821+
# @return [Array<Hash>] information of the consumers if subcommand is `consumers`
2822+
def xinfo(subcommand, key, group = nil)
2823+
args = [:xinfo, subcommand, key, group].compact
2824+
synchronize do |client|
2825+
client.call(args) do |reply|
2826+
case subcommand.to_s.downcase
2827+
when 'stream' then Hashify.call(reply)
2828+
when 'groups', 'consumers' then reply.map { |arr| Hashify.call(arr) }
2829+
else reply
2830+
end
2831+
end
2832+
end
2833+
end
2834+
2835+
# Add new entry to the stream.
2836+
#
2837+
# @example Without options
2838+
# redis.xadd('mystream', f1: 'v1', f2: 'v2')
2839+
# @example With options
2840+
# redis.xadd('mystream', { f1: 'v1', f2: 'v2' }, id: '0-0', maxlen: 1000, approximately: true)
2841+
#
2842+
# @param key [String] the stream key
2843+
# @param entry [Hash] one or multiple field-value pairs
2844+
# @param opts [Hash] several options for `XADD` command
2845+
#
2846+
# @option opts [String] :id the entry id, default value is `*`, it means auto generation
2847+
# @option opts [Integer] :maxlen max length of entries
2848+
# @option opts [Boolean] :approximately whether to add `~` modifier of maxlen or not
2849+
#
2850+
# @return [String] the entry id
2851+
def xadd(key, entry, opts = {})
2852+
args = [:xadd, key]
2853+
args.concat(['MAXLEN', (opts[:approximately] ? '~' : nil), opts[:maxlen]].compact) if opts[:maxlen]
2854+
args << (opts[:id] || '*')
2855+
args.concat(entry.to_a.flatten)
2856+
synchronize { |client| client.call(args) }
2857+
end
2858+
2859+
# Trims older entries of the stream if needed.
2860+
#
2861+
# @example Without options
2862+
# redis.xtrim('mystream', 1000)
2863+
# @example With options
2864+
# redis.xtrim('mystream', 1000, approximately: true)
2865+
#
2866+
# @param key [String] the stream key
2867+
# @param mexlen [Integer] max length of entries
2868+
# @param approximately [Boolean] whether to add `~` modifier of maxlen or not
2869+
#
2870+
# @return [Integer] the number of entries actually deleted
2871+
def xtrim(key, maxlen, approximately: false)
2872+
args = [:xtrim, key, 'MAXLEN', (approximately ? '~' : nil), maxlen].compact
2873+
synchronize { |client| client.call(args) }
2874+
end
2875+
2876+
# Delete entries by entry ids.
2877+
#
2878+
# @example With splatted entry ids
2879+
# redis.xdel('mystream', '0-1', '0-2')
2880+
# @example With arrayed entry ids
2881+
# redis.xdel('mystream', ['0-1', '0-2'])
2882+
#
2883+
# @param key [String] the stream key
2884+
# @param ids [Array<String>] one or multiple entry ids
2885+
#
2886+
# @return [Integer] the number of entries actually deleted
2887+
def xdel(key, *ids)
2888+
args = [:xdel, key].concat(ids.flatten)
2889+
synchronize { |client| client.call(args) }
2890+
end
2891+
2892+
# Fetches entries of the stream.
2893+
#
2894+
# @example Without options
2895+
# redis.xrange('mystream')
2896+
# @example With first entry id option
2897+
# redis.xrange('mystream', first: '0-1')
2898+
# @example With first and last entry id options
2899+
# redis.xrange('mystream', first: '0-1', last: '0-3')
2900+
# @example With count options
2901+
# redis.xrange('mystream', count: 10)
2902+
#
2903+
# @param key [String] the stream key
2904+
# @param first [String] first entry id of range, default value is `-`
2905+
# @param last [String] last entry id of range, default value is `+`
2906+
# @param count [Integer] the number of entries as limit
2907+
#
2908+
# @return [Hash{String => Hash}] the entries
2909+
def xrange(key, first: '-', last: '+', count: nil)
2910+
args = [:xrange, key, first, last]
2911+
args.concat(['COUNT', count]) if count
2912+
synchronize { |client| client.call(args, &HashifyStreamEntries) }
2913+
end
2914+
2915+
# Fetches entries of the stream in descending order.
2916+
#
2917+
# @example Without options
2918+
# redis.xrevrange('mystream')
2919+
# @example With first entry id option
2920+
# redis.xrevrange('mystream', first: '0-1')
2921+
# @example With first and last entry id options
2922+
# redis.xrevrange('mystream', first: '0-1', last: '0-3')
2923+
# @example With count options
2924+
# redis.xrevrange('mystream', count: 10)
2925+
#
2926+
# @param key [String] the stream key
2927+
# @param first [String] first entry id of range, default value is `-`
2928+
# @param last [String] last entry id of range, default value is `+`
2929+
# @param count [Integer] the number of entries as limit
2930+
#
2931+
# @return [Hash{String => Hash}] the entries
2932+
def xrevrange(key, first: '-', last: '+', count: nil)
2933+
args = [:xrevrange, key, last, first]
2934+
args.concat(['COUNT', count]) if count
2935+
synchronize { |client| client.call(args, &HashifyStreamEntries) }
2936+
end
2937+
2938+
# Returns the number of entries inside a stream.
2939+
#
2940+
# @example With key
2941+
# redis.xlen('mystream')
2942+
#
2943+
# @param key [String] the stream key
2944+
#
2945+
# @return [Integer] the number of entries
2946+
def xlen(key)
2947+
synchronize { |client| client.call([:xlen, key]) }
2948+
end
2949+
2950+
# Fetches entries from one or multiple streams. Optionally blocking.
2951+
#
2952+
# @example With a key
2953+
# redis.xread('mystream', '0-0')
2954+
# @example With multiple keys
2955+
# redis.xread(%w[mystream1 mystream2], %w[0-0 0-0])
2956+
# @example With count option
2957+
# redis.xread('mystream', '0-0', count: 2)
2958+
# @example With block option
2959+
# redis.xread('mystream', '$', block: 1000)
2960+
#
2961+
# @param keys [Array<String>] one or multiple stream keys
2962+
# @param ids [Array<String>] one or multiple entry ids
2963+
# @param count [Integer] the number of entries as limit per stream
2964+
# @param block [Integer] the number of milliseconds as blocking timeout
2965+
#
2966+
# @return [Hash{String => Hash{String => Hash}}] the entries
2967+
def xread(keys, ids, count: nil, block: nil)
2968+
args = [:xread]
2969+
args.concat(['COUNT', count]) if count
2970+
args.concat(['BLOCK', block.to_i]) if block
2971+
_xread(args, keys, ids, block)
2972+
end
2973+
2974+
# Manages the consumer group of the stream.
2975+
#
2976+
# @example With `create` subcommand
2977+
# redis.xgroup(:create, 'mystream', 'mygroup', '$')
2978+
# @example With `setid` subcommand
2979+
# redis.xgroup(:setid, 'mystream', 'mygroup', '$')
2980+
# @example With `destroy` subcommand
2981+
# redis.xgroup(:destroy, 'mystream', 'mygroup')
2982+
# @example With `delconsumer` subcommand
2983+
# redis.xgroup(:delconsumer, 'mystream', 'mygroup', 'consumer1')
2984+
#
2985+
# @param subcommand [String] `create` `setid` `destroy` `delconsumer`
2986+
# @param key [String] the stream key
2987+
# @param group [String] the consumer group name
2988+
# @param id_or_consumer [String]
2989+
# * the entry id or `$`, required if subcommand is `create` or `setid`
2990+
# * the consumer name, required if subcommand is `delconsumer`
2991+
# @param mkstream [Boolean] whether to create an empty stream automatically or not
2992+
#
2993+
# @return [String] `OK` if subcommand is `create` or `setid`
2994+
# @return [Integer] effected count if subcommand is `destroy` or `delconsumer`
2995+
def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)
2996+
args = [:xgroup, subcommand, key, group, id_or_consumer, (mkstream ? 'MKSTREAM' : nil)].compact
2997+
synchronize { |client| client.call(args) }
2998+
end
2999+
3000+
# Fetches a subset of the entries from one or multiple streams related with the consumer group.
3001+
# Optionally blocking.
3002+
#
3003+
# @example With a key
3004+
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>')
3005+
# @example With multiple keys
3006+
# redis.xreadgroup('mygroup', 'consumer1', %w[mystream1 mystream2], %w[> >])
3007+
# @example With count option
3008+
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', count: 2)
3009+
# @example With block option
3010+
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', block: 1000)
3011+
# @example With noack option
3012+
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', noack: true)
3013+
#
3014+
# @param group [String] the consumer group name
3015+
# @param consumer [String] the consumer name
3016+
# @param keys [Array<String>] one or multiple stream keys
3017+
# @param ids [Array<String>] one or multiple entry ids
3018+
# @param opts [Hash] several options for `XREADGROUP` command
3019+
#
3020+
# @option opts [Integer] :count the number of entries as limit
3021+
# @option opts [Integer] :block the number of milliseconds as blocking timeout
3022+
# @option opts [Boolean] :noack whether message loss is acceptable or not
3023+
#
3024+
# @return [Hash{String => Hash{String => Hash}}] the entries
3025+
def xreadgroup(group, consumer, keys, ids, opts = {})
3026+
args = [:xreadgroup, 'GROUP', group, consumer]
3027+
args.concat(['COUNT', opts[:count]]) if opts[:count]
3028+
args.concat(['BLOCK', opts[:block].to_i]) if opts[:block]
3029+
args << 'NOACK' if opts[:noack]
3030+
_xread(args, keys, ids, opts[:block])
3031+
end
3032+
3033+
# Removes one or multiple entries from the pending entries list of a stream consumer group.
3034+
#
3035+
# @example With a entry id
3036+
# redis.xack('mystream', 'mygroup', '1526569495631-0')
3037+
# @example With splatted entry ids
3038+
# redis.xack('mystream', 'mygroup', '0-1', '0-2')
3039+
# @example With arrayed entry ids
3040+
# redis.xack('mystream', 'mygroup', %w[0-1 0-2])
3041+
#
3042+
# @param key [String] the stream key
3043+
# @param group [String] the consumer group name
3044+
# @param ids [Array<String>] one or multiple entry ids
3045+
#
3046+
# @return [Integer] the number of entries successfully acknowledged
3047+
def xack(key, group, *ids)
3048+
args = [:xack, key, group].concat(ids.flatten)
3049+
synchronize { |client| client.call(args) }
3050+
end
3051+
3052+
# Changes the ownership of a pending entry
3053+
#
3054+
# @example With splatted entry ids
3055+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-1', '0-2')
3056+
# @example With arrayed entry ids
3057+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2])
3058+
# @example With idle option
3059+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], idle: 1000)
3060+
# @example With time option
3061+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], time: 1542866959000)
3062+
# @example With retrycount option
3063+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], retrycount: 10)
3064+
# @example With force option
3065+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], force: true)
3066+
# @example With justid option
3067+
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], justid: true)
3068+
#
3069+
# @param key [String] the stream key
3070+
# @param group [String] the consumer group name
3071+
# @param consumer [String] the consumer name
3072+
# @param min_idle_time [Integer] the number of milliseconds
3073+
# @param ids [Array<String>] one or multiple entry ids
3074+
# @param opts [Hash] several options for `XCLAIM` command
3075+
#
3076+
# @option opts [Integer] :idle the number of milliseconds as last time it was delivered of the entry
3077+
# @option opts [Integer] :time the number of milliseconds as a specific Unix time
3078+
# @option opts [Integer] :retrycount the number of retry counter
3079+
# @option opts [Boolean] :force whether to create the pending entry to the pending entries list or not
3080+
# @option opts [Boolean] :justid whether to fetch just an array of entry ids or not
3081+
#
3082+
# @return [Hash{String => Hash}] the entries successfully claimed
3083+
# @return [Array<String>] the entry ids successfully claimed if justid option is `true`
3084+
def xclaim(key, group, consumer, min_idle_time, *ids, **opts)
3085+
args = [:xclaim, key, group, consumer, min_idle_time].concat(ids.flatten)
3086+
args.concat(['IDLE', opts[:idle].to_i]) if opts[:idle]
3087+
args.concat(['TIME', opts[:time].to_i]) if opts[:time]
3088+
args.concat(['RETRYCOUNT', opts[:retrycount]]) if opts[:retrycount]
3089+
args << 'FORCE' if opts[:force]
3090+
args << 'JUSTID' if opts[:justid]
3091+
blk = opts[:justid] ? Noop : HashifyStreamEntries
3092+
synchronize { |client| client.call(args, &blk) }
3093+
end
3094+
3095+
# Fetches not acknowledging pending entries
3096+
#
3097+
# @example With key and group
3098+
# redis.xpending('mystream', 'mygroup')
3099+
# @example With range options
3100+
# redis.xpending('mystream', 'mygroup', first: '-', last: '+', count: 10)
3101+
# @example With range and consumer options
3102+
# redis.xpending('mystream', 'mygroup', 'consumer1', first: '-', last: '+', count: 10)
3103+
#
3104+
# @param key [String] the stream key
3105+
# @param group [String] the consumer group name
3106+
# @param consumer [String] the consumer name
3107+
# @param opts [Hash] several options for `XPENDING` command
3108+
#
3109+
# @option opts [String] :first first entry id of range
3110+
# @option opts [String] :last last entry id of range
3111+
# @option opts [Integer] :count the number of entries as limit
3112+
#
3113+
# @return [Hash] the summary of pending entries
3114+
# @return [Array<Hash>] the pending entries details if options were specified
3115+
def xpending(key, group, consumer = nil, **opts)
3116+
args = [:xpending, key, group, opts[:first], opts[:last], opts[:count], consumer].compact
3117+
summary_needed = consumer.nil? && opts.empty?
3118+
blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails
3119+
synchronize { |client| client.call(args, &blk) }
3120+
end
3121+
28063122
# Interact with the sentinel command (masters, master, slaves, failover)
28073123
#
28083124
# @param [String] subcommand e.g. `masters`, `master`, `slaves`
@@ -2948,6 +3264,43 @@ def method_missing(command, *args)
29483264
end.compact]
29493265
}
29503266

3267+
HashifyStreams =
3268+
lambda { |reply|
3269+
return {} if reply.nil?
3270+
reply.map do |stream_key, entries|
3271+
[stream_key, HashifyStreamEntries.call(entries)]
3272+
end.to_h
3273+
}
3274+
3275+
HashifyStreamEntries =
3276+
lambda { |reply|
3277+
reply.map do |entry_id, values|
3278+
[entry_id, values.each_slice(2).to_h]
3279+
end.to_h
3280+
}
3281+
3282+
HashifyStreamPendings =
3283+
lambda { |reply|
3284+
{
3285+
'size' => reply[0],
3286+
'min_entry_id' => reply[1],
3287+
'max_entry_id' => reply[2],
3288+
'consumers' => reply[3].nil? ? {} : Hash[reply[3]]
3289+
}
3290+
}
3291+
3292+
HashifyStreamPendingDetails =
3293+
lambda { |reply|
3294+
reply.map do |arr|
3295+
{
3296+
'entry_id' => arr[0],
3297+
'consumer' => arr[1],
3298+
'elapsed' => arr[2],
3299+
'count' => arr[3]
3300+
}
3301+
end
3302+
}
3303+
29513304
HashifyClusterNodeInfo =
29523305
lambda { |str|
29533306
arr = str.split(' ')
@@ -3013,6 +3366,21 @@ def _subscription(method, timeout, channels, block)
30133366
@client = original
30143367
end
30153368
end
3369+
3370+
def _xread(args, keys, ids, blocking_timeout_msec)
3371+
keys = keys.is_a?(Array) ? keys : [keys]
3372+
ids = ids.is_a?(Array) ? ids : [ids]
3373+
args.concat(['STREAMS'], keys, ids)
3374+
3375+
synchronize do |client|
3376+
if blocking_timeout_msec.nil?
3377+
client.call(args, &HashifyStreams)
3378+
else
3379+
timeout = client.timeout.to_f + blocking_timeout_msec.to_f / 1000.0
3380+
client.call_with_timeout(args, timeout, &HashifyStreams)
3381+
end
3382+
end
3383+
end
30163384
end
30173385

30183386
require_relative "redis/version"

0 commit comments

Comments
 (0)