Skip to content
Prev Previous commit
Next Next commit
removed prepare init container, use keystores from mounted folder, do…
… not add system truststore certs
  • Loading branch information
maltesander committed Sep 13, 2023
commit 02df17df9b5d30bb1abb7be659d0e58cde0c3686
1 change: 0 additions & 1 deletion rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ impl Storage {
#[serde(rename_all = "kebab-case")]
#[strum(serialize_all = "kebab-case")]
pub enum Container {
Prepare,
Vector,
KcatProber,
GetService,
Expand Down
181 changes: 18 additions & 163 deletions rust/crd/src/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl KafkaTlsSecurity {
pub const INTERNAL_PORT: u16 = 19092;
pub const SECURE_INTERNAL_PORT: u16 = 19093;
// - TLS global
const SSL_STORE_PASSWORD: &'static str = "changeit";
const SSL_STORE_PASSWORD: &'static str = "";
// - TLS client
const CLIENT_SSL_KEYSTORE_LOCATION: &'static str = "listener.name.client.ssl.keystore.location";
const CLIENT_SSL_KEYSTORE_PASSWORD: &'static str = "listener.name.client.ssl.keystore.password";
Expand Down Expand Up @@ -81,20 +81,12 @@ impl KafkaTlsSecurity {
const INTER_SSL_TRUSTSTORE_TYPE: &'static str = "listener.name.internal.ssl.truststore.type";
const INTER_SSL_CLIENT_AUTH: &'static str = "listener.name.internal.ssl.client.auth";
// directories
const SYSTEM_TRUST_STORE_DIR: &'static str = "/etc/pki/java/cacerts";
// for kcat container
const STACKABLE_TLS_CERT_SERVER_MOUNT_DIR: &'static str = "/stackable/tls_cert_server_mount";
const STACKABLE_TLS_CERT_SERVER_MOUNT_DIR_NAME: &'static str = "tls-cert-server-mount";
// prepare and kafka container
const STACKABLE_TLS_KEYSTORE_SERVER_MOUNT_DIR: &'static str =
"/stackable/tls_keystore_server_mount";
const STACKABLE_TLS_KEYSTORE_SERVER_MOUNT_DIR_NAME: &'static str = "tls-keystore-server-mount";
const STACKABLE_TLS_CERT_SERVER_DIR: &'static str = "/stackable/tls_cert_server_mount";
const STACKABLE_TLS_CERT_SERVER_DIR_NAME: &'static str = "tls-cert-server-mount";
// kafka container
const STACKABLE_TLS_KEYSTORE_SERVER_DIR: &'static str = "/stackable/tls_keystore_server";
const STACKABLE_TLS_KEYSTORE_SERVER_DIR_NAME: &'static str = "tls-keystore-server";
const STACKABLE_TLS_KEYSTORE_INTERNAL_MOUNT_DIR: &'static str =
"/stackable/tls_keystore_internal_mount";
const STACKABLE_TLS_KEYSTORE_INTERNAL_MOUNT_DIR_NAME: &'static str =
"tls-keystore-internal-mount";
const STACKABLE_TLS_KEYSTORE_INTERNAL_DIR: &'static str = "/stackable/tls_keystore_internal";
const STACKABLE_TLS_KEYSTORE_INTERNAL_DIR_NAME: &'static str = "tls-keystore-internal";

Expand Down Expand Up @@ -196,52 +188,6 @@ impl KafkaTlsSecurity {
}
}

/// Returns required (init) container commands to generate keystores and truststores
/// depending on the tls and authentication settings.
pub fn prepare_container_command_args(&self) -> Vec<String> {
let mut args = vec![];

// We set either client tls with authentication or client tls without authentication
// If authentication is explicitly required we do not want to have any other CAs to
// be trusted.
if self.tls_client_authentication_class().is_some() {
args.extend(Self::import_keystore(
Self::STACKABLE_TLS_KEYSTORE_SERVER_MOUNT_DIR,
Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR,
));
args.extend(Self::import_truststore(
Self::STACKABLE_TLS_KEYSTORE_SERVER_MOUNT_DIR,
Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR,
));
} else if self.tls_server_secret_class().is_some() {
// Copy system truststore to stackable truststore
args.extend(Self::import_system_truststore(
Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR,
));
args.extend(Self::import_keystore(
Self::STACKABLE_TLS_KEYSTORE_SERVER_MOUNT_DIR,
Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR,
));
args.extend(Self::import_truststore(
Self::STACKABLE_TLS_KEYSTORE_SERVER_MOUNT_DIR,
Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR,
));
}

if self.tls_internal_secret_class().is_some() {
args.extend(Self::import_keystore(
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_MOUNT_DIR,
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_DIR,
));
args.extend(Self::import_truststore(
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_MOUNT_DIR,
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_DIR,
));
}

args
}

/// Returns SVC container command to retrieve the node port service port.
pub fn svc_container_commands(&self) -> String {
let port_name = self.client_port_name();
Expand All @@ -258,14 +204,12 @@ impl KafkaTlsSecurity {
args.push("-b".to_string());
args.push(format!("localhost:{}", port));
args.extend(Self::kcat_client_auth_ssl(
Self::STACKABLE_TLS_CERT_SERVER_MOUNT_DIR,
Self::STACKABLE_TLS_CERT_SERVER_DIR,
));
} else if self.tls_server_secret_class().is_some() {
args.push("-b".to_string());
args.push(format!("localhost:{}", port));
args.extend(Self::kcat_client_ssl(
Self::STACKABLE_TLS_CERT_SERVER_MOUNT_DIR,
));
args.extend(Self::kcat_client_ssl(Self::STACKABLE_TLS_CERT_SERVER_DIR));
} else {
args.push("-b".to_string());
args.push(format!("localhost:{}", port));
Expand Down Expand Up @@ -305,76 +249,40 @@ impl KafkaTlsSecurity {
pub fn add_volume_and_volume_mounts(
&self,
pod_builder: &mut PodBuilder,
cb_prepare: &mut ContainerBuilder,
cb_kcat_prober: &mut ContainerBuilder,
cb_kafka: &mut ContainerBuilder,
) {
// add tls (server or client authentication volumes) if required
if let Some(tls_server_secret_class) = self.get_tls_secret_class() {
// We have to mount tls pem files for kcat (the mount can be used directly)
cb_kcat_prober.add_volume_mount(
Self::STACKABLE_TLS_CERT_SERVER_MOUNT_DIR_NAME,
Self::STACKABLE_TLS_CERT_SERVER_MOUNT_DIR,
Self::STACKABLE_TLS_CERT_SERVER_DIR_NAME,
Self::STACKABLE_TLS_CERT_SERVER_DIR,
);
pod_builder.add_volume(Self::create_tls_volume(
Self::STACKABLE_TLS_CERT_SERVER_MOUNT_DIR_NAME,
tls_server_secret_class,
));
// We have to use the TLS keystore mounts in the prepare container to copy / recreate
// in an empty dir. We should not write / add anything to a secret-op volume mount.
cb_prepare.add_volume_mount(
Self::STACKABLE_TLS_KEYSTORE_SERVER_MOUNT_DIR_NAME,
Self::STACKABLE_TLS_KEYSTORE_SERVER_MOUNT_DIR,
);
pod_builder.add_volume(Self::create_tls_keystore_volume(
Self::STACKABLE_TLS_KEYSTORE_SERVER_MOUNT_DIR_NAME,
Self::STACKABLE_TLS_CERT_SERVER_DIR_NAME,
tls_server_secret_class,
));
// Empty dir shared for prepare and kafka container to be writeable and eventually
// add other certs etc. to the keystores.
cb_prepare.add_volume_mount(
Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR_NAME,
Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR,
);
pod_builder.add_volume(
VolumeBuilder::new(Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR_NAME)
.with_empty_dir(Some(""), None)
.build(),
);
// We only need the empty dir keystore mount in the kafka container
// Mounted keystores fore the kafka container
cb_kafka.add_volume_mount(
Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR_NAME,
Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR,
);
pod_builder.add_volume(Self::create_tls_keystore_volume(
Self::STACKABLE_TLS_KEYSTORE_SERVER_DIR_NAME,
tls_server_secret_class,
));
}

if let Some(tls_internal_secret_class) = self.tls_internal_secret_class() {
// We have to use the TLS keystore mounts in the prepare container to copy / recreate in an empty dir
// We should not write / add to a secret-op volume mount.
cb_prepare.add_volume_mount(
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_MOUNT_DIR_NAME,
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_MOUNT_DIR,
);
pod_builder.add_volume(Self::create_tls_keystore_volume(
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_MOUNT_DIR_NAME,
tls_internal_secret_class,
));
// Empty dir shared for prepare and kafka container to be writeable and eventually
// add other certs etc.
cb_prepare.add_volume_mount(
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_DIR_NAME,
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_DIR,
);
pod_builder.add_volume(
VolumeBuilder::new(Self::STACKABLE_TLS_KEYSTORE_INTERNAL_DIR_NAME)
.with_empty_dir(Some(""), None)
.build(),
);
// We only need the empty dir keystore mount in the kafka container
cb_kafka.add_volume_mount(
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_DIR_NAME,
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_DIR,
);
pod_builder.add_volume(Self::create_tls_keystore_volume(
Self::STACKABLE_TLS_KEYSTORE_INTERNAL_DIR_NAME,
tls_internal_secret_class,
));
}
}

Expand Down Expand Up @@ -523,59 +431,6 @@ impl KafkaTlsSecurity {
.build()
}

/// Generates the shell script to import a secret operator provided keystore without password
/// into a new keystore with password in a writeable empty dir
///
/// # Arguments
/// - `source_directory` - The directory of the source keystore.
/// Should usually be a secret operator volume mount.
/// - `destination_directory` - The directory of the destination keystore.
/// Should usually be an empty dir.
fn import_keystore(source_directory: &str, destination_directory: &str) -> Vec<String> {
vec![
// The source directory is a secret-op mount and we do not want to write / add anything in there
// Therefore we import all the contents to a keystore in "writeable" empty dirs.
// Keytool is only barking if a password is not set for the destination keystore (which we set)
// and do provide an empty password for the source keystore coming from the secret-operator.
// Using no password will result in a warning.
format!("echo Importing {source_directory}/keystore.p12 to {destination_directory}/keystore.p12"),
format!("keytool -importkeystore -srckeystore {source_directory}/keystore.p12 -srcstoretype PKCS12 -srcstorepass \"\" -destkeystore {destination_directory}/keystore.p12 -deststoretype PKCS12 -deststorepass {pw} -noprompt", pw = Self::SSL_STORE_PASSWORD),
]
}

/// Generates the shell script to import a secret operator provided truststore without password
/// into a new truststore with password in a writeable empty dir
///
/// # Arguments
/// - `source_directory` - The directory of the source truststore.
/// Should usually be a secret operator volume mount.
/// - `destination_directory` - The directory of the destination truststore.
/// Should usually be an empty dir.
fn import_truststore(source_directory: &str, destination_directory: &str) -> Vec<String> {
vec![
// The source directory is a secret-op mount and we do not want to write / add anything in there
// Therefore we import all the contents to a truststore in "writeable" empty dirs.
// Keytool is only barking if a password is not set for the destination truststore (which we set)
// and do provide an empty password for the source truststore coming from the secret-operator.
// Using no password will result in a warning.
// All secret-op generated truststores have one entry with alias "1". We generate a UUID for
// the destination truststore to avoid conflicts when importing multiple secret-op generated
// truststores. We do not use the UUID rust crate since this will continuously change the STS... and
// leads to never-ending reconciles.
format!("echo Importing {source_directory}/truststore.p12 to {destination_directory}/truststore.p12"),
format!("keytool -importkeystore -srckeystore {source_directory}/truststore.p12 -srcstoretype PKCS12 -srcstorepass \"\" -srcalias 1 -destkeystore {destination_directory}/truststore.p12 -deststoretype PKCS12 -deststorepass {pw} -destalias $(cat /proc/sys/kernel/random/uuid) -noprompt", pw = Self::SSL_STORE_PASSWORD),
]
}

/// Import the system truststore to a truststore named `truststore.p12` in `destination_directory`.
fn import_system_truststore(destination_directory: &str) -> Vec<String> {
vec![
format!("echo Importing {system_truststore_dir} to {destination_directory}/truststore.p12", system_truststore_dir = Self::SYSTEM_TRUST_STORE_DIR),
format!("keytool -importkeystore -srckeystore {system_truststore_dir} -srcstoretype jks -srcstorepass {pw} -destkeystore {destination_directory}/truststore.p12 -deststoretype pkcs12 -deststorepass {pw} -noprompt",
system_truststore_dir = Self::SYSTEM_TRUST_STORE_DIR, pw = Self::SSL_STORE_PASSWORD),
]
}

fn kcat_client_auth_ssl(cert_directory: &str) -> Vec<String> {
vec![
"-X".to_string(),
Expand Down
37 changes: 0 additions & 37 deletions rust/operator/src/kafka_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,12 +673,6 @@ fn build_broker_rolegroup_statefulset(
.rolegroup(rolegroup_ref)
.context(InternalOperatorSnafu)?;

let prepare_container_name = Container::Prepare.to_string();
let mut cb_prepare =
ContainerBuilder::new(&prepare_container_name).context(InvalidContainerNameSnafu {
name: prepare_container_name.clone(),
})?;

let get_svc_container_name = Container::GetService.to_string();
let mut cb_get_svc =
ContainerBuilder::new(&get_svc_container_name).context(InvalidContainerNameSnafu {
Expand All @@ -702,7 +696,6 @@ fn build_broker_rolegroup_statefulset(
// Add TLS related volumes and volume mounts
kafka_security.add_volume_and_volume_mounts(
&mut pod_builder,
&mut cb_prepare,
&mut cb_kcat_prober,
&mut cb_kafka,
);
Expand Down Expand Up @@ -730,35 +723,6 @@ fn build_broker_rolegroup_statefulset(
.add_volume_mount("tmp", STACKABLE_TMP_DIR)
.resources(merged_config.resources.clone().into());

let mut prepare_container_args = vec![];

if let Some(ContainerLogConfig {
choice: Some(ContainerLogConfigChoice::Automatic(log_config)),
}) = merged_config.logging.containers.get(&Container::Prepare)
{
prepare_container_args.push(product_logging::framework::capture_shell_output(
STACKABLE_LOG_DIR,
&prepare_container_name,
log_config,
));
}

prepare_container_args.extend(kafka_security.prepare_container_command_args());

cb_prepare
.image_from_product_image(resolved_product_image)
.command(vec![
"/bin/bash".to_string(),
"-euo".to_string(),
"pipefail".to_string(),
"-c".to_string(),
])
.args(vec![prepare_container_args.join(" && ")])
.add_volume_mount(LOG_DIRS_VOLUME_NAME, STACKABLE_DATA_DIR)
.add_volume_mount("tmp", STACKABLE_TMP_DIR)
.add_volume_mount("log", STACKABLE_LOG_DIR)
.resources(merged_config.resources.clone().into());

let pvcs = merged_config.resources.storage.build_pvcs();

let mut env = broker_config
Expand Down Expand Up @@ -916,7 +880,6 @@ fn build_broker_rolegroup_statefulset(
.with_label(pod_svc_controller::LABEL_ENABLE, "true")
})
.image_pull_secrets_from_product_image(resolved_product_image)
.add_init_container(cb_prepare.build())
.add_init_container(cb_get_svc.build())
.add_container(cb_kafka.build())
.add_container(cb_kcat_prober.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ customConfig:
condition: >-
.pod == "test-kafka-broker-automatic-log-config-0" &&
.container == "vector"
filteredAutomaticLogConfigBrokerPrepare:
type: filter
inputs: [vector]
condition: >-
.pod == "test-kafka-broker-automatic-log-config-0" &&
.container == "prepare"
filteredCustomLogConfigBrokerKafka:
type: filter
inputs: [vector]
Expand All @@ -50,12 +44,6 @@ customConfig:
condition: >-
.pod == "test-kafka-broker-custom-log-config-0" &&
.container == "vector"
filteredCustomLogConfigBrokerPrepare:
type: filter
inputs: [vector]
condition: >-
.pod == "test-kafka-broker-custom-log-config-0" &&
.container == "prepare"
filteredInvalidEvents:
type: filter
inputs: [vector]
Expand Down
2 changes: 1 addition & 1 deletion tests/templates/kuttl/tls/test_client_auth_tls.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ TOPIC=$(tr -dc A-Za-z0-9 </dev/urandom | head -c 20 ; echo '')
BAD_TOPIC=$(tr -dc A-Za-z0-9 </dev/urandom | head -c 20 ; echo '')

# write client config
echo $'security.protocol=SSL\nssl.keystore.location=/stackable/tls_keystore_server/keystore.p12\nssl.keystore.password=changeit\nssl.truststore.location=/stackable/tls_keystore_server/truststore.p12\nssl.truststore.password=changeit' > /tmp/client.config
echo $'security.protocol=SSL\nssl.keystore.location=/stackable/tls_keystore_server/keystore.p12\nssl.keystore.password=\nssl.truststore.location=/stackable/tls_keystore_server/truststore.p12\nssl.truststore.password=' > /tmp/client.config

if /stackable/kafka/bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server "$SERVER" --command-config /tmp/client.config
then
Expand Down
2 changes: 1 addition & 1 deletion tests/templates/kuttl/tls/test_client_tls.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ TOPIC=$(tr -dc A-Za-z0-9 </dev/urandom | head -c 20 ; echo '')
BAD_TOPIC=$(tr -dc A-Za-z0-9 </dev/urandom | head -c 20 ; echo '')

# write client config
echo $'security.protocol=SSL\nssl.truststore.location=/stackable/tls_keystore_server/truststore.p12\nssl.truststore.password=changeit' > /tmp/client.config
echo $'security.protocol=SSL\nssl.truststore.location=/stackable/tls_keystore_server/truststore.p12\nssl.truststore.password=' > /tmp/client.config

if /stackable/kafka/bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server "$SERVER" --command-config /tmp/client.config
then
Expand Down