Control.Pipeline
Description
Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a promise (future) response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipeline (and get promises back); it will pipeline each thread's request right away without waiting.
A pipeline closes itself when a read or write causes an error, so you can detect a broken pipeline by checking isClosed. It also closes itself when garbage collected, or you can close it explicitly.
- data Pipeline handle bytes
- newPipeline :: (Stream h b, Resource IO h) => (Size -> b) -> (b -> Size) -> h -> IO (Pipeline h b)
- send :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO ()
- call :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO (IO b)
- type Size = Int
- class Length list where
- class Resource m r where
- class Flush handle where
- class (Length bytes, Monoid bytes, Flush handle) => Stream handle bytes where
- getN :: Stream h b => h -> Int -> IO b
Pipeline
Arguments
:: (Stream h b, Resource IO h) | |
=> (Size -> b) | Convert Size to bytes of fixed length. Every Int must translate to same number of bytes. |
-> (b -> Size) | Convert bytes of fixed length to Size. Must be exact inverse of encodeSize. |
-> h | Underlying socket (handle) this pipeline will read/write from |
-> IO (Pipeline h b) |
Create new Pipeline with given encodeInt, decodeInt, and handle. You should close
pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
send :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO ()Source
Send messages all together to destination (no messages will be interleaved between them). None of the messages can induce a response, i.e. the destination must not reply to any of these messages (otherwise future call
s will get these responses instead of their own).
Each message is preceeded by its length when written to socket.
Raises IOError and closes pipeline if send fails
call :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO (IO b)Source
Send messages all together to destination (no messages will be interleaved between them), and return promise of response from one message only. One and only one message in the list must induce a response, i.e. the destination must reply to exactly one message only (otherwise promises will have the wrong responses in them). Each message is preceeded by its length when written to socket. Likewise, the response must be preceeded by its length. Raises IOError and closes pipeline if send fails, likewise for reply.
Util
Instances
class (Length bytes, Monoid bytes, Flush handle) => Stream handle bytes whereSource
Methods
put :: handle -> bytes -> IO ()Source
Write bytes to handle
get :: handle -> Int -> IO bytesSource
Read up to N bytes from handle; if EOF return empty bytes, otherwise block until at least 1 byte is available
Instances