amqp-worker: Type-safe AMQP workers

[ bsd3, library, network, program ] [ Propose Tags ] [ Report a vulnerability ]
Versions [RSS] 0.2.0, 0.2.1, 0.2.2, 0.2.3, 0.2.4, 0.2.5, 0.3.2, 0.4.0, 1.0.0, 2.0.0, 2.0.1
Change log CHANGELOG.md
Dependencies aeson (>=2.0 && <2.3), amqp (>=0.20 && <1), amqp-worker, base (>=4.9 && <5), bytestring (>=0.11 && <0.13), data-default (>=0.7 && <0.9), exceptions (>=0.10 && <0.11), monad-loops (>=0.4 && <0.5), mtl (>=2.2 && <2.4), resource-pool (>=0.4 && <0.5), text (>=2 && <3) [details]
License BSD-3-Clause
Copyright Orbital Labs
Author Sean Hess
Maintainer seanhess@gmail.com
Category Network
Home page https://github.com/seanhess/amqp-worker#readme
Bug tracker https://github.com/seanhess/amqp-worker/issues
Source repo head: git clone https://github.com/seanhess/amqp-worker
Uploaded by seanhess at 2025-05-21T17:31:41Z
Distributions
Executables example
Downloads 4982 total (5 in the last 30 days)
Rating (no votes yet) [estimated by Bayesian average]
Your Rating
  • λ
  • λ
  • λ
Status Docs uploaded by user
Build status unknown [no reports yet]

Readme for amqp-worker-2.0.1

[back to package description]

AMQP Worker

Type-safe AMQP workers. Compatible with RabbitMQ

{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE OverloadedStrings #-} module Main where import Control.Concurrent (forkIO) import Control.Monad.Catch (SomeException) import Data.Aeson (FromJSON, ToJSON) import Data.Function ((&)) import Data.Text (Text) import GHC.Generics (Generic) import Network.AMQP.Worker import qualified Network.AMQP.Worker as Worker import System.IO (BufferMode (..), hSetBuffering, stderr, stdout) newtype Greeting = Greeting {message :: Text} deriving (Generic, Show, Eq) instance FromJSON Greeting instance ToJSON Greeting newGreetings :: Key Bind Greeting newGreetings = key "greetings" & word "new" anyGreetings :: Key Bind Greeting anyGreetings = key "greetings" & any1 example :: IO () example = do conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") simple conn publishing :: Connection -> IO () publishing conn = do Worker.publish conn newGreetings $ Greeting "Hello" -- | Create a queue to process messages simple :: Connection -> IO () simple conn = do -- create a queue to receive them q <- Worker.queue conn def newGreetings -- publish a message (delivered to queue) Worker.publish conn newGreetings $ Greeting "Hello" -- We cannot publish to anyGreetings because it is a binding key (with wildcards in it) -- Worker.publish conn anyGreetings $ TestMessage "Compiler Error" -- Loop and print any values received Worker.worker conn def q onError (print . value) -- | Multiple queues with distinct names will each get copies of published messages multiple :: Connection -> IO () multiple conn = do -- create two separate queues one <- Worker.queue conn "one" newGreetings two <- Worker.queue conn "two" newGreetings -- publish a message (delivered to both) Worker.publish conn newGreetings $ Greeting "Hello" -- Each of these workers will receive the same message _ <- forkIO $ Worker.worker conn def one onError $ \m -> putStrLn "one" >> print (value m) _ <- forkIO $ Worker.worker conn def two onError $ \m -> putStrLn "two" >> print (value m) putStrLn "Press any key to exit" _ <- getLine return () -- | Create multiple workers on the same queue to load balance between them balance :: Connection -> IO () balance conn = do -- create a single queue q <- Worker.queue conn def newGreetings -- publish two messages Worker.publish conn newGreetings $ Greeting "Hello1" Worker.publish conn newGreetings $ Greeting "Hello2" -- Each worker will receive one of the messages _ <- forkIO $ Worker.worker conn def q onError $ \m -> putStrLn "one" >> print (value m) _ <- forkIO $ Worker.worker conn def q onError $ \m -> putStrLn "two" >> print (value m) putStrLn "Press any key to exit" _ <- getLine return () -- | You can bind to messages dynamically with wildcards in Binding Keys dynamic :: Connection -> IO () dynamic conn = do -- \| anyGreetings matches `greetings.*` q <- Worker.queue conn def anyGreetings -- You can only publish to a Routing Key. Publishing to anyGreetings will give a compile error Worker.publish conn newGreetings $ Greeting "Hello" -- This queue listens for anything under `greetings.` Worker.worker conn def q onError $ \m -> putStrLn "Got: " >> print (value m) onError :: WorkerException SomeException -> IO () onError e = do putStrLn "Do something with errors" print e test :: (Connection -> IO ()) -> IO () test action = do hSetBuffering stdout LineBuffering hSetBuffering stderr LineBuffering conn <- Worker.connect (fromURI "amqp://guest:guest@localhost:5672") action conn main :: IO () main = example