Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions questdb-rs/src/ingress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::collections::HashMap;
use std::convert::Infallible;
use std::fmt::{Debug, Display, Formatter, Write};
use std::io::{self, BufRead, BufReader, ErrorKind, Write as IoWrite};
use std::num::NonZeroUsize;
use std::ops::Deref;
use std::path::PathBuf;
use std::str::FromStr;
Expand Down Expand Up @@ -430,11 +431,13 @@ impl OpCase {
}
}

#[derive(Debug, Clone)]
// IMPORTANT: This struct MUST remain `Copy` to ensure that
// there are no heap allocations when performing marker operations.
#[derive(Debug, Clone, Copy)]
struct BufferState {
op_case: OpCase,
row_count: usize,
first_table: Option<String>,
first_table_len: Option<NonZeroUsize>,
transactional: bool,
}

Expand All @@ -443,17 +446,10 @@ impl BufferState {
Self {
op_case: OpCase::Init,
row_count: 0,
first_table: None,
first_table_len: None,
transactional: true,
}
}

fn clear(&mut self) {
self.op_case = OpCase::Init;
self.row_count = 0;
self.first_table = None;
self.transactional = true;
}
}

/// A reusable buffer to prepare a batch of ILP messages.
Expand Down Expand Up @@ -494,11 +490,11 @@ impl BufferState {
/// * A row always starts with [`table`](Buffer::table).
/// * A row must contain at least one [`symbol`](Buffer::symbol) or
/// column (
/// [`column_bool`](Buffer::column_bool),
/// [`column_i64`](Buffer::column_i64),
/// [`column_f64`](Buffer::column_f64),
/// [`column_str`](Buffer::column_str),
/// [`column_ts`](Buffer::column_ts)).
/// [`column_bool`](Buffer::column_bool),
/// [`column_i64`](Buffer::column_i64),
/// [`column_f64`](Buffer::column_f64),
/// [`column_str`](Buffer::column_str),
/// [`column_ts`](Buffer::column_ts)).
/// * Symbols must appear before columns.
/// * A row must be terminated with either [`at`](Buffer::at) or
/// [`at_now`](Buffer::at_now).
Expand Down Expand Up @@ -630,7 +626,7 @@ impl Buffer {
)
));
}
self.marker = Some((self.output.len(), self.state.clone()));
self.marker = Some((self.output.len(), self.state));
Ok(())
}

Expand Down Expand Up @@ -663,7 +659,7 @@ impl Buffer {
/// [`capacity`](Buffer::capacity).
pub fn clear(&mut self) {
self.output.clear();
self.state.clear();
self.state = BufferState::new();
self.marker = None;
}

Expand Down Expand Up @@ -729,16 +725,31 @@ impl Buffer {
let name: TableName<'a> = name.try_into()?;
self.validate_max_name_len(name.name)?;
self.check_op(Op::Table)?;
let table_begin = self.output.len();
write_escaped_unquoted(&mut self.output, name.name);
let table_end = self.output.len();
self.state.op_case = OpCase::TableWritten;

// A buffer stops being transactional if it targets multiple tables.
if let Some(first_table) = &self.state.first_table {
if first_table != name.name {
if let Some(first_table_len) = &self.state.first_table_len {
let first_table = &self.output[0..(first_table_len.get())];
let this_table = &self.output[table_begin..table_end];
if first_table != this_table {
self.state.transactional = false;
}
} else {
self.state.first_table = Some(name.name.to_owned());
debug_assert!(table_begin == 0);

// This is a bit confusing, so worth explaining:
// `NonZeroUsize::new(table_end)` will return `None` if `table_end` is 0,
// but we know that `table_end` is never 0 here, we just need an option type
// anyway, so we don't bother unwrapping it to then wrap it again.
let first_table_len = NonZeroUsize::new(table_end);

// Instead we just assert that it's `Some`.
debug_assert!(first_table_len.is_some());

self.state.first_table_len = first_table_len;
}
Ok(self)
}
Expand Down
72 changes: 72 additions & 0 deletions questdb-rs/src/tests/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,78 @@ fn test_row_count() -> TestResult {
Ok(())
}

#[test]
fn test_transactional() -> TestResult {
let mut buffer = Buffer::new();

// transactional since there are no recorded tables yet
assert_eq!(buffer.row_count(), 0);
assert!(buffer.transactional());

buffer.set_marker()?;
buffer.table("table 1.test")?.symbol("a", "b")?.at_now()?;
assert_eq!(buffer.row_count(), 1); // tables={'table 1.test'}

// still transactional since there is only one single table.
assert!(buffer.transactional());

buffer.table("table 2.test")?.symbol("c", "d")?.at_now()?;

// not transactional since we have both tables "x" and "y".
assert_eq!(buffer.row_count(), 2); // tables={'table 1.test', 'table 2.test'}
assert!(!buffer.transactional());

buffer.rewind_to_marker()?;
// no tables, so we're transactional again
assert_eq!(buffer.row_count(), 0); // tables=[]
assert!(buffer.transactional());
assert!(buffer.is_empty());

// We add another new and different table, so we are still transactional.
buffer.table("test=table=3")?.symbol("e", "f")?.at_now()?;
assert_eq!(buffer.row_count(), 1); // tables={'test=table=3'}
assert!(buffer.transactional());

// Same table again, so we are still transactional.
buffer.table("test=table=3")?.symbol("g", "h")?.at_now()?;
assert_eq!(buffer.row_count(), 2); // tables={'test=table=3'}
assert!(buffer.transactional());

buffer.set_marker()?;
// We add a new different table: Name differs in length.
buffer.table("test=table=3 ")?.symbol("i", "j")?.at_now()?;
assert_eq!(buffer.row_count(), 3); // tables={'test=table=3', 'test=table=3 '}
assert!(!buffer.transactional());

buffer.rewind_to_marker()?;
assert_eq!(buffer.row_count(), 2); // tables={'test=table=3'}
assert!(buffer.transactional());

buffer.set_marker()?;
// We add a new different table: Name differs in content, but not in length.
buffer.table("test=table=4")?.symbol("k", "l")?.at_now()?;
assert_eq!(buffer.row_count(), 3); // tables={'test=table=3', 'test=table=4'}
assert!(!buffer.transactional());

buffer.rewind_to_marker()?;
assert_eq!(buffer.row_count(), 2); // tables={'test=table=3'}
assert!(buffer.transactional());

buffer.clear();
assert_eq!(buffer.row_count(), 0); // tables=[]
assert!(buffer.transactional());
assert!(buffer.is_empty());

// We add three rows of the same new table, so we are still transactional.
buffer.table("test=table=5")?.symbol("m", "n")?.at_now()?;
buffer.table("test=table=5")?.symbol("o", "p")?.at_now()?;
buffer.table("test=table=5")?.symbol("q", "r")?.at_now()?;
assert_eq!(buffer.row_count(), 3); // tables={'test=table=5'}
assert!(buffer.transactional());

Ok(())
}

#[test]
fn test_auth_inconsistent_keys() -> TestResult {
test_bad_key("fLKYEaoEb9lrn3nkwLDA-M_xnuFOdSt9y0Z7_vWSHLU", // d
Expand Down
Loading