]> woffs.de Git - fd/haskell-amqp-utils.git/blob - plane.hs
WiP rabbit hole
[fd/haskell-amqp-utils.git] / plane.hs
1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
2 --
3 -- SPDX-License-Identifier: GPL-3.0-only
4
5 {-# LANGUAGE OverloadedStrings #-}
6
7 -- generic AMQP rpc client
8 import           Control.Concurrent
9 import qualified Control.Exception             as X
10 import           Control.Monad
11 import qualified Data.ByteString.Char8         as BS
12 import qualified Data.ByteString.Lazy.Char8    as BL
13 import qualified Data.Text                     as T
14 import           Data.Time
15 import           Data.Time.Clock.POSIX
16 import           Data.Version                  (showVersion)
17 import           Network.AMQP
18 import           Network.AMQP.Utils.Connection
19 import           Network.AMQP.Utils.Helpers
20 import           Network.AMQP.Utils.Options
21 import           Paths_amqp_utils              (version)
22 import           System.Environment
23 import           System.Exit
24 import           System.IO
25 import qualified System.File.OsPath               as FOS
26 import qualified System.OsPath                    as OS
27
28 main :: IO ()
29 main = do
30   hr "starting"
31   tid <- myThreadId
32   args <- getArgs >>= parseargs 'p'
33   X.onException
34     (printparam "rpc_timeout" [show (rpc_timeout args), "s"])
35     (error $ "invalid rpc_timeout")
36   printparam "client version" ["amqp-utils", showVersion version]
37   printparam "routing key" $ rKey args
38   (conn, chan) <- connect args
39   addChannelExceptionHandler chan (X.throwTo tid)
40   (q, _, _) <- declareQueue chan newQueue {queueExclusive = True}
41   if (currentExchange args /= "")
42     then do
43       printparam "exchange" $ currentExchange args
44       bindQueue chan q (T.pack $ currentExchange args) q
45     else return ()
46   let inputFile' = firstInputFile (inputFiles args)
47   printparam "input file" $ inputFile'
48   message <-
49     if inputFile' == "-"
50       then BL.getContents
51       else OS.encodeUtf (BS.unpack inputFile') >>= FOS.readFile
52   printparam "output file" $ outputFile args
53   h <-
54     if outputFile args == "-"
55       then return stdout
56       else openBinaryFile (outputFile args) WriteMode
57   ctag <- consumeMsgs chan q NoAck (rpcClientCallback h tid args)
58   printparam "consumer tag" ctag
59   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
60   hr "publishing request"
61   _ <-
62     publishMsg
63       chan
64       (T.pack $ currentExchange args)
65       (T.pack $ rKey args)
66       newMsg
67         { msgBody = message
68         , msgReplyTo = Just q
69         , msgCorrelationID = corrid args
70         , msgExpiration = msgexp args
71         , msgTimestamp = Just now
72         , msgHeaders = msgheader args
73         }
74   hr "waiting for answer"
75   _ <-
76     forkIO
77       (threadDelay (floor (1000000 * rpc_timeout args)) >>
78        throwTo tid TimeoutException)
79   X.catch
80     (forever $ threadDelay 200000)
81     (\x -> do
82        ec <- exceptionHandler x
83        hr "closing connection"
84        closeConnection conn
85        printparam "exiting" ec
86        exitWith ec)
87
88 exceptionHandler :: RpcException -> IO (ExitCode)
89 exceptionHandler ReceivedException = hr "success" >> (return ExitSuccess)
90 exceptionHandler TimeoutException  = hr "timeout" >> (return $ ExitFailure 1)
91
92 rpcClientCallback :: Handle -> ThreadId -> Args -> (Message, Envelope) -> IO ()
93 rpcClientCallback h tid a m@(_, env) = do
94   let numstring = show $ envDeliveryTag env
95   hr $ "received " ++ numstring
96   now <- getZonedTime
97   _ <-
98     X.catch
99       (printmsg (Just h) m (anRiss a) now)
100       (\x -> X.throwTo tid (x :: X.IOException) >> return ([], []))
101   throwTo tid ReceivedException
102
103 data RpcException
104   = ReceivedException
105   | TimeoutException
106   deriving (Show)
107
108 instance X.Exception RpcException