Skip to content

Commit 6afd539

Browse files
authored
Add disk usage limit configuration to datafusion-cli (apache#15586)
* added disk limit option * run prettier and cargo fmt * help line update
1 parent c0b1fbc commit 6afd539

File tree

2 files changed

+43
-10
lines changed

2 files changed

+43
-10
lines changed

datafusion-cli/src/main.rs

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use datafusion::error::{DataFusionError, Result};
2525
use datafusion::execution::context::SessionConfig;
2626
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool};
2727
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
28+
use datafusion::execution::DiskManager;
2829
use datafusion::prelude::SessionContext;
2930
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
3031
use datafusion_cli::functions::ParquetMetadataFunc;
@@ -39,6 +40,7 @@ use datafusion_cli::{
3940
use clap::Parser;
4041
use datafusion::common::config_err;
4142
use datafusion::config::ConfigOptions;
43+
use datafusion::execution::disk_manager::DiskManagerConfig;
4244
use mimalloc::MiMalloc;
4345

4446
#[global_allocator]
@@ -125,6 +127,14 @@ struct Args {
125127

126128
#[clap(long, help = "Enables console syntax highlighting")]
127129
color: bool,
130+
131+
#[clap(
132+
short = 'd',
133+
long,
134+
help = "Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g')",
135+
value_parser(extract_disk_limit)
136+
)]
137+
disk_limit: Option<usize>,
128138
}
129139

130140
#[tokio::main]
@@ -165,6 +175,18 @@ async fn main_inner() -> Result<()> {
165175
rt_builder = rt_builder.with_memory_pool(pool)
166176
}
167177

178+
// set disk limit
179+
if let Some(disk_limit) = args.disk_limit {
180+
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
181+
182+
let disk_manager = Arc::try_unwrap(disk_manager)
183+
.expect("DiskManager should be a single instance")
184+
.with_max_temp_directory_size(disk_limit.try_into().unwrap())?;
185+
186+
let disk_config = DiskManagerConfig::new_existing(Arc::new(disk_manager));
187+
rt_builder = rt_builder.with_disk_manager(disk_config);
188+
}
189+
168190
let runtime_env = rt_builder.build_arc()?;
169191

170192
// enable dynamic file query
@@ -300,7 +322,7 @@ impl ByteUnit {
300322
}
301323
}
302324

303-
fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
325+
fn parse_size_string(size: &str, label: &str) -> Result<usize, String> {
304326
static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
305327
LazyLock::new(|| {
306328
let mut m = HashMap::new();
@@ -322,25 +344,33 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
322344
let lower = size.to_lowercase();
323345
if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
324346
let num_str = caps.get(1).unwrap().as_str();
325-
let num = num_str.parse::<usize>().map_err(|_| {
326-
format!("Invalid numeric value in memory pool size '{}'", size)
327-
})?;
347+
let num = num_str
348+
.parse::<usize>()
349+
.map_err(|_| format!("Invalid numeric value in {} '{}'", label, size))?;
328350

329351
let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
330-
let unit = &BYTE_SUFFIXES
352+
let unit = BYTE_SUFFIXES
331353
.get(suffix)
332-
.ok_or_else(|| format!("Invalid memory pool size '{}'", size))?;
333-
let memory_pool_size = usize::try_from(unit.multiplier())
354+
.ok_or_else(|| format!("Invalid {} '{}'", label, size))?;
355+
let total_bytes = usize::try_from(unit.multiplier())
334356
.ok()
335357
.and_then(|multiplier| num.checked_mul(multiplier))
336-
.ok_or_else(|| format!("Memory pool size '{}' is too large", size))?;
358+
.ok_or_else(|| format!("{} '{}' is too large", label, size))?;
337359

338-
Ok(memory_pool_size)
360+
Ok(total_bytes)
339361
} else {
340-
Err(format!("Invalid memory pool size '{}'", size))
362+
Err(format!("Invalid {} '{}'", label, size))
341363
}
342364
}
343365

366+
pub fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
367+
parse_size_string(size, "memory pool size")
368+
}
369+
370+
pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
371+
parse_size_string(size, "disk limit")
372+
}
373+
344374
#[cfg(test)]
345375
mod tests {
346376
use super::*;

docs/source/user-guide/cli/usage.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ OPTIONS:
5757
--mem-pool-type <MEM_POOL_TYPE>
5858
Specify the memory pool type 'greedy' or 'fair', default to 'greedy'
5959

60+
-d, --disk-limit <DISK_LIMIT>
61+
Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g')
62+
6063
-p, --data-path <DATA_PATH>
6164
Path to your data, default to current directory
6265

0 commit comments

Comments
 (0)