Architecture
Event format
The Fluent Bit wire protocol represents an Event as a two-element array with a nested array as the first element:
[[TIMESTAMP, METADATA], MESSAGE] -
TIMESTAMPis a timestamp in seconds as an integer or floating point value (not a string). -
METADATAis an object containing event metadata, and might be empty. -
MESSAGEis an object containing the event body.
record_accessor syntax
If you need the log group or stream name to be based on the contents of the log record itself. Use record_accessor syntax
Fluent Bit on CloudWatch
- If you don't already have a namespace called amazon-cloudwatch, create one by entering the following command:
kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/cloudwatch-namespace.yaml # create amazon-cloudwatch namespace apiVersion: v1 kind: Namespace metadata: name: amazon-cloudwatch labels: name: amazon-cloudwatch - Run the following command to create a ConfigMap named
cluster-infowith the cluster name and the Region to send logs to. Replace cluster-name and cluster-region with your cluster's name and Region.
ClusterName=dev 🔥 RegionName=ap-northeast-2 🔥 FluentBitHttpPort='' FluentBitReadFromHead='Off' [[ ${FluentBitReadFromHead} = 'On' ]] && FluentBitReadFromTail='Off'|| FluentBitReadFromTail='On' [[ -z ${FluentBitHttpPort} ]] && FluentBitHttpServer='Off' || FluentBitHttpServer='On' kubectl create configmap fluent-bit-cluster-info \ --from-literal=cluster.name=${ClusterName} \ --from-literal=http.server=${FluentBitHttpServer} \ --from-literal=http.port=${FluentBitHttpPort} \ --from-literal=read.head=${FluentBitReadFromHead} \ --from-literal=read.tail=${FluentBitReadFromTail} \ --from-literal=logs.region=${RegionName} -n amazon-cloudwatch In this command, the FluentBitHttpServer for monitoring plugin metrics is on by default. To turn it off, change the third line in the command to FluentBitHttpPort='' (empty string) in the command.
Also by default, Fluent Bit reads log files from the tail, and will capture only new logs after it is deployed. If you want the opposite, set FluentBitReadFromHead='On' and it will collect all logs in the file system.
- apply fluent bit daemonset to the cluster
kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/fluent-bit/fluent-bit.yaml apiVersion: v1 kind: ServiceAccount metadata: name: fluent-bit namespace: amazon-cloudwatch --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: fluent-bit-role rules: - nonResourceURLs: - /metrics verbs: - get - apiGroups: [""] resources: - namespaces - pods - pods/logs - nodes - nodes/proxy verbs: ["get", "list", "watch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: fluent-bit-role-binding roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: fluent-bit-role subjects: - kind: ServiceAccount name: fluent-bit namespace: amazon-cloudwatch --- apiVersion: v1 kind: ConfigMap metadata: name: fluent-bit-config namespace: amazon-cloudwatch labels: k8s-app: fluent-bit data: fluent-bit.conf: | [SERVICE] Flush 5 Grace 30 Log_Level error Daemon off Parsers_File parsers.conf HTTP_Server ${HTTP_SERVER} HTTP_Listen 0.0.0.0 HTTP_Port ${HTTP_PORT} storage.path /var/fluent-bit/state/flb-storage/ storage.sync normal storage.checksum off storage.backlog.mem_limit 5M @INCLUDE application-log.conf @INCLUDE dataplane-log.conf @INCLUDE host-log.conf application-log.conf: | [INPUT] Name tail Tag application.* Exclude_Path /var/log/containers/cloudwatch-agent*, /var/log/containers/fluent-bit*, /var/log/containers/aws-node*, /var/log/containers/kube-proxy*, /var/log/containers/fluentd* Path /var/log/containers/*.log multiline.parser docker, cri DB /var/fluent-bit/state/flb_container.db Mem_Buf_Limit 50MB Skip_Long_Lines On Refresh_Interval 10 Rotate_Wait 30 storage.type filesystem Read_from_Head ${READ_FROM_HEAD} [FILTER] Name kubernetes Match application.* Kube_URL https://kubernetes.default.svc:443 Kube_Tag_Prefix application.var.log.containers. Merge_Log On K8S-Logging.Parser On K8S-Logging.Exclude On Labels Off Annotations On Use_Kubelet On Kubelet_Port 10250 Buffer_Size 0 [FILTER] Name grep Match application.* Regex $kubernetes['annotations']['fluentbit/enabled'] true [FILTER] Name grep Match application.* Regex level . [FILTER] Name nest Match * Operation lift Nested_under kubernetes Add_prefix k8s_ [FILTER] Name record_modifier Match * Remove_key stream Remove_key _p Remove_key log Remove_key k8s_namespace_name Remove_key k8s_pod_ip Remove_key k8s_pod_name Remove_key k8s_container_hash Remove_key k8s_docker_id Remove_key k8s_container_image Remove_key k8s_container_name Remove_key k8s_annotations Remove_key kubernetes [FILTER] Name nest Match * Operation nest Wildcard k8s_* Nest_under kubernetes Remove_prefix k8s_ [OUTPUT] Name cloudwatch_logs Match application.* region ${AWS_REGION} log_group_name /eks/${CLUSTER_NAME}/pipeline log_stream_prefix ${HOST_NAME}- auto_create_group true extra_user_agent container-insights dataplane-log.conf: | [INPUT] Name systemd Tag dataplane.systemd.* Systemd_Filter _SYSTEMD_UNIT=docker.service Systemd_Filter _SYSTEMD_UNIT=containerd.service Systemd_Filter _SYSTEMD_UNIT=kubelet.service DB /var/fluent-bit/state/systemd.db Path /var/log/journal Read_From_Tail ${READ_FROM_TAIL} [INPUT] Name tail Tag dataplane.tail.* Path /var/log/containers/aws-node*, /var/log/containers/kube-proxy* multiline.parser docker, cri DB /var/fluent-bit/state/flb_dataplane_tail.db Mem_Buf_Limit 50MB Skip_Long_Lines On Refresh_Interval 10 Rotate_Wait 30 storage.type filesystem Read_from_Head ${READ_FROM_HEAD} [FILTER] Name modify Match dataplane.systemd.* Rename _HOSTNAME hostname Rename _SYSTEMD_UNIT systemd_unit Rename MESSAGE message Remove_regex ^((?!hostname|systemd_unit|message).)*$ [FILTER] Name aws Match dataplane.* imds_version v2 [OUTPUT] Name cloudwatch_logs Match dataplane.* region ${AWS_REGION} log_group_name /aws/containerinsights/${CLUSTER_NAME}/dataplane log_stream_prefix ${HOST_NAME}- auto_create_group true extra_user_agent container-insights host-log.conf: | [INPUT] Name tail Tag host.dmesg Path /var/log/dmesg Key message DB /var/fluent-bit/state/flb_dmesg.db Mem_Buf_Limit 5MB Skip_Long_Lines On Refresh_Interval 10 Read_from_Head ${READ_FROM_HEAD} [INPUT] Name tail Tag host.messages Path /var/log/messages Parser syslog DB /var/fluent-bit/state/flb_messages.db Mem_Buf_Limit 5MB Skip_Long_Lines On Refresh_Interval 10 Read_from_Head ${READ_FROM_HEAD} [INPUT] Name tail Tag host.secure Path /var/log/secure Parser syslog DB /var/fluent-bit/state/flb_secure.db Mem_Buf_Limit 5MB Skip_Long_Lines On Refresh_Interval 10 Read_from_Head ${READ_FROM_HEAD} [FILTER] Name aws Match host.* imds_version v2 [OUTPUT] Name cloudwatch_logs Match host.* region ${AWS_REGION} log_group_name /aws/containerinsights/${CLUSTER_NAME}/host log_stream_prefix ${HOST_NAME}. auto_create_group true extra_user_agent container-insights parsers.conf: | [PARSER] Name syslog Format regex Regex ^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$ Time_Key time Time_Format %b %d %H:%M:%S [PARSER] Name container_firstline Format regex Regex (?<log>(?<="log":")\S(?!\.).*?)(?<!\\)".*(?<stream>(?<="stream":").*?)".*(?<time>\d{4}-\d{1,2}-\d{1,2}T\d{2}:\d{2}:\d{2}\.\w*).*(?=}) Time_Key time Time_Format %Y-%m-%dT%H:%M:%S.%LZ [PARSER] Name cwagent_firstline Format regex Regex (?<log>(?<="log":")\d{4}[\/-]\d{1,2}[\/-]\d{1,2}[ T]\d{2}:\d{2}:\d{2}(?!\.).*?)(?<!\\)".*(?<stream>(?<="stream":").*?)".*(?<time>\d{4}-\d{1,2}-\d{1,2}T\d{2}:\d{2}:\d{2}\.\w*).*(?=}) Time_Key time Time_Format %Y-%m-%dT%H:%M:%S.%LZ --- apiVersion: apps/v1 kind: DaemonSet metadata: name: fluent-bit namespace: amazon-cloudwatch labels: k8s-app: fluent-bit version: v1 kubernetes.io/cluster-service: "true" spec: selector: matchLabels: k8s-app: fluent-bit template: metadata: labels: k8s-app: fluent-bit version: v1 kubernetes.io/cluster-service: "true" spec: containers: - name: fluent-bit # https://gallery.ecr.aws/aws-observability/aws-for-fluent-bit # Fluent Bit v4.1.1 image: public.ecr.aws/aws-observability/aws-for-fluent-bit:3.0.0 imagePullPolicy: Always env: - name: AWS_REGION valueFrom: configMapKeyRef: name: fluent-bit-cluster-info key: logs.region - name: CLUSTER_NAME valueFrom: configMapKeyRef: name: fluent-bit-cluster-info key: cluster.name - name: HTTP_SERVER valueFrom: configMapKeyRef: name: fluent-bit-cluster-info key: http.server - name: HTTP_PORT valueFrom: configMapKeyRef: name: fluent-bit-cluster-info key: http.port - name: READ_FROM_HEAD valueFrom: configMapKeyRef: name: fluent-bit-cluster-info key: read.head - name: READ_FROM_TAIL valueFrom: configMapKeyRef: name: fluent-bit-cluster-info key: read.tail - name: HOST_NAME valueFrom: fieldRef: fieldPath: spec.nodeName - name: HOSTNAME valueFrom: fieldRef: apiVersion: v1 fieldPath: metadata.name - name: CI_VERSION value: "k8s/1.3.37" resources: limits: memory: 200Mi requests: cpu: 100m memory: 100Mi volumeMounts: # Please don't change below read-only permissions - name: fluentbitstate mountPath: /var/fluent-bit/state - name: varlog mountPath: /var/log readOnly: true - name: varlibdockercontainers mountPath: /var/lib/docker/containers readOnly: true - name: fluent-bit-config mountPath: /fluent-bit/etc/ - name: runlogjournal mountPath: /run/log/journal readOnly: true - name: dmesg mountPath: /var/log/dmesg readOnly: true terminationGracePeriodSeconds: 10 hostNetwork: true dnsPolicy: ClusterFirstWithHostNet volumes: - name: fluentbitstate hostPath: path: /var/fluent-bit/state - name: varlog hostPath: path: /var/log - name: varlibdockercontainers hostPath: path: /var/lib/docker/containers - name: fluent-bit-config configMap: name: fluent-bit-config - name: runlogjournal hostPath: path: /run/log/journal - name: dmesg hostPath: path: /var/log/dmesg serviceAccountName: fluent-bit nodeSelector: kubernetes.io/os: linux Python custom logger
""" minwook 2025-10 Format logger for kubernetes """ import logging import json import sys import atexit class PipelineLogger: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init() return cls._instance def _init(self): handler = logging.StreamHandler(sys.stdout) handler.setFormatter(_JsonFormatter()) self.logger = logging.getLogger("perception") # logger name "perception" self.logger.propagate = False self.logger.setLevel(logging.INFO) self.logger.addHandler(handler) self._context = {} # flush when exit atexit.register(self.flush) def flush(self): for handler in self.logger.handlers: try: handler.flush() except Exception: pass try: sys.stdout.flush() except Exception: pass def set_context(self, module: str, site: str, date: str, sync=None, cam_num=None): self._context = {"ai_module": module, "site": site, "date": date} if sync: self._context["sync"] = sync if cam_num: self._context["cam_num"] = cam_num def info(self, msg, *args, **kwargs): kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context} self.logger.info(msg, *args, **kwargs) def warning(self, msg, *args, **kwargs): kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context} self.logger.warning(msg, *args, **kwargs) def error(self, msg, *args, **kwargs): kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context} self.logger.error(msg, *args, **kwargs) def exception(self, msg, *args, exc_info=True, **kwargs): kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context} self.logger.exception(msg, *args, exc_info=exc_info, **kwargs) def critical(self, msg, *args, **kwargs): kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context} self.logger.critical(msg, *args, **kwargs) class _JsonFormatter(logging.Formatter): def format(self, record) -> str: log_data = { "level": record.levelname, "message": record.getMessage(), } for key, value in record.__dict__.items(): if key not in [ "name", "msg", "args", "created", "filename", "funcName", "levelname", "levelno", "lineno", "module", "msecs", "message", "pathname", "process", "processName", "relativeCreated", "thread", "threadName", "exc_info", "exc_text", "stack_info", ]: log_data[key] = value return json.dumps(log_data) usage
def graceful_shutdown(signum, frame): global messages, main_pid, logger, is_processing cur_pid = os.getpid() if cur_pid != main_pid: return if is_processing: logger.warning(f"Process is busy. But SIGTERM Occured from main process.") logger.info(f"Received signal: {signum}. Shutting down gracefully.") for msg in messages[message_idx:]: reset_visibility(msg) logger.flush() sys.exit(0) signal.signal(signal.SIGTERM, graceful_shutdown) ... def main(): global messages, message_idx, is_processing MaxNumberOfMessages = 3 VisibilityTimeout = 720 * MaxNumberOfMessages while True: try: # Receive a message from the SQS queue response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=MaxNumberOfMessages, WaitTimeSeconds=10, # Long polling VisibilityTimeout=VisibilityTimeout, ) messages = response.get("Messages", []) for message_idx, message in enumerate(messages): ... logger.set_context( module="pose", site=site, date=date, sync=sync_nnn, cam_num=cam_num, ) if __name__ == "__main__": logger = PipelineLogger() K8s annotation
apiVersion: keda.sh/v1alpha1 kind: ScaledJob metadata: name: ... spec: jobTargetRef: template: metadata: annotations: fluentbit/enabled: "true"

Top comments (0)