Skip to content

A Clojure library designed to provide a simple interface to consume from Kafka for debugging purposes

License

Notifications You must be signed in to change notification settings

ashwinbhaskar/kafka-util

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

kafka-util

A Clojure library designed to provide a simple interface to consume from Kafka for debugging purposes. For example, you can quickly retrieve records for the queries like

  • Give me latest records from partition 1
  • Give me records for all partitions from 15 minutes ago
  • Give me records for partition 1 starting from offset 23434
  • Give me records for partition 1 from 45 minutes ago
  • Compute the partition for a given key

    The library uses clojure.core.async to communicate back the kafka records.

Add Dependency

Add the following to the dependencies section of project.clj

[io.github.ashwinbhaskar/kafka-util "0.1.2"]

Usage

(:require [kafka-util.core :as ku] [clojure.core.async :refer [thread chan >!! <!! close! timeout]]) (def consumer-settings {:broker "localhost" :port 9092 :security-protocol "PLAIN_TEXT" :decode-value-as-json true :key-deserializer :string}) (defn process-records [records] (->> records (run! (fn [{:keys [value partition offset topic headers]}] (println value))))) (comment (let [topic "my-topic" channel (timeout 1000000) minutes-ago 60 partition 2 offset 564646 total-partitions 8] (thread (ku/consume-records-latest consumer-settings topic channel) (ku/consume-records-latest consumer-settings topic channel partition) (ku/consume-records-minutes-ago consumer-settings topic channel minutes-ago) (ku/consume-records-minutes-ago consumer-settings topic channel minutes-ago partition) (ku/consume-records-offset consumer-settings topic channel partition offset) (ku/compute-partition (.getBytes "my-key") total-partitions)) (loop [] (if-let [records (<!! channel)] (do (process-records records) (recur)) (println "Channel is closed!"))))) 

About

A Clojure library designed to provide a simple interface to consume from Kafka for debugging purposes

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published