Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/clickbench/benchmark_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ echo 'Start databend-query...'
cat <<EOF >config.toml
[query]
tenant_id = "benchmark"
cluster_id = "${BENCHMARK_ID}"
warehouse_id = "${BENCHMARK_ID}"
[[query.users]]
name = "root"
auth_type = "no_password"
Expand Down
2 changes: 1 addition & 1 deletion benchmark/clickbench/benchmark_local_merge_into.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ echo 'Start databend-query...'
cat <<EOF >config.toml
[query]
tenant_id = "benchmark"
cluster_id = "${BENCHMARK_ID}"
warehouse_id = "${BENCHMARK_ID}"
[[query.users]]
name = "root"
auth_type = "no_password"
Expand Down
2 changes: 1 addition & 1 deletion docker/query-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ flight_sql_handler_host = "0.0.0.0"
flight_sql_handler_port = 8900

tenant_id = "default"
cluster_id = "default"
warehouse_id = "default"

[log]

Expand Down
2 changes: 1 addition & 1 deletion scripts/distribution/configs/databend-query.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ flight_sql_handler_host = "0.0.0.0"
flight_sql_handler_port = 8900

tenant_id = "default"
cluster_id = "default"
warehouse_id = "default"

table_engine_memory_enabled = true

Expand Down
6 changes: 3 additions & 3 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async fn precheck_services(conf: &InnerConfig) -> Result<()> {
}

let tenant = conf.query.tenant_id.clone();
let cluster_id = conf.query.cluster_id.clone();
let warehouse_id = conf.query.warehouse_id.clone();
let flight_addr = conf.query.flight_api_address.clone();

let mut _sentry_guard = None;
Expand All @@ -108,7 +108,7 @@ async fn precheck_services(conf: &InnerConfig) -> Result<()> {
..Default::default()
})));
sentry::configure_scope(|scope| scope.set_tag("tenant", tenant.name()));
sentry::configure_scope(|scope| scope.set_tag("cluster_id", cluster_id));
sentry::configure_scope(|scope| scope.set_tag("cluster_id", warehouse_id));
sentry::configure_scope(|scope| scope.set_tag("address", flight_addr));
}

Expand Down Expand Up @@ -228,7 +228,7 @@ pub async fn start_services(conf: &InnerConfig) -> Result<()> {
.await?;
info!(
"Databend query has been registered:{:?} to metasrv:{:?}.",
conf.query.cluster_id, conf.meta.endpoints
conf.query.warehouse_id, conf.meta.endpoints
);
}

