]> woffs.de Git - fd/haskell-amqp-utils.git/blob - plane.hs
manpages
[fd/haskell-amqp-utils.git] / plane.hs
1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
2 --
3 -- SPDX-License-Identifier: GPL-3.0-only
4
5 {-# LANGUAGE OverloadedStrings #-}
6
7 -- generic AMQP rpc client
8 import           Control.Concurrent
9 import qualified Control.Exception             as X
10 import           Control.Monad
11 import qualified Data.ByteString.Lazy.Char8    as BL
12 import qualified Data.Text                     as T
13 import           Data.Time
14 import           Data.Time.Clock.POSIX
15 import           Data.Version                  (showVersion)
16 import           Network.AMQP
17 import           Network.AMQP.Utils.Connection
18 import           Network.AMQP.Utils.Helpers
19 import           Network.AMQP.Utils.Options
20 import           Paths_amqp_utils              (version)
21 import           System.Environment
22 import           System.Exit
23 import           System.IO
24
25 main :: IO ()
26 main = do
27   hr "starting"
28   tid <- myThreadId
29   args <- getArgs >>= parseargs 'p'
30   X.onException
31     (printparam "rpc_timeout" [show (rpc_timeout args), "s"])
32     (error $ "invalid rpc_timeout")
33   printparam "client version" ["amqp-utils", showVersion version]
34   printparam "routing key" $ rKey args
35   (conn, chan) <- connect args
36   addChannelExceptionHandler chan (X.throwTo tid)
37   (q, _, _) <- declareQueue chan newQueue {queueExclusive = True}
38   if (currentExchange args /= "")
39     then do
40       printparam "exchange" $ currentExchange args
41       bindQueue chan q (T.pack $ currentExchange args) q
42     else return ()
43   let inputFile' = firstInputFile (inputFiles args)
44   printparam "input file" $ inputFile'
45   message <-
46     if inputFile' == "-"
47       then BL.getContents
48       else readFileRawLazy inputFile'
49   printparam "output file" $ outputFile args
50   h <-
51     if outputFile args == "-"
52       then return stdout
53       else openBinaryFile (outputFile args) WriteMode
54   ctag <- consumeMsgs chan q NoAck (rpcClientCallback h tid args)
55   printparam "consumer tag" ctag
56   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
57   hr "publishing request"
58   _ <-
59     publishMsg
60       chan
61       (T.pack $ currentExchange args)
62       (T.pack $ rKey args)
63       newMsg
64         { msgBody = message
65         , msgReplyTo = Just q
66         , msgCorrelationID = corrid args
67         , msgExpiration = msgexp args
68         , msgTimestamp = Just now
69         , msgHeaders = msgheader args
70         }
71   hr "waiting for answer"
72   _ <-
73     forkIO
74       (threadDelay (floor (1000000 * rpc_timeout args)) >>
75        throwTo tid TimeoutException)
76   X.catch
77     (forever $ threadDelay 200000)
78     (\x -> do
79        ec <- exceptionHandler x
80        hr "closing connection"
81        closeConnection conn
82        printparam "exiting" ec
83        exitWith ec)
84
85 exceptionHandler :: RpcException -> IO (ExitCode)
86 exceptionHandler ReceivedException = hr "success" >> (return ExitSuccess)
87 exceptionHandler TimeoutException  = hr "timeout" >> (return $ ExitFailure 1)
88
89 rpcClientCallback :: Handle -> ThreadId -> Args -> (Message, Envelope) -> IO ()
90 rpcClientCallback h tid a m@(_, env) = do
91   let numstring = show $ envDeliveryTag env
92   hr $ "received " ++ numstring
93   now <- getZonedTime
94   _ <-
95     X.catch
96       (printmsg (Just h) m (anRiss a) now)
97       (\x -> X.throwTo tid (x :: X.IOException) >> return ([], []))
98   throwTo tid ReceivedException
99
100 data RpcException
101   = ReceivedException
102   | TimeoutException
103   deriving (Show)
104
105 instance X.Exception RpcException