@@ -29,25 +29,19 @@ use crate::event::error::EventError;
2929use crate :: event:: format:: known_schema:: { self , KNOWN_SCHEMA_LIST } ;
3030use crate :: event:: format:: { self , EventFormat , LogSource , LogSourceEntry } ;
3131use crate :: event:: { self , FORMAT_KEY , USER_AGENT_KEY } ;
32- use crate :: handlers:: http:: MAX_EVENT_PAYLOAD_SIZE ;
33- use crate :: handlers:: http:: modal:: utils:: ingest_utils:: push_logs;
3432use crate :: handlers:: {
3533 CONTENT_TYPE_JSON , CONTENT_TYPE_PROTOBUF , EXTRACT_LOG_KEY , LOG_SOURCE_KEY ,
3634 STREAM_NAME_HEADER_KEY , TELEMETRY_TYPE_KEY , TelemetryType ,
3735} ;
3836use crate :: metadata:: SchemaVersion ;
3937use crate :: option:: Mode ;
40- use crate :: otel:: logs:: { OTEL_LOG_KNOWN_FIELD_LIST , flatten_otel_protobuf } ;
41- use crate :: otel:: metrics:: { OTEL_METRICS_KNOWN_FIELD_LIST , flatten_otel_metrics_protobuf } ;
42- use crate :: otel:: traces:: { OTEL_TRACES_KNOWN_FIELD_LIST , flatten_otel_traces_protobuf } ;
38+ use crate :: otel:: logs:: OTEL_LOG_KNOWN_FIELD_LIST ;
39+ use crate :: otel:: metrics:: OTEL_METRICS_KNOWN_FIELD_LIST ;
40+ use crate :: otel:: traces:: OTEL_TRACES_KNOWN_FIELD_LIST ;
4341use crate :: parseable:: { PARSEABLE , StreamNotFound } ;
4442use crate :: storage:: { ObjectStorageError , StreamType } ;
4543use crate :: utils:: header_parsing:: ParseHeaderError ;
4644use crate :: utils:: json:: { flatten:: JsonFlattenError , strict:: StrictValue } ;
47- use opentelemetry_proto:: tonic:: collector:: logs:: v1:: ExportLogsServiceRequest ;
48- use opentelemetry_proto:: tonic:: collector:: metrics:: v1:: ExportMetricsServiceRequest ;
49- use opentelemetry_proto:: tonic:: collector:: trace:: v1:: ExportTraceServiceRequest ;
50- use prost:: Message ;
5145
5246use super :: logstream:: error:: { CreateStreamError , StreamError } ;
5347use super :: modal:: utils:: ingest_utils:: { flatten_and_push_logs, get_custom_fields_from_header} ;
@@ -169,7 +163,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
169163}
170164
171165// Common validation and setup for OTEL ingestion
172- async fn setup_otel_stream (
166+ pub async fn setup_otel_stream (
173167 req : & HttpRequest ,
174168 expected_log_source : LogSource ,
175169 known_fields : & [ & str ] ,
@@ -241,18 +235,12 @@ async fn setup_otel_stream(
241235}
242236
243237// Common content processing for OTEL ingestion
244- async fn process_otel_content < T , F > (
238+ async fn process_otel_content (
245239 req : & HttpRequest ,
246240 body : web:: Bytes ,
247241 stream_name : & str ,
248242 log_source : & LogSource ,
249- decode_protobuf : F ,
250- flatten_protobuf : fn ( & T ) -> Vec < serde_json:: Value > ,
251- ) -> Result < ( ) , PostError >
252- where
253- T : prost:: Message + Default ,
254- F : FnOnce ( web:: Bytes ) -> Result < T , prost:: DecodeError > ,
255- {
243+ ) -> Result < ( ) , PostError > {
256244 let p_custom_fields = get_custom_fields_from_header ( req) ;
257245
258246 match req
@@ -270,27 +258,9 @@ where
270258 )
271259 . await ?;
272260 } else if content_type == CONTENT_TYPE_PROTOBUF {
273- // 10MB limit
274- if body. len ( ) > MAX_EVENT_PAYLOAD_SIZE {
275- return Err ( PostError :: Invalid ( anyhow:: anyhow!(
276- "Protobuf message size {} exceeds maximum allowed size of {} bytes" ,
277- body. len( ) ,
278- MAX_EVENT_PAYLOAD_SIZE
279- ) ) ) ;
280- }
281- match decode_protobuf ( body) {
282- Ok ( decoded) => {
283- for record in flatten_protobuf ( & decoded) {
284- push_logs ( stream_name, record, log_source, & p_custom_fields) . await ?;
285- }
286- }
287- Err ( e) => {
288- return Err ( PostError :: Invalid ( anyhow:: anyhow!(
289- "Failed to decode protobuf message: {}" ,
290- e
291- ) ) ) ;
292- }
293- }
261+ return Err ( PostError :: Invalid ( anyhow:: anyhow!(
262+ "Protobuf ingestion is not supported in Parseable OSS"
263+ ) ) ) ;
294264 } else {
295265 return Err ( PostError :: Invalid ( anyhow:: anyhow!(
296266 "Unsupported Content-Type: {}. Expected application/json or application/x-protobuf" ,
@@ -323,15 +293,7 @@ pub async fn handle_otel_logs_ingestion(
323293 )
324294 . await ?;
325295
326- process_otel_content (
327- & req,
328- body,
329- & stream_name,
330- & log_source,
331- ExportLogsServiceRequest :: decode,
332- flatten_otel_protobuf,
333- )
334- . await ?;
296+ process_otel_content ( & req, body, & stream_name, & log_source) . await ?;
335297
336298 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
337299}
@@ -351,15 +313,7 @@ pub async fn handle_otel_metrics_ingestion(
351313 )
352314 . await ?;
353315
354- process_otel_content (
355- & req,
356- body,
357- & stream_name,
358- & log_source,
359- ExportMetricsServiceRequest :: decode,
360- flatten_otel_metrics_protobuf,
361- )
362- . await ?;
316+ process_otel_content ( & req, body, & stream_name, & log_source) . await ?;
363317
364318 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
365319}
@@ -379,15 +333,7 @@ pub async fn handle_otel_traces_ingestion(
379333 )
380334 . await ?;
381335
382- process_otel_content (
383- & req,
384- body,
385- & stream_name,
386- & log_source,
387- ExportTraceServiceRequest :: decode,
388- flatten_otel_traces_protobuf,
389- )
390- . await ?;
336+ process_otel_content ( & req, body, & stream_name, & log_source) . await ?;
391337
392338 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
393339}
0 commit comments