Skip to content
5 changes: 5 additions & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const AUTHORIZATION_KEY: &str = "authorization";
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const SEPARATOR: char = '^';

const OIDC_SCOPE: &str = "openid profile email";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
const USER_COOKIE_NAME: &str = "username";

// constants for Log Source values for known sources
const LOG_SOURCE_KINESIS: &str = "kinesis";
const LOG_SOURCE_OTEL: &str = "otel";
1 change: 1 addition & 0 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use self::middleware::{DisAllowRootUser, RouteExt};
mod about;
mod health_check;
mod ingest;
mod kinesis;
mod llm;
mod logstream;
mod middleware;
Expand Down
47 changes: 39 additions & 8 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,25 @@
*
*/

use std::collections::HashMap;
use std::sync::Arc;

use actix_web::http::header::ContentType;
use actix_web::{HttpRequest, HttpResponse};
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
use arrow_schema::Field;
use bytes::Bytes;
use http::StatusCode;
use serde_json::Value;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use crate::event::error::EventError;
use crate::event::format::EventFormat;
use crate::event::{self, format};
use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY};
use crate::handlers::{
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
STREAM_NAME_HEADER_KEY,
};
use crate::metadata::STREAM_INFO;
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};

use super::kinesis;
use super::logstream::error::CreateStreamError;

// Handler for POST /api/v1/ingest
Expand All @@ -46,19 +48,48 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name).await?;
push_logs(stream_name, req, body).await?;

flatten_and_push_logs(req, body, stream_name).await?;
Ok(HttpResponse::Ok().finish())
} else {
Err(PostError::Header(ParseHeaderError::MissingStreamName))
}
}

async fn flatten_and_push_logs(
req: HttpRequest,
body: Bytes,
stream_name: String,
) -> Result<(), PostError> {
//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
let log_source: String = log_source.to_str().unwrap().to_owned();
match log_source.as_str() {
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
LOG_SOURCE_OTEL => {}
_ => {
log::warn!("Unknown log source: {}", log_source);
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
}
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
push_logs(stream_name.to_string(), req, body).await?;
}
Ok(())
}

// Handler for POST /api/v1/logstream/{logstream}
// only ingests events into the specified logstream
// fails if the logstream does not exist
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
push_logs(stream_name, req, body).await?;

flatten_and_push_logs(req, body, stream_name).await?;
Ok(HttpResponse::Ok().finish())
}

Expand Down
66 changes: 66 additions & 0 deletions server/src/handlers/http/kinesis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use base64::{engine::general_purpose::STANDARD, Engine as _};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::BTreeMap;
use std::str;

#[derive(Serialize, Deserialize, Debug)]
struct Message {
#[serde(rename = "records")]
records: Vec<Data>,
#[serde(rename = "requestId")]
request_id: String,
timestamp: u64,
}
#[derive(Serialize, Deserialize, Debug)]
struct Data {
#[serde(rename = "data")]
data: String,
}

pub fn flatten_kinesis_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
let body_str = std::str::from_utf8(body).unwrap();
let message: Message = serde_json::from_str(body_str).unwrap();
let mut vec_kinesis_json: Vec<BTreeMap<String, Value>> = Vec::new();

for record in message.records.iter() {
let bytes = STANDARD.decode(record.data.clone()).unwrap();
let json_string: String = String::from_utf8(bytes).unwrap();
let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
let mut kinesis_json: BTreeMap<String, Value> = match serde_json::from_value(json) {
Ok(value) => value,
Err(error) => panic!("Failed to deserialize JSON: {}", error),
};

kinesis_json.insert(
"requestId".to_owned(),
Value::String(message.request_id.clone()),
);
kinesis_json.insert(
"timestamp".to_owned(),
Value::String(message.timestamp.to_string()),
);

vec_kinesis_json.push(kinesis_json);
}
vec_kinesis_json
}
9 changes: 8 additions & 1 deletion server/src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use actix_web::{
};
use futures_util::future::LocalBoxFuture;

use crate::handlers::{AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, STREAM_NAME_HEADER_KEY};
use crate::handlers::{
AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS,
STREAM_NAME_HEADER_KEY,
};
use crate::{
option::CONFIG,
rbac::Users,
Expand Down Expand Up @@ -149,6 +152,10 @@ where
header::HeaderValue::from_str(&message.common_attributes.x_p_stream.clone())
.unwrap(),
);
req.headers_mut().insert(
HeaderName::from_static(LOG_SOURCE_KEY),
header::HeaderValue::from_static(LOG_SOURCE_KINESIS),
);
}

/* ## Section end */
Expand Down