1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
3 -- SPDX-License-Identifier: GPL-3.0-only
5 {-# LANGUAGE OverloadedStrings #-}
7 -- generic AMQP rpc client
8 import Control.Concurrent
9 import qualified Control.Exception as X
11 import qualified Data.ByteString.Lazy.Char8 as BL
12 import qualified Data.Text as T
14 import Data.Time.Clock.POSIX
15 import Data.Version (showVersion)
17 import Network.AMQP.Utils.Connection
18 import Network.AMQP.Utils.Helpers
19 import Network.AMQP.Utils.Options
20 import Paths_amqp_utils (version)
21 import System.Environment
29 args <- getArgs >>= parseargs 'p'
30 hSetBuffering stdout LineBuffering
31 hSetBuffering stderr LineBuffering
33 (printparam "rpc_timeout" [show (rpc_timeout args), "s"])
34 (error $ "invalid rpc_timeout")
35 printparam "client version" ["amqp-utils", showVersion version]
36 printparam "routing key" $ rKey args
37 (conn, chan) <- connect args
38 addChannelExceptionHandler chan (X.throwTo tid)
39 (q, _, _) <- declareQueue chan newQueue {queueExclusive = True}
40 if (currentExchange args /= "")
42 printparam "exchange" $ currentExchange args
43 bindQueue chan q (T.pack $ currentExchange args) q
45 let inputFile' = firstInputFile (inputFiles args)
46 printparam "input file" $ inputFile'
50 else readFileRawLazy inputFile'
51 printparam "output file" $ outputFile args
53 if outputFile args == "-"
55 else openBinaryFile (outputFile args) WriteMode
56 ctag <- consumeMsgs chan q NoAck (rpcClientCallback h tid args)
57 printparam "consumer tag" ctag
58 now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
59 hr "publishing request"
63 (T.pack $ currentExchange args)
68 , msgCorrelationID = corrid args
69 , msgExpiration = msgexp args
70 , msgTimestamp = Just now
71 , msgHeaders = msgheader args
73 hr "waiting for answer"
76 (threadDelay (floor (1000000 * rpc_timeout args)) >>
77 throwTo tid TimeoutException)
79 (forever $ threadDelay 200000)
81 ec <- exceptionHandler x
82 hr "closing connection"
84 printparam "exiting" ec
87 exceptionHandler :: RpcException -> IO (ExitCode)
88 exceptionHandler ReceivedException = hr "success" >> (return ExitSuccess)
89 exceptionHandler TimeoutException = hr "timeout" >> (return $ ExitFailure 1)
91 rpcClientCallback :: Handle -> ThreadId -> Args -> (Message, Envelope) -> IO ()
92 rpcClientCallback h tid a m@(_, env) = do
93 let numstring = show $ envDeliveryTag env
94 hr $ "received " ++ numstring
98 (printmsg (Just h) m (anRiss a) now)
99 (\x -> X.throwTo tid (x :: X.IOException) >> return ([], []))
100 throwTo tid ReceivedException
107 instance X.Exception RpcException