Skip to content

Commit 71dcac7

Browse files
committed
feat: Support graceful shutdown
1 parent 39cc8b8 commit 71dcac7

File tree

13 files changed

+177
-39
lines changed

13 files changed

+177
-39
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
1111
- Support PodDisruptionBudgets ([#625]).
1212
- Support new versions 2.8.2, 3.4.1, 3.5.1 ([#627]).
1313
- Document internal clusterId check ([#631]).
14+
- Support graceful shutdown ([#XXX]).
1415

1516
### Changed
1617

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ repository = "https://github.com/stackabletech/kafka-operator"
1414
built = { version = "0.6", features = ["chrono", "git2"] }
1515
clap = "4.3"
1616
futures = { version = "0.3", features = ["compat"] }
17+
indoc = "2.0"
18+
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.6.0" }
1719
rstest = "0.18"
1820
semver = "1.0"
1921
serde = { version = "1.0", features = ["derive"] }
2022
serde_json = "1.0"
2123
serde_yaml = "0.9"
2224
snafu = "0.7"
2325
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.56.0" }
24-
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.6.0" }
2526
strum = { version = "0.25", features = ["derive"] }
2627
tokio = { version = "1.29", features = ["full"] }
2728
tracing = "0.1"

deploy/helm/kafka-operator/crds/crds.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,10 @@ spec:
507507
type: array
508508
type: object
509509
type: object
510+
gracefulShutdownTimeout:
511+
description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details.
512+
nullable: true
513+
type: string
510514
logging:
511515
default:
512516
enableVectorAgent: null
@@ -3991,6 +3995,10 @@ spec:
39913995
type: array
39923996
type: object
39933997
type: object
3998+
gracefulShutdownTimeout:
3999+
description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details.
4000+
nullable: true
4001+
type: string
39944002
logging:
39954003
default:
39964004
enableVectorAgent: null

rust/crd/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ repository.workspace = true
99
publish = false
1010

1111
[dependencies]
12+
indoc.workspace = true
1213
semver.workspace = true
13-
serde.workspace = true
1414
serde_json.workspace = true
15+
serde.workspace = true
1516
snafu.workspace = true
1617
stackable-operator.workspace = true
1718
strum.workspace = true

rust/crd/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use stackable_operator::{
3333
role_utils::{GenericRoleConfig, Role, RoleGroup, RoleGroupRef},
3434
schemars::{self, JsonSchema},
3535
status::condition::{ClusterCondition, HasStatusCondition},
36+
time::Duration,
3637
};
3738
use std::{collections::BTreeMap, str::FromStr};
3839
use strum::{Display, EnumIter, EnumString, IntoEnumIterator};
@@ -55,6 +56,9 @@ pub const STACKABLE_TMP_DIR: &str = "/stackable/tmp";
5556
pub const STACKABLE_DATA_DIR: &str = "/stackable/data";
5657
pub const STACKABLE_CONFIG_DIR: &str = "/stackable/config";
5758
pub const STACKABLE_LOG_CONFIG_DIR: &str = "/stackable/log_config";
59+
pub const STACKABLE_LOG_DIR: &str = "/stackable/log";
60+
61+
const DEFAULT_BROKER_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_minutes_unchecked(30);
5862

5963
#[derive(Snafu, Debug)]
6064
pub enum Error {
@@ -382,10 +386,16 @@ pub enum Container {
382386
pub struct KafkaConfig {
383387
#[fragment_attrs(serde(default))]
384388
pub logging: Logging<Container>,
389+
385390
#[fragment_attrs(serde(default))]
386391
pub resources: Resources<Storage, NoRuntimeLimits>,
392+
387393
#[fragment_attrs(serde(default))]
388394
pub affinity: StackableAffinity,
395+
396+
/// Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details.
397+
#[fragment_attrs(serde(default))]
398+
pub graceful_shutdown_timeout: Option<Duration>,
389399
}
390400

391401
impl KafkaConfig {
@@ -410,6 +420,7 @@ impl KafkaConfig {
410420
},
411421
},
412422
affinity: get_affinity(cluster_name, role),
423+
graceful_shutdown_timeout: Some(DEFAULT_BROKER_GRACEFUL_SHUTDOWN_TIMEOUT),
413424
}
414425
}
415426
}

rust/crd/src/security.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,28 @@
44
//! and helper functions
55
//!
66
//! This is required due to overlaps between TLS encryption and e.g. mTLS authentication or Kerberos
7+
use std::collections::BTreeMap;
78

8-
use crate::{
9-
authentication, authentication::ResolvedAuthenticationClasses, listener, tls, KafkaCluster,
10-
SERVER_PROPERTIES_FILE, STACKABLE_CONFIG_DIR, STACKABLE_TMP_DIR,
11-
};
12-
13-
use crate::listener::KafkaListenerConfig;
9+
use indoc::formatdoc;
1410
use snafu::{ResultExt, Snafu};
1511
use stackable_operator::builder::SecretFormat;
12+
use stackable_operator::product_logging::framework::{
13+
create_vector_shutdown_file_command, remove_vector_shutdown_file_command,
14+
};
1615
use stackable_operator::{
1716
builder::{ContainerBuilder, PodBuilder, SecretOperatorVolumeSourceBuilder, VolumeBuilder},
1817
client::Client,
1918
commons::authentication::{AuthenticationClass, AuthenticationClassProvider},
2019
k8s_openapi::api::core::v1::Volume,
20+
utils::COMMON_BASH_TRAP_FUNCTIONS,
21+
};
22+
23+
use crate::STACKABLE_LOG_DIR;
24+
use crate::{
25+
authentication::{self, ResolvedAuthenticationClasses},
26+
listener::{self, KafkaListenerConfig},
27+
tls, KafkaCluster, SERVER_PROPERTIES_FILE, STACKABLE_CONFIG_DIR, STACKABLE_TMP_DIR,
2128
};
22-
use std::collections::BTreeMap;
2329

2430
#[derive(Snafu, Debug)]
2531
pub enum Error {
@@ -225,23 +231,26 @@ impl KafkaTlsSecurity {
225231
kafka_listeners: &KafkaListenerConfig,
226232
opa_connect_string: Option<&str>,
227233
) -> Vec<String> {
228-
vec![
229-
"bin/kafka-server-start.sh".to_string(),
230-
format!("{STACKABLE_CONFIG_DIR}/{SERVER_PROPERTIES_FILE}"),
231-
"--override \"zookeeper.connect=$ZOOKEEPER\"".to_string(),
232-
format!("--override \"listeners={}\"", kafka_listeners.listeners()),
233-
format!(
234-
"--override \"advertised.listeners={}\"",
235-
kafka_listeners.advertised_listeners()
236-
),
237-
format!(
238-
"--override \"listener.security.protocol.map={}\"",
239-
kafka_listeners.listener_security_protocol_map()
240-
),
241-
opa_connect_string.map_or("".to_string(), |opa| {
242-
format!("--override \"opa.authorizer.url={}\"", opa)
243-
}),
244-
]
234+
vec![formatdoc! {"
235+
{COMMON_BASH_TRAP_FUNCTIONS}
236+
{remove_vector_shutdown_file_command}
237+
prepare_signal_handlers
238+
bin/kafka-server-start.sh {STACKABLE_CONFIG_DIR}/{SERVER_PROPERTIES_FILE} --override \"zookeeper.connect=$ZOOKEEPER\" --override \"listeners={listeners}\" --override \"advertised.listeners={advertised_listeners}\" --override \"listener.security.protocol.map={listener_security_protocol_map}\"{opa_config} &
239+
wait_for_termination $!
240+
{create_vector_shutdown_file_command}
241+
",
242+
remove_vector_shutdown_file_command =
243+
remove_vector_shutdown_file_command(STACKABLE_LOG_DIR),
244+
create_vector_shutdown_file_command =
245+
create_vector_shutdown_file_command(STACKABLE_LOG_DIR),
246+
listeners = kafka_listeners.listeners(),
247+
advertised_listeners = kafka_listeners.advertised_listeners(),
248+
listener_security_protocol_map = kafka_listeners.listener_security_protocol_map(),
249+
opa_config = match opa_connect_string {
250+
None => "".to_string(),
251+
Some(opa_connect_string) => format!(" --override \"opa.authorizer.url={opa_connect_string}\""),
252+
}
253+
}]
245254
}
246255

247256
/// Adds required volumes and volume mounts to the pod and container builders

rust/operator/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ publish = false
1212
stackable-kafka-crd = { path = "../crd" }
1313

1414
futures.workspace = true
15-
serde.workspace = true
15+
product-config.workspace = true
1616
serde_json.workspace = true
17+
serde.workspace = true
18+
snafu.workspace = true
1719
stackable-operator.workspace = true
18-
product-config.workspace = true
1920
strum.workspace = true
2021
tokio.workspace = true
2122
tracing.workspace = true
22-
snafu.workspace = true
2323

2424
[dev-dependencies]
2525
serde_yaml.workspace = true

0 commit comments

Comments
 (0)