Skip to content
34 changes: 12 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion src/common/hashtable/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

// To avoid RUSTFLAGS="-C target-feature=+sse4.2" warning.

use std::hash::BuildHasher;
use std::iter::TrustedLen;
use std::mem::MaybeUninit;
use std::num::NonZeroU64;
Expand Down Expand Up @@ -219,6 +218,8 @@ impl FastHash for u128 {
unsafe { _mm_crc32_u64(value, (*self >> 64) as u64) }
} else {
use std::hash::Hasher;
use std::hash::BuildHasher;

let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]);
let mut hasher = state.build_hasher();
hasher.write_u128(*self);
Expand Down Expand Up @@ -249,6 +250,8 @@ impl FastHash for i256 {
value
} else {
use std::hash::Hasher;
use std::hash::BuildHasher;

let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]);
let mut hasher = state.build_hasher();
for x in self.0 {
Expand All @@ -274,6 +277,8 @@ impl FastHash for U256 {
value
} else {
use std::hash::Hasher;
use std::hash::BuildHasher;

let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]);
let mut hasher = state.build_hasher();
for x in self.0 {
Expand Down Expand Up @@ -342,6 +347,8 @@ impl FastHash for [u8] {
value
} else {
use std::hash::Hasher;
use std::hash::BuildHasher;

let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]);
let mut hasher = state.build_hasher();
hasher.write(self);
Expand Down Expand Up @@ -373,6 +380,8 @@ impl<const N: usize> FastHash for ([u64; N], NonZeroU64) {
value
} else {
use std::hash::Hasher;
use std::hash::BuildHasher;

let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]);
let mut hasher = state.build_hasher();
for x in self.0 {
Expand Down Expand Up @@ -413,6 +422,8 @@ pub fn hash_join_fast_string_hash(key: &[u8]) -> u64 {
}
} else {
use std::hash::Hasher;
use std::hash::BuildHasher;

let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]);
let mut hasher = state.build_hasher();
hasher.write(key);
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ roaring = "0.10.1"
sha1 = "0.10.5"
sha2 = "0.10.6"
simdutf8 = "0.1.4"
simple_hll = { version = "0.0.1", features = ["serde_borsh"] }
siphasher = "0.3"
streaming_algorithms = { git = "https://github.com/ariesdevil/streaming_algorithms", rev = "2839d5d" }
strength_reduce = "0.2.3"
stringslice = "0.2.0"
twox-hash = "1.6.3"
Expand Down
147 changes: 85 additions & 62 deletions src/query/functions/src/aggregates/aggregate_approx_count_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::hash::Hash;
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::type_check::check_number;
use databend_common_expression::types::AnyType;
use databend_common_expression::types::DataType;
use databend_common_expression::types::DateType;
Expand All @@ -25,9 +26,12 @@ use databend_common_expression::types::StringType;
use databend_common_expression::types::TimestampType;
use databend_common_expression::types::UInt64Type;
use databend_common_expression::types::ValueType;
use databend_common_expression::types::F64;
use databend_common_expression::with_number_mapped_type;
use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
use databend_common_expression::Scalar;
use streaming_algorithms::HyperLogLog;
use simple_hll::HyperLogLog;

use super::aggregate_function::AggregateFunction;
use super::aggregate_function_factory::AggregateFunctionDescription;
Expand All @@ -37,38 +41,23 @@ use super::AggregateUnaryFunction;
use super::FunctionData;
use super::UnaryState;
use crate::aggregates::aggregator_common::assert_unary_arguments;
use crate::BUILTIN_FUNCTIONS;

/// Use Hyperloglog to estimate distinct of values
struct AggregateApproxCountDistinctState<T>
where T: ValueType
{
hll: HyperLogLog<T::Scalar>,
}
type AggregateApproxCountDistinctState<const HLL_P: usize> = HyperLogLog<HLL_P>;

impl<T> Default for AggregateApproxCountDistinctState<T>
where
T: ValueType + Send + Sync,
T::Scalar: Hash,
{
fn default() -> Self {
Self {
hll: HyperLogLog::<T::Scalar>::new(0.04),
}
}
}

impl<T> UnaryState<T, UInt64Type> for AggregateApproxCountDistinctState<T>
impl<const HLL_P: usize, T> UnaryState<T, UInt64Type> for AggregateApproxCountDistinctState<HLL_P>
where
T: ValueType + Send + Sync,
T::Scalar: Hash,
{
fn add(&mut self, other: T::ScalarRef<'_>) -> Result<()> {
self.hll.push(&T::to_owned_scalar(other));
self.add_object(&T::to_owned_scalar(other));
Ok(())
}

fn merge(&mut self, rhs: &Self) -> Result<()> {
self.hll.union(&rhs.hll);
self.merge(rhs);
Ok(())
}

Expand All @@ -77,18 +66,17 @@ where
builder: &mut Vec<u64>,
_function_data: Option<&dyn FunctionData>,
) -> Result<()> {
builder.push(self.hll.len() as u64);
builder.push(self.count() as u64);
Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, &self.hll)
borsh_serialize_state(writer, &self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized {
let hll = borsh_deserialize_state(reader)?;
Ok(Self { hll })
borsh_deserialize_state(reader)
}
}

