Skip to content
Prev Previous commit
Next Next commit
remove redundancy
  • Loading branch information
nikhilsinhaparseable committed Aug 18, 2025
commit 1f0c72651a6459b1921737a8589ee5638485b67c
16 changes: 2 additions & 14 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,12 @@ pub async fn ingest(

//if stream exists, fetch the stream log source
//return error if the stream log source is otel traces or otel metrics
let stream = validate_stream_for_ingestion(&stream_name)?;
validate_stream_for_ingestion(&stream_name)?;

PARSEABLE
.add_update_log_source(&stream_name, log_source_entry)
.await?;

if stream.get_time_partition().is_some() {
return Err(PostError::CustomError(
"Ingestion is not allowed to stream with time partition".to_string(),
));
}

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -397,13 +391,7 @@ pub async fn post_event(

//if stream exists, fetch the stream log source
//return error if the stream log source is otel traces or otel metrics
let stream = validate_stream_for_ingestion(&stream_name)?;

if stream.get_time_partition().is_some() {
return Err(PostError::Invalid(anyhow::anyhow!(
"Ingestion is not allowed to stream with time partition. Please upgrade to Parseable enterprise to enable this feature"
)));
}
validate_stream_for_ingestion(&stream_name)?;

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;

Expand Down
14 changes: 7 additions & 7 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use opentelemetry_proto::tonic::{
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
};
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;
use tracing::warn;

use crate::{
Expand All @@ -39,7 +39,7 @@ use crate::{
},
},
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
parseable::{PARSEABLE, Stream},
parseable::PARSEABLE,
storage::StreamType,
utils::json::{convert_array_to_object, flatten::convert_to_array},
};
Expand Down Expand Up @@ -268,7 +268,7 @@ fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> {
Ok(())
}

pub fn validate_stream_for_ingestion(stream_name: &str) -> Result<Arc<Stream>, PostError> {
pub fn validate_stream_for_ingestion(stream_name: &str) -> Result<(), PostError> {
let stream = PARSEABLE.get_stream(stream_name)?;

// Validate that the stream's log source is compatible
Expand All @@ -283,12 +283,12 @@ pub fn validate_stream_for_ingestion(stream_name: &str) -> Result<Arc<Stream>, P

// Check for time partition
if stream.get_time_partition().is_some() {
return Err(PostError::CustomError(
"Ingestion is not allowed to stream with time partition".to_string(),
));
return Err(PostError::Invalid(anyhow::anyhow!(
"Ingestion with time partition is not supported in Parseable OSS"
)));
}

Ok(stream)
Ok(())
}

#[cfg(test)]
Expand Down
Loading