Managing logs at scale can be painful, especially when all of them end up in the same Kafka topic. What if you could route logs into pod-specific topics (or any custom topic) directly from Fluent Bit? This makes filtering and querying much easier, especially in multi-tenant or large Kubernetes environments. In this post, we’ll explore how to set up dynamic Kafka topics in Fluent Bit using Kubernetes metadata.
The problem
Fluent Bit’s Kafka output plugin supports dynamic topic assignment using Dynamic_topic on
and Topic_key
. However, the documentation is vague about using nested fields, which is the case for kubernetes metadata fields, as the topic key.
Example:
{ "timestamp":"2025-05-13T19:27:31.954980134Z", "log":"Server is listening on port 5000", "kubernetes":{ "namespace_name":"default", "pod_id":"de70b4a7-2803-485b-9015-3d17791969ae", "labels":{ "app":"dy-7d8eeb3d4fb6da81d88da56280de7d42", "pod-template-hash":"5744dfc46c" }, "container_name":"dy-7d8eeb3d4fb6da81d88da56280de7d42", "annotations":{ "kubectl.kubernetes.io/restartedAt":"2025-05-13T22:27:29+03:00" }, "container_image":"docker.io/library/dy-7d8eeb3d4fb6da81d88da56280de7d42:latest", "container_hash":"sha256:d110c044a3e9fed6c3b3c9222a1b4d5cf4bc5f128a1d5f47980abee7ff98f745", "host":"kind-control-plane", "docker_id":"da27432083a32dada6ccac359d72233a4e93d98d1a139e6b2d4bee7d167c49cc", "pod_name":"dy-7d8eeb3d4fb6da81d88da56280de7d42-5744dfc46c-dgsjf" } }
At first glance, it looks like we should be able to use $kubernetes['pod_name']
as Topic_key, since the docs show this syntax in filter examples . But in practice, this doesn’t work for outputs — only for filters.
❌ Configuration that doesn't work:
[OUTPUT] Name kafka Match * Brokers kafka:9093 Topics kube Topic_key $kubernetes['pod_name'] # This syntax doesn't work Dynamic_topic on
The solution
After some trial and error, I discovered why the config in the docs doesn’t work:
Fluent Bit’s Kafka output plugin only supports top-level keys in Topic_key.
That means if your field is nested (like kubernetes.pod_name
), Fluent Bit won’t resolve it directly.
The trick is to promote the nested field to the top level before it reaches the output plugin.
We can do this using a Lua filter.
Step 1: Original log (nested field inside kubernetes)
{ "timestamp":"2025-05-13T19:27:31.954980134Z", "log":"Server is listening on port 5000", "kubernetes": { "namespace_name":"default", "pod_name":"dy-7d8eeb3d4fb6da81d88da56280de7d42-5744dfc46c-dgsjf" } }
Step 2: Lua filter to promote nested field
function promote_pod_name(tag, timestamp, record) local k8s = record["kubernetes"] if k8s and k8s["pod_name"] then record["topic_name"] = k8s["pod_name"] else record["topic_name"] = "default-topic" -- fallback end return 1, timestamp, record end
Quick Note:
- 1 means "keep the record"
-
timestamp
is passed through filter unchanged -
record
is the updated log with topic_name at top level
Step 3: Log after Lua filter
{ "timestamp":"2025-05-13T19:27:31.954980134Z", "log":"Server is listening on port 5000", "topic_name":"dy-7d8eeb3d4fb6da81d88da56280de7d42-5744dfc46c-dgsjf", "kubernetes": { ... } }
Now Fluent Bit can safely use topic_name as the Topic_key.
Step 4: Fluent Bit config
[FILTER] Name lua Match kube.* script /fluent-bit/scripts/promote_pod_name.lua call promote_pod_name [OUTPUT] Name kafka Match * Brokers kafka:9093 Topics kube Topic_key topic_name Dynamic_topic on Format json promote_pod_name.lua: | function promote_pod_name(tag, timestamp, record) local k8s = record["kubernetes"] if k8s and k8s["pod_name"] then record["topic_name"] = k8s["pod_name"] else record["topic_name"] = "default-topic" -- fallback end return 1, timestamp, record end
Here's what a typical ConfigMap configuration for Fluent Bit for processing logs coming from kubernetes pods would look like:
apiVersion: v1 kind: ConfigMap metadata: name: fluent-bit-config namespace: logging data: fluent-bit.conf: | [INPUT] Name tail Path /var/log/containers/*.log multiline.parser docker, cri Tag kube.* Mem_Buf_Limit 5MB Skip_Long_Lines On [FILTER] Name kubernetes Match kube.* Merge_Log On Keep_Log Off Kube_Tag_Prefix kube.var.log.containers. Kube_URL https://kubernetes.default.svc:443 Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token K8S-Logging.Exclude On [FILTER] Name lua Match kube.* script /fluent-bit/scripts/promote_pod_name.lua call promote_pod_name [OUTPUT] Name kafka Match * Brokers kafka:9093 Topics kube Topic_key topic_name Dynamic_topic on Format json promote_pod_name.lua: | function promote_pod_name(tag, timestamp, record) local k8s = record["kubernetes"] if k8s and k8s["pod_name"] then record["topic_name"] = k8s["pod_name"] else record["topic_name"] = "default-topic" -- fallback end return 1, timestamp, record end
This approach isn’t limited to pod_name
— you can promote any nested Kubernetes field or label to a top-level field and use it as your Kafka topic key. This is useful for:
- Splitting logs by namespace
- Creating per-application Kafka topics
- Enforcing multi-tenant log isolation
If you try this in your environment or run into edge cases, I’d love to hear from you! Feel free to connect with me on LinkedIn or drop me a message.
Top comments (0)