A Clojure library designed to provide a simple interface to consume from Kafka for debugging purposes. For example, you can quickly retrieve records for the queries like
- Give me latest records from partition 1
- Give me records for all partitions from 15 minutes ago
- Give me records for partition 1 starting from offset 23434
- Give me records for partition 1 from 45 minutes ago
- Compute the partition for a given key
The library usesclojure.core.asyncto communicate back the kafka records.
Add the following to the dependencies section of project.clj
[io.github.ashwinbhaskar/kafka-util "0.1.2"](:require [kafka-util.core :as ku] [clojure.core.async :refer [thread chan >!! <!! close! timeout]]) (def consumer-settings {:broker "localhost" :port 9092 :security-protocol "PLAIN_TEXT" :decode-value-as-json true :key-deserializer :string}) (defn process-records [records] (->> records (run! (fn [{:keys [value partition offset topic headers]}] (println value))))) (comment (let [topic "my-topic" channel (timeout 1000000) minutes-ago 60 partition 2 offset 564646 total-partitions 8] (thread (ku/consume-records-latest consumer-settings topic channel) (ku/consume-records-latest consumer-settings topic channel partition) (ku/consume-records-minutes-ago consumer-settings topic channel minutes-ago) (ku/consume-records-minutes-ago consumer-settings topic channel minutes-ago partition) (ku/consume-records-offset consumer-settings topic channel partition offset) (ku/compute-partition (.getBytes "my-key") total-partitions)) (loop [] (if-let [records (<!! channel)] (do (process-records records) (recur)) (println "Channel is closed!")))))