Skip to content

Commit 231a770

Browse files
committed
Blocking reads without timeout
1 parent 9d69ff1 commit 231a770

File tree

3 files changed

+52
-6
lines changed

3 files changed

+52
-6
lines changed

src/cmd.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use std::{collections::BTreeMap, ops::Bound, u64};
1+
use std::{collections::BTreeMap, ops::Bound, time::Duration, u64};
2+
3+
use tokio::sync::mpsc;
24

35
use crate::{error::DBError, protocol::Protocol, server::Server, storage::now_in_millis};
46

@@ -19,7 +21,7 @@ pub enum Cmd {
1921
Type(String),
2022
Xadd(String, String, Vec<(String, String)>),
2123
Xrange(String, String, String),
22-
Xread(Vec<String>, Vec<String>),
24+
Xread(Vec<String>, Vec<String>, Option<u64>),
2325
}
2426

2527
impl Cmd {
@@ -28,7 +30,7 @@ impl Cmd {
2830
match protocol.clone().0 {
2931
Protocol::Array(p) => {
3032
let cmd = p.into_iter().map(|x| x.decode()).collect::<Vec<_>>();
31-
if cmd.len() == 0 {
33+
if cmd.is_empty() {
3234
return Err(DBError("cmd length is 0".to_string()));
3335
}
3436
Ok((
@@ -114,9 +116,20 @@ impl Cmd {
114116
if cmd.len() < 4 || cmd.len() % 2 != 0 {
115117
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
116118
}
117-
let cmd2 = &cmd[2..];
119+
let mut offset = 2;
120+
// block cmd
121+
let mut block = None;
122+
if cmd[1] == "block" {
123+
offset += 2;
124+
if let Ok(block_time) = cmd[2].parse() {
125+
block = Some(block_time);
126+
} else {
127+
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
128+
}
129+
}
130+
let cmd2 = &cmd[offset..];
118131
let len2 = cmd2.len() / 2;
119-
Cmd::Xread(cmd2[0..len2].to_vec(), cmd2[len2..].to_vec())
132+
Cmd::Xread(cmd2[0..len2].to_vec(), cmd2[len2..].to_vec(), block)
120133
}
121134
_ => return Err(DBError(format!("unsupported cmd {:?}", cmd[0]))),
122135
},
@@ -165,7 +178,9 @@ impl Cmd {
165178
}
166179

167180
Cmd::Xrange(stream_key, start, end) => xrange_cmd(server, stream_key, start, end).await,
168-
Cmd::Xread(stream_keys, starts) => xread_cmd(starts, server, stream_keys).await,
181+
Cmd::Xread(stream_keys, starts, block) => {
182+
xread_cmd(starts, server, stream_keys, block).await
183+
}
169184
};
170185
if ret.is_ok() {
171186
server.offset.fetch_add(
@@ -217,7 +232,23 @@ async fn xread_cmd(
217232
starts: &[String],
218233
server: &mut Server,
219234
stream_keys: &[String],
235+
block_millis: &Option<u64>,
220236
) -> Result<Protocol, DBError> {
237+
if let Some(t) = block_millis {
238+
if t > &0 {
239+
tokio::time::sleep(Duration::from_millis(*t)).await;
240+
} else {
241+
let (sender, mut receiver) = mpsc::channel(4);
242+
{
243+
let mut blocker = server.stream_reader_blocker.lock().await;
244+
blocker.push(sender.clone());
245+
}
246+
while let Some(_) = receiver.recv().await {
247+
println!("get new xadd cmd, release block");
248+
break;
249+
}
250+
}
251+
}
221252
let streams = server.streams.lock().await;
222253
let mut ret = Vec::new();
223254
for (i, stream_key) in stream_keys.iter().enumerate() {
@@ -344,6 +375,13 @@ async fn xadd_cmd(
344375
s.push((key.clone(), value.clone()));
345376
}
346377
}
378+
{
379+
let mut blocker = server.stream_reader_blocker.lock().await;
380+
for sender in blocker.iter() {
381+
sender.send(()).await?;
382+
}
383+
blocker.clear();
384+
}
347385
resp_and_replicate(
348386
server,
349387
Protocol::BulkString(offset.to_string()),

src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,8 @@ impl From<mpsc::error::SendError<(Protocol, u64)>> for DBError {
3737
DBError(item.to_string().clone())
3838
}
3939
}
40+
impl From<tokio::sync::mpsc::error::SendError<()>> for DBError {
41+
fn from(item: mpsc::error::SendError<()>) -> Self {
42+
DBError(item.to_string().clone())
43+
}
44+
}

src/server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::sync::Arc;
77
use tokio::fs::OpenOptions;
88
use tokio::io::AsyncReadExt;
99
use tokio::io::AsyncWriteExt;
10+
use tokio::sync::mpsc::Sender;
1011
use tokio::sync::Mutex;
1112

1213
use crate::cmd::Cmd;
@@ -26,6 +27,7 @@ pub struct Server {
2627
pub option: options::DBOption,
2728
pub offset: Arc<AtomicU64>,
2829
pub master_repl_clients: Arc<Mutex<Option<MasterReplicationClient>>>,
30+
pub stream_reader_blocker: Arc<Mutex<Vec<Sender<()>>>>,
2931
master_addr: Option<String>,
3032
}
3133

@@ -55,6 +57,7 @@ impl Server {
5557
Arc::new(Mutex::new(None))
5658
},
5759
offset: Arc::new(AtomicU64::new(0)),
60+
stream_reader_blocker: Arc::new(Mutex::new(Vec::new())),
5861
master_addr,
5962
};
6063

0 commit comments

Comments
 (0)