]> woffs.de Git - fd/haskell-amqp-utils.git/blob - plane.hs
whitespace
[fd/haskell-amqp-utils.git] / plane.hs
1 {-# LANGUAGE OverloadedStrings #-}
2
3 -- generic AMQP rpc client
4 import           Control.Concurrent
5 import qualified Control.Exception             as X
6 import           Control.Monad
7 import qualified Data.ByteString.Lazy.Char8    as BL
8 import qualified Data.Text                     as T
9 import           Data.Time
10 import           Data.Time.Clock.POSIX
11 import           Data.Version                  (showVersion)
12 import           Network.AMQP
13 import           Network.AMQP.Utils.Connection
14 import           Network.AMQP.Utils.Helpers
15 import           Network.AMQP.Utils.Options
16 import           Paths_amqp_utils              (version)
17 import           System.Environment
18 import           System.Exit
19 import           System.IO
20
21 main :: IO ()
22 main = do
23   hr "starting"
24   tid <- myThreadId
25   args <- getArgs >>= parseargs 'p'
26   X.onException
27     (printparam "rpc_timeout" [show (rpc_timeout args), "s"])
28     (error $ "invalid rpc_timeout")
29   printparam "client version" ["amqp-utils", showVersion version]
30   printparam "routing key" $ rKey args
31   (conn, chan) <- connect args
32   addChannelExceptionHandler chan (X.throwTo tid)
33   (q, _, _) <- declareQueue chan newQueue {queueExclusive = True}
34   if (currentExchange args /= "")
35     then do
36       printparam "exchange" $ currentExchange args
37       bindQueue chan q (T.pack $ currentExchange args) q
38     else return ()
39   let inputFile' = firstInputFile (inputFiles args)
40   printparam "input file" $ inputFile'
41   message <-
42     if inputFile' == "-"
43       then BL.getContents
44       else BL.readFile (inputFile')
45   printparam "output file" $ outputFile args
46   h <-
47     if outputFile args == "-"
48       then return stdout
49       else openBinaryFile (outputFile args) WriteMode
50   ctag <- consumeMsgs chan q NoAck (rpcClientCallback h tid args)
51   printparam "consumer tag" ctag
52   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
53   hr "publishing request"
54   _ <-
55     publishMsg
56       chan
57       (T.pack $ currentExchange args)
58       (T.pack $ rKey args)
59       newMsg
60         { msgBody = message
61         , msgReplyTo = Just q
62         , msgCorrelationID = corrid args
63         , msgExpiration = msgexp args
64         , msgTimestamp = Just now
65         , msgHeaders = msgheader args
66         }
67   hr "waiting for answer"
68   _ <-
69     forkIO
70       (threadDelay (floor (1000000 * rpc_timeout args)) >>
71        throwTo tid TimeoutException)
72   X.catch
73     (forever $ threadDelay 200000)
74     (\x -> do
75        ec <- exceptionHandler x
76        hr "closing connection"
77        closeConnection conn
78        printparam "exiting" ec
79        exitWith ec)
80
81 exceptionHandler :: RpcException -> IO (ExitCode)
82 exceptionHandler ReceivedException = hr "success" >> (return ExitSuccess)
83 exceptionHandler TimeoutException  = hr "timeout" >> (return $ ExitFailure 1)
84
85 rpcClientCallback :: Handle -> ThreadId -> Args -> (Message, Envelope) -> IO ()
86 rpcClientCallback h tid a m@(_, env) = do
87   let numstring = show $ envDeliveryTag env
88   hr $ "received " ++ numstring
89   now <- getZonedTime
90   _ <-
91     X.catch
92       (printmsg (Just h) m (anRiss a) now)
93       (\x -> X.throwTo tid (x :: X.IOException) >> return ([], []))
94   throwTo tid ReceivedException
95
96 data RpcException
97   = ReceivedException
98   | TimeoutException
99   deriving (Show)
100
101 instance X.Exception RpcException