Skip to content
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ lint:
# Cargo.toml file formatter(make setup to install)
taplo fmt
# Python file formatter(make setup to install)
ruff format tests/
# ruff format tests/
# Bash file formatter(make setup to install)
shfmt -l -w scripts/*

Expand Down
187 changes: 149 additions & 38 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ use databend_common_base::runtime::ParentMemStat;
use databend_common_base::runtime::ThreadTracker;
use databend_common_base::runtime::GLOBAL_MEM_STAT;
use databend_common_config::GlobalConfig;
use databend_common_expression::DataSchemaRef;
use databend_common_exception::ErrorCode;
use databend_common_expression::DataSchema;
use databend_common_management::WorkloadGroupResourceManager;
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
use fastrace::prelude::*;
use headers::Header;
use headers::HeaderMapExt;
use http::HeaderMap;
use http::HeaderValue;
use http::StatusCode;
Expand All @@ -47,8 +50,10 @@ use poem::put;
use poem::web::Json;
use poem::web::Path;
use poem::EndpointExt;
use poem::FromRequest;
use poem::IntoResponse;
use poem::Request;
use poem::RequestBody;
use poem::Response;
use poem::Route;
use serde::Deserialize;
Expand All @@ -60,6 +65,7 @@ use super::query::ExecuteStateKind;
use super::query::HttpQuery;
use super::query::HttpQueryRequest;
use super::query::HttpQueryResponseInternal;
use super::query::ResponseState;
use crate::clusters::ClusterDiscovery;
use crate::servers::http::error::HttpErrorCode;
use crate::servers::http::error::QueryError;
Expand Down Expand Up @@ -123,7 +129,7 @@ pub struct QueryResponseField {
}

impl QueryResponseField {
pub fn from_schema(schema: DataSchemaRef) -> Vec<Self> {
pub fn from_schema(schema: &DataSchema) -> Vec<Self> {
schema
.fields()
.iter()
Expand Down Expand Up @@ -165,17 +171,34 @@ pub struct QueryResponse {
}

impl QueryResponse {
pub(crate) fn from_internal(
fn from_internal(
id: String,
r: HttpQueryResponseInternal,
HttpQueryResponseInternal {
data,
session_id,
session,
node_id,
result_timeout_secs,
state:
ResponseState {
has_result_set,
schema,
running_time_ms,
progresses,
state,
affect,
error,
warnings,
},
}: HttpQueryResponseInternal,
is_final: bool,
) -> impl IntoResponse {
let state = r.state.clone();
body_format: BodyFormat,
) -> Response {
let (data, next_uri) = if is_final {
(Arc::new(BlocksSerializer::empty()), None)
} else {
match state.state {
ExecuteStateKind::Running | ExecuteStateKind::Starting => match r.data {
match state {
ExecuteStateKind::Running | ExecuteStateKind::Starting => match data {
None => (
Arc::new(BlocksSerializer::empty()),
Some(make_state_uri(&id)),
Expand All @@ -192,7 +215,7 @@ impl QueryResponse {
Arc::new(BlocksSerializer::empty()),
Some(make_final_uri(&id)),
),
ExecuteStateKind::Succeeded => match r.data {
ExecuteStateKind::Succeeded => match data {
None => (
Arc::new(BlocksSerializer::empty()),
Some(make_final_uri(&id)),
Expand All @@ -208,39 +231,64 @@ impl QueryResponse {
}
};

if let Some(err) = &r.state.error {
if let Some(err) = &error {
metrics_incr_http_response_errors_count(err.name(), err.code());
}

let session_id = r.session_id.clone();
let stats = QueryStats {
progresses: state.progresses.clone(),
running_time_ms: state.running_time_ms,
};
let rows = data.num_rows();

Json(QueryResponse {
data,
state: state.state,
schema: state.schema.clone(),
session_id: Some(session_id),
node_id: r.node_id,
session: r.session,
stats,
affect: state.affect,
warnings: r.state.warnings,
let mut res = QueryResponse {
id: id.clone(),
session_id: Some(session_id),
node_id,
state,
session,
stats: QueryStats {
progresses,
running_time_ms,
},
schema: vec![],
data: Arc::new(BlocksSerializer::empty()),
affect,
warnings,
next_uri,
stats_uri: Some(make_state_uri(&id)),
final_uri: Some(make_final_uri(&id)),
kill_uri: Some(make_kill_uri(&id)),
error: r.state.error.map(QueryError::from_error_code),
has_result_set: r.state.has_result_set,
result_timeout_secs: Some(r.result_timeout_secs),
})
.with_header(HEADER_QUERY_ID, id.clone())
.with_header(HEADER_QUERY_STATE, state.state.to_string())
.with_header(HEADER_QUERY_PAGE_ROWS, rows)
error: error.map(QueryError::from_error_code),
has_result_set,
result_timeout_secs: Some(result_timeout_secs),
};

match body_format {
BodyFormat::Arrow if !schema.fields.is_empty() && !data.is_empty() => {
let buf: Result<_, ErrorCode> = try {
const META_KEY: &str = "response_header";
let json_res = serde_json::to_string(&res)?;
data.to_arrow_ipc(&schema, vec![(META_KEY.to_string(), json_res)])?
};

match buf {
Ok(buf) => Response::builder()
.header(HEADER_QUERY_ID, id)
.header(HEADER_QUERY_STATE, state.to_string())
.header(HEADER_QUERY_PAGE_ROWS, rows)
.content_type(body_format.content_type())
.body(buf),
Err(err) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(err.to_string()),
}
}
_ => {
res.data = data;
res.schema = QueryResponseField::from_schema(&schema);
Json(res)
.with_header(HEADER_QUERY_ID, id)
.with_header(HEADER_QUERY_STATE, state.to_string())
.with_header(HEADER_QUERY_PAGE_ROWS, rows)
.into_response()
}
}
}
}

Expand Down Expand Up @@ -307,6 +355,7 @@ impl StateResponse {
#[poem::handler]
async fn query_final_handler(
ctx: &HttpQueryContext,
body_format: BodyFormat,
Path(query_id): Path<String>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
Expand Down Expand Up @@ -335,7 +384,12 @@ async fn query_final_handler(
// it is safe to set these 2 fields to None, because client now check for null/None first.
response.session = None;
response.state.affect = None;
Ok(QueryResponse::from_internal(query_id, response, true))
Ok(QueryResponse::from_internal(
query_id,
response,
true,
body_format,
))
}
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
}
Expand Down Expand Up @@ -408,6 +462,7 @@ async fn query_state_handler(
#[poem::handler]
async fn query_page_handler(
ctx: &HttpQueryContext,
body_format: BodyFormat,
Path((query_id, page_no)): Path<(String, usize)>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
Expand Down Expand Up @@ -459,7 +514,12 @@ async fn query_page_handler(
query
.update_expire_time(false, resp.is_data_drained())
.await;
Ok(QueryResponse::from_internal(query_id, resp, false))
Ok(QueryResponse::from_internal(
query_id,
resp,
false,
body_format,
))
}
}
};
Expand All @@ -483,13 +543,12 @@ async fn query_page_handler(

#[poem::handler]
#[async_backtrace::framed]
#[fastrace::trace]
pub(crate) async fn query_handler(
ctx: &HttpQueryContext,
body_format: BodyFormat,
Json(mut req): Json<HttpQueryRequest>,
) -> PoemResult<impl IntoResponse> {
let session = ctx.session.clone();

let query_handle = async {
let agent_info = ctx
.user_agent
Expand Down Expand Up @@ -553,7 +612,10 @@ pub(crate) async fn query_handler(
query
.update_expire_time(false, resp.is_data_drained())
.await;
Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response())
Ok(
QueryResponse::from_internal(query.id.to_string(), resp, false, body_format)
.into_response(),
)
}
}
};
Expand Down Expand Up @@ -924,3 +986,52 @@ pub(crate) fn get_http_tracing_span(
Span::root(name, SpanContext::new(trace_id, SpanId(rand::random())))
.with_properties(|| ctx.to_fastrace_properties())
}

#[derive(Debug, Clone, Copy)]
enum BodyFormat {
Json,
Arrow,
}

impl Header for BodyFormat {
fn name() -> &'static http::HeaderName {
&http::header::ACCEPT
}

fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
where
Self: Sized,
I: Iterator<Item = &'i HeaderValue>,
{
if let Some(v) = values.next() {
match v.to_str() {
Ok(s) if s == BodyFormat::Arrow.content_type() => return Ok(BodyFormat::Arrow),
Err(_) => return Err(headers::Error::invalid()),
_ => {}
};
}
Ok(BodyFormat::Json)
}

fn encode<E: Extend<HeaderValue>>(&self, values: &mut E) {
values.extend([HeaderValue::from_static(self.content_type())]);
}
}

impl BodyFormat {
pub const fn content_type(&self) -> &'static str {
match self {
BodyFormat::Json => "application/json",
BodyFormat::Arrow => "application/vnd.apache.arrow.stream",
}
}
}

impl<'a> FromRequest<'a> for BodyFormat {
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> Result<Self, PoemError> {
Ok(req
.headers()
.typed_get::<Self>()
.unwrap_or(BodyFormat::Json))
}
}
28 changes: 28 additions & 0 deletions src/query/service/src/servers/http/v1/query/blocks_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
use std::cell::RefCell;
use std::ops::DerefMut;

use arrow_ipc::writer::IpcWriteOptions;
use arrow_ipc::writer::StreamWriter;
use arrow_ipc::CompressionType;
use arrow_ipc::MetadataVersion;
use databend_common_exception::Result;
use databend_common_expression::types::date::date_to_string;
use databend_common_expression::types::interval::interval_to_string;
use databend_common_expression::types::timestamp::timestamp_to_string;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_formats::field_encoder::FieldEncoderValues;
use databend_common_io::ewkb_to_geo;
use databend_common_io::geo_to_ewkb;
Expand Down Expand Up @@ -100,6 +106,28 @@ impl BlocksSerializer {
pub fn num_rows(&self) -> usize {
self.columns.iter().map(|(_, num_rows)| *num_rows).sum()
}

pub fn to_arrow_ipc(
&self,
data_schema: &DataSchema,
ext_meta: Vec<(String, String)>,
) -> Result<Vec<u8>> {
let mut schema = arrow_schema::Schema::from(data_schema);
schema.metadata.extend(ext_meta);

let mut buf = Vec::new();
let opts = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?
.try_with_compression(Some(CompressionType::LZ4_FRAME))?;
let mut writer = StreamWriter::try_new_with_options(&mut buf, &schema, opts)?;

for (block, _) in &self.columns {
let block = DataBlock::new_from_columns(block.clone());
let batch = block.to_record_batch_with_dataschema(data_schema)?;
writer.write(&batch)?;
}
writer.finish()?;
Ok(buf)
}
}

impl serde::Serialize for BlocksSerializer {
Expand Down
Loading
Loading