Skip to content

Commit be6657d

Browse files
authored
feat: hilbert clustering (#17045)
* hilbert_clustering * fix test * vacuum temp files after recluster * fix * add test * fix * fix * add hilbert_clustering_information * update * update * add comments
1 parent 4470ee5 commit be6657d

File tree

80 files changed

+2136
-644
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+2136
-644
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ databend-enterprise-attach-table = { path = "src/query/ee_features/attach_table"
184184
databend-enterprise-background-service = { path = "src/query/ee_features/background_service" }
185185
databend-enterprise-data-mask-feature = { path = "src/query/ee_features/data_mask" }
186186
databend-enterprise-fail-safe = { path = "src/query/ee_features/fail_safe" }
187+
databend-enterprise-hilbert-clustering = { path = "src/query/ee_features/hilbert_clustering" }
187188
databend-enterprise-inverted-index = { path = "src/query/ee_features/inverted_index" }
188189
databend-enterprise-meta = { path = "src/meta/ee" }
189190
databend-enterprise-query = { path = "src/query/ee" }

src/common/exception/src/exception_code.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,9 @@ build_exceptions! {
426426
// recluster error codes
427427
NoNeedToRecluster(4011),
428428
NoNeedToCompact(4012),
429+
UnsupportedClusterType(4013),
429430

430-
RefreshTableInfoFailure(4012),
431+
RefreshTableInfoFailure(4021),
431432
}
432433

433434
// Service errors [5001,6000].

src/common/license/src/license.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ pub enum Feature {
7575
AmendTable,
7676
#[serde(alias = "system_management", alias = "SYSTEM_MANAGEMENT")]
7777
SystemManagement,
78+
#[serde(alias = "hilbert_clustering", alias = "HILBERT_CLUSTERING")]
79+
HilbertClustering,
7880
#[serde(other)]
7981
Unknown,
8082
}
@@ -122,6 +124,7 @@ impl fmt::Display for Feature {
122124
}
123125
Feature::AmendTable => write!(f, "amend_table"),
124126
Feature::SystemManagement => write!(f, "system_management"),
127+
Feature::HilbertClustering => write!(f, "hilbert_clustering"),
125128
Feature::Unknown => write!(f, "unknown"),
126129
}
127130
}
@@ -169,7 +172,8 @@ impl Feature {
169172
| (Feature::InvertedIndex, Feature::InvertedIndex)
170173
| (Feature::VirtualColumn, Feature::VirtualColumn)
171174
| (Feature::AttacheTable, Feature::AttacheTable)
172-
| (Feature::StorageEncryption, Feature::StorageEncryption) => Ok(true),
175+
| (Feature::StorageEncryption, Feature::StorageEncryption)
176+
| (Feature::HilbertClustering, Feature::HilbertClustering) => Ok(true),
173177
(_, _) => Ok(false),
174178
}
175179
}
@@ -337,6 +341,11 @@ mod tests {
337341
serde_json::from_str::<Feature>("\"amend_table\"").unwrap()
338342
);
339343

344+
assert_eq!(
345+
Feature::HilbertClustering,
346+
serde_json::from_str::<Feature>("\"hilbert_clustering\"").unwrap()
347+
);
348+
340349
assert_eq!(
341350
Feature::Unknown,
342351
serde_json::from_str::<Feature>("\"ssss\"").unwrap()
@@ -370,11 +379,12 @@ mod tests {
370379
storage_usage: Some(1),
371380
}),
372381
Feature::AmendTable,
382+
Feature::HilbertClustering,
373383
]),
374384
};
375385

376386
assert_eq!(
377-
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,background_service,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,inverted_index,license_info,storage_encryption,storage_quota(storage_usage: 1),stream,vacuum,virtual_column] }",
387+
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,background_service,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,hilbert_clustering,inverted_index,license_info,storage_encryption,storage_quota(storage_usage: 1),stream,vacuum,virtual_column] }",
378388
license_info.to_string()
379389
);
380390
}

src/query/catalog/src/plan/partition.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,3 +391,10 @@ impl ReclusterParts {
391391
}
392392
}
393393
}
394+
395+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
396+
pub struct ReclusterInfoSideCar {
397+
pub merged_blocks: Vec<Arc<BlockMeta>>,
398+
pub removed_segment_indexes: Vec<usize>,
399+
pub removed_statistics: Statistics,
400+
}

