Skip to content

Commit 3437ddd

Browse files
[kafka] add system tests to consumer and producer data streams (#15334)
* [kafka] add system tests to consumer and producer data streams * update PR link * producer docker healthcheck * consumer docker healthcheck * update producer sample_event.json * moved docker tests setup from individual data streams to top level kafka _dev dir * revert raft data stream changes * fix file name
1 parent f33e18a commit 3437ddd

File tree

22 files changed

+171
-225
lines changed

22 files changed

+171
-225
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
FROM apache/kafka:4.0.0
2+
3+
USER root
4+
5+
# Copy jolokia agent jar from host into container
6+
COPY jolokia/jolokia-jvm-agent.jar /opt/jolokia/jolokia-jvm.jar
7+
8+
# Copy startup script
9+
COPY startup.sh /opt/startup.sh
10+
COPY healthcheck.sh /opt/healthcheck.sh
11+
RUN chmod +x /opt/startup.sh /opt/healthcheck.sh && chown -R 1001:0 /opt
12+
13+
USER 1001
14+
15+
ENTRYPOINT ["/opt/startup.sh"]
Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,20 @@ services:
44
default:
55
aliases:
66
- svc-kafka
7-
image: docker.elastic.co/observability-ci/kafka-with-jolokia:4.0.0
7+
build: .
88
ports:
99
- "9092:9092"
1010
- "9093:9093"
1111
- "9999:9999"
1212
- "8780:8780"
13+
- "8774:8774"
14+
- "8775:8775"
15+
healthcheck:
16+
test: [ "CMD-SHELL", "/opt/healthcheck.sh" ]
17+
interval: 15s
18+
timeout: 10s
19+
retries: 5
20+
start_period: 60s
1321
environment:
1422
KAFKA_NODE_ID: 1
1523
KAFKA_PROCESS_ROLES: broker,controller
@@ -29,4 +37,4 @@ services:
2937
-Dcom.sun.management.jmxremote.port=9999
3038
-Dcom.sun.management.jmxremote.rmi.port=9999
3139
volumes:
32-
- ./server.properties:/opt/kafka/config/kraft/server.properties
40+
- ./server.properties:/opt/kafka/config/kraft/server.properties
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#!/bin/bash
2+
# Exit immediately if a command exits with a non-zero status.
3+
set -e
4+
5+
# 1. Check for the Kafka Broker process
6+
pgrep -f 'kafka.Kafka' > /dev/null || { echo "Healthcheck failed: Kafka broker not running."; exit 1; }
7+
8+
# 2. Check for the Console Producer process
9+
pgrep -f 'kafka.tools.ConsoleProducer' > /dev/null || { echo "Healthcheck failed: Console producer not running."; exit 1; }
10+
11+
# 3. Check for the Console Consumer process
12+
pgrep -f 'kafka.tools.consumer.ConsoleConsumer' > /dev/null || { echo "Healthcheck failed: Console consumer not running."; exit 1; }
13+
14+
# If all checks pass, exit with 0 to indicate the container is healthy.
15+
exit 0
667 KB
Binary file not shown.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#!/bin/bash
2+
set -e
3+
4+
# --- 1. CONFIGURE AND START KAFKA BROKER ---
5+
6+
# Add Jolokia agent to the broker's JMX options
7+
export KAFKA_JMX_OPTS="
8+
-Dcom.sun.management.jmxremote
9+
-Dcom.sun.management.jmxremote.local.only=false
10+
-Dcom.sun.management.jmxremote.authenticate=false
11+
-Dcom.sun.management.jmxremote.ssl=false
12+
-Djava.rmi.server.hostname=0.0.0.0
13+
-Dcom.sun.management.jmxremote.port=${KAFKA_JMX_PORT:-9999}
14+
-Dcom.sun.management.jmxremote.rmi.port=${KAFKA_JMX_PORT:-9999}
15+
-javaagent:/opt/jolokia/jolokia-jvm.jar=port=8780,host=0.0.0.0
16+
"
17+
18+
LOG_DIR="/tmp/kraft-combined-logs"
19+
META_FILE="$LOG_DIR/meta.properties"
20+
CLUSTER_ID=${KAFKA_CLUSTER_ID:-"_ABcDEf1gHiJkLmNoPqRsT"} # Using a default Base64 ID
21+
22+
# Ensure log directory exists and has correct permissions
23+
mkdir -p "$LOG_DIR"
24+
chown -R 1001:0 "$LOG_DIR"
25+
26+
# Initialize storage if not formatted yet
27+
if [ ! -f "$META_FILE" ]; then
28+
echo "Formatting KRaft storage with Cluster ID: $CLUSTER_ID"
29+
/opt/kafka/bin/kafka-storage.sh format -t "$CLUSTER_ID" -c /opt/kafka/config/kraft/server.properties
30+
else
31+
echo "KRaft storage already formatted."
32+
fi
33+
34+
# Start the Kafka server in the background
35+
echo "Starting Kafka server..."
36+
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties &
37+
# Store the Process ID (PID) of the Kafka server
38+
kafka_pid=$!
39+
40+
# --- 2. WAIT FOR BROKER AND CREATE TOPIC ---
41+
42+
# Wait for the broker to become available on port 9092
43+
echo "Waiting for Kafka broker to be ready..."
44+
until KAFKA_JMX_OPTS= /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list > /dev/null 2>&1; do
45+
echo "Broker not ready yet, sleeping..."
46+
sleep 2
47+
done
48+
echo "Broker is ready!"
49+
50+
# Create a topic for the producer and consumer
51+
echo "Creating topic 'my-topic'..."
52+
KAFKA_JMX_OPTS= /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --if-not-exists --topic my-topic --partitions 1 --replication-factor 1
53+
54+
# --- 3. START PRODUCER AND CONSUMER ---
55+
56+
# Start the console producer in the background
57+
echo "Starting producer with Jolokia on port 8775..."
58+
export KAFKA_OPTS="-javaagent:/opt/jolokia/jolokia-jvm.jar=port=8775,host=0.0.0.0"
59+
(while true; do echo "Test msg"; sleep 5; done) | \
60+
KAFKA_JMX_OPTS= /opt/kafka/bin/kafka-console-producer.sh \
61+
--bootstrap-server localhost:9092 \
62+
--topic my-topic &
63+
64+
# Start the console consumer in the background
65+
echo "Starting consumer with Jolokia on port 8774..."
66+
export KAFKA_OPTS="-javaagent:/opt/jolokia/jolokia-jvm.jar=port=8774,host=0.0.0.0"
67+
KAFKA_JMX_OPTS= /opt/kafka/bin/kafka-console-consumer.sh \
68+
--bootstrap-server localhost:9092 \
69+
--topic my-topic \
70+
--from-beginning &
71+
72+
# --- 4. WAIT FOR KAFKA SERVER TO EXIT ---
73+
echo "Startup complete. All processes are running."
74+
# The 'wait' command blocks the script until the Kafka server process (kafka_pid) exits.
75+
# This keeps the container running.
76+
wait $kafka_pid

packages/kafka/changelog.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
# newer versions go on top
2+
- version: "1.23.1"
3+
changes:
4+
- description: Add system tests for consumer and producer data streams.
5+
type: enhancement
6+
link: https://github.com/elastic/integrations/pull/15334
27
- version: "1.23.0"
38
changes:
49
- description: Add support for producer and consumer data streams.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
vars:
2+
hosts:
3+
- http://{{Hostname}}:8780
4+
metrics_path:
5+
- /jolokia
6+
data_stream:
7+
vars:
8+
period: 10s
9+
jolokia_hosts:
10+
- http://{{Hostname}}:8774

packages/kafka/data_stream/jvm/_dev/deploy/docker/docker-compose.yml

Lines changed: 0 additions & 32 deletions
This file was deleted.

packages/kafka/data_stream/jvm/_dev/deploy/docker/server.properties

Lines changed: 0 additions & 8 deletions
This file was deleted.

0 commit comments

Comments
 (0)