- Notifications
You must be signed in to change notification settings - Fork 2.6k
supports for all streams operations #1040
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Great, thanks. I’m visiting family for a few days and will be back on Wednesday. I’ll plan on reviewing/merging this then. One observation already: looks like redis 5.0 is fully released now. Can we update the Travis config to pull the full release version and update the skip test decorators back to 5.0? |
| @andymccurdy enjoy the visit :) |
| @andymccurdy updated travis config to redis5. |
andymccurdy left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made a bunch of inline notes. Most are python 2/3 compatibility and Token-encoding literal string issues. I tried to provide suggested changes for all of those.
There's a few non-comment strings that are using double quotes instead of single quotes. Let's try to be consistent with the rest of the code base and use single quotes for non-comment strings.
Also I didn't see a response callback or any tests for the XPENDING command.
redis/client.py Outdated
| return self.execute_command('SUNIONSTORE', dest, *args) | ||
| | ||
| # STREAMS COMMANDS | ||
| def xadd(self, _name, id='*', maxlen=None, approximate=True, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the key/value pairs of the entry should be specified as a single dict arg rather than kwargs. This is one of the things I regret and intend to change about ZADD. I've seen a number of bug reports with ZADD where a user wanted to use an entry name that conflicted with one of the command arguments. This is also the reason redis-py hasn't included the the newer ZADD args like nx/xx/etc. yet -- it would be backwards incompatible to anyone with a sorted set including an element named nx.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. changing it to "fields" and setting it as a must arg
redis/client.py Outdated
| """ | ||
| pieces = [] | ||
| if maxlen is not None: | ||
| if not isinstance(maxlen, int) or maxlen < 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if not isinstance(maxlen, int) or maxlen < 1: | |
| if not isinstance(maxlen, (int, long)) or maxlen < 1: |
for python 2/3 compat we need isinstance to check both int and long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
redis/client.py Outdated
| pieces.append(str(maxlen)) | ||
| pieces.append(id) | ||
| for pair in iteritems(kwargs): | ||
| pieces.append(pair[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pieces.extend(pair) is slightly more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
redis/client.py Outdated
| """ | ||
| pieces = [start, finish] | ||
| if count is not None: | ||
| if not isinstance(count, int) or count < 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if not isinstance(count, int) or count < 1: | |
| if not isinstance(count, (int, long)) or count < 1: |
py 2/3 compat
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
redis/client.py Outdated
| if not isinstance(param, int): | ||
| raise RedisError("XCLAIM {} must be an integer" | ||
| .format(param_name)) | ||
| pieces.append(str(param)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to supply the actual int values here in addition to the option names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually that is the int value. what i forgot to add was the param name...
renaming param to param_value, and adding param_name to pieces.
redis/client.py Outdated
| return int(response) | ||
| | ||
| | ||
| def stream_key(response): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why we need this callback as it's only returning the response. I think it can be removed and XADD can be omitted from the callback list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
XADD removing unnecessary stream_key parse function
redis/client.py Outdated
| def stream_list(response): | ||
| if response is None: | ||
| return None | ||
| result = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pairs_to_dict callback already does the dict construction that we need. This can simply be:
return [(r[0], pairs_to_dict(r[1])) for r in response]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. stream_list should reuse pairs_to_dict
redis/client.py Outdated
| return None | ||
| result = dict() | ||
| for r in response: | ||
| result[r[0].decode('utf-8')] = stream_list(r[1]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| result[r[0].decode('utf-8')] = stream_list(r[1]) | |
| result[nativestr(r[0])] = stream_list(r[1]) |
The response type can vary based on the decode_responses flag. Use nativestr to only decode if we're dealing with bytes on py3 or str on py2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
multi_stream_list should use nativestr for compatibility
redis/client.py Outdated
| return stream_list(response) | ||
| | ||
| | ||
| def multi_stream_list(response): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several options for the return structure of multi_stream_list:
# a list of of 2-item stream/messages lists [ [ 'stream-1', [ [ '1540456647578-0', {'key-1': 'value-1', 'key-2': 'value-2'} ], [ '1540456647578-1', {'key-1': 'value-1', 'key-2': 'value-2'} ], ], ], [ 'stream-2', [ [ '1540456647578-0', {'key-1': 'value-1', 'key-2': 'value-2'} ], [ '1540456647578-1', {'key-1': 'value-1', 'key-2': 'value-2'} ], ], ], ] # this allows all list-based iterating.... for stream, messages in r.xread(streams={'stream-1': 0, 'stream-2': 0}): for message_id, message_data in messages: .... # alternatively, the existing code represents this return value # as a dict of streams to lists of message id: dict attr pairs { 'stream-1': [ [ '1540456647578-0', {'key-1': 'value-1', 'key-2': 'value-2'} ], [ '1540456647578-1', {'key-1': 'value-1', 'key-2': 'value-2'} ], ], 'stream-2': [ [ '1540456662589-0', {'key-1': 'value-1', 'key-2': 'value-2'} ], [ '1540456662589-1', {'key-1': 'value-1', 'key-2': 'value-2'} ], ] } # iterating here requires the extra .items() call for stream, messages in r.xread(streams={'stream-1': 0, 'stream-2': 0}).items() for message_id, message_data in messages: .... # given redis returns null when there are no messages to read, # you can't reliably index directly into the dictionary, which seems to # take away much of the value of the dictionary # this blows up with a TypeError if there's no message to read # since xread()'s return value will be None. r.xread(streams={'stream-1': 99999999999})['stream-1']I think the first option might be the best choice here. I don't see any value from making the return value a dict of {stream: messages} pairs when you can't reliably index into the dictionary. It seems like this just forces users to type more. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
allow list based iterating on XREADGROUP results
This includes: XADD, XREAD, XRANGE, XREVRANGE, XLEN. See http://antirez.com/news/114 for more information. Consumer groups is not yet supported, as its details are still being finalised upstream.
Co-Authored-By: RoeyPrat <roey.prat@redislabs.com>
…t arg rather than kwargs
redis/client.py Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be more explicit, response callbacks can accept **options. These come from passing **kwargs to execute_command. In the xtrim_range function, you could pass a parse_detail=True to execute_command, then here say something like:
if options.get('parse_detail', False): return parse_range_xpending(response)| return self.execute_command('XPENDING', name, groupname) | ||
| | ||
| def xpending_range(self, name, groupname, start='-', end='+', count=-1, | ||
| consumername=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The count=-1 is something I had been using, but it is not supported going forward I don't believe. See:
https://github.com/antirez/redis/pull/5459#issuecomment-433929399
| Just FYI, I maintain a project called walrus that implements (among other things) a subclass of the Many of the signatures and behaviors are the same as in this PR, but I thought I'd mention a few slight differences for what they're worth. XREAD / XREADGROUP are kind of awkward...they supports multiple streams which can all have different IDs. So the natural value from the user is a dict of stream -> id as you have. What I found, though, is that I usually want to just read from a stream at the max ID (if XREAD) or from the last-read-message (if XREADGROUP), or maybe from a couple streams at the max ID, so I allow the following values, and then normalize them to a dict of stream -> id:
Additionally, the response from the XREADs is kind of weird, because it returns up to Changes look nice, just thought I'd join the mix. |
| raise RedisError('XREAD streams must be a non empty dict') | ||
| pieces.append(Token.get_token('STREAMS')) | ||
| pieces.extend(streams.keys()) | ||
| pieces.extend(streams.values()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it guaranteed that these will line up in earlier Python? Since dicts aren't technically ordered (at least until 3.7) it might be safer to be explicit here.
keys, values = zip(*streams.items())There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See https://docs.python.org/2/library/stdtypes.html#dict.items Specifically:
"If items(), keys(), values(), iteritems(), iterkeys(), and itervalues() are called with no intervening modifications to the dictionary, the lists will directly correspond."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but ya, streams.items() seems better. I'll fix. thx.
| entry in the PEL even if certain specified IDs are not already in the | ||
| PEL assigned to a different client. | ||
| justid: optional boolean, false by default. Return just an array of IDs | ||
| of messages successfully claimed, without returning the actual message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The redis docs mention some of these parameters are more targeted towards internal usage:
The command has multiple options, however most are mainly for internal use in order to transfer the effects of XCLAIM or other commands to the AOF file and to propagate the same effects to the slaves, and are unlikely to be useful to normal users:
Do you think it's worthwhile exposing more than just name/group/consumer/min idle/id list?
Currently I'm fine just support the single, explicit dict form.
@RoeyPrat initially implemented this has a dict with stream names as keys and a list of messages as the value. We talked a bit and found that having a list of (stream, message_list) pairs made for easier iteration. In practice, I can't see anyone using any of the dict functions. You (almost always) want to consume all messages returned to you, especially with for stream, messages in r.xread(...): for message_id, fields in messages: ... |
Brilliant, I must've misread the code. Probably no point in trying to interleave the messages to provide a total ordering, right? Looking forward to getting these changes integrated into ol' walrus. |
| @coleifer It's early and I haven't had coffee yet. What do you mean by "interleaving the messages to provide a total ordering"? |
| I ass-u-me @coleifer means "order the messages by their ids"? |
| BTW congrats on this merge - hoping to see more action in this repo ;) |
| Yeap to both |
| @itamarhaber @RoeyPrat @coleifer The following commands all return representations of stream messages: Another idea would be to instead represent each message as a dict in the following format: { 'stream': 'stream-name', 'id': '1234-0', 'fields': {'field-one', 'val-1'} }I think this has three benefits:
Thoughts? |
| I wondered about similar things...Redis return values can be awkward at times. My sense is that, since the message ID is discrete from any ID key/value within the message body, it makes sense to treat the message/body as a 2-tuple. It is also close to what the protocol is doing, so it should be easy to understand if you're coming from the protocol docs to the python client. Similarly with the list of (stream, message-list) for read responses. I get the sense that redis-py is first-and-foremost a straight up redis client, so any "interpretation" beyond basic translation into python types is left to the application developer (?). That's not to say you wouldn't add a higher-level |
| groupname: name of the consumer group. | ||
| id: ID of the last item in the stream to consider already delivered. | ||
| """ | ||
| return self.execute_command('XGROUP CREATE', name, groupname, id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a new option to make the streams, "MKSTREAM".
Walrus impl for comparison:
def xgroup_create(self, key, group, id='$', mkstream=False): """ Create a consumer group. :param key: stream key -- must exist before creating a group if mkstream is ``False`` (default). :param group: consumer group name :param id: set the id of the last-received-message :param mkstream: create the stream automatically """ cmd = ['XGROUP', 'CREATE', key, group, id] if mkstream: cmd.append('MKSTREAM') return self.execute_command(*cmd) == b'OK'There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this something in beta? I see no mention of it on https://redis.io/commands/xgroup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the MKSTREAM option in ff3bbdf. Thanks for the heads up.
redis/redis-py#1040 contains changes to add support for the streams APIs, making it no longer necessary to maintain a separate implementation. The high-level container-type APIs have not changed significantly, however: When reading from the context of a consumer group, the return value is no longer a dict of {stream: [messages]}, but is rather a list of [(stream, [messages]), ...].
| Thanks for this excellent patch. I removed all the overrides from my own lib here: hell yeaaa: |
| Just a heads-up this command is currently in unstable and I'd assume it'll make its way into the next 5.x release: Note that the comment regarding the command arguments is actually incorrect. I've created a ticket for this here: https://github.com/antirez/redis/issues/5519 |
@andymccurdy @nicois
this should give us support of all streams commands in redis 5.0
based on nicois work in #920, and commits from @itamarhaber.
also fixed some issues with tests, and changed CI to run on newer redis version.