Skip to content

Commit a2fd938

Browse files
committed
fix
1 parent d4a91b7 commit a2fd938

File tree

9 files changed

+50
-26
lines changed

9 files changed

+50
-26
lines changed

src/query/service/src/interpreters/interpreter_replace.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use databend_common_storage::StageFileInfo;
4747
use databend_common_storages_factory::Table;
4848
use databend_common_storages_fuse::FuseTable;
4949
use databend_storages_common_table_meta::readers::snapshot_reader::TableSnapshotAccessor;
50+
use databend_storages_common_table_meta::table::ClusterType;
5051
use parking_lot::RwLock;
5152

5253
use crate::interpreters::common::check_deduplicate_label;
@@ -290,7 +291,10 @@ impl ReplaceInterpreter {
290291
.ctx
291292
.get_settings()
292293
.get_replace_into_bloom_pruning_max_column_number()?;
293-
let bloom_filter_column_indexes = if table.cluster_key_meta().is_some() {
294+
let bloom_filter_column_indexes = if table
295+
.cluster_type()
296+
.is_some_and(|v| v == ClusterType::Linear)
297+
{
294298
fuse_table
295299
.choose_bloom_filter_columns(
296300
self.ctx.clone(),

src/query/service/src/pipelines/builders/builder_commit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl PipelineBuilder {
4040
self.main_pipeline.add_async_accumulating_transformer(|| {
4141
let base_segments = if matches!(
4242
plan.mutation_kind,
43-
MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster(_)
43+
MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster
4444
) {
4545
vec![]
4646
} else {

src/query/service/src/pipelines/builders/builder_hilbert_serialize.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use databend_common_storages_fuse::operations::TransformSerializeBlock;
2121
use databend_common_storages_fuse::statistics::ClusterStatsGenerator;
2222
use databend_common_storages_fuse::FuseTable;
2323
use databend_common_storages_fuse::TableContext;
24-
use databend_storages_common_table_meta::table::ClusterType;
2524

2625
use crate::pipelines::PipelineBuilder;
2726

@@ -49,7 +48,7 @@ impl PipelineBuilder {
4948
transform_output_port,
5049
table,
5150
ClusterStatsGenerator::default(),
52-
MutationKind::Recluster(ClusterType::Hilbert),
51+
MutationKind::Recluster,
5352
)?;
5453
proc.into_processor()
5554
})

src/query/service/src/pipelines/builders/builder_recluster.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use databend_common_storages_factory::Table;
3434
use databend_common_storages_fuse::operations::TransformSerializeBlock;
3535
use databend_common_storages_fuse::FuseTable;
3636
use databend_common_storages_fuse::TableContext;
37-
use databend_storages_common_table_meta::table::ClusterType;
3837

3938
use crate::pipelines::builders::SortPipelineBuilder;
4039
use crate::pipelines::processors::TransformAddStreamColumns;
@@ -178,7 +177,7 @@ impl PipelineBuilder {
178177
transform_output_port,
179178
table,
180179
cluster_stats_gen.clone(),
181-
MutationKind::Recluster(ClusterType::Linear),
180+
MutationKind::Recluster,
182181
)?;
183182
proc.into_processor()
184183
})

src/query/sql/src/executor/physical_plans/common.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::fmt::Formatter;
1717

1818
use databend_common_expression::types::DataType;
1919
use databend_common_expression::Scalar;
20-
use databend_storages_common_table_meta::table::ClusterType;
2120

2221
use crate::plans::UDFField;
2322
use crate::plans::UDFType;
@@ -71,7 +70,7 @@ pub enum MutationKind {
7170
Delete,
7271
Update,
7372
Replace,
74-
Recluster(ClusterType),
73+
Recluster,
7574
Insert,
7675
Compact,
7776
MergeInto,
@@ -82,10 +81,7 @@ impl Display for MutationKind {
8281
match self {
8382
MutationKind::Delete => write!(f, "Delete"),
8483
MutationKind::Insert => write!(f, "Insert"),
85-
MutationKind::Recluster(cluster_type) => match cluster_type {
86-
ClusterType::Linear => write!(f, "Recluster"),
87-
ClusterType::Hilbert => write!(f, "Hilbert Recluster"),
88-
},
84+
MutationKind::Recluster => write!(f, "Recluster"),
8985
MutationKind::Update => write!(f, "Update"),
9086
MutationKind::Replace => write!(f, "Replace"),
9187
MutationKind::Compact => write!(f, "Compact"),

src/query/sql/src/executor/physical_plans/physical_recluster.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use databend_common_exception::ErrorCode;
2121
use databend_common_exception::Result;
2222
use databend_common_meta_app::schema::TableInfo;
2323
use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler;
24-
use databend_storages_common_table_meta::table::ClusterType;
2524

2625
use crate::executor::physical_plans::CommitSink;
2726
use crate::executor::physical_plans::CompactSource;
@@ -90,7 +89,7 @@ impl PhysicalPlanBuilder {
9089
input: Box::new(plan),
9190
table_info,
9291
snapshot: Some(snapshot),
93-
mutation_kind: MutationKind::Recluster(ClusterType::Hilbert),
92+
mutation_kind: MutationKind::Recluster,
9493
update_stream_meta: vec![],
9594
merge_meta: false,
9695
deduplicated_label: None,
@@ -139,7 +138,7 @@ impl PhysicalPlanBuilder {
139138
input: Box::new(root),
140139
table_info,
141140
snapshot: Some(snapshot),
142-
mutation_kind: MutationKind::Recluster(ClusterType::Linear),
141+
mutation_kind: MutationKind::Recluster,
143142
update_stream_meta: vec![],
144143
merge_meta: false,
145144
deduplicated_label: None,

src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl SnapshotGenerator for MutationGenerator {
101101

102102
if matches!(
103103
self.mutation_kind,
104-
MutationKind::Compact | MutationKind::Recluster(_)
104+
MutationKind::Compact | MutationKind::Recluster
105105
) {
106106
// for compaction, a basic but very important verification:
107107
// the number of rows should be the same

src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,17 @@ use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
6363
pub struct TableMutationAggregator {
6464
ctx: Arc<dyn TableContext>,
6565
schema: TableSchemaRef,
66+
table_id: u64,
6667
dal: Operator,
6768
location_gen: TableMetaLocationGenerator,
68-
6969
thresholds: BlockThresholds,
7070
block_per_seg: usize,
71-
default_cluster_key_id: Option<u32>,
7271

72+
default_cluster_key_id: Option<u32>,
7373
base_segments: Vec<Location>,
7474
// Used for recluster.
7575
recluster_merged_blocks: Vec<Arc<BlockMeta>>,
76+
set_hilbert_level: bool,
7677

7778
mutations: HashMap<SegmentIndex, BlockMutations>,
7879
appended_segments: Vec<Location>,
@@ -83,7 +84,6 @@ pub struct TableMutationAggregator {
8384
kind: MutationKind,
8485
start_time: Instant,
8586
finished_tasks: usize,
86-
table_id: u64,
8787
}
8888

8989
// takes in table mutation logs and aggregates them (former mutation_transform)
@@ -115,7 +115,7 @@ impl AsyncAccumulatingTransform for TableMutationAggregator {
115115
},
116116
self.schema.clone(),
117117
)),
118-
MutationKind::Recluster(_) => self.apply_recluster(&mut new_segment_locs).await?,
118+
MutationKind::Recluster => self.apply_recluster(&mut new_segment_locs).await?,
119119
_ => self.apply_mutation(&mut new_segment_locs).await?,
120120
};
121121

@@ -137,13 +137,24 @@ impl TableMutationAggregator {
137137
removed_statistics: Statistics,
138138
kind: MutationKind,
139139
) -> Self {
140+
let set_hilbert_level = table
141+
.cluster_type()
142+
.is_some_and(|v| matches!(v, ClusterType::Hilbert))
143+
&& matches!(
144+
kind,
145+
MutationKind::Delete
146+
| MutationKind::MergeInto
147+
| MutationKind::Replace
148+
| MutationKind::Recluster
149+
);
140150
TableMutationAggregator {
141151
ctx,
142152
schema: table.schema(),
143153
dal: table.get_operator(),
144154
location_gen: table.meta_location_generator().clone(),
145155
thresholds: table.get_block_thresholds(),
146156
default_cluster_key_id: table.cluster_key_id(),
157+
set_hilbert_level,
147158
block_per_seg: table
148159
.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT),
149160
mutations: HashMap::new(),
@@ -262,6 +273,7 @@ impl TableMutationAggregator {
262273
let default_cluster_key = Some(default_cluster_key_id);
263274
let thresholds = self.thresholds;
264275
let block_per_seg = self.block_per_seg;
276+
let set_hilbert_level = self.set_hilbert_level;
265277
let kind = self.kind;
266278
for chunk in &merged_blocks.into_iter().chunks(chunk_size) {
267279
let new_blocks = chunk.collect::<Vec<_>>();
@@ -279,6 +291,7 @@ impl TableMutationAggregator {
279291
all_perfect,
280292
block_per_seg,
281293
kind,
294+
set_hilbert_level,
282295
)
283296
.await
284297
});
@@ -413,17 +426,19 @@ impl TableMutationAggregator {
413426
let thresholds = self.thresholds;
414427
let default_cluster_key_id = self.default_cluster_key_id;
415428
let block_per_seg = self.block_per_seg;
429+
let kind = self.kind;
430+
let set_hilbert_level = self.set_hilbert_level;
416431
let mut tasks = Vec::with_capacity(segment_indices.len());
417432
for index in segment_indices {
418433
let segment_mutation = self.mutations.remove(&index).unwrap();
419434
let location = self.base_segments.get(index).cloned();
420435
let schema = self.schema.clone();
421436
let op = self.dal.clone();
422437
let location_gen = self.location_gen.clone();
423-
let kind = self.kind;
424438

425-
let mut all_perfect = false;
426439
tasks.push(async move {
440+
let mut all_perfect = false;
441+
let mut set_level = false;
427442
let (new_blocks, origin_summary) = if let Some(loc) = location {
428443
// read the old segment
429444
let compact_segment_info =
@@ -453,6 +468,12 @@ impl TableMutationAggregator {
453468

454469
// assign back the mutated blocks to segment
455470
let new_blocks = block_editor.into_values().collect::<Vec<_>>();
471+
set_level = set_hilbert_level
472+
&& segment_info
473+
.summary
474+
.cluster_stats
475+
.as_ref()
476+
.is_some_and(|v| v.cluster_key_id == default_cluster_key_id.unwrap());
456477
(new_blocks, Some(segment_info.summary))
457478
} else {
458479
// use by compact.
@@ -478,6 +499,7 @@ impl TableMutationAggregator {
478499
all_perfect,
479500
block_per_seg,
480501
kind,
502+
set_level,
481503
)
482504
.await?;
483505

@@ -551,6 +573,7 @@ async fn write_segment(
551573
all_perfect: bool,
552574
block_per_seg: usize,
553575
kind: MutationKind,
576+
set_hilbert_level: bool,
554577
) -> Result<(String, Statistics)> {
555578
let location = location_gen.gen_segment_info_location();
556579
let mut new_summary = reduce_block_metas(&blocks, thresholds, default_cluster_key);
@@ -564,9 +587,13 @@ async fn write_segment(
564587
new_summary.perfect_block_count = new_summary.block_count;
565588
}
566589
}
567-
if matches!(kind, MutationKind::Recluster(ClusterType::Hilbert)) {
568-
assert!(new_summary.cluster_stats.is_none());
569-
let level = if new_summary.block_count >= block_per_seg as u64 {
590+
if set_hilbert_level {
591+
debug_assert!(new_summary.cluster_stats.is_none());
592+
let level = if new_summary.block_count >= block_per_seg as u64
593+
&& (new_summary.row_count as usize >= block_per_seg * thresholds.min_rows_per_block
594+
|| new_summary.uncompressed_byte_size as usize
595+
>= block_per_seg * thresholds.max_bytes_per_block)
596+
{
570597
-1
571598
} else {
572599
0

src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ impl Processor for TransformSerializeBlock {
336336
}
337337
}
338338

339-
if matches!(self.kind, MutationKind::Recluster(_)) {
339+
if matches!(self.kind, MutationKind::Recluster) {
340340
Self::mutation_logs(MutationLogEntry::ReclusterAppendBlock {
341341
block_meta: Arc::new(block_meta),
342342
})

0 commit comments

Comments
 (0)