Expand Down
6 changes: 3 additions & 3 deletions src/common/metrics/src/metrics/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ static CLUSTER_CLUSTER_ERROR_COUNT: LazyLock<FamilyCounter<Vec<(&'static str, St
static CLUSTER_DISCOVERED_NODE_GAUGE: LazyLock<FamilyGauge<Vec<(&'static str, String)>>> =
LazyLock::new(|| register_gauge_family("cluster_discovered_node"));

pub fn metric_incr_cluster_heartbeat_count(
pub fn metric_incr_warehouse_heartbeat_count(
local_id: &str,
flight_address: &str,
cluster_id: &str,
warehouse_id: &str,
tenant_id: &str,
result: &str,
) {
let labels = &vec![
("local_id", String::from(local_id)),
("flight_address", String::from(flight_address)),
("cluster_id", cluster_id.to_string()),
("cluster_id", warehouse_id.to_string()),
("tenant_id", tenant_id.to_string()),
("result", result.to_string()),
];
Expand Down
20 changes: 18 additions & 2 deletions src/query/catalog/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use std::sync::Arc;

use databend_common_meta_types::NodeInfo;

pub struct Cluster {
pub struct Warehouse {
pub local_id: String,
pub nodes: Vec<Arc<NodeInfo>>,
}

impl Cluster {
impl Warehouse {
/// If this cluster is empty?
///
/// # Note
Expand All @@ -40,4 +40,20 @@ impl Cluster {
pub fn is_empty(&self) -> bool {
self.nodes.len() <= 1
}

pub fn create(nodes: Vec<Arc<NodeInfo>>, local_id: String) -> Arc<Warehouse> {
Arc::new(Warehouse { local_id, nodes })
}

pub fn is_local(&self, node: &NodeInfo) -> bool {
node.id == self.local_id
}

pub fn local_id(&self) -> String {
self.local_id.clone()
}

pub fn get_nodes(&self) -> Vec<Arc<NodeInfo>> {
self.nodes.to_vec()
}
}
4 changes: 2 additions & 2 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use parking_lot::RwLock;
use xorf::BinaryFuse16;

use crate::catalog::Catalog;
use crate::cluster_info::Cluster;
use crate::cluster_info::Warehouse;
use crate::merge_into_join::MergeIntoJoin;
use crate::plan::DataSourcePlan;
use crate::plan::PartInfoPtr;
Expand Down Expand Up @@ -189,7 +189,7 @@ pub trait TableContext: Send + Sync {
fn get_connection_id(&self) -> String;
fn get_settings(&self) -> Arc<Settings>;
fn get_shared_settings(&self) -> Arc<Settings>;
fn get_cluster(&self) -> Arc<Cluster>;
fn get_warehouse(&self) -> Arc<Warehouse>;
fn get_processes_info(&self) -> Vec<ProcessInfo>;
fn get_queued_queries(&self) -> Vec<ProcessInfo>;
fn get_queries_profile(&self) -> HashMap<String, Vec<Arc<Profile>>>;
Expand Down
8 changes: 6 additions & 2 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,9 @@ pub struct QueryConfig {
#[clap(long, value_name = "VALUE", default_value_t)]
pub cluster_id: String,

#[clap(long, value_name = "VALUE", default_value_t)]
pub warehouse_id: String,

#[clap(long, value_name = "VALUE", default_value_t)]
pub num_cpus: u64,

Expand Down Expand Up @@ -1673,7 +1676,7 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
Ok(InnerQueryConfig {
tenant_id: Tenant::new_or_err(self.tenant_id, "")
.map_err(|_e| ErrorCode::InvalidConfig("tenant-id can not be empty"))?,
cluster_id: self.cluster_id,
warehouse_id: self.cluster_id,
node_id: "".to_string(),
num_cpus: self.num_cpus,
mysql_handler_host: self.mysql_handler_host,
Expand Down Expand Up @@ -1753,7 +1756,8 @@ impl From<InnerQueryConfig> for QueryConfig {
fn from(inner: InnerQueryConfig) -> Self {
Self {
tenant_id: inner.tenant_id.name().to_string(),
cluster_id: inner.cluster_id,
cluster_id: inner.warehouse_id.clone(),
warehouse_id: inner.warehouse_id,
num_cpus: inner.num_cpus,
mysql_handler_host: inner.mysql_handler_host,
mysql_handler_port: inner.mysql_handler_port,
Expand Down
6 changes: 3 additions & 3 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ impl Debug for InnerConfig {
pub struct QueryConfig {
/// Tenant id for get the information from the MetaSrv.
pub tenant_id: Tenant,
/// ID for construct the cluster.
pub cluster_id: String,
/// ID for construct the warehouse.
pub warehouse_id: String,
// ID for the query node.
// This only initialized when InnerConfig::load().
pub node_id: String,
Expand Down Expand Up @@ -238,7 +238,7 @@ impl Default for QueryConfig {
fn default() -> Self {
Self {
tenant_id: Tenant::new_or_err("admin", "default()").unwrap(),
cluster_id: "".to_string(),
warehouse_id: "".to_string(),
node_id: "".to_string(),
num_cpus: 0,
mysql_handler_host: "127.0.0.1".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/ee/src/background_service/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn get_background_service_user(conf: &InnerConfig) -> UserInfo {
format!(
"{}-{}-background-svc",
conf.query.tenant_id.name(),
conf.query.cluster_id.clone()
conf.query.warehouse_id.clone()
)
.as_str(),
"0.0.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/query/ee/src/test_kits/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl TestFixture {
.await?;
info!(
"Databend query has been registered:{:?} to metasrv:{:?}.",
config.query.cluster_id, config.meta.endpoints
config.query.warehouse_id, config.meta.endpoints
);
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/management/src/cluster/cluster_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use databend_common_meta_types::NodeInfo;

#[async_trait::async_trait]
pub trait ClusterApi: Sync + Send {
// Add a new node info to /tenant/cluster_id/node-name.
// Add a new node info to /tenant/warehouse_id/node-name.
async fn add_node(&self, node: NodeInfo) -> Result<u64>;

// Get the tenant's cluster all nodes.
Expand Down
4 changes: 2 additions & 2 deletions src/query/management/src/cluster/cluster_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl ClusterMgr {
pub fn create(
metastore: MetaStore,
tenant: &str,
cluster_id: &str,
warehouse_id: &str,
lift_time: Duration,
) -> Result<Self> {
if tenant.is_empty() {
Expand All @@ -58,7 +58,7 @@ impl ClusterMgr {
"{}/{}/{}/databend_query",
CLUSTER_API_KEY_PREFIX,
escape_for_key(tenant)?,
escape_for_key(cluster_id)?
warehouse_id,
),
})
}
Expand Down
Loading