Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,11 @@ fn into_event_batch(
pub async fn create_stream_if_not_exists(
stream_name: &str,
stream_type: &str,
) -> Result<(), PostError> {
) -> Result<bool, PostError> {
let mut stream_exists = false;
if STREAM_INFO.stream_exists(stream_name) {
return Ok(());
stream_exists = true;
return Ok(stream_exists);
}
match &CONFIG.parseable.mode {
Mode::All | Mode::Query => {
Expand Down Expand Up @@ -459,7 +461,7 @@ pub async fn create_stream_if_not_exists(
.map_err(|_| PostError::StreamNotFound(stream_name.to_owned()))?;
}
}
Ok(())
Ok(stream_exists)
}

#[derive(Debug, thiserror::Error)]
Expand Down
13 changes: 8 additions & 5 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,8 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let ingestor_stats = if CONFIG.parseable.mode == Mode::Query
&& STREAM_INFO.stream_type(&stream_name).unwrap() == StreamType::UserDefined.to_string()
&& STREAM_INFO.stream_type(&stream_name).unwrap()
== Some(StreamType::UserDefined.to_string())
{
Some(fetch_stats_from_ingestors(&stream_name).await?)
} else {
Expand Down Expand Up @@ -957,7 +958,7 @@ pub async fn put_stream_hot_tier(
return Err(StreamError::StreamNotFound(stream_name));
}

if STREAM_INFO.stream_type(&stream_name).unwrap() == StreamType::Internal.to_string() {
if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
return Err(StreamError::Custom {
msg: "Hot tier can not be updated for internal stream".to_string(),
status: StatusCode::BAD_REQUEST,
Expand Down Expand Up @@ -1060,10 +1061,12 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
}

pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
if create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string())
.await
.is_ok()
if let Ok(stream_exists) =
create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await
{
if stream_exists {
return Ok(());
}
let mut header_map = HeaderMap::new();
header_map.insert(
HeaderName::from_str(STREAM_TYPE_KEY).unwrap(),
Expand Down
46 changes: 22 additions & 24 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct LogStreamMetadata {
pub custom_partition: Option<String>,
pub static_schema_flag: Option<String>,
pub hot_tier_enabled: Option<bool>,
pub stream_type: String,
pub stream_type: Option<String>,
}

// It is very unlikely that panic will occur when dealing with metadata.
Expand Down Expand Up @@ -303,7 +303,7 @@ impl StreamInfo {
} else {
static_schema
},
stream_type: stream_type.to_string(),
stream_type: Some(stream_type.to_string()),
..Default::default()
};
map.insert(stream_name, metadata);
Expand Down Expand Up @@ -365,12 +365,12 @@ impl StreamInfo {
self.read()
.expect(LOCK_EXPECT)
.iter()
.filter(|(_, v)| v.stream_type == StreamType::Internal.to_string())
.filter(|(_, v)| v.stream_type.clone().unwrap() == StreamType::Internal.to_string())
.map(|(k, _)| k.clone())
.collect()
}

pub fn stream_type(&self, stream_name: &str) -> Result<String, MetadataError> {
pub fn stream_type(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
map.get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
Expand Down Expand Up @@ -431,26 +431,24 @@ pub async fn update_data_type_time_partition(
let mut schema = schema.clone();
if meta.time_partition.is_some() {
let time_partition = meta.time_partition.unwrap();
let time_partition_data_type = schema
.field_with_name(&time_partition)
.unwrap()
.data_type()
.clone();
if time_partition_data_type != DataType::Timestamp(TimeUnit::Millisecond, None) {
let mut fields = schema
.fields()
.iter()
.filter(|field| *field.name() != time_partition)
.cloned()
.collect::<Vec<Arc<Field>>>();
let time_partition_field = Arc::new(Field::new(
time_partition,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
));
fields.push(time_partition_field);
schema = Schema::new(fields);
storage.put_schema(stream_name, &schema).await?;
if let Ok(time_partition_field) = schema.field_with_name(&time_partition) {
if time_partition_field.data_type() != &DataType::Timestamp(TimeUnit::Millisecond, None)
{
let mut fields = schema
.fields()
.iter()
.filter(|field| *field.name() != time_partition)
.cloned()
.collect::<Vec<Arc<Field>>>();
let time_partition_field = Arc::new(Field::new(
time_partition,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
));
fields.push(time_partition_field);
schema = Schema::new(fields);
storage.put_schema(stream_name, &schema).await?;
}
}
}
Ok(schema)
Expand Down
6 changes: 3 additions & 3 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub struct ObjectStoreFormat {
pub static_schema_flag: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub hot_tier_enabled: Option<bool>,
pub stream_type: String,
pub stream_type: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
Expand All @@ -122,7 +122,7 @@ pub struct StreamInfo {
pub custom_partition: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub static_schema_flag: Option<String>,
pub stream_type: String,
pub stream_type: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
Expand Down Expand Up @@ -175,7 +175,7 @@ impl Default for ObjectStoreFormat {
Self {
version: CURRENT_SCHEMA_VERSION.to_string(),
objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(),
stream_type: StreamType::UserDefined.to_string(),
stream_type: Some(StreamType::UserDefined.to_string()),
created_at: Local::now().to_rfc3339(),
first_event_at: None,
owner: Owner::new("".to_string(), "".to_string()),
Expand Down
2 changes: 1 addition & 1 deletion server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ pub trait ObjectStorage: Sync + 'static {
let permission = Permisssion::new(CONFIG.parseable.username.clone());
format.permissions = vec![permission];
format.created_at = Local::now().to_rfc3339();
format.stream_type = stream_type.to_string();
format.stream_type = Some(stream_type.to_string());
if time_partition.is_empty() {
format.time_partition = None;
} else {
Expand Down