hw-kafka-client: Kafka bindings for Haskell

[ database, library, mit ] [ Propose Tags ] [ Report a vulnerability ]

Apache Kafka bindings backed by the librdkafka C library.

Features include:

  • Consumer groups: auto-rebalancing consumers

  • Keyed and keyless messages producing/consuming

  • Batch producing messages


[Skip to Readme]

Downloads

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees

Candidates

  • No Candidates
Versions [RSS] 1.0.0, 1.1.0, 1.1.1, 1.1.2, 1.1.3, 1.1.4, 2.0.0, 2.0.1, 2.0.2, 2.0.3, 2.0.4, 2.1.0, 2.1.1, 2.1.2, 2.1.3, 2.2.0, 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.4.0, 2.4.1, 2.4.3, 2.4.4, 2.5.0, 2.6.0, 2.6.1, 3.0.0, 3.1.0, 3.1.1, 3.1.2, 4.0.0, 4.0.1, 4.0.2, 4.0.3, 4.0.4, 5.0.0, 5.3.0 (info)
Dependencies base (>=4.6 && <5), bifunctors, bytestring, containers, hw-kafka-client, temporary, transformers, unix [details]
License MIT
Author Alexey Raga <alexey.raga@gmail.com>
Maintainer Alexey Raga <alexey.raga@gmail.com>
Category Database
Home page https://github.com/haskell-works/hw-kafka-client
Bug tracker https://github.com/haskell-works/hw-kafka-client/issues
Source repo head: git clone git://github.com/haskell-works/hw-kafka-client.git
Uploaded by alexeyraga at 2017-02-19T10:05:44Z
Distributions LTSHaskell:5.3.0, NixOS:5.3.0, Stackage:5.3.0
Reverse Dependencies 9 direct, 1 indirect [details]
Executables kafka-client-example
Downloads 22024 total (160 in the last 30 days)
Rating 2.25 (votes: 4) [estimated by Bayesian average]
Your Rating
  • λ
  • λ
  • λ
Status Docs uploaded by user
Build status unknown [no reports yet]

Readme for hw-kafka-client-1.1.1

[back to package description]

hw-kafka-client

CircleCI

Kafka bindings for Haskell backed by the librdkafka C module.

Credits

This project is inspired by Haskakafka which unfortunately doesn't seem to be actively maintained.

Ecosystem

HaskellWorks Kafka ecosystem is described here: https://github.com/haskell-works/hw-kafka

Consumer

High level consumers are supported by librdkafka starting from version 0.9.
High-level consumers provide an abstraction for consuming messages from multiple partitions and topics. They are also address scalability (up to a number of partitions) by providing automatic rebalancing functionality. When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer.

Example:

A working consumer example can be found here: ConsumerExample.hs

import Data.Monoid ((<>)) import Kafka import Kafka.Consumer -- Global consumer properties consumerProps :: ConsumerProperties consumerProps = consumerBrokersList [BrokerAddress "localhost:9092"] <> groupId (ConsumerGroupId "consumer_example_group") <> noAutoCommit <> consumerDebug [DebugAll] -- Subscription to topics consumerSub :: Subscription consumerSub = topics [TopicName "kafka-client-example-topic"] <> offsetReset Earliest -- Running an example runConsumerExample :: IO () runConsumerExample = do res <- runConsumer consumerProps consumerSub processMessages print res ------------------------------------------------------------------- processMessages :: KafkaConsumer -> IO (Either KafkaError ()) processMessages kafka = do mapM_ (\_ -> do msg1 <- pollMessage kafka (Timeout 1000) putStrLn $ "Message: " <> show msg1 err <- commitAllOffsets kafka OffsetCommit putStrLn $ "Offsets: " <> maybe "Committed." show err ) [0 .. 10] return $ Right () 

Producer

kafka-client producer supports sending messages to multiple topics. Target topic name is a part of each message that is to be sent by produceMessage.

A working producer example can be found here: ProducerExample.hs

Example

import Control.Monad (forM_) import Kafka import Kafka.Producer -- Global producer properties producerProps :: ProducerProperties producerProps = producerBrokersList [BrokerAddress "localhost:9092"] -- Topic to send messages to targetTopic :: TopicName targetTopic = TopicName "kafka-client-example-topic" -- Run an example runProducerExample :: IO () runProducerExample = do res <- runProducer producerProps sendMessages print res sendMessages :: KafkaProducer -> IO (Either KafkaError ()) sendMessages prod = do err1 <- produceMessage prod ProducerRecord { prTopic = targetTopic , prPartition = UnassignedPartition , prKey = Nothing , prValue = Just "test from producer" } forM_ err1 print err2 <- produceMessage prod ProducerRecord { prTopic = targetTopic , prPartition = UnassignedPartition , prKey = Just "key" , prValue = Just "test from producer (with key)" } forM_ err2 print return $ Right () 

Installation

Installing librdkafka

Although librdkafka is available on many platforms, most of the distribution packages are too old to support kafka-client. As such, we suggest you install from the source:

git clone https://github.com/edenhill/librdkafka cd librdkafka ./configure make && sudo make install 

Sometimes it is helpful to specify openssl includes explicitly:

LDFLAGS=-L/usr/local/opt/openssl/lib CPPFLAGS=-I/usr/local/opt/openssl/include ./configure 

Installing Kafka

The full Kafka guide is at http://kafka.apache.org/documentation.html#quickstart

Alternatively docker-compose can be used to run Kafka locally inside a Docker container. To run Kafka inside Docker:

$ docker-compose up