Skip to content

Conversation

@RoeyPrat
Copy link

@andymccurdy @nicois
this should give us support of all streams commands in redis 5.0
based on nicois work in #920, and commits from @itamarhaber.
also fixed some issues with tests, and changed CI to run on newer redis version.

@andymccurdy
Copy link
Contributor

Great, thanks. I’m visiting family for a few days and will be back on Wednesday. I’ll plan on reviewing/merging this then.

One observation already: looks like redis 5.0 is fully released now. Can we update the Travis config to pull the full release version and update the skip test decorators back to 5.0?

@itamarhaber
Copy link
Member

@andymccurdy enjoy the visit :)

@RoeyPrat
Copy link
Author

@andymccurdy updated travis config to redis5.
this PR is not running travis by automation, but you can see the run here

Copy link
Contributor

@andymccurdy andymccurdy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made a bunch of inline notes. Most are python 2/3 compatibility and Token-encoding literal string issues. I tried to provide suggested changes for all of those.

There's a few non-comment strings that are using double quotes instead of single quotes. Let's try to be consistent with the rest of the code base and use single quotes for non-comment strings.

Also I didn't see a response callback or any tests for the XPENDING command.

redis/client.py Outdated
return self.execute_command('SUNIONSTORE', dest, *args)

# STREAMS COMMANDS
def xadd(self, _name, id='*', maxlen=None, approximate=True, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the key/value pairs of the entry should be specified as a single dict arg rather than kwargs. This is one of the things I regret and intend to change about ZADD. I've seen a number of bug reports with ZADD where a user wanted to use an entry name that conflicted with one of the command arguments. This is also the reason redis-py hasn't included the the newer ZADD args like nx/xx/etc. yet -- it would be backwards incompatible to anyone with a sorted set including an element named nx.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. changing it to "fields" and setting it as a must arg

redis/client.py Outdated
"""
pieces = []
if maxlen is not None:
if not isinstance(maxlen, int) or maxlen < 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not isinstance(maxlen, int) or maxlen < 1:
if not isinstance(maxlen, (int, long)) or maxlen < 1:

for python 2/3 compat we need isinstance to check both int and long.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

redis/client.py Outdated
pieces.append(str(maxlen))
pieces.append(id)
for pair in iteritems(kwargs):
pieces.append(pair[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pieces.extend(pair) is slightly more efficient.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

redis/client.py Outdated
"""
pieces = [start, finish]
if count is not None:
if not isinstance(count, int) or count < 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not isinstance(count, int) or count < 1:
if not isinstance(count, (int, long)) or count < 1:

py 2/3 compat

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

redis/client.py Outdated
if not isinstance(param, int):
raise RedisError("XCLAIM {} must be an integer"
.format(param_name))
pieces.append(str(param))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to supply the actual int values here in addition to the option names.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually that is the int value. what i forgot to add was the param name...
renaming param to param_value, and adding param_name to pieces.

redis/client.py Outdated
return int(response)


def stream_key(response):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we need this callback as it's only returning the response. I think it can be removed and XADD can be omitted from the callback list.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XADD removing unnecessary stream_key parse function

redis/client.py Outdated
def stream_list(response):
if response is None:
return None
result = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pairs_to_dict callback already does the dict construction that we need. This can simply be:

return [(r[0], pairs_to_dict(r[1])) for r in response]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. stream_list should reuse pairs_to_dict

redis/client.py Outdated
return None
result = dict()
for r in response:
result[r[0].decode('utf-8')] = stream_list(r[1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
result[r[0].decode('utf-8')] = stream_list(r[1])
result[nativestr(r[0])] = stream_list(r[1])

The response type can vary based on the decode_responses flag. Use nativestr to only decode if we're dealing with bytes on py3 or str on py2

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.
multi_stream_list should use nativestr for compatibility

redis/client.py Outdated
return stream_list(response)


def multi_stream_list(response):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several options for the return structure of multi_stream_list:

# a list of of 2-item stream/messages lists [ [ 'stream-1', [ [ '1540456647578-0', {'key-1': 'value-1', 'key-2': 'value-2'} ], [ '1540456647578-1', {'key-1': 'value-1', 'key-2': 'value-2'} ], ], ], [ 'stream-2', [ [ '1540456647578-0', {'key-1': 'value-1', 'key-2': 'value-2'} ], [ '1540456647578-1', {'key-1': 'value-1', 'key-2': 'value-2'} ], ], ], ] # this allows all list-based iterating.... for stream, messages in r.xread(streams={'stream-1': 0, 'stream-2': 0}): for message_id, message_data in messages: .... # alternatively, the existing code represents this return value # as a dict of streams to lists of message id: dict attr pairs { 'stream-1': [ [ '1540456647578-0', {'key-1': 'value-1', 'key-2': 'value-2'} ], [ '1540456647578-1', {'key-1': 'value-1', 'key-2': 'value-2'} ], ], 'stream-2': [ [ '1540456662589-0', {'key-1': 'value-1', 'key-2': 'value-2'} ], [ '1540456662589-1', {'key-1': 'value-1', 'key-2': 'value-2'} ], ] } # iterating here requires the extra .items() call for stream, messages in r.xread(streams={'stream-1': 0, 'stream-2': 0}).items() for message_id, message_data in messages: .... # given redis returns null when there are no messages to read, # you can't reliably index directly into the dictionary, which seems to # take away much of the value of the dictionary # this blows up with a TypeError if there's no message to read # since xread()'s return value will be None. r.xread(streams={'stream-1': 99999999999})['stream-1']

I think the first option might be the best choice here. I don't see any value from making the return value a dict of {stream: messages} pairs when you can't reliably index into the dictionary. It seems like this just forces users to type more. Am I missing something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.
allow list based iterating on XREADGROUP results

redis/client.py Outdated
Copy link
Contributor

@andymccurdy andymccurdy Oct 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more explicit, response callbacks can accept **options. These come from passing **kwargs to execute_command. In the xtrim_range function, you could pass a parse_detail=True to execute_command, then here say something like:

if options.get('parse_detail', False): return parse_range_xpending(response)
return self.execute_command('XPENDING', name, groupname)

def xpending_range(self, name, groupname, start='-', end='+', count=-1,
consumername=None):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The count=-1 is something I had been using, but it is not supported going forward I don't believe. See:

https://github.com/antirez/redis/pull/5459#issuecomment-433929399

@coleifer
Copy link

coleifer commented Nov 1, 2018

Just FYI, I maintain a project called walrus that implements (among other things) a subclass of the redis.Redis object and has support for the streams APIs.

Many of the signatures and behaviors are the same as in this PR, but I thought I'd mention a few slight differences for what they're worth.

XREAD / XREADGROUP are kind of awkward...they supports multiple streams which can all have different IDs. So the natural value from the user is a dict of stream -> id as you have. What I found, though, is that I usually want to just read from a stream at the max ID (if XREAD) or from the last-read-message (if XREADGROUP), or maybe from a couple streams at the max ID, so I allow the following values, and then normalize them to a dict of stream -> id:

  • stream name as a string -> becomes {stream: '$'} (or > for xreadgroup)
  • list of stream names -> becomes {stream1: '$', stream2: '$'} (or > for xreadgroup)
  • stream to explicit id -> no transformation needed

Additionally, the response from the XREADs is kind of weird, because it returns up to count rows from each stream passed in...The response is then read as a dict of {stream: [list of messages], other_stream: [list of other stream messages]}... which makes sense (and is what happens by default in my implementation). But it might also be nice if a single stream is passed-in to just skip the dict business and return the list? IDK...special cases and all that. I've implemented a high-level TimeSeries class that just interleaves the messages from multiple streams to form a single list with a total ordering and that's very handy, but not suitable for a low-level API like this.

Changes look nice, just thought I'd join the mix.

raise RedisError('XREAD streams must be a non empty dict')
pieces.append(Token.get_token('STREAMS'))
pieces.extend(streams.keys())
pieces.extend(streams.values())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that these will line up in earlier Python? Since dicts aren't technically ordered (at least until 3.7) it might be safer to be explicit here.

keys, values = zip(*streams.items())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://docs.python.org/2/library/stdtypes.html#dict.items Specifically:

"If items(), keys(), values(), iteritems(), iterkeys(), and itervalues() are called with no intervening modifications to the dictionary, the lists will directly correspond."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but ya, streams.items() seems better. I'll fix. thx.

entry in the PEL even if certain specified IDs are not already in the
PEL assigned to a different client.
justid: optional boolean, false by default. Return just an array of IDs
of messages successfully claimed, without returning the actual message
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The redis docs mention some of these parameters are more targeted towards internal usage:

The command has multiple options, however most are mainly for internal use in order to transfer the effects of XCLAIM or other commands to the AOF file and to propagate the same effects to the slaves, and are unlikely to be useful to normal users:

Do you think it's worthwhile exposing more than just name/group/consumer/min idle/id list?

@andymccurdy
Copy link
Contributor

XREAD / XREADGROUP are kind of awkward...they supports multiple streams which can all have different IDs....

Currently I'm fine just support the single, explicit dict form.

Additionally, the response from the XREADs is kind of weird, because it returns up to count rows from each stream passed in...The response is then read as a dict of {stream: [list of messages], other_stream: [list of other stream messages]}... which makes sense (and is what happens by default in my implementation). But it might also be nice if a single stream is passed-in to just skip the dict business and return the list? IDK...special cases and all that. I've implemented a high-level TimeSeries class that just interleaves the messages from multiple streams to form a single list with a total ordering and that's very handy, but not suitable for a low-level API like this.

@RoeyPrat initially implemented this has a dict with stream names as keys and a list of messages as the value. We talked a bit and found that having a list of (stream, message_list) pairs made for easier iteration. In practice, I can't see anyone using any of the dict functions. You (almost always) want to consume all messages returned to you, especially with XREADGROUP. The new implementation allows for:

for stream, messages in r.xread(...): for message_id, fields in messages: ...
@andymccurdy andymccurdy merged commit a32a8e6 into redis:master Nov 1, 2018
@coleifer
Copy link

coleifer commented Nov 1, 2018

We talked a bit and found that having a list of (stream, message_list) pairs made for easier iteration.

Brilliant, I must've misread the code. Probably no point in trying to interleave the messages to provide a total ordering, right?

Looking forward to getting these changes integrated into ol' walrus.

@andymccurdy
Copy link
Contributor

@coleifer It's early and I haven't had coffee yet. What do you mean by "interleaving the messages to provide a total ordering"?

@itamarhaber
Copy link
Member

I ass-u-me @coleifer means "order the messages by their ids"?

@itamarhaber
Copy link
Member

BTW congrats on this merge - hoping to see more action in this repo ;)

@coleifer
Copy link

coleifer commented Nov 1, 2018

Yeap to both

@andymccurdy
Copy link
Contributor

@itamarhaber @RoeyPrat @coleifer

The following commands all return representations of stream messages: XCLAIM, XINFO STREAM (first-entry, last-entry), XRANGE, XREVRANGE, XREAD and XREADGROUP. Currently these messages are represented as a two item tuple in the form of (message_id, dict_of_fields). The stream name that contains the message is somewhere outside the message, either "higher up" in the data structure, or inferred based on the argument the user supplied when calling the command.

Another idea would be to instead represent each message as a dict in the following format:

{ 'stream': 'stream-name', 'id': '1234-0', 'fields': {'field-one', 'val-1'} }

I think this has three benefits:

  1. The dict representation is more explicit. It's obvious what the fields mean as opposed to the current implementation where you have to remember that message[0] is the message_id and message[1] is is the field dict.

  2. Having the stream name within the message itself makes things easier to debug in user processes. e.g., a worker process needs only to log the message to provide enough debug information about the message, whereas currently it would need to separately also log the stream name.

  3. It makes iterating over XREAD and XREADGROUP responses easier. With this the user doesn't need to perform two loops (currently the first loop iterates over (stream_name, list_of_messages) pairs and the second loop iterates over (message_id, fields) pairs. Instead, all the information they need is provided in each message, so these commands would simply return a list of message dicts.

Thoughts?

@coleifer
Copy link

coleifer commented Nov 1, 2018

I wondered about similar things...Redis return values can be awkward at times. My sense is that, since the message ID is discrete from any ID key/value within the message body, it makes sense to treat the message/body as a 2-tuple. It is also close to what the protocol is doing, so it should be easy to understand if you're coming from the protocol docs to the python client. Similarly with the list of (stream, message-list) for read responses. I get the sense that redis-py is first-and-foremost a straight up redis client, so any "interpretation" beyond basic translation into python types is left to the application developer (?). That's not to say you wouldn't add a higher-level Stream / whatever type -- similar to how pub/sub is handled -- but having the x____ commands stick close to the protocol seems good. People can always manipulate the data how they want, since nothing is lost.

groupname: name of the consumer group.
id: ID of the last item in the stream to consider already delivered.
"""
return self.execute_command('XGROUP CREATE', name, groupname, id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a new option to make the streams, "MKSTREAM".

Walrus impl for comparison:

 def xgroup_create(self, key, group, id='$', mkstream=False): """  Create a consumer group.   :param key: stream key -- must exist before creating a group if  mkstream is ``False`` (default).  :param group: consumer group name  :param id: set the id of the last-received-message  :param mkstream: create the stream automatically  """ cmd = ['XGROUP', 'CREATE', key, group, id] if mkstream: cmd.append('MKSTREAM') return self.execute_command(*cmd) == b'OK'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something in beta? I see no mention of it on https://redis.io/commands/xgroup

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the MKSTREAM option in ff3bbdf. Thanks for the heads up.

coleifer added a commit to coleifer/walrus that referenced this pull request Nov 2, 2018
redis/redis-py#1040 contains changes to add support for the streams APIs, making it no longer necessary to maintain a separate implementation. The high-level container-type APIs have not changed significantly, however: When reading from the context of a consumer group, the return value is no longer a dict of {stream: [messages]}, but is rather a list of [(stream, [messages]), ...].
@coleifer
Copy link

coleifer commented Nov 2, 2018

Thanks for this excellent patch. I removed all the overrides from my own lib here:
ef25195a653cdd862fc64123e9686ea1480362ac

hell yeaaa: 7 files changed, 153 insertions(+), 653 deletions(-)

@coleifer
Copy link

coleifer commented Nov 2, 2018

Just a heads-up this command is currently in unstable and I'd assume it'll make its way into the next 5.x release: XSETID <stream> <id> for setting the max-id for a stream outside the context of a consumer group.

Note that the comment regarding the command arguments is actually incorrect. I've created a ticket for this here: https://github.com/antirez/redis/issues/5519

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

4 participants