@@ -157,6 +157,7 @@ def __init__(
157157 consumer_id : str = "$" ,
158158 mkstream : bool = True ,
159159 xread_block : int = 10000 ,
160+ maxlen : Optional [int ] = None ,
160161 additional_streams : Optional [Dict [str , str ]] = None ,
161162 ** connection_kwargs : Any ,
162163 ) -> None :
@@ -176,6 +177,8 @@ def __init__(
176177 :param mkstream: create stream if it does not exist.
177178 :param xread_block: block time in ms for xreadgroup.
178179 Better to set it to a bigger value, to avoid unnecessary calls.
180+ :param maxlen: sets the maximum length of the stream
181+ trims (the old values of) the stream each time a new element is added
179182 :param additional_streams: additional streams to read from.
180183 Each key is a stream name, value is a consumer id.
181184 """
@@ -193,6 +196,7 @@ def __init__(
193196 self .consumer_id = consumer_id
194197 self .mkstream = mkstream
195198 self .block = xread_block
199+ self .maxlen = maxlen
196200 self .additional_streams = additional_streams or {}
197201
198202 async def _declare_consumer_group (self ) -> None :
@@ -223,7 +227,11 @@ async def kick(self, message: BrokerMessage) -> None:
223227 :param message: message to append.
224228 """
225229 async with self ._acquire_master_conn () as redis_conn :
226- await redis_conn .xadd (self .queue_name , {b"data" : message .message })
230+ await redis_conn .xadd (
231+ self .queue_name ,
232+ {b"data" : message .message },
233+ maxlen = self .maxlen ,
234+ )
227235
228236 def _ack_generator (self , id : str ) -> Callable [[], Awaitable [None ]]:
229237 async def _ack () -> None :
0 commit comments