Skip to content

Commit 933d544

Browse files
committed
add hilbert_clustering_information
1 parent a2fd938 commit 933d544

File tree

7 files changed

+272
-7
lines changed

7 files changed

+272
-7
lines changed

src/query/service/src/table_functions/table_function_factory.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_storages_fuse::table_functions::FuseEncodingFunc;
2727
use databend_common_storages_fuse::table_functions::FuseStatisticsFunc;
2828
use databend_common_storages_fuse::table_functions::FuseTimeTravelSizeFunc;
2929
use databend_common_storages_fuse::table_functions::FuseVacuumTemporaryTable;
30+
use databend_common_storages_fuse::table_functions::HilbertClusteringInfoFunc;
3031
use databend_common_storages_fuse::table_functions::TableFunctionTemplate;
3132
use databend_common_storages_stream::stream_status_table_func::StreamStatusTable;
3233
use databend_storages_common_table_meta::table_id_ranges::SYS_TBL_FUC_ID_END;
@@ -186,6 +187,14 @@ impl TableFunctionFactory {
186187
),
187188
);
188189

190+
creators.insert(
191+
"hilbert_clustering_information".to_string(),
192+
(
193+
next_id(),
194+
Arc::new(TableFunctionTemplate::<HilbertClusteringInfoFunc>::create),
195+
),
196+
);
197+
189198
creators.insert(
190199
"fuse_vacuum_temporary_table".to_string(),
191200
(

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -403,10 +403,10 @@ impl FuseTable {
403403
}
404404

405405
pub fn linear_cluster_keys(&self, ctx: Arc<dyn TableContext>) -> Vec<RemoteExpr<String>> {
406-
let Some(cluster_type) = self.cluster_type() else {
407-
return vec![];
408-
};
409-
if matches!(cluster_type, ClusterType::Hilbert) {
406+
if self
407+
.cluster_type()
408+
.is_none_or(|v| matches!(v, ClusterType::Hilbert))
409+
{
410410
return vec![];
411411
}
412412

src/query/storages/fuse/src/table_functions/clustering_information.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ impl<'a> ClusteringInformation<'a> {
211211
.get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear);
212212
if matches!(typ, ClusterType::Hilbert) {
213213
return Err(ErrorCode::UnsupportedClusterType(
214-
"Unsupported 'hilbert' type",
214+
"Unsupported 'hilbert' type, please use `hilbert_clustering_information` instead",
215215
));
216216
}
217217
}

src/query/storages/fuse/src/table_functions/fuse_segment.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use databend_common_expression::TableDataType;
2525
use databend_common_expression::TableField;
2626
use databend_common_expression::TableSchema;
2727
use databend_common_expression::TableSchemaRefExt;
28-
use databend_storages_common_table_meta::meta::SegmentInfo;
28+
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
2929
use databend_storages_common_table_meta::meta::TableSnapshot;
3030

3131
use crate::io::SegmentsIO;
@@ -85,7 +85,7 @@ impl TableMetaFunc for FuseSegment {
8585
std::cmp::min(ctx.get_settings().get_max_threads()? as usize * 4, len).max(1);
8686
for chunk in segment_locations.chunks(chunk_size) {
8787
let segments = segments_io
88-
.read_segments::<SegmentInfo>(chunk, true)
88+
.read_segments::<Arc<CompactSegmentInfo>>(chunk, true)
8989
.await?;
9090

9191
for (idx, segment) in segments.into_iter().enumerate() {
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use chrono::Utc;
18+
use databend_common_catalog::catalog::CATALOG_DEFAULT;
19+
use databend_common_catalog::plan::DataSourcePlan;
20+
use databend_common_catalog::table::Table;
21+
use databend_common_catalog::table_args::TableArgs;
22+
use databend_common_catalog::table_context::TableContext;
23+
use databend_common_exception::ErrorCode;
24+
use databend_common_exception::Result;
25+
use databend_common_expression::types::DataType;
26+
use databend_common_expression::types::NumberDataType;
27+
use databend_common_expression::types::NumberScalar;
28+
use databend_common_expression::BlockEntry;
29+
use databend_common_expression::DataBlock;
30+
use databend_common_expression::Scalar;
31+
use databend_common_expression::TableDataType;
32+
use databend_common_expression::TableField;
33+
use databend_common_expression::TableSchema;
34+
use databend_common_expression::TableSchemaRef;
35+
use databend_common_expression::TableSchemaRefExt;
36+
use databend_common_expression::Value;
37+
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
38+
use databend_storages_common_table_meta::table::ClusterType;
39+
use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE;
40+
41+
use crate::io::SegmentsIO;
42+
use crate::table_functions::parse_db_tb_args;
43+
use crate::table_functions::string_literal;
44+
use crate::table_functions::SimpleArgFunc;
45+
use crate::table_functions::SimpleArgFuncTemplate;
46+
use crate::FuseTable;
47+
48+
pub struct HilbertClusteringInfoArgs {
49+
database_name: String,
50+
table_name: String,
51+
}
52+
53+
impl From<&HilbertClusteringInfoArgs> for TableArgs {
54+
fn from(args: &HilbertClusteringInfoArgs) -> Self {
55+
let tbl_args = vec![
56+
string_literal(args.database_name.as_str()),
57+
string_literal(args.table_name.as_str()),
58+
];
59+
TableArgs::new_positioned(tbl_args)
60+
}
61+
}
62+
63+
impl TryFrom<(&str, TableArgs)> for HilbertClusteringInfoArgs {
64+
type Error = ErrorCode;
65+
fn try_from(
66+
(func_name, table_args): (&str, TableArgs),
67+
) -> std::result::Result<Self, Self::Error> {
68+
let (database_name, table_name) = parse_db_tb_args(&table_args, func_name)?;
69+
70+
Ok(Self {
71+
database_name,
72+
table_name,
73+
})
74+
}
75+
}
76+
77+
pub type HilbertClusteringInfoFunc = SimpleArgFuncTemplate<HilbertClusteringInfo>;
78+
pub struct HilbertClusteringInfo;
79+
80+
#[async_trait::async_trait]
81+
impl SimpleArgFunc for HilbertClusteringInfo {
82+
type Args = HilbertClusteringInfoArgs;
83+
84+
fn schema() -> TableSchemaRef {
85+
HilbertClusteringInfoImpl::schema()
86+
}
87+
88+
async fn apply(
89+
ctx: &Arc<dyn TableContext>,
90+
args: &Self::Args,
91+
_plan: &DataSourcePlan,
92+
) -> Result<DataBlock> {
93+
let tenant_id = ctx.get_tenant();
94+
let tbl = ctx
95+
.get_catalog(CATALOG_DEFAULT)
96+
.await?
97+
.get_table(
98+
&tenant_id,
99+
args.database_name.as_str(),
100+
args.table_name.as_str(),
101+
)
102+
.await?;
103+
104+
let tbl = FuseTable::try_from_table(tbl.as_ref())?;
105+
106+
HilbertClusteringInfoImpl::new(ctx.clone(), tbl)
107+
.get_clustering_info()
108+
.await
109+
}
110+
}
111+
112+
struct HilbertClusteringInfoImpl<'a> {
113+
pub ctx: Arc<dyn TableContext>,
114+
pub table: &'a FuseTable,
115+
}
116+
117+
impl<'a> HilbertClusteringInfoImpl<'a> {
118+
fn new(ctx: Arc<dyn TableContext>, table: &'a FuseTable) -> Self {
119+
Self { ctx, table }
120+
}
121+
122+
#[async_backtrace::framed]
123+
async fn get_clustering_info(&self) -> Result<DataBlock> {
124+
let Some(cluster_key_str) = self.table.cluster_key_str() else {
125+
return Err(ErrorCode::UnclusteredTable(format!(
126+
"Unclustered table {}",
127+
self.table.table_info.desc
128+
)));
129+
};
130+
let cluster_type = self
131+
.table
132+
.get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear);
133+
if matches!(cluster_type, ClusterType::Linear) {
134+
return Err(ErrorCode::UnsupportedClusterType(
135+
"Unsupported `linear` type, please use `clustering_information` instead",
136+
));
137+
}
138+
139+
let snapshot = self.table.read_table_snapshot().await?;
140+
let now = Utc::now();
141+
let timestamp = snapshot
142+
.as_ref()
143+
.map_or(now, |s| s.timestamp.unwrap_or(now))
144+
.timestamp_micros();
145+
let mut total_segment_count = 0;
146+
let mut stable_segment_count = 0;
147+
let mut partial_segment_count = 0;
148+
let mut unclustered_segment_count = 0;
149+
if let Some(snapshot) = snapshot {
150+
let total_count = snapshot.segments.len();
151+
total_segment_count = total_count as u64;
152+
let chunk_size = std::cmp::min(
153+
self.ctx.get_settings().get_max_threads()? as usize * 4,
154+
total_count,
155+
)
156+
.max(1);
157+
let segments_io = SegmentsIO::create(
158+
self.ctx.clone(),
159+
self.table.operator.clone(),
160+
self.table.schema(),
161+
);
162+
for chunk in snapshot.segments.chunks(chunk_size) {
163+
let segments = segments_io
164+
.read_segments::<Arc<CompactSegmentInfo>>(chunk, true)
165+
.await?;
166+
for segment in segments {
167+
let segment = segment?;
168+
let Some(level) = segment.summary.cluster_stats.as_ref().map(|v| v.level)
169+
else {
170+
unclustered_segment_count += 1;
171+
continue;
172+
};
173+
if level == -1 {
174+
stable_segment_count += 1;
175+
} else {
176+
partial_segment_count += 1;
177+
}
178+
}
179+
}
180+
}
181+
Ok(DataBlock::new(
182+
vec![
183+
BlockEntry::new(
184+
DataType::String,
185+
Value::Scalar(Scalar::String(cluster_key_str.to_string())),
186+
),
187+
BlockEntry::new(
188+
DataType::String,
189+
Value::Scalar(Scalar::String("hilbert".to_string())),
190+
),
191+
BlockEntry::new(
192+
DataType::Timestamp,
193+
Value::Scalar(Scalar::Timestamp(timestamp)),
194+
),
195+
BlockEntry::new(
196+
DataType::Number(NumberDataType::UInt64),
197+
Value::Scalar(Scalar::Number(NumberScalar::UInt64(total_segment_count))),
198+
),
199+
BlockEntry::new(
200+
DataType::Number(NumberDataType::UInt64),
201+
Value::Scalar(Scalar::Number(NumberScalar::UInt64(stable_segment_count))),
202+
),
203+
BlockEntry::new(
204+
DataType::Number(NumberDataType::UInt64),
205+
Value::Scalar(Scalar::Number(NumberScalar::UInt64(partial_segment_count))),
206+
),
207+
BlockEntry::new(
208+
DataType::Number(NumberDataType::UInt64),
209+
Value::Scalar(Scalar::Number(NumberScalar::UInt64(
210+
unclustered_segment_count,
211+
))),
212+
),
213+
],
214+
1,
215+
))
216+
}
217+
218+
fn schema() -> Arc<TableSchema> {
219+
TableSchemaRefExt::create(vec![
220+
TableField::new("cluster_key", TableDataType::String),
221+
TableField::new("type", TableDataType::String),
222+
TableField::new("timestamp", TableDataType::Timestamp),
223+
TableField::new(
224+
"total_segment_count",
225+
TableDataType::Number(NumberDataType::UInt64),
226+
),
227+
TableField::new(
228+
"stable_segment_count",
229+
TableDataType::Number(NumberDataType::UInt64),
230+
),
231+
TableField::new(
232+
"partial_segment_count",
233+
TableDataType::Number(NumberDataType::UInt64),
234+
),
235+
TableField::new(
236+
"unclustered_segment_count",
237+
TableDataType::Number(NumberDataType::UInt64),
238+
),
239+
])
240+
}
241+
}

src/query/storages/fuse/src/table_functions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod fuse_snapshot;
2424
mod fuse_statistic;
2525
mod fuse_time_travel_size;
2626
mod fuse_vacuum_temporary_table;
27+
mod hilbert_clustering_information;
2728
mod table_args;
2829

2930
pub use clustering_information::ClusteringInformationFunc;
@@ -43,4 +44,5 @@ pub use fuse_statistic::FuseStatisticsFunc;
4344
pub use fuse_time_travel_size::FuseTimeTravelSize;
4445
pub use fuse_time_travel_size::FuseTimeTravelSizeFunc;
4546
pub use fuse_vacuum_temporary_table::FuseVacuumTemporaryTable;
47+
pub use hilbert_clustering_information::HilbertClusteringInfoFunc;
4648
pub use table_args::*;

tests/sqllogictests/suites/ee/07_hilbert_clustering/07_0000_recluster_final.test

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ select count() from fuse_snapshot('test_hilbert','t');
4141
----
4242
4
4343

44+
statement error 4013
45+
select * EXCLUDE(timestamp) from clustering_information('test_hilbert','t');
46+
47+
query TTIIII
48+
select * EXCLUDE(timestamp) from hilbert_clustering_information('test_hilbert','t');
49+
----
50+
(a, b) hilbert 4 0 0 4
51+
4452
statement ok
4553
alter table t recluster final;
4654

@@ -63,6 +71,11 @@ insert into t values(9, 9);
6371
statement ok
6472
alter table t recluster final;
6573

74+
query TTIIII
75+
select * EXCLUDE(timestamp) from hilbert_clustering_information('test_hilbert','t');
76+
----
77+
(a, b) hilbert 3 2 1 0
78+
6679
query I
6780
select count() from fuse_snapshot('test_hilbert','t');
6881
----

0 commit comments

Comments
 (0)