Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tracing::trace;

use crate::{
alerts::AggregateCondition,
parseable::PARSEABLE,
query::{TableScanVisitor, QUERY_SESSION},
rbac::{
map::SessionKey,
Expand Down Expand Up @@ -137,8 +138,9 @@ async fn execute_base_query(
AlertError::CustomError(format!("Table name not found in query- {}", original_query))
})?;

let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
query
.get_dataframe(stream_name)
.get_dataframe(time_partition.as_ref())
.await
.map_err(|err| AlertError::CustomError(err.to_string()))
}
Expand Down
12 changes: 7 additions & 5 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use ulid::Ulid;
pub mod alerts_utils;
pub mod target;

use crate::parseable::PARSEABLE;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::map::SessionKey;
use crate::storage;
Expand Down Expand Up @@ -514,17 +514,16 @@ impl AlertConfig {

// for now proceed in a similar fashion as we do in query
// TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
let stream_name = if let Some(stream_name) = query.first_table_name() {
stream_name
} else {
let Some(stream_name) = query.first_table_name() else {
return Err(AlertError::CustomError(format!(
"Table name not found in query- {}",
self.query
)));
};

let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
let base_df = query
.get_dataframe(stream_name)
.get_dataframe(time_partition.as_ref())
.await
.map_err(|err| AlertError::CustomError(err.to_string()))?;

Expand Down Expand Up @@ -704,6 +703,8 @@ pub enum AlertError {
CustomError(String),
#[error("Invalid State Change: {0}")]
InvalidStateChange(String),
#[error("{0}")]
StreamNotFound(#[from] StreamNotFound),
}

impl actix_web::ResponseError for AlertError {
Expand All @@ -717,6 +718,7 @@ impl actix_web::ResponseError for AlertError {
Self::DatafusionError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::CustomError(_) => StatusCode::BAD_REQUEST,
Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST,
Self::StreamNotFound(_) => StatusCode::NOT_FOUND,
}
}

Expand Down
12 changes: 4 additions & 8 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::handlers::http::query::{into_query, update_schema_when_distributed};
use crate::handlers::livetail::cross_origin_config;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::parseable::PARSEABLE;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::query::{execute, TableScanVisitor, QUERY_SESSION};
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
Expand Down Expand Up @@ -216,13 +216,9 @@ impl FlightService for AirServiceImpl {
})?;
let time = Instant::now();

let stream_name_clone = stream_name.clone();
let (records, _) =
match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await {
Ok(Ok((records, fields))) => (records, fields),
Ok(Err(e)) => return Err(Status::internal(e.to_string())),
Err(err) => return Err(Status::internal(err.to_string())),
};
let (records, _) = execute(query, &stream_name)
.await
.map_err(|err| Status::internal(err.to_string()))?;

/*
* INFO: No returning the schema with the data.
Expand Down
21 changes: 6 additions & 15 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
use arrow_array::RecordBatch;
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
Expand All @@ -40,9 +39,9 @@ use crate::handlers::http::fetch_schema;
use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::Mode;
use crate::parseable::PARSEABLE;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::error::ExecuteError;
use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand Down Expand Up @@ -131,7 +130,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons

return Ok(HttpResponse::Ok().json(response));
}
let (records, fields) = execute_query(query, table_name.clone()).await?;

let (records, fields) = execute(query, &table_name).await?;

let response = QueryResponse {
records,
Expand All @@ -150,17 +150,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
Ok(response)
}

async fn execute_query(
query: LogicalQuery,
stream_name: String,
) -> Result<(Vec<RecordBatch>, Vec<String>), QueryError> {
match tokio::task::spawn_blocking(move || query.execute(stream_name)).await {
Ok(Ok(result)) => Ok(result),
Ok(Err(e)) => Err(QueryError::Execute(e)),
Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))),
}
}

pub async fn get_counts(
req: HttpRequest,
counts_request: Json<CountsRequest>,
Expand Down Expand Up @@ -330,6 +319,8 @@ Description: {0}"#
ActixError(#[from] actix_web::Error),
#[error("Error: {0}")]
Anyhow(#[from] anyhow::Error),
#[error("Error: {0}")]
StreamNotFound(#[from] StreamNotFound),
}

impl actix_web::ResponseError for QueryError {
Expand Down
2 changes: 1 addition & 1 deletion src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode};
use once_cell::sync::Lazy;
pub use staging::StagingError;
use streams::StreamRef;
pub use streams::{StreamNotFound, Streams};
pub use streams::{Stream, StreamNotFound, Streams};
use tracing::error;

#[cfg(feature = "kafka")]
Expand Down
43 changes: 29 additions & 14 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use std::ops::Bound;
use std::sync::Arc;
use stream_schema_provider::collect_manifest_files;
use sysinfo::System;
use tokio::runtime::Runtime;

use self::error::ExecuteError;
use self::stream_schema_provider::GlobalSchemaProvider;
Expand All @@ -60,6 +61,21 @@ use crate::utils::time::TimeRange;
pub static QUERY_SESSION: Lazy<SessionContext> =
Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));

/// Dedicated multi-threaded runtime to run
pub static QUERY_RUNTIME: Lazy<Runtime> =
Lazy::new(|| Runtime::new().expect("Runtime should be constructible"));

pub async fn execute(
query: Query,
stream_name: &str,
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
QUERY_RUNTIME
.spawn(async move { query.execute(time_partition.as_ref()).await })
.await
.expect("The Join should have been successful")
}

// A query request by client
#[derive(Debug)]
pub struct Query {
Expand Down Expand Up @@ -129,15 +145,12 @@ impl Query {
SessionContext::new_with_state(state)
}

#[tokio::main(flavor = "multi_thread")]
pub async fn execute(
&self,
stream_name: String,
time_partition: Option<&String>,
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();

let df = QUERY_SESSION
.execute_logical_plan(self.final_logical_plan(&time_partition))
.execute_logical_plan(self.final_logical_plan(time_partition))
.await?;

let fields = df
Expand All @@ -153,21 +166,23 @@ impl Query {
}

let results = df.collect().await?;

Ok((results, fields))
}

pub async fn get_dataframe(&self, stream_name: String) -> Result<DataFrame, ExecuteError> {
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();

pub async fn get_dataframe(
&self,
time_partition: Option<&String>,
) -> Result<DataFrame, ExecuteError> {
let df = QUERY_SESSION
.execute_logical_plan(self.final_logical_plan(&time_partition))
.execute_logical_plan(self.final_logical_plan(time_partition))
.await?;

Ok(df)
}

/// return logical plan with all time filters applied through
fn final_logical_plan(&self, time_partition: &Option<String>) -> LogicalPlan {
fn final_logical_plan(&self, time_partition: Option<&String>) -> LogicalPlan {
// see https://github.com/apache/arrow-datafusion/pull/8400
// this can be eliminated in later version of datafusion but with slight caveat
// transform cannot modify stringified plans by itself
Expand Down Expand Up @@ -487,7 +502,7 @@ fn transform(
plan: LogicalPlan,
start_time: NaiveDateTime,
end_time: NaiveDateTime,
time_partition: &Option<String>,
time_partition: Option<&String>,
) -> Transformed<LogicalPlan> {
plan.transform(&|plan| match plan {
LogicalPlan::TableScan(table) => {
Expand Down Expand Up @@ -545,7 +560,7 @@ fn transform(

fn table_contains_any_time_filters(
table: &datafusion::logical_expr::TableScan,
time_partition: &Option<String>,
time_partition: Option<&String>,
) -> bool {
table
.filters
Expand All @@ -559,8 +574,8 @@ fn table_contains_any_time_filters(
})
.any(|expr| {
matches!(&*expr.left, Expr::Column(Column { name, .. })
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
(!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY)))
if (time_partition.is_some_and(|field| field == name) ||
(time_partition.is_none() && name == event::DEFAULT_TIMESTAMP_KEY)))
})
}

Expand Down
Loading