Expand All @@ -99,64 +87,99 @@ pub fn try_create_aggregate_approx_count_distinct_function(
) -> Result<Arc<dyn AggregateFunction>> {
assert_unary_arguments(display_name, arguments.len())?;

let return_type = DataType::Number(NumberDataType::UInt64);
let mut p = 14;

if !params.is_empty() {
let error_rate = check_number::<_, F64>(
None,
&FunctionContext::default(),
&Expr::<usize>::Constant {
span: None,
scalar: params[0].clone(),
data_type: params[0].as_ref().infer_data_type(),
},
&BUILTIN_FUNCTIONS,
)?;
p = ((1.04f64 / *error_rate).log2() * 2.0).ceil() as u64;
p = p.clamp(4, 14);
}

match p {
4 => create_templated::<4>(display_name, params, arguments),
5 => create_templated::<5>(display_name, params, arguments),
6 => create_templated::<6>(display_name, params, arguments),
7 => create_templated::<7>(display_name, params, arguments),
8 => create_templated::<8>(display_name, params, arguments),
9 => create_templated::<9>(display_name, params, arguments),
10 => create_templated::<10>(display_name, params, arguments),
11 => create_templated::<11>(display_name, params, arguments),
12 => create_templated::<12>(display_name, params, arguments),
13 => create_templated::<13>(display_name, params, arguments),
14 => create_templated::<14>(display_name, params, arguments),
_ => unreachable!(),
}
}

fn create_templated<const P: usize>(
display_name: &str,
params: Vec<Scalar>,
arguments: Vec<DataType>,
) -> Result<Arc<dyn AggregateFunction>> {
let return_type = DataType::Number(NumberDataType::UInt64);
with_number_mapped_type!(|NUM_TYPE| match &arguments[0] {
DataType::Number(NumberDataType::NUM_TYPE) => {
let func = AggregateUnaryFunction::<
AggregateApproxCountDistinctState<NumberType<NUM_TYPE>>,
NumberType<NUM_TYPE>,
UInt64Type,
>::try_create(
display_name, return_type, params, arguments[0].clone()
)
.with_need_drop(true);
let func =
AggregateUnaryFunction::<HyperLogLog<P>, NumberType<NUM_TYPE>, UInt64Type>::try_create(
display_name,
return_type,
params,
arguments[0].clone(),
)
.with_need_drop(true);

Ok(Arc::new(func))
}
DataType::String => {
let func = AggregateUnaryFunction::<
AggregateApproxCountDistinctState<StringType>,
StringType,
UInt64Type,
>::try_create(
display_name, return_type, params, arguments[0].clone()
)
.with_need_drop(true);
let func =
AggregateUnaryFunction::<HyperLogLog<P>, StringType, UInt64Type>::try_create(
display_name,
return_type,
params,
arguments[0].clone(),
)
.with_need_drop(true);

Ok(Arc::new(func))
}
DataType::Date => {
let func = AggregateUnaryFunction::<
AggregateApproxCountDistinctState<DateType>,
DateType,
UInt64Type,
>::try_create(
display_name, return_type, params, arguments[0].clone()
let func = AggregateUnaryFunction::<HyperLogLog<P>, DateType, UInt64Type>::try_create(
display_name,
return_type,
params,
arguments[0].clone(),
)
.with_need_drop(true);

Ok(Arc::new(func))
}
DataType::Timestamp => {
let func = AggregateUnaryFunction::<
AggregateApproxCountDistinctState<TimestampType>,
TimestampType,
UInt64Type,
>::try_create(
display_name, return_type, params, arguments[0].clone()
)
.with_need_drop(true);
let func =
AggregateUnaryFunction::<HyperLogLog<P>, TimestampType, UInt64Type>::try_create(
display_name,
return_type,
params,
arguments[0].clone(),
)
.with_need_drop(true);

Ok(Arc::new(func))
}
_ => {
let func = AggregateUnaryFunction::<
AggregateApproxCountDistinctState<AnyType>,
AnyType,
UInt64Type,
>::try_create(
display_name, return_type, params, arguments[0].clone()
let func = AggregateUnaryFunction::<HyperLogLog<P>, AnyType, UInt64Type>::try_create(
display_name,
return_type,
params,
arguments[0].clone(),
)
.with_need_drop(true);

Expand Down
14 changes: 13 additions & 1 deletion src/query/storages/fuse/src/statistics/column_statistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;

use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberScalar;
use databend_common_expression::types::NumberType;
use databend_common_expression::types::ValueType;
use databend_common_expression::Column;
Expand All @@ -30,8 +31,19 @@ use databend_storages_common_index::RangeIndex;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use databend_storages_common_table_meta::meta::StatisticsOfColumns;

// Don't change this value
// 0.04f--> 10 buckets
const DISTINCT_ERROR_RATE: f64 = 0.04;

pub fn calc_column_distinct_of_values(column: &Column, rows: usize) -> Result<u64> {
let distinct_values = eval_aggr("approx_count_distinct", vec![], &[column.clone()], rows)?;
let distinct_values = eval_aggr(
"approx_count_distinct",
vec![Scalar::Number(NumberScalar::Float64(
DISTINCT_ERROR_RATE.into(),
))],
&[column.clone()],
rows,
)?;
let col = NumberType::<u64>::try_downcast_column(&distinct_values.0).unwrap();
Ok(col[0])
}
Expand Down
Loading