]> woffs.de Git - fd/haskell-amqp-utils.git/blob - plane.hs
formatting
[fd/haskell-amqp-utils.git] / plane.hs
1 {-# LANGUAGE OverloadedStrings #-}
2
3 -- generic AMQP rpc client
4 import Control.Concurrent
5 import qualified Control.Exception as X
6 import Control.Monad
7 import qualified Data.ByteString.Lazy.Char8 as BL
8 import qualified Data.Text as T
9 import Data.Time
10 import Data.Time.Clock.POSIX
11 import Data.Version (showVersion)
12 import Network.AMQP
13 import Network.AMQP.Utils.Connection
14 import Network.AMQP.Utils.Helpers
15 import Network.AMQP.Utils.Options
16 import Paths_amqp_utils (version)
17 import System.Environment
18 import System.Exit
19 import System.IO
20
21 main :: IO ()
22 main = do
23   hr "starting"
24   tid <- myThreadId
25   args <- getArgs >>= parseargs 'p'
26   X.onException
27     (printparam "rpc_timeout" [show (rpc_timeout args), "s"])
28     (error $ "invalid rpc_timeout")
29   printparam "client version" ["amqp-utils", showVersion version]
30   printparam "destination queue" $ tmpQName args
31   (conn, chan) <- connect args
32   addChannelExceptionHandler chan (X.throwTo tid)
33   (q, _, _) <- declareQueue chan newQueue {queueExclusive = True}
34   if (currentExchange args /= "")
35     then do
36       printparam "exchange" $ currentExchange args
37       bindQueue chan q (T.pack $ currentExchange args) q
38     else return ()
39   printparam "input file" $ inputFile args
40   message <-
41     if inputFile args == "-"
42       then BL.getContents
43       else BL.readFile (inputFile args)
44   printparam "output file" $ outputFile args
45   h <-
46     if outputFile args == "-"
47       then return stdout
48       else openBinaryFile (outputFile args) WriteMode
49   ctag <- consumeMsgs chan q NoAck (rpcClientCallback h tid args)
50   printparam "consumer tag" ctag
51   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
52   hr "publishing request"
53   _ <-
54     publishMsg
55       chan
56       (T.pack $ currentExchange args)
57       (T.pack $ tmpQName args)
58       newMsg
59         { msgBody = message
60         , msgReplyTo = Just q
61         , msgCorrelationID = corrid args
62         , msgExpiration = msgexp args
63         , msgTimestamp = Just now
64         , msgHeaders = msgheader args
65         }
66   hr "waiting for answer"
67   _ <-
68     forkIO
69       (threadDelay (floor (1000000 * rpc_timeout args)) >>
70        throwTo tid TimeoutException)
71   X.catch
72     (forever $ threadDelay 200000)
73     (\x -> do
74        ec <- exceptionHandler x
75        hr "closing connection"
76        closeConnection conn
77        printparam "exiting" ec
78        exitWith ec)
79
80 exceptionHandler :: RpcException -> IO (ExitCode)
81 exceptionHandler ReceivedException = return ExitSuccess
82 exceptionHandler TimeoutException = return $ ExitFailure 1
83
84 rpcClientCallback :: Handle -> ThreadId -> Args -> (Message, Envelope) -> IO ()
85 rpcClientCallback h tid a m@(_, env) = do
86   let numstring = show $ envDeliveryTag env
87   hr $ "received " ++ numstring
88   now <- getZonedTime
89   _ <-
90     X.catch
91       (printmsg (Just h) m (anRiss a) now)
92       (\x -> X.throwTo tid (x :: X.SomeException) >> return ([], []))
93   throwTo tid ReceivedException
94
95 data RpcException
96   = ReceivedException
97   | TimeoutException
98   deriving (Show)
99
100 instance X.Exception RpcException