Skip to content
176 changes: 96 additions & 80 deletions src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,99 +110,115 @@ pub async fn resolve_parseable_metadata(
.as_ref()
.map(|meta| serde_json::from_slice(meta).expect("parseable config is valid json"));

// Env Change needs to be updated
let check = determine_environment(staging_metadata, remote_metadata);
// flags for if metadata needs to be synced
let mut overwrite_staging = false;
let mut overwrite_remote = false;

let res = match check {
EnvChange::None(metadata) => {
// overwrite staging anyways so that it matches remote in case of any divergence
overwrite_staging = true;
if PARSEABLE.options.mode == Mode::All {
metadata.server_mode.standalone_after_distributed()?;
}
Ok(metadata)
},
EnvChange::NewRemote => {
Err("Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server")
}
EnvChange::NewStaging(mut metadata) => {
let env_change = determine_environment(staging_metadata, remote_metadata);

// if server is started in ingest mode,we need to make sure that query mode has been started
// i.e the metadata is updated to reflect the server mode = Query
if metadata.server_mode== Mode::All && PARSEABLE.options.mode == Mode::Ingest {
Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet")
} else {
create_dir_all(PARSEABLE.options.staging_dir())?;
metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?;
// this flag is set to true so that metadata is copied to staging
overwrite_staging = true;
// overwrite remote in all and query mode
// because staging dir has changed.
match PARSEABLE.options.mode {
Mode::All => {
metadata.server_mode.standalone_after_distributed()
.map_err(|err| {
ObjectStorageError::Custom(err.to_string())
})?;
overwrite_remote = true;
},
Mode::Query | Mode::Prism => {
overwrite_remote = true;
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
},
Mode::Ingest => {
// if ingest server is started fetch the metadata from remote
// update the server mode for local metadata
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
},
Mode::Index => {
// if index server is started fetch the metadata from remote
// update the server mode for local metadata
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
}
}
Ok(metadata)
}
}
EnvChange::CreateBoth => {
create_dir_all(PARSEABLE.options.staging_dir())?;
let metadata = StorageMetadata::default();
// new metadata needs to be set
// if mode is query or all then both staging and remote
match PARSEABLE.options.mode {
Mode::All | Mode::Query | Mode::Prism => overwrite_remote = true,
_ => (),
}
// else only staging
overwrite_staging = true;
Ok(metadata)
}
};

let mut metadata = res.map_err(|err| {
let err = format!("{}. {}", err, JOIN_COMMUNITY);
let err: Box<dyn std::error::Error + Send + Sync + 'static> = err.into();
ObjectStorageError::UnhandledError(err)
})?;
let (mut metadata, overwrite_staging, overwrite_remote) = process_env_change(env_change)?;

metadata.server_mode = PARSEABLE.options.mode;

if overwrite_remote {
put_remote_metadata(&metadata).await?;
}

if overwrite_staging {
put_staging_metadata(&metadata)?;
}

Ok(metadata)
}

fn process_env_change(
env_change: EnvChange,
) -> Result<(StorageMetadata, bool, bool), ObjectStorageError> {
match env_change {
EnvChange::None(mut metadata) => handle_none_env(&mut metadata),
EnvChange::NewRemote => handle_new_remote_env(),
EnvChange::NewStaging(mut metadata) => handle_new_staging_env(&mut metadata),
EnvChange::CreateBoth => handle_create_both_env(),
}
}

fn handle_none_env(
metadata: &mut StorageMetadata,
) -> Result<(StorageMetadata, bool, bool), ObjectStorageError> {
let overwrite_staging = true;
let mut overwrite_remote = false;

match PARSEABLE.options.mode {
Mode::All => {
metadata.server_mode.standalone_after_distributed()?;
overwrite_remote = true;
update_metadata_mode_and_staging(metadata);
}
Mode::Query => {
overwrite_remote = true;
update_metadata_mode_and_staging(metadata);
}
_ => {}
}
if PARSEABLE.options.mode == Mode::All {
metadata.server_mode.standalone_after_distributed()?;
}
Ok((metadata.clone(), overwrite_staging, overwrite_remote))
}

fn handle_new_remote_env() -> Result<(StorageMetadata, bool, bool), ObjectStorageError> {
Err(ObjectStorageError::UnhandledError(format!(
"Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server. {}",
JOIN_COMMUNITY
).into()))
}

fn handle_new_staging_env(
metadata: &mut StorageMetadata,
) -> Result<(StorageMetadata, bool, bool), ObjectStorageError> {
if metadata.server_mode == Mode::All && PARSEABLE.options.mode == Mode::Ingest {
return Err(ObjectStorageError::UnhandledError(
format!(
"Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}",
JOIN_COMMUNITY
)
.into(),
));
}
create_dir_all(PARSEABLE.options.staging_dir())?;
metadata.staging = PARSEABLE.options.staging_dir().canonicalize()?;
let overwrite_staging = true;
let mut overwrite_remote = false;

match PARSEABLE.options.mode {
Mode::All => {
metadata
.server_mode
.standalone_after_distributed()
.map_err(|err| ObjectStorageError::Custom(err.to_string()))?;
overwrite_remote = true;
}
Mode::Query | Mode::Prism | Mode::Ingest | Mode::Index => {
update_metadata_mode_and_staging(metadata);
if matches!(PARSEABLE.options.mode, Mode::Query | Mode::Prism) {
overwrite_remote = true;
}
}
}
Ok((metadata.clone(), overwrite_staging, overwrite_remote))
}

fn handle_create_both_env() -> Result<(StorageMetadata, bool, bool), ObjectStorageError> {
create_dir_all(PARSEABLE.options.staging_dir())?;
let metadata = StorageMetadata::default();
let overwrite_remote = matches!(
PARSEABLE.options.mode,
Mode::All | Mode::Query | Mode::Prism
);
let overwrite_staging = true;
Ok((metadata, overwrite_staging, overwrite_remote))
}

fn update_metadata_mode_and_staging(metadata: &mut StorageMetadata) {
metadata.server_mode = PARSEABLE.options.mode;
metadata.staging = PARSEABLE.options.staging_dir().to_path_buf();
}

pub fn determine_environment(
staging_metadata: Option<StorageMetadata>,
remote_metadata: Option<StorageMetadata>,
Expand Down
Loading