Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ use datafusion::{
logical_expr::{BinaryExpr, Literal, Operator},
prelude::{col, lit, DataFrame, Expr},
};
use tracing::trace;
use tokio::task::JoinSet;
use tracing::{trace, warn};

use crate::{
alerts::LogicalOperator, parseable::PARSEABLE, query::QUERY_SESSION, utils::time::TimeRange,
alerts::LogicalOperator,
handlers::http::query::update_schema_when_distributed,
parseable::PARSEABLE,
query::{resolve_stream_names, QUERY_SESSION},
utils::time::TimeRange,
};

use super::{
Expand Down Expand Up @@ -71,11 +76,37 @@ async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, Alert

let session_state = QUERY_SESSION.state();
let select_query = alert.get_base_query();
let raw_logical_plan = session_state.create_logical_plan(&select_query).await?;

let time_range = TimeRange::parse_human_time(start_time, end_time)
.map_err(|err| AlertError::CustomError(err.to_string()))?;

let streams = resolve_stream_names(&select_query)?;
let raw_logical_plan = match session_state.create_logical_plan(&select_query).await {
Ok(plan) => plan,
Err(_) => {
let mut join_set = JoinSet::new();
for stream_name in streams {
let stream_name = stream_name.clone();
join_set.spawn(async move {
let result = PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await;

if let Err(e) = &result {
warn!("Failed to create stream '{}': {}", stream_name, e);
}

(stream_name, result)
});
}

while let Some(result) = join_set.join_next().await {
if let Err(join_error) = result {
warn!("Task join error: {}", join_error);
}
}
session_state.create_logical_plan(&select_query).await?
}
};
Ok(crate::query::Query {
raw_logical_plan,
time_range,
Expand All @@ -87,11 +118,18 @@ async fn execute_base_query(
query: &crate::query::Query,
original_query: &str,
) -> Result<DataFrame, AlertError> {
let stream_name = query.first_table_name().ok_or_else(|| {
let streams = resolve_stream_names(original_query)?;
let stream_name = streams.first().ok_or_else(|| {
AlertError::CustomError(format!("Table name not found in query- {original_query}"))
})?;

let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
update_schema_when_distributed(&streams)
.await
.map_err(|err| {
AlertError::CustomError(format!(
"Failed to update schema for distributed streams: {err}"
))
})?;
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
query
.get_dataframe(time_partition.as_ref())
.await
Expand Down
4 changes: 4 additions & 0 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use actix_web::http::header::ContentType;
use async_trait::async_trait;
use chrono::Utc;
use datafusion::sql::sqlparser::parser::ParserError;
use derive_more::derive::FromStr;
use derive_more::FromStrError;
use http::StatusCode;
Expand Down Expand Up @@ -860,6 +861,8 @@ pub enum AlertError {
InvalidTargetModification(String),
#[error("Can't delete a Target which is being used")]
TargetInUse,
#[error("{0}")]
ParserError(#[from] ParserError),
}

impl actix_web::ResponseError for AlertError {
Expand All @@ -880,6 +883,7 @@ impl actix_web::ResponseError for AlertError {
Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST,
Self::InvalidTargetModification(_) => StatusCode::BAD_REQUEST,
Self::TargetInUse => StatusCode::CONFLICT,
Self::ParserError(_) => StatusCode::BAD_REQUEST,
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Correlations {
.iter()
.map(|t| t.table_name.clone())
.collect_vec();
if user_auth_for_datasets(&permissions, tables).is_ok() {
if user_auth_for_datasets(&permissions, tables).await.is_ok() {
user_correlations.push(correlation.clone());
}
}
Expand Down Expand Up @@ -281,7 +281,7 @@ impl CorrelationConfig {
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_datasets(&permissions, tables)?;
user_auth_for_datasets(&permissions, tables).await?;

// to validate table config, we need to check whether the mentioned fields
// are present in the table or not
Expand Down
38 changes: 12 additions & 26 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use arrow_array::RecordBatch;
use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::PollInfo;
use arrow_schema::ArrowError;

use datafusion::common::tree_node::TreeNode;
use serde_json::json;
use std::net::SocketAddr;
use std::time::Instant;
Expand All @@ -35,11 +33,11 @@ use tonic_web::GrpcWebLayer;

use crate::handlers::http::cluster::get_node_info;
use crate::handlers::http::modal::{NodeMetadata, NodeType};
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
use crate::handlers::http::query::into_query;
use crate::handlers::livetail::cross_origin_config;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::parseable::PARSEABLE;
use crate::query::{execute, TableScanVisitor, QUERY_SESSION};
use crate::query::{execute, resolve_stream_names, QUERY_SESSION};
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
Expand Down Expand Up @@ -131,40 +129,26 @@ impl FlightService for AirServiceImpl {

let ticket =
get_query_from_ticket(&req).map_err(|e| Status::invalid_argument(e.to_string()))?;

let streams = resolve_stream_names(&ticket.query).map_err(|e| {
error!("Failed to extract table names from SQL: {}", e);
Status::invalid_argument("Invalid SQL query syntax")
})?;
info!("query requested to airplane: {:?}", ticket);

// get the query session_state
let session_state = QUERY_SESSION.state();

// get the logical plan and extract the table name
let raw_logical_plan = session_state
.create_logical_plan(&ticket.query)
.await
.map_err(|err| {
error!("Datafusion Error: Failed to create logical plan: {}", err);
Status::internal("Failed to create logical plan")
})?;

let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time)
.map_err(|e| Status::internal(e.to_string()))?;
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let streams = visitor.into_inner();

let stream_name = streams
.first()
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
.to_owned();

update_schema_when_distributed(&streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;

// map payload to query
let query = into_query(&ticket, &session_state, time_range)
let query = into_query(&ticket, &session_state, time_range, &streams)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

Expand Down Expand Up @@ -214,9 +198,11 @@ impl FlightService for AirServiceImpl {

let permissions = Users.get_permissions(&key);

user_auth_for_datasets(&permissions, &streams).map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;
user_auth_for_datasets(&permissions, &streams)
.await
.map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();

let (records, _) = execute(query, &stream_name, false)
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub async fn get(
.map(|t| t.table_name.clone())
.collect_vec();

user_auth_for_datasets(&permissions, tables)?;
user_auth_for_datasets(&permissions, tables).await?;

Ok(web::Json(correlation))
}
Expand Down
Loading
Loading