src/query/catalog/src/table.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ use std::sync::Arc;
1919

2020
use chrono::DateTime;
2121
use chrono::Utc;
22+
use databend_common_ast::ast::Expr;
23+
use databend_common_ast::parser::parse_comma_separated_exprs;
24+
use databend_common_ast::parser::tokenize_sql;
2225
use databend_common_exception::ErrorCode;
2326
use databend_common_exception::Result;
2427
use databend_common_expression::BlockThresholds;
@@ -136,6 +139,28 @@ pub trait Table: Sync + Send {
136139
Some(cluster_type)
137140
}
138141

142+
fn resolve_cluster_keys(&self, ctx: Arc<dyn TableContext>) -> Option<Vec<Expr>> {
143+
let Some((_, cluster_key_str)) = &self.cluster_key_meta() else {
144+
return None;
145+
};
146+
let tokens = tokenize_sql(cluster_key_str).unwrap();
147+
let sql_dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default();
148+
let mut ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect).unwrap();
149+
// unwrap tuple.
150+
if ast_exprs.len() == 1 {
151+
if let Expr::Tuple { exprs, .. } = &ast_exprs[0] {
152+
ast_exprs = exprs.clone();
153+
}
154+
} else {
155+
// Defensive check:
156+
// `ast_exprs` should always contain one element which can be one of the following:
157+
// 1. A tuple of composite cluster keys
158+
// 2. A single cluster key
159+
unreachable!("invalid cluster key ast expression, {:?}", ast_exprs);
160+
}
161+
Some(ast_exprs)
162+
}
163+
139164
fn change_tracking_enabled(&self) -> bool {
140165
false
141166
}

src/query/catalog/src/table_context.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,11 +279,23 @@ pub trait TableContext: Send + Sync {
279279
max_files: Option<usize>,
280280
) -> Result<FilteredCopyFiles>;
281281

282-
fn add_segment_location(&self, segment_loc: Location) -> Result<()>;
282+
fn add_written_segment_location(&self, segment_loc: Location) -> Result<()>;
283283

284-
fn clear_segment_locations(&self) -> Result<()>;
284+
fn clear_written_segment_locations(&self) -> Result<()>;
285285

286-
fn get_segment_locations(&self) -> Result<Vec<Location>>;
286+
fn get_written_segment_locations(&self) -> Result<Vec<Location>>;
287+
288+
fn add_selected_segment_location(&self, _segment_loc: Location) {
289+
unimplemented!()
290+
}
291+
292+
fn get_selected_segment_locations(&self) -> Vec<Location> {
293+
unimplemented!()
294+
}
295+
296+
fn clear_selected_segment_locations(&self) {
297+
unimplemented!()
298+
}
287299

288300
fn add_file_status(&self, file_path: &str, file_status: FileStatus) -> Result<()>;
289301

src/query/ee/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ databend-enterprise-attach-table = { workspace = true }
4848
databend-enterprise-background-service = { workspace = true }
4949
databend-enterprise-data-mask-feature = { workspace = true }
5050
databend-enterprise-fail-safe = { workspace = true }
51+
databend-enterprise-hilbert-clustering = { workspace = true }
5152
databend-enterprise-inverted-index = { workspace = true }
5253
databend-enterprise-resources-management = { workspace = true }
5354
databend-enterprise-storage-encryption = { workspace = true }

src/query/ee/src/enterprise_services.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::attach_table::RealAttachTableHandler;
2121
use crate::background_service::RealBackgroundService;
2222
use crate::data_mask::RealDatamaskHandler;
2323
use crate::fail_safe::RealFailSafeHandler;
24+
use crate::hilbert_clustering::RealHilbertClusteringHandler;
2425
use crate::inverted_index::RealInvertedIndexHandler;
2526
use crate::license::license_mgr::RealLicenseManager;
2627
use crate::resource_management::init_resources_management;
@@ -47,6 +48,7 @@ impl EnterpriseServices {
4748
RealStorageQuotaHandler::init(&cfg)?;
4849
RealFailSafeHandler::init()?;
4950
init_resources_management(&cfg).await?;
51+
RealHilbertClusteringHandler::init()?;
5052
Ok(())
5153
}
5254
}

0 commit comments

Comments
 (0)