A stream adaptor that chunks up items with timeout support. Items are flushed when:
- The buffer reaches capacity or
- A timeout occurs
Based on the Chunks adaptor from futures-util, with added timeout functionality.
Note: Originally called
tokio-batch, but renamed since it has no dependency on Tokio.
Add to your Cargo.toml:
[dependencies] futures-batch = "0.7"Use as a stream combinator:
use std::time::Duration; use futures::{stream, StreamExt}; use futures_batch::ChunksTimeoutStreamExt; #[tokio::main] async fn main() { let results = stream::iter(0..10) .chunks_timeout(5, Duration::from_secs(10)) .collect::<Vec<_>>() .await; assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], results); }This creates chunks of up to 5 items with a 10-second timeout.
For streams that yield Result values, use try_chunks_timeout to batch successful values and immediately propagate errors:
use std::time::Duration; use futures::{stream, StreamExt}; use futures_batch::TryChunksTimeoutStreamExt; #[tokio::main] async fn main() { let results = stream::iter((0..10).map(|i| if i == 5 { Err("error") } else { Ok(i) })) .try_chunks_timeout(3, Duration::from_secs(10)) .collect::<Vec<_>>() .await; // Results in: [Ok([0, 1, 2]), Ok([3, 4]), Err("error"), Ok([6, 7, 8]), Ok([9])] println!("{:?}", results); }This batches Ok values until the buffer is full or timeout occurs, while immediately propagating any Err values.
Enable Sink support for bidirectional streams:
[dependencies] futures-batch = { version = "0.7", features = ["sink"] }When enabled, both ChunksTimeout and TryChunksTimeout implement Sink and forward sink operations to the underlying stream.
futures-batch has minimal overhead and is suitable for high-performance applications:
- Used for batching syscalls in production
- Built on
futures-timerwith microsecond resolution - Zero allocations for chunk creation (reuses capacity)
Benchmarks show consistent ~20ns per operation across different batch sizes.
Thanks to arielb1, alexcrichton, doyoubi, leshow, spebern, and wngr for their contributions!