Copyright | (c) Tim Watson 2013 - 2014 |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | Tim Watson <[email protected]> |
Stability | experimental |
Portability | non-portable (requires concurrency) |
Safe Haskell | None |
Language | Haskell2010 |
Control.Distributed.Process.Execution
Description
- Inter-Process Traffic Management
The Execution Framework provides tools for load regulation, workload shedding and remote hand-off. The currently implementation provides only a subset of the plumbing required, comprising tools for event management, mailbox buffering and message routing.
Synopsis
- __remoteTable :: RemoteTable -> RemoteTable
- acceptEverything :: Closure (Message -> Process FilterResult)
- acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult)
- active :: Mailbox -> Filter -> Process ()
- createMailbox :: BufferType -> Limit -> Process Mailbox
- deliver :: Mailbox -> Process ()
- monitor :: Mailbox -> Process MonitorRef
- notify :: Mailbox -> Process ()
- resize :: Mailbox -> Integer -> Process ()
- startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox
- startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox
- statistics :: Mailbox -> Process MailboxStats
- data BufferType
- data Delivery = Delivery {}
- data FilterResult
- type Limit = Integer
- data Mailbox
- data MailboxStats = MailboxStats {}
- data NewMail = NewMail !Mailbox !Integer
- bindToBroadcaster :: Exchange -> Process ()
- broadcastClient :: Exchange -> Process (InputStream Message)
- broadcastExchange :: Process Exchange
- broadcastExchangeT :: Process BroadcastExchange
- applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a
- configureExchange :: Serializable m => Exchange -> m -> Process ()
- createMessage :: Serializable m => String -> [(String, String)] -> m -> Message
- post :: Serializable a => Exchange -> a -> Process ()
- postMessage :: Exchange -> Message -> Process ()
- runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process ()
- startExchange :: ExchangeType s -> Process Exchange
- startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message)
- bindHeader :: HeaderName -> String -> Exchange -> Process ()
- bindKey :: String -> Exchange -> Process ()
- headerContentRouter :: RelayType -> HeaderName -> Process Exchange
- messageKeyRouter :: RelayType -> Process Exchange
- route :: Serializable m => Exchange -> m -> Process ()
- routeMessage :: Exchange -> Message -> Process ()
- router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange
- supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange
- type BroadcastExchange = ExchangeType BroadcastEx
- data Exchange
- data ExchangeType s = ExchangeType {}
- data Message = Message {}
- class (Hashable k, Eq k, Serializable k) => Bindable k
- data Binding
- = BindKey {
- bindingKey :: !String
- | BindHeader {
- bindingKey :: !String
- headerName :: !HeaderName
- | BindNone
- = BindKey {
- type BindingSelector k = Message -> Process k
- type HeaderName = String
- data RelayType
Mailbox Buffering
acceptEverything :: Closure (Message -> Process FilterResult) Source #
A do-nothing filter that accepts all messages (i.e., returns Keep
for any input).
acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) Source #
A filter that takes a Closure (Message -> Process FilterResult)
holding
the filter function and applies it remotely (i.e., in the mailbox's own
managed process).
active :: Mailbox -> Filter -> Process () Source #
Instructs the mailbox to send a Delivery
as soon as any mail is
available, or immediately (if the buffer already contains data).
NB: signals are only delivered to the mailbox's owning process.
createMailbox :: BufferType -> Limit -> Process Mailbox Source #
Start a mailbox for the calling process.
create = getSelfPid >>= start
deliver :: Mailbox -> Process () Source #
Instructs the mailbox to deliver all pending messages to the owner.
resize :: Mailbox -> Integer -> Process () Source #
Alters the mailbox's limit - this might cause messages to be dropped!
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox Source #
Start a mailbox for the supplied ProcessId
.
start = spawnLocal $ run
startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox Source #
As startMailbox
, but suitable for use in supervisor child specs.
statistics :: Mailbox -> Process MailboxStats Source #
Obtain statistics (from/to anywhere) about a mailbox.
data BufferType Source #
Describes the different types of buffer.
Constructors
Queue | FIFO buffer, limiter drops the eldest message (queue head) |
Stack | unordered buffer, limiter drops the newest (top) message |
Ring | FIFO buffer, limiter refuses (i.e., drops) new messages |
Instances
Show BufferType Source # | |
Defined in Control.Distributed.Process.Execution.Mailbox Methods showsPrec :: Int -> BufferType -> ShowS # show :: BufferType -> String # showList :: [BufferType] -> ShowS # | |
Eq BufferType Source # | |
Mail delivery.
Constructors
Delivery | |
Instances
data FilterResult Source #
Instances
Binary FilterResult Source # | |||||
Generic FilterResult Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
type Rep FilterResult Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep FilterResult = D1 ('MetaData "FilterResult" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-GHvg2ng8MkwGCKwzWo95YX" 'False) (C1 ('MetaCons "Keep" 'PrefixI 'False) (U1 :: Type -> Type) :+: (C1 ('MetaCons "Skip" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Send" 'PrefixI 'False) (U1 :: Type -> Type))) |
Represents the maximum number of messages the internal buffer can hold.
Opaque handle to a mailbox.
Instances
Binary Mailbox Source # | |
Addressable Mailbox Source # | |
Linkable Mailbox Source # | |
Resolvable Mailbox Source # | |
Routable Mailbox Source # | |
Defined in Control.Distributed.Process.Execution.Mailbox Methods sendTo :: (Serializable m, Resolvable Mailbox) => Mailbox -> m -> Process () # unsafeSendTo :: (NFSerializable m, Resolvable Mailbox) => Mailbox -> m -> Process () # | |
Generic Mailbox Source # | |
Show Mailbox Source # | |
Eq Mailbox Source # | |
type Rep Mailbox Source # | |
data MailboxStats Source #
Bundle of statistics data, available on request via
the mailboxStats
API call.
Constructors
MailboxStats | |
Fields |
Instances
Binary MailboxStats Source # | |||||
Generic MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
Show MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Methods showsPrec :: Int -> MailboxStats -> ShowS # show :: MailboxStats -> String # showList :: [MailboxStats] -> ShowS # | |||||
type Rep MailboxStats Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep MailboxStats = D1 ('MetaData "MailboxStats" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-GHvg2ng8MkwGCKwzWo95YX" 'False) (C1 ('MetaCons "MailboxStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "pendingMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer) :*: S1 ('MetaSel ('Just "droppedMessages") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Integer)) :*: (S1 ('MetaSel ('Just "currentLimit") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Limit) :*: S1 ('MetaSel ('Just "owningProcess") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 ProcessId)))) |
Marker message indicating to the owning process that mail has arrived.
Instances
Binary NewMail Source # | |||||
Generic NewMail Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox Associated Types
| |||||
Show NewMail Source # | |||||
type Rep NewMail Source # | |||||
Defined in Control.Distributed.Process.Execution.Mailbox type Rep NewMail = D1 ('MetaData "NewMail" "Control.Distributed.Process.Execution.Mailbox" "distributed-process-execution-0.1.5.0-GHvg2ng8MkwGCKwzWo95YX" 'False) (C1 ('MetaCons "NewMail" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Mailbox) :*: S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Integer))) |
Message Exchanges
bindToBroadcaster :: Exchange -> Process () Source #
broadcastClient :: Exchange -> Process (InputStream Message) Source #
Create a binding to the given broadcast exchange for the calling process
and return an InputStream
that can be used in the expect
and
receiveWait
family of messaging primitives. This form of client interaction
helps avoid cluttering the caller's mailbox with Message
data, since the
InputChannel
provides a separate input stream (in a similar fashion to
a typed channel).
Example:
is <- broadcastClient ex msg <- receiveWait [ matchInputStream is ] handleMessage (payload msg)
broadcastExchange :: Process Exchange Source #
Start a new broadcast exchange and return a handle to the exchange.
broadcastExchangeT :: Process BroadcastExchange Source #
The ExchangeType
of a broadcast exchange. Can be combined with the
startSupervisedRef
and startSupervised
APIs.
applyHandlers :: a -> Message -> [Message -> Process (Maybe a)] -> Process a Source #
Utility for custom exchange type authors - evaluates a set of primitive
message handlers from left to right, returning the first which evaluates
to Just a
, or the initial e
value if all the handlers yield Nothing
.
configureExchange :: Serializable m => Exchange -> m -> Process () Source #
Sends an arbitrary Serializable
datum to an exchange, for use as a
configuration change - see configureEx
for details.
createMessage :: Serializable m => String -> [(String, String)] -> m -> Message Source #
post :: Serializable a => Exchange -> a -> Process () Source #
Posts an arbitrary Serializable
datum to an exchange. The raw datum is
wrapped in the Message
data type, with its key
set to ""
and its
headers
to []
.
runExchange :: ExchangeType s -> MVar (ControlPort ControlMessage) -> Process () Source #
startExchange :: ExchangeType s -> Process Exchange Source #
Starts an exchange process with the given ExchangeType
.
startSupervisedRef :: ExchangeType s -> SupervisorPid -> Process (ProcessId, Message) Source #
Starts an exchange as part of a supervision tree.
Example: > childSpec = toChildStart $ startSupervisedRef exType
bindHeader :: HeaderName -> String -> Exchange -> Process () Source #
Add a binding (for the calling process) to a headerContentRouter
exchange.
bindKey :: String -> Exchange -> Process () Source #
Add a binding (for the calling process) to a messageKeyRouter
exchange.
headerContentRouter :: RelayType -> HeaderName -> Process Exchange Source #
A router that matches on a specific (named) header. To bind a client
Process
to such an exchange, use the bindHeader
function.
route :: Serializable m => Exchange -> m -> Process () Source #
Send a Serializable
message to the supplied Exchange
. The given datum
will be converted to a Message
, with the key
set to ""
and the
headers
to []
.
The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
routeMessage :: Exchange -> Message -> Process () Source #
Send a Message
to the supplied Exchange
.
The routing behaviour will be dependent on the choice of BindingSelector
given when initialising the router.
router :: Bindable k => RelayType -> BindingSelector k -> Process Exchange Source #
Defines a router exchange. The BindingSelector
is used to construct
a binding (i.e., an instance of the Bindable
type k
) for each incoming
Message
. Such bindings are matched against bindings stored in the exchange.
Clients of a router exchange are identified by a binding, mapped to
one or more ProcessId
s.
The format of the bindings, nature of their storage and mechanism for
submitting new bindings is implementation dependent (i.e., will vary by
exchange type). For example, the messageKeyRouter
and headerContentRouter
implementations both use the Binding
data type, which can represent a
Message
key or a HeaderName
and content. As with all custom exchange
types, bindings should be submitted by evaluating configureExchange
with
a suitable data type.
supervisedRouter :: Bindable k => RelayType -> BindingSelector k -> SupervisorPid -> Process Exchange Source #
Defines a router that can be used in a supervision tree.
type BroadcastExchange = ExchangeType BroadcastEx Source #
Opaque handle to an exchange.
Instances
Binary Exchange Source # | |
Linkable Exchange Source # | |
Resolvable Exchange Source # | |
Generic Exchange Source # | |
Show Exchange Source # | |
Eq Exchange Source # | |
type Rep Exchange Source # | |
data ExchangeType s Source #
Different exchange types are defined using record syntax.
The configureEx
and routeEx
API functions are called during the exchange
lifecycle when incoming traffic arrives. Configuration messages are
completely arbitrary types and the exchange type author is entirely
responsible for decoding them. Messages posted to the exchange (see the
Message
data type) are passed to the routeEx
API function along with the
exchange type's own internal state. Both API functions return a new
(potentially updated) state and run in the Process
monad.
Constructors
ExchangeType | |
Messages sent to an exchange can optionally provide a routing key and a list of (key, value) headers in addition to the underlying payload.
Constructors
Message | |
Instances
Binary Message Source # | |||||
NFData Message Source # | |||||
Generic Message Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Internal Associated Types
| |||||
Show Message Source # | |||||
type Rep Message Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Internal type Rep Message = D1 ('MetaData "Message" "Control.Distributed.Process.Execution.Exchange.Internal" "distributed-process-execution-0.1.5.0-GHvg2ng8MkwGCKwzWo95YX" 'False) (C1 ('MetaCons "Message" 'PrefixI 'True) (S1 ('MetaSel ('Just "key") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: (S1 ('MetaSel ('Just "headers") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 [(String, String)]) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Message)))) |
class (Hashable k, Eq k, Serializable k) => Bindable k Source #
Things that can be used as binding keys in a router.
The binding key used by the built-in key and header based routers.
Constructors
BindKey | |
Fields
| |
BindHeader | |
Fields
| |
BindNone |
Instances
Binary Binding Source # | |||||
NFData Binding Source # | |||||
Generic Binding Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Router Associated Types
| |||||
Show Binding Source # | |||||
Eq Binding Source # | |||||
Hashable Binding Source # | |||||
type Rep Binding Source # | |||||
Defined in Control.Distributed.Process.Execution.Exchange.Router type Rep Binding = D1 ('MetaData "Binding" "Control.Distributed.Process.Execution.Exchange.Router" "distributed-process-execution-0.1.5.0-GHvg2ng8MkwGCKwzWo95YX" 'False) (C1 ('MetaCons "BindKey" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String)) :+: (C1 ('MetaCons "BindHeader" 'PrefixI 'True) (S1 ('MetaSel ('Just "bindingKey") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 String) :*: S1 ('MetaSel ('Just "headerName") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 HeaderName)) :+: C1 ('MetaCons "BindNone" 'PrefixI 'False) (U1 :: Type -> Type))) |
type BindingSelector k = Message -> Process k Source #
type HeaderName = String Source #
Given to a router to indicate whether clients should
receive Message
payloads only, or the whole Message
object
itself.
Constructors
PayloadOnly | |
WholeMessage |