@@ -2808,6 +2808,322 @@ def geodist(key, member1, member2, unit = 'm')
28082808 end
28092809 end
28102810
2811+ # Returns the stream information each subcommand.
2812+ #
2813+ # @example stream
2814+ # redis.xinfo(:stream, 'mystream')
2815+ # @example groups
2816+ # redis.xinfo(:groups, 'mystream')
2817+ # @example consumers
2818+ # redis.xinfo(:consumers, 'mystream', 'mygroup')
2819+ #
2820+ # @param subcommand [String] e.g. `stream` `groups` `consumers`
2821+ # @param key [String] the stream key
2822+ # @param group [String] the consumer group name, required if subcommand is `consumers`
2823+ #
2824+ # @return [Hash] information of the stream if subcommand is `stream`
2825+ # @return [Array<Hash>] information of the consumer groups if subcommand is `groups`
2826+ # @return [Array<Hash>] information of the consumers if subcommand is `consumers`
2827+ def xinfo ( subcommand , key , group = nil )
2828+ args = [ :xinfo , subcommand , key , group ] . compact
2829+ synchronize do |client |
2830+ client . call ( args ) do |reply |
2831+ case subcommand . to_s . downcase
2832+ when 'stream' then Hashify . call ( reply )
2833+ when 'groups' , 'consumers' then reply . map { |arr | Hashify . call ( arr ) }
2834+ else reply
2835+ end
2836+ end
2837+ end
2838+ end
2839+
2840+ # Add new entry to the stream.
2841+ #
2842+ # @example Without options
2843+ # redis.xadd('mystream', f1: 'v1', f2: 'v2')
2844+ # @example With options
2845+ # redis.xadd('mystream', { f1: 'v1', f2: 'v2' }, id: '0-0', maxlen: 1000, approximate: true)
2846+ #
2847+ # @param key [String] the stream key
2848+ # @param entry [Hash] one or multiple field-value pairs
2849+ # @param opts [Hash] several options for `XADD` command
2850+ #
2851+ # @option opts [String] :id the entry id, default value is `*`, it means auto generation
2852+ # @option opts [Integer] :maxlen max length of entries
2853+ # @option opts [Boolean] :approximate whether to add `~` modifier of maxlen or not
2854+ #
2855+ # @return [String] the entry id
2856+ def xadd ( key , entry , opts = { } )
2857+ args = [ :xadd , key ]
2858+ args . concat ( [ 'MAXLEN' , ( opts [ :approximate ] ? '~' : nil ) , opts [ :maxlen ] ] . compact ) if opts [ :maxlen ]
2859+ args << ( opts [ :id ] || '*' )
2860+ args . concat ( entry . to_a . flatten )
2861+ synchronize { |client | client . call ( args ) }
2862+ end
2863+
2864+ # Trims older entries of the stream if needed.
2865+ #
2866+ # @example Without options
2867+ # redis.xtrim('mystream', 1000)
2868+ # @example With options
2869+ # redis.xtrim('mystream', 1000, approximate: true)
2870+ #
2871+ # @param key [String] the stream key
2872+ # @param mexlen [Integer] max length of entries
2873+ # @param approximate [Boolean] whether to add `~` modifier of maxlen or not
2874+ #
2875+ # @return [Integer] the number of entries actually deleted
2876+ def xtrim ( key , maxlen , approximate : false )
2877+ args = [ :xtrim , key , 'MAXLEN' , ( approximate ? '~' : nil ) , maxlen ] . compact
2878+ synchronize { |client | client . call ( args ) }
2879+ end
2880+
2881+ # Delete entries by entry ids.
2882+ #
2883+ # @example With splatted entry ids
2884+ # redis.xdel('mystream', '0-1', '0-2')
2885+ # @example With arrayed entry ids
2886+ # redis.xdel('mystream', ['0-1', '0-2'])
2887+ #
2888+ # @param key [String] the stream key
2889+ # @param ids [Array<String>] one or multiple entry ids
2890+ #
2891+ # @return [Integer] the number of entries actually deleted
2892+ def xdel ( key , *ids )
2893+ args = [ :xdel , key ] . concat ( ids . flatten )
2894+ synchronize { |client | client . call ( args ) }
2895+ end
2896+
2897+ # Fetches entries of the stream.
2898+ #
2899+ # @example Without options
2900+ # redis.xrange('mystream')
2901+ # @example With first entry id option
2902+ # redis.xrange('mystream', first: '0-1')
2903+ # @example With first and last entry id options
2904+ # redis.xrange('mystream', first: '0-1', last: '0-3')
2905+ # @example With count options
2906+ # redis.xrange('mystream', count: 10)
2907+ #
2908+ # @param key [String] the stream key
2909+ # @param first [String] first entry id of range, default value is `-`
2910+ # @param last [String] last entry id of range, default value is `+`
2911+ # @param count [Integer] the number of entries as limit
2912+ #
2913+ # @return [Hash{String => Hash}] the entries
2914+ def xrange ( key , first : '-' , last : '+' , count : nil )
2915+ args = [ :xrange , key , first , last ]
2916+ args . concat ( [ 'COUNT' , count ] ) if count
2917+ synchronize { |client | client . call ( args , &HashifyStreamEntries ) }
2918+ end
2919+
2920+ # Fetches entries of the stream in descending order.
2921+ #
2922+ # @example Without options
2923+ # redis.xrevrange('mystream')
2924+ # @example With first entry id option
2925+ # redis.xrevrange('mystream', first: '0-1')
2926+ # @example With first and last entry id options
2927+ # redis.xrevrange('mystream', first: '0-1', last: '0-3')
2928+ # @example With count options
2929+ # redis.xrevrange('mystream', count: 10)
2930+ #
2931+ # @param key [String] the stream key
2932+ # @param first [String] first entry id of range, default value is `-`
2933+ # @param last [String] last entry id of range, default value is `+`
2934+ # @param count [Integer] the number of entries as limit
2935+ #
2936+ # @return [Hash{String => Hash}] the entries
2937+ def xrevrange ( key , first : '-' , last : '+' , count : nil )
2938+ args = [ :xrevrange , key , last , first ]
2939+ args . concat ( [ 'COUNT' , count ] ) if count
2940+ synchronize { |client | client . call ( args , &HashifyStreamEntries ) }
2941+ end
2942+
2943+ # Returns the number of entries inside a stream.
2944+ #
2945+ # @example With key
2946+ # redis.xlen('mystream')
2947+ #
2948+ # @param key [String] the stream key
2949+ #
2950+ # @return [Integer] the number of entries
2951+ def xlen ( key )
2952+ synchronize { |client | client . call ( [ :xlen , key ] ) }
2953+ end
2954+
2955+ # Fetches entries from one or multiple streams. Optionally blocking.
2956+ #
2957+ # @example With a key
2958+ # redis.xread('mystream', '0-0')
2959+ # @example With multiple keys
2960+ # redis.xread(%w[mystream1 mystream2], %w[0-0 0-0])
2961+ # @example With count option
2962+ # redis.xread('mystream', '0-0', count: 2)
2963+ # @example With block option
2964+ # redis.xread('mystream', '$', block: 1000)
2965+ #
2966+ # @param keys [Array<String>] one or multiple stream keys
2967+ # @param ids [Array<String>] one or multiple entry ids
2968+ # @param count [Integer] the number of entries as limit per stream
2969+ # @param block [Integer] the number of milliseconds as blocking timeout
2970+ #
2971+ # @return [Hash{String => Hash{String => Hash}}] the entries
2972+ def xread ( keys , ids , count : nil , block : nil )
2973+ args = [ :xread ]
2974+ args . concat ( [ 'COUNT' , count ] ) if count
2975+ args . concat ( [ 'BLOCK' , block . to_i ] ) if block
2976+ _xread ( args , keys , ids , block )
2977+ end
2978+
2979+ # Manages the consumer group of the stream.
2980+ #
2981+ # @example With `create` subcommand
2982+ # redis.xgroup(:create, 'mystream', 'mygroup', '$')
2983+ # @example With `setid` subcommand
2984+ # redis.xgroup(:setid, 'mystream', 'mygroup', '$')
2985+ # @example With `destroy` subcommand
2986+ # redis.xgroup(:destroy, 'mystream', 'mygroup')
2987+ # @example With `delconsumer` subcommand
2988+ # redis.xgroup(:delconsumer, 'mystream', 'mygroup', 'consumer1')
2989+ #
2990+ # @param subcommand [String] `create` `setid` `destroy` `delconsumer`
2991+ # @param key [String] the stream key
2992+ # @param group [String] the consumer group name
2993+ # @param id_or_consumer [String]
2994+ # * the entry id or `$`, required if subcommand is `create` or `setid`
2995+ # * the consumer name, required if subcommand is `delconsumer`
2996+ # @param mkstream [Boolean] whether to create an empty stream automatically or not
2997+ #
2998+ # @return [String] `OK` if subcommand is `create` or `setid`
2999+ # @return [Integer] effected count if subcommand is `destroy` or `delconsumer`
3000+ def xgroup ( subcommand , key , group , id_or_consumer = nil , mkstream : false )
3001+ args = [ :xgroup , subcommand , key , group , id_or_consumer , ( mkstream ? 'MKSTREAM' : nil ) ] . compact
3002+ synchronize { |client | client . call ( args ) }
3003+ end
3004+
3005+ # Fetches a subset of the entries from one or multiple streams related with the consumer group.
3006+ # Optionally blocking.
3007+ #
3008+ # @example With a key
3009+ # redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>')
3010+ # @example With multiple keys
3011+ # redis.xreadgroup('mygroup', 'consumer1', %w[mystream1 mystream2], %w[> >])
3012+ # @example With count option
3013+ # redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', count: 2)
3014+ # @example With block option
3015+ # redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', block: 1000)
3016+ # @example With noack option
3017+ # redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', noack: true)
3018+ #
3019+ # @param group [String] the consumer group name
3020+ # @param consumer [String] the consumer name
3021+ # @param keys [Array<String>] one or multiple stream keys
3022+ # @param ids [Array<String>] one or multiple entry ids
3023+ # @param opts [Hash] several options for `XREADGROUP` command
3024+ #
3025+ # @option opts [Integer] :count the number of entries as limit
3026+ # @option opts [Integer] :block the number of milliseconds as blocking timeout
3027+ # @option opts [Boolean] :noack whether message loss is acceptable or not
3028+ #
3029+ # @return [Hash{String => Hash{String => Hash}}] the entries
3030+ def xreadgroup ( group , consumer , keys , ids , opts = { } )
3031+ args = [ :xreadgroup , 'GROUP' , group , consumer ]
3032+ args . concat ( [ 'COUNT' , opts [ :count ] ] ) if opts [ :count ]
3033+ args . concat ( [ 'BLOCK' , opts [ :block ] . to_i ] ) if opts [ :block ]
3034+ args << 'NOACK' if opts [ :noack ]
3035+ _xread ( args , keys , ids , opts [ :block ] )
3036+ end
3037+
3038+ # Removes one or multiple entries from the pending entries list of a stream consumer group.
3039+ #
3040+ # @example With a entry id
3041+ # redis.xack('mystream', 'mygroup', '1526569495631-0')
3042+ # @example With splatted entry ids
3043+ # redis.xack('mystream', 'mygroup', '0-1', '0-2')
3044+ # @example With arrayed entry ids
3045+ # redis.xack('mystream', 'mygroup', %w[0-1 0-2])
3046+ #
3047+ # @param key [String] the stream key
3048+ # @param group [String] the consumer group name
3049+ # @param ids [Array<String>] one or multiple entry ids
3050+ #
3051+ # @return [Integer] the number of entries successfully acknowledged
3052+ def xack ( key , group , *ids )
3053+ args = [ :xack , key , group ] . concat ( ids . flatten )
3054+ synchronize { |client | client . call ( args ) }
3055+ end
3056+
3057+ # Changes the ownership of a pending entry
3058+ #
3059+ # @example With splatted entry ids
3060+ # redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-1', '0-2')
3061+ # @example With arrayed entry ids
3062+ # redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2])
3063+ # @example With idle option
3064+ # redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], idle: 1000)
3065+ # @example With time option
3066+ # redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], time: 1542866959000)
3067+ # @example With retrycount option
3068+ # redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], retrycount: 10)
3069+ # @example With force option
3070+ # redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], force: true)
3071+ # @example With justid option
3072+ # redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], justid: true)
3073+ #
3074+ # @param key [String] the stream key
3075+ # @param group [String] the consumer group name
3076+ # @param consumer [String] the consumer name
3077+ # @param min_idle_time [Integer] the number of milliseconds
3078+ # @param ids [Array<String>] one or multiple entry ids
3079+ # @param opts [Hash] several options for `XCLAIM` command
3080+ #
3081+ # @option opts [Integer] :idle the number of milliseconds as last time it was delivered of the entry
3082+ # @option opts [Integer] :time the number of milliseconds as a specific Unix Epoch time
3083+ # @option opts [Integer] :retrycount the number of retry counter
3084+ # @option opts [Boolean] :force whether to create the pending entry to the pending entries list or not
3085+ # @option opts [Boolean] :justid whether to fetch just an array of entry ids or not
3086+ #
3087+ # @return [Hash{String => Hash}] the entries successfully claimed
3088+ # @return [Array<String>] the entry ids successfully claimed if justid option is `true`
3089+ def xclaim ( key , group , consumer , min_idle_time , *ids , **opts )
3090+ args = [ :xclaim , key , group , consumer , min_idle_time ] . concat ( ids . flatten )
3091+ args . concat ( [ 'IDLE' , opts [ :idle ] . to_i ] ) if opts [ :idle ]
3092+ args . concat ( [ 'TIME' , opts [ :time ] . to_i ] ) if opts [ :time ]
3093+ args . concat ( [ 'RETRYCOUNT' , opts [ :retrycount ] ] ) if opts [ :retrycount ]
3094+ args << 'FORCE' if opts [ :force ]
3095+ args << 'JUSTID' if opts [ :justid ]
3096+ blk = opts [ :justid ] ? Noop : HashifyStreamEntries
3097+ synchronize { |client | client . call ( args , &blk ) }
3098+ end
3099+
3100+ # Fetches not acknowledging pending entries
3101+ #
3102+ # @example With key and group
3103+ # redis.xpending('mystream', 'mygroup')
3104+ # @example With range options
3105+ # redis.xpending('mystream', 'mygroup', first: '-', last: '+', count: 10)
3106+ # @example With range and consumer options
3107+ # redis.xpending('mystream', 'mygroup', 'consumer1', first: '-', last: '+', count: 10)
3108+ #
3109+ # @param key [String] the stream key
3110+ # @param group [String] the consumer group name
3111+ # @param consumer [String] the consumer name
3112+ # @param opts [Hash] several options for `XPENDING` command
3113+ #
3114+ # @option opts [String] :first first entry id of range
3115+ # @option opts [String] :last last entry id of range
3116+ # @option opts [Integer] :count the number of entries as limit
3117+ #
3118+ # @return [Hash] the summary of pending entries
3119+ # @return [Array<Hash>] the pending entries details if options were specified
3120+ def xpending ( key , group , consumer = nil , **opts )
3121+ args = [ :xpending , key , group , opts [ :first ] , opts [ :last ] , opts [ :count ] , consumer ] . compact
3122+ summary_needed = consumer . nil? && opts . empty?
3123+ blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails
3124+ synchronize { |client | client . call ( args , &blk ) }
3125+ end
3126+
28113127 # Interact with the sentinel command (masters, master, slaves, failover)
28123128 #
28133129 # @param [String] subcommand e.g. `masters`, `master`, `slaves`
@@ -2953,6 +3269,43 @@ def method_missing(command, *args)
29533269 end . compact ]
29543270 }
29553271
3272+ HashifyStreams =
3273+ lambda { |reply |
3274+ return { } if reply . nil?
3275+ reply . map do |stream_key , entries |
3276+ [ stream_key , HashifyStreamEntries . call ( entries ) ]
3277+ end . to_h
3278+ }
3279+
3280+ HashifyStreamEntries =
3281+ lambda { |reply |
3282+ reply . map do |entry_id , values |
3283+ [ entry_id , values . each_slice ( 2 ) . to_h ]
3284+ end . to_h
3285+ }
3286+
3287+ HashifyStreamPendings =
3288+ lambda { |reply |
3289+ {
3290+ 'size' => reply [ 0 ] ,
3291+ 'min_entry_id' => reply [ 1 ] ,
3292+ 'max_entry_id' => reply [ 2 ] ,
3293+ 'consumers' => reply [ 3 ] . nil? ? { } : Hash [ reply [ 3 ] ]
3294+ }
3295+ }
3296+
3297+ HashifyStreamPendingDetails =
3298+ lambda { |reply |
3299+ reply . map do |arr |
3300+ {
3301+ 'entry_id' => arr [ 0 ] ,
3302+ 'consumer' => arr [ 1 ] ,
3303+ 'elapsed' => arr [ 2 ] ,
3304+ 'count' => arr [ 3 ]
3305+ }
3306+ end
3307+ }
3308+
29563309 HashifyClusterNodeInfo =
29573310 lambda { |str |
29583311 arr = str . split ( ' ' )
@@ -3018,6 +3371,21 @@ def _subscription(method, timeout, channels, block)
30183371 @client = original
30193372 end
30203373 end
3374+
3375+ def _xread ( args , keys , ids , blocking_timeout_msec )
3376+ keys = keys . is_a? ( Array ) ? keys : [ keys ]
3377+ ids = ids . is_a? ( Array ) ? ids : [ ids ]
3378+ args . concat ( [ 'STREAMS' ] , keys , ids )
3379+
3380+ synchronize do |client |
3381+ if blocking_timeout_msec . nil?
3382+ client . call ( args , &HashifyStreams )
3383+ else
3384+ timeout = client . timeout . to_f + blocking_timeout_msec . to_f / 1000.0
3385+ client . call_with_timeout ( args , timeout , &HashifyStreams )
3386+ end
3387+ end
3388+ end
30213389end
30223390
30233391require_relative "redis/version"
0 commit comments