Skip to content

Conversation

@nicois
Copy link

@nicois nicois commented Nov 4, 2017

See http://antirez.com/news/114 for more information.

@nicois nicois force-pushed the master branch 2 times, most recently from bd46c7b to 7495a20 Compare November 4, 2017 02:15
@nicois
Copy link
Author

nicois commented Nov 4, 2017

The tests have been decorated with an arbitrary redis version as we don't yet know the number @antirez will use for his stable release. Once it is available I can update the decorators.

@nicois
Copy link
Author

nicois commented Nov 4, 2017

Also, if you are generally happy with this PR, I can look at updating the documentation. I'll hold off in case there are any fundamental problems you see here.

@nskalis
Copy link

nskalis commented Jan 16, 2018

I would love to see this PR merged, given that the redis team is working on the consumer groups implementation and the specification is already available

@vtermanis
Copy link

vtermanis commented Jan 19, 2018

def xread(self, *full_streams, **kwargs)

@nicois: The current signature cannot handle "partial streams" if their names are either block or count. Might be it worthwhile to have expose them explicitly?

  • def xread(self, *full_streams, count=None, block=False, **partial_streams) or
  • def xread(self, *streams, block=None, count=None, default_id='$') (different approach)

Also, quick question (as someone who's fairly new to the redis-py, wanting to understand what is the right way to extend the library):
What's the advantage of defining separate string_keys_to_dict entries for the response parsers over appending them to the large sub-dict?

@nicois
Copy link
Author

nicois commented Jan 19, 2018

@vtermanis thanks, that's a good point.
In the absence of other perspectives, I'd prefer to stay with the current approach, as it is more flexible with specifying different message_id values for each stream.

I can't really comment about the pros and cons of response parser definitions. I was hoping to get some feedback from @andymccurdy . Partly to confirm the signatures were correct, after which the documentation can be updated.

Also, I will make a simple modification to support XREVRANGE and XLEN, which I wasn't aware of when I first made this PR.

@nicois nicois force-pushed the master branch 3 times, most recently from 196df57 to 6ca2bc2 Compare January 19, 2018 12:04
@nicois
Copy link
Author

nicois commented Jan 19, 2018

@vtermanis I've reverted the count/block to what I originally had. The problem is python2 support. If this were my repository I would ditch python2, but it's not up to me. And python2 requires kwargs with defaults to precede *args.

@nicois nicois force-pushed the master branch 3 times, most recently from 27fb05c to 0487886 Compare January 19, 2018 12:21
@vtermanis
Copy link

Fair enough, makes sense!

@nicois nicois force-pushed the master branch 2 times, most recently from aec486a to 0186808 Compare January 19, 2018 19:05
@nicois
Copy link
Author

nicois commented Jan 19, 2018

Actually, sleeping on it, the *full_streams argument is not very helpful, in practice: the only time you would want to XREAD without providing a message ID is the first iteration around a loop. Every subsequent iteration you'll provide the previous highest message_id.

e.g.

streams = dict(foo='$', bar='$') while True: stream_activity = redis.xread(block=10000, count=10000, **streams) if stream_activity is None: continue for stream_name, messages in stream_activity.items(): for message_id, payload in messages: do_something(stream_name, message_id, payload) streams[stream_name] = message_id

.. so I have removed the *full_streams, allowing me to implement your suggestion.

Secondly, in answer to your question around the usage of string_keys_to_dict: it actually makes no appreciable difference. When the class is initialised, the dict_merge call aggregates the strings into a single dict. So from that point forth, execution speed is identical. Any time variation in execution time is a one-off cost and would be too small to measure.

@nicois nicois changed the title Added support for XADD / XREAD / XRANGE operations Added support for XADD / XREAD / XRANGE / XREVRANGE / XLEN operations Jan 19, 2018
This includes: XADD, XREAD, XRANGE, XREVRANGE, XLEN. See http://antirez.com/news/114 for more information. Consumer groups is not yet supported, as its details are still being finalised upstream.
@ArminGruner
Copy link

Hi, I'm already using your patches to redis-py, but came to an unpleasant limitation:

My streams are named "log/xxx", "log/yyy", and unfortunately the signature you've chosen for
xadd() and xread() do not permit such key names, although they are perfectly valid REDIS key names.

Could you perhaps reconsider the binding? Having keyword arguments for the input parameters put a severe limit on the stream key names, that is, they must be valid Python identifiers.

What if you perhaps just transport the desired streams as normal dictionary?

xread(self, count=None, block=None, streams)

@vtermanis
Copy link

@ArminGruner that's why proposed this approach (earlier in this issue). I agree that having stream names as keyword arguments limiting their names is not great. A workaround with the the pull request the way it currently works it to pass the streams in as a dictionary explicitly, e.g.:

redis.xread(**{'log/xxx': '$', 'log/yyy': 'some_id'})
@ariddell
Copy link

FYI: full documentation on streams was recently added. For example, a general introduction was added on May 18, https://redis.io/topics/streams-intro .

@perkfly
Copy link

perkfly commented Jun 20, 2018

Does this commit support XREADGROUP etc. ?

@oeo
Copy link

oeo commented Jul 2, 2018

Merge please!

@ryanalexanderson
Copy link

Hi all,
I've created a simple iterator object based on nicois' fork that I'd love to get some feedback on. Details at:
https://www.reddit.com/r/redis/comments/8vsxp8/seeking_feedback_on_a_redis_python_stream/

@SylvanG
Copy link

SylvanG commented Aug 1, 2018

I'd like to know when it will be merged

@nicois
Copy link
Author

nicois commented Aug 1, 2018

I have given up on this maintainer. I have just switched to using the async python client, which works really well and already has streams support. I've made a really nice websocket cache to sit downstream of redis streams, which would have been a pain with the non-async library anyway.

@bsergean
Copy link

@nicois do you have links to your async python client ? Is it related to aioredis?

@zvesp
Copy link

zvesp commented Aug 16, 2018

@bsergean they might have been referring to the aredis project, which is listed as an official client and which supports streams.

@Pilen
Copy link

Pilen commented Aug 17, 2018

I just tried to use this (with python 3.5.2) but it fails if redis.StrictRedis(decode_responses=True) as multi_stream_list calls r[0].decode('utf-8') on what is already decoded to a string.

I am not sure if the correct solution is to make a check here, or if the correct design would be to not decode, and letting the user handle it either manually or through decode_responses.

@alkasm
Copy link

alkasm commented Aug 19, 2018

@zvesp it's more likely @nicois is using aioredis as @bsergean mentioned, given that he has it forked on his GitHub.

@AngusP
Copy link
Contributor

AngusP commented Aug 28, 2018

As Redis 5 with streams hasn't been stable-released yet, might it make sense to add a redis5 branch and merge Stream and Redis 5 based PRs in there, given that broken or incorrect or removed commands targeting a release candidate probably shouldn't make it in to a library release (if one happens between now and the first stable Redis to include Streams)?

Also @nicois for the test decorators, the 'Redis 5' release candidates started numbering from 4.9.100

@alkasm
Copy link

alkasm commented Aug 28, 2018

@AngusP 5.0 is on rc4 and the API has remained constant for a long time (with Salvatore saying "this will be the API" about a year ago), so I think we can assume it will not be changing. Regardless, this PR does not implement the full streams API (consumer groups and XACK and all that) so I agree, it would be nice to see it as a branch that can be collaborated on (improving performance, returning iterators for e.g.) as people are less likely to pick up and work from a PR than an active branch. But, the maintainer @andymccurdy has not been very active on this repo for about a year.

@AngusP
Copy link
Contributor

AngusP commented Aug 31, 2018

@andymccurdy would you be open to letting others help maintain this library? I'd be happy to help!

@alkasm the API does seem final at this stage, the redis.io docs are missing a few commands (XACK being a good example) but the consumer group ones are also given here https://gist.github.com/antirez/4e7049ce4fce4aa61bf0cfbc3672e64d (and obviously also in the redis source)

Given this PR is old and in need of some tweaks now as things have changed it seems to me a branch in this report would be the best bet rather than PR'ing downstream?!

@coleifer
Copy link

coleifer commented Oct 9, 2018

I've pushed some streams-related changes to my library walrus. The client (which subclasses redis-py's Redis client) adds methods for all the streams APIs. Additionally there are some high-level container types for working with streams within the context of a consumer group. You can see the high-level APIs in (contrived) action here.

Implementation of low-level APIs: https://github.com/coleifer/walrus/blob/82985ab1ec712cedada9e4b1234178242b1f84f0/walrus/database.py#L126-L431

Higher-level container types: https://github.com/coleifer/walrus/blob/82985ab1ec712cedada9e4b1234178242b1f84f0/walrus/containers.py#L1018-L1397

@itamarhaber
Copy link
Member

This has been merged via #1040 - closing & ty @nicois.

@itamarhaber itamarhaber closed this Nov 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet