Copyright | (c) Tim Watson 2012 - 2014 |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | Tim Watson <watson.timothy@gmail.com> |
Stability | experimental |
Portability | non-portable (requires concurrency) |
Safe Haskell | None |
Language | Haskell98 |
Control.Distributed.Process.Execution.Exchange
Contents
Description
- Message Exchanges
The concept of a message exchange is borrowed from the world of messaging and enterprise integration. The exchange acts like a kind of mailbox, accepting inputs from producers and forwarding these messages to one or more consumers, depending on the implementation's semantics.
This module provides some basic types of message exchange and exposes an API for defining your own custom exchange types.
- Broadcast Exchanges
The broadcast exchange type, started via broadcastExchange
, forward their inputs to all registered consumers (as the name suggests). This exchange type is highly optimised for local (intra-node) traffic and provides two different kinds of client binding, one which causes messages to be delivered directly to the client's mailbox (viz bindToBroadcaster
), the other providing a separate stream of messages that can be obtained using the expect
and receiveX
family of messaging primitives (and thus composed with other forms of input selection, such as typed channels and selective reads on the process mailbox).
Important: When a ProcessId
is registered via bindToBroadcaster
, only the payload of the Message
(i.e., the underlying Serializable
datum) is forwarded to the consumer, not the whole Message
itself.
- Router Exchanges
The router API provides a means to selectively route messages to one or more clients, depending on the content of the Message
. Two modes of binding (and client selection) are provided out of the box, one of which matches the message key
, the second of which matches on a name and value from the headers
. Alternative mechanisms for content based routing can be derived by modifying the BindingSelector
expression passed to router
See messageKeyRouter
and headerContentRouter
for the built-in routing exchanges, and router
for the extensible routing API.
- Custom Exchange Types
Both the broadcast and router exchanges are implemented as custom exchange types. The mechanism for defining custom exchange behaviours such as these is very simple. Raw exchanges are started by evaluating startExchange
with a specific ExchangeType
record. This type is parameterised by the internal state it holds, and defines two API callbacks in its configureEx
and routeEx
fields. The former is evaluated whenever a client process evaluates configureExchange
, the latter whenever a client evaluates post
or postMessage
. The configureEx
callback takes a raw Message
(from Control.Distributed.Process) and is responsible for decoding the message and updating its own state (if required). It is via this callback that custom exchange types can receive information about clients and handle it in their own way. The routeEx
callback is evaluated with the exchange type's own internal state and the Message
originally sent to the exchange process (via post
) and is responsible for delivering the message to its clients in whatever way makes sense for that exchange type.
- data Exchange
- data Message = Message {}
- startExchange :: forall s. ExchangeType s -> Process Exchange
- startSupervised :: forall s. ExchangeType s -> SupervisorPid -> Process Exchange
- startSupervisedRef :: forall s. ExchangeType s -> SupervisorPid -> Process (ProcessId, Message)
- runExchange :: forall s. ExchangeType s -> MVar (ControlPort ControlMessage) -> Process ()
- post :: Serializable a => Exchange -> a -> Process ()
- postMessage :: Exchange -> Message -> Process ()
- configureExchange :: Serializable m => Exchange -> m -> Process ()
- createMessage :: Serializable m => String -> [(String, String)] -> m -> Message
- broadcastExchange :: Process Exchange
- broadcastExchangeT :: Process BroadcastExchange
- broadcastClient :: Exchange -> Process (InputStream Message)
- bindToBroadcaster :: Exchange -> Process ()
- type BroadcastExchange = ExchangeType BroadcastEx
- type HeaderName = String
- data Binding
- = BindKey {
- bindingKey :: !String
- | BindHeader {
- bindingKey :: !String
- headerName :: !HeaderName
- | BindNone
- = BindKey {
- class (Hashable k, Eq k, Serializable k) => Bindable k
- type BindingSelector k = Message -> Process k
- data RelayType
- router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange
- supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange
- route :: Serializable m => Exchange -> m -> Process ()
- routeMessage :: Exchange -> Message -> Process ()
- messageKeyRouter :: RelayType -> Process Exchange
- bindKey :: String -> Exchange -> Process ()
- headerContentRouter :: RelayType -> HeaderName -> Process Exchange
- bindHeader :: HeaderName -> String -> Exchange -> Process ()
- data ExchangeType s = ExchangeType {}
- applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a
Fundamental API
Opaque handle to an exchange.
Messages sent to an exchange can optionally provide a routing key and a list of (key, value) headers in addition to the underlying payload.
Constructors
Message | |
Starting/Running an Exchange
startExchange :: forall s. ExchangeType s -> Process Exchange Source
Starts an exchange process with the given ExchangeType
.
startSupervised :: forall s. ExchangeType s -> SupervisorPid -> Process Exchange Source
Starts an exchange as part of a supervision tree.
Example: > childSpec = toChildStart $ startSupervised exType
startSupervisedRef :: forall s. ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) Source
Starts an exchange as part of a supervision tree.
Example: > childSpec = toChildStart $ startSupervisedRef exType
runExchange :: forall s. ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () Source
Client Facing API
post :: Serializable a => Exchange -> a -> Process () Source
Posts an arbitrary Serializable
datum to an exchange. The raw datum is wrapped in the Message
data type, with its key
set to ""
and its headers
to []
.
configureExchange :: Serializable m => Exchange -> m -> Process () Source
Sends an arbitrary Serializable
datum to an exchange, for use as a configuration change - see configureEx
for details.
createMessage :: Serializable m => String -> [(String, String)] -> m -> Message Source
Broadcast Exchange
broadcastExchange :: Process Exchange Source
Start a new broadcast exchange and return a handle to the exchange.
broadcastExchangeT :: Process BroadcastExchange Source
The ExchangeType
of a broadcast exchange. Can be combined with the startSupervisedRef
and startSupervised
APIs.
broadcastClient :: Exchange -> Process (InputStream Message) Source
Create a binding to the given broadcast exchange for the calling process and return an InputStream
that can be used in the expect
and receiveWait
family of messaging primitives. This form of client interaction helps avoid cluttering the caller's mailbox with Message
data, since the InputChannel
provides a separate input stream (in a similar fashion to a typed channel). Example:
is <- broadcastClient ex msg <- receiveWait [ matchInputStream is ] handleMessage (payload msg)
bindToBroadcaster :: Exchange -> Process () Source
type BroadcastExchange = ExchangeType BroadcastEx Source
Routing (Content Based)
type HeaderName = String Source
The binding key used by the built-in key and header based routers.
Constructors
BindKey | |
Fields
| |
BindHeader | |
Fields
| |
BindNone |
class (Hashable k, Eq k, Serializable k) => Bindable k Source
Things that can be used as binding keys in a router.
Instances
(Hashable k, Eq k, Serializable k) => Bindable k Source |
type BindingSelector k = Message -> Process k Source
Given to a router to indicate whether clients should receive Message
payloads only, or the whole Message
object itself.
Constructors
PayloadOnly | |
WholeMessage |
Starting a Router
router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange Source
Defines a router exchange. The BindingSelector
is used to construct a binding (i.e., an instance of the Bindable
type k
) for each incoming Message
. Such bindings are matched against bindings stored in the exchange. Clients of a router exchange are identified by a binding, mapped to one or more ProcessId
s.
The format of the bindings, nature of their storage and mechanism for submitting new bindings is implementation dependent (i.e., will vary by exchange type). For example, the messageKeyRouter
and headerContentRouter
implementations both use the Binding
data type, which can represent a Message
key or a HeaderName
and content. As with all custom exchange types, bindings should be submitted by evaluating configureExchange
with a suitable data type.
supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange Source
Defines a router that can be used in a supervision tree.
Routing (Publishing) API
route :: Serializable m => Exchange -> m -> Process () Source
Send a Serializable
message to the supplied Exchange
. The given datum will be converted to a Message
, with the key
set to ""
and the headers
to []
.
The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
routeMessage :: Exchange -> Message -> Process () Source
Send a Message
to the supplied Exchange
. The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
Routing via message/binding keys
bindKey :: String -> Exchange -> Process () Source
Add a binding (for the calling process) to a messageKeyRouter
exchange.
Routing via message headers
headerContentRouter :: RelayType -> HeaderName -> Process Exchange Source
A router that matches on a specific (named) header. To bind a client Process
to such an exchange, use the bindHeader
function.
bindHeader :: HeaderName -> String -> Exchange -> Process () Source
Add a binding (for the calling process) to a headerContentRouter
exchange.
Defining Custom Exchange Types
data ExchangeType s Source
Different exchange types are defined using record syntax. The configureEx
and routeEx
API functions are called during the exchange lifecycle when incoming traffic arrives. Configuration messages are completely arbitrary types and the exchange type author is entirely responsible for decoding them. Messages posted to the exchange (see the Message
data type) are passed to the routeEx
API function along with the exchange type's own internal state. Both API functions return a new (potentially updated) state and run in the Process
monad.
Constructors
ExchangeType | |