Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,53 @@ impl ObjectStorage for BlobStore {
Ok(streams)
}

async fn list_hours(
&self,
stream_name: &str,
date: &str,
) -> Result<Vec<String>, ObjectStorageError> {
let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date));
let resp = self.client.list_with_delimiter(Some(&pre)).await?;

let hours = resp
.common_prefixes
.iter()
.filter_map(|path| {
path.as_ref()
.strip_prefix(&format!("{}/{}/", stream_name, date))
.and_then(|s| s.strip_suffix('/'))
.map(String::from)
})
.filter(|dir| dir.starts_with("hour="))
.collect();

Ok(hours)
}

async fn list_minutes(
&self,
stream_name: &str,
date: &str,
hour: &str,
) -> Result<Vec<String>, ObjectStorageError> {
let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour));
let resp = self.client.list_with_delimiter(Some(&pre)).await?;

let minutes = resp
.common_prefixes
.iter()
.filter_map(|path| {
path.as_ref()
.strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour))
.and_then(|s| s.strip_suffix('/'))
.map(String::from)
})
.filter(|dir| dir.starts_with("minute="))
.collect();

Ok(minutes)
}

async fn list_manifest_files(
&self,
stream_name: &str,
Expand Down
47 changes: 47 additions & 0 deletions src/storage/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,53 @@ impl ObjectStorage for Gcs {
Ok(streams)
}

async fn list_hours(
&self,
stream_name: &str,
date: &str,
) -> Result<Vec<String>, ObjectStorageError> {
let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date));
let resp = self.client.list_with_delimiter(Some(&pre)).await?;

let hours = resp
.common_prefixes
.iter()
.filter_map(|path| {
path.as_ref()
.strip_prefix(&format!("{}/{}/", stream_name, date))
.and_then(|s| s.strip_suffix('/'))
.map(String::from)
})
.filter(|dir| dir.starts_with("hour="))
.collect();

Ok(hours)
}

async fn list_minutes(
&self,
stream_name: &str,
date: &str,
hour: &str,
) -> Result<Vec<String>, ObjectStorageError> {
let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour));
let resp = self.client.list_with_delimiter(Some(&pre)).await?;

let minutes = resp
.common_prefixes
.iter()
.filter_map(|path| {
path.as_ref()
.strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour))
.and_then(|s| s.strip_suffix('/'))
.map(String::from)
})
.filter(|dir| dir.starts_with("minute="))
.collect();

Ok(minutes)
}

async fn list_manifest_files(
&self,
stream_name: &str,
Expand Down
37 changes: 37 additions & 0 deletions src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,43 @@ impl ObjectStorage for LocalFS {
Ok(dates.into_iter().flatten().collect())
}

async fn list_hours(
&self,
stream_name: &str,
date: &str,
) -> Result<Vec<String>, ObjectStorageError> {
let path = self.root.join(stream_name).join(date);
let directories = ReadDirStream::new(fs::read_dir(&path).await?);
let entries: Vec<DirEntry> = directories.try_collect().await?;
let entries = entries.into_iter().map(dir_name);
let hours: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?;
Ok(hours
.into_iter()
.flatten()
.filter(|dir| dir.starts_with("hour="))
.collect())
}

async fn list_minutes(
&self,
stream_name: &str,
date: &str,
hour: &str,
) -> Result<Vec<String>, ObjectStorageError> {
let path = self.root.join(stream_name).join(date).join(hour);
// Propagate any read_dir errors instead of swallowing them
let directories = ReadDirStream::new(fs::read_dir(&path).await?);
let entries: Vec<DirEntry> = directories.try_collect().await?;
let entries = entries.into_iter().map(dir_name);
let minutes: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?;
// Filter down to only the "minute=" prefixed directories
Ok(minutes
.into_iter()
.flatten()
.filter(|dir| dir.starts_with("minute="))
.collect())
}

async fn list_manifest_files(
&self,
_stream_name: &str,
Expand Down
Loading
Loading