]> woffs.de Git - fd/haskell-amqp-utils.git/blob - arbeite.hs
update reuse compat
[fd/haskell-amqp-utils.git] / arbeite.hs
1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
2 --
3 -- SPDX-License-Identifier: GPL-3.0-only
4
5 {-# LANGUAGE OverloadedStrings #-}
6
7 -- generic AMQP rpc server
8 import           Control.Concurrent
9 import qualified Control.Exception             as X
10 import           Control.Monad
11 import qualified Data.ByteString.Char8         as BS
12 import           Data.Map                      (singleton)
13 import           Data.Maybe
14 import qualified Data.Text                     as T
15 import           Data.Time
16 import           Data.Version                  (showVersion)
17 import           Network.AMQP
18 import           Network.AMQP.Types
19 import           Network.AMQP.Utils.Connection
20 import           Network.AMQP.Utils.Helpers
21 import           Network.AMQP.Utils.Options
22 import           Paths_amqp_utils              (version)
23 import           System.Environment
24 import           System.IO
25
26 main :: IO ()
27 main = do
28   hr "starting"
29   tid <- myThreadId
30   args <- getArgs >>= parseargs 'r'
31   hSetBuffering stdout LineBuffering
32   hSetBuffering stderr LineBuffering
33   X.onException
34     (printparam "worker" $ fromJust $ fileProcess args)
35     (error "-X option required")
36   printparam "cleanup temp file" $ cleanupTmpFile args
37   let addiArgs = reverse $ additionalArgs args
38   printparam "client version" ["amqp-utils", showVersion version]
39   (conn, chan) <- connect args
40   addChannelExceptionHandler chan (X.throwTo tid)
41   -- set prefetch
42   printparam "prefetch" $ preFetch args
43   qos chan 0 (preFetch args) False
44   queue <-
45     maybe
46       (declareQueue
47          chan
48          newQueue {queueExclusive = True, queueName = (T.pack $ tmpQName args)} >>=
49        (\(x, _, _) -> return x))
50       (return)
51       (fmap T.pack (qName args))
52   printparam "queue name" queue
53   if (currentExchange args /= "")
54     then do
55       printparam "exchange" $ currentExchange args
56       bindQueue chan queue (T.pack $ currentExchange args) queue
57     else return ()
58   ctag <-
59     consumeMsgs
60       chan
61       queue
62       (if ack args
63          then Ack
64          else NoAck)
65       (rpcServerCallback tid args addiArgs chan)
66   printparam "consumer tag" ctag
67   printparam "send acks" $ ack args
68   printparam "requeue if rejected" $ (ack args) && (requeuenack args)
69   hr "entering main loop"
70   sleepingBeauty >>= printparam "exception"
71   closeConnection conn
72   hr "connection closed"
73
74 rpcServerCallback ::
75      ThreadId -> Args -> [String] -> Channel -> (Message, Envelope) -> IO ()
76 rpcServerCallback tid a addi c m@(msg, env) = do
77   let numstring = show $ envDeliveryTag env
78   hr $ "BEGIN " ++ numstring
79   now <- getZonedTime
80   (callbackoptions, callbackenv) <-
81     X.catch
82       (printmsg Nothing m (anRiss a) now)
83       (\x -> X.throwTo tid (x :: X.IOException) >> return ([], []))
84   either (\e -> printparam "ERROR" (e :: X.IOException)) return =<<
85     X.try
86       (optionalFileStuff
87          m
88          callbackoptions
89          addi
90          numstring
91          a
92          tid
93          (Just reply)
94          callbackenv)
95   hr $ "END " ++ numstring
96   where
97     reply e contents = do
98       void $
99         publishMsg
100           c
101           (envExchangeName env)
102           (fromJust $ msgReplyTo msg)
103           newMsg
104             { msgBody = contents
105             , msgCorrelationID = msgCorrelationID msg
106             , msgTimestamp = msgTimestamp msg
107             , msgExpiration = msgExpiration msg
108             , msgHeaders =
109                 Just $
110                 FieldTable $ singleton "exitcode" $ FVString $ BS.pack $ show e
111             }