I am going to describe how to create new Kubernetes cluster and install Knative eventing, Kafka flavor, in it. I am actually going to create two Kafka clusters with mirroring enabled, to be able to perform some experiments later on.
I am also going to describe steps one can follow to ensure Knative scales well enough when messages volume increases. And I am going to point to the resources on how to install monitoring for such cluster.
Kubernetes cluster with Knative eventing should fit in Google Cloud trial quotas, but monitoring and scaling workload on top of that might not.
Cluster creation
Create new Kubernetes cluster, one zone, 4-6 nodes, node is Standard compute-optimized (c2-standard-4 at least), 100 Gb disk (best if pd-ssd, but can be pd-standard or pd-balanced). Trial quota is 4 nodes c2-standard-4.
Installing Kafka and Knative
Create namespace knative-eventing
.
Follow Strimzi quickstart to install kafka
in knative-eventing
namespace, but use different Kafka cluster definition, see below. Knative workloads are expecting to be run in knative-eventing
namespace, otherwise issues arise. And it's easier to keep Knative and Kafka in one namespace.
Use kafka-cluster.yaml as kafka cluster resource instead of the one used in Strimzi quickstart (kafka-single-persistent.yaml
). If you're not limited on disk, best to set storage: size: 50Gi
or 100Gb
in kafka-cluster yaml, and at least 25Gb for zookeeper storage. For trial quota, you're limited to 20Gb and 10Gb for zookeeper (if we're doing 2 Kafka clusters, if one - can be more).
Follow knative docs to install Knative eventing. Install all Kafka components too: Kafka sink, Kafka broker, Kafka event source. Use this publication to configure broker config to be Kafka broker class
(replication: 1).
Also make sure to install Kafka source. kafka-source-dispatcher will have 0 pods until some Kafka sources are created.
Autoscaling Knative
For trial quota GCP, you'll likely won't have space for Keda controller or upscaled Knative workloads. Otherwise,
Follow this blog to configure HA for Knative workloads. I would set HA to 6 though, and keep an eye on memory/CPU consumption of the workloads in case you're got significant events traffic going through the system. Otherwise there's going to be slowdown in events delivery.
Install scaling controller for Kafka sources - Keda autoscaler. HPA parameters are controlled by annotations on the Kafka source yaml definition:
metadata: annotations: autoscaling.knative.dev/class: keda.autoscaling.knative.dev autoscaling.knative.dev/minScale: "0" autoscaling.knative.dev/maxScale: "5" keda.autoscaling.knative.dev/pollingInterval: "30" keda.autoscaling.knative.dev/cooldownPeriod: "30" keda.autoscaling.knative.dev/kafkaLagThreshold: "10"
Kafka of course has it's own parallelism mechanism - creating more brokers, which enables higher partitions amount for a given topic.
Monitoring Knative and Kafka
Follow this publication to setup Prometeus monitoring for Kafka cluster. DataDog has a nice description of what those metrics mean.
Knative has a tutorial on how to setup monitoring. However I ended up creating Service
and ServiceMonitor
by hand for Knative workloads to be able to monitor them.
Here's example Service
and ServiceMon
for kafka-sink-receiver
:
apiVersion: v1 kind: Service metadata: name: knative-sink-service labels: app: knative-sink-service spec: selector: app: kafka-sink-receiver ports: - name: http-metrics protocol: TCP port: 9090 target-port: http-metrics --- apiVersion: monitoring.coreos.com/v1 kind: ServiceMonitor metadata: name: knative-sink-service-monitor labels: app: knative-sink-service-mon spec: selector: matchLabels: app: knative-sink-service endpoints: - port: http-metrics
Knative exposes a couple of it's own metrics (like processing delays) and also exposes a huge amount of Kafka metrics for it's consumers/producers. I ended up curl-ing Knative Services on the metrics port, and scripting a tool that would help to create primitive Grafana dashboard for the list of metric names and uid of datasource. See readme on how to use the tool. Or can replace datasource uid in the dashboard-*.json
with your datasource uid, and make sure job
selectors in the dashboard JSON match the service name that sends metrics.
Knative dashboards together with Kafka's dashboards it sheds light on almost any aspect of what's going on in the system.
More tuning
Some useful production-grade considerations for Knative could be found here
Knative exposes consumer and producer configs for brokers and other workloads as configmap
. I had more luck with setting
auto.offset.reset=latest enable.auto.commit=true commit interval to be about 1.5 seconds, heartbeat interval/2
for Knative sink-receiver config.
More on Kafka consumer and producer tuning
https://strimzi.io/blog/2021/01/07/consumer-tuning/
https://strimzi.io/blog/2020/10/15/producer-tuning/
Make sure it works
You can create a Kafka topic which messages are transferred to another topic using Knative machinery:
input-topic -> knative source -> knative broker -> knative trigger (opt: filter by message headers) -> knative sink -> output-topic
Example definitions to use are below. Apply topics and broker, make sure they've got status Ready (kubectl get kafkatopic -n knative-eventing
, kubectl get broker -n knative-eventing
). Then apply sink and source, also make sure they're ready. Last apply trigger.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: input-topic namespace: knative-eventing labels: strimzi.io/cluster: my-cluster spec: partitions: 1 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824 --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: output-topic namespace: knative-eventing labels: strimzi.io/cluster: my-cluster spec: partitions: 1 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824 --- apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: my-broker namespace: knative-eventing annotations: eventing.knative.dev/broker.class: Kafka spec: {} --- apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: input-topic-source namespace: knative-eventing # keda autoscaler annotations here if using keda # see Autoscaling section of blog, above spec: consumerGroup: input-topic-source-group bootstrapServers: - my-cluster-kafka-bootstrap.knative-eventing:9092 topics: - input-topic sink: ref: apiVersion: eventing.knative.dev/v1 kind: Broker name: my-broker --- apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink metadata: name: output-topic-sink namespace: knative-eventing spec: topic: output-topic bootstrapServers: - my-cluster-kafka-bootstrap.knative-eventing:9092 --- apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: output-trigger namespace: knative-eventing spec: broker: my-broker # can define a filter for messages based on header, input Kafka headers get `kafkaheader` prefix. So if message was sent on `input-topic` with header `Ce-my-header: my-value`, it's filter here will be `kafkaheadercemyheader: my-value` # filter: # attributes: # kafkaheadercemyheader: my-value subscriber: ref: apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink name: output-topic-sink
Here's primitive Python web app that simply logs message upon arrival. Can use echo app as destination sink instead of second topic. Deployment for web app echo should be in namespace knative-eventing
, and expose ClusterIP
type Service
that maps port 80 map to 8083. If you're not familiar with how to create deployment and service for it, use k8s docs or use Google Console "new deployment button" (gotta upload image to dockerhub or another artifact registry first though).
Let's send some messages.
Launch listener for output-topic:
kubectl -n knative-eventing run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.37.0-kafka-3.5.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic output-topic --from-beginning --property print.headers=true
In other tab, launch client for input-topic:
kubectl -n knative-eventing run kafka-producer -ti --image=quay.io/strimzi/kafka:0.37.0-kafka-3.5.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic input-topic --property parse.headers=true --property headers.delimiter=\t --property headers.separator=, --property headers.key.separator=:
And post following payload to input-topic:
Ce-my-header:my-value\t{"msg":"content"}
The same message should arrive to output-topic, with original headers having kafkaheader prefix:
ce_specversion:1.0,ce_id:...,ce_source:...,content-type:application/json; charset=utf-8,kafkaheadercemyheader:my-value {"msg":"content"}
Top comments (0)