]> woffs.de Git - fd/haskell-amqp-utils.git/blob - arbeite.hs
WiP rabbit hole
[fd/haskell-amqp-utils.git] / arbeite.hs
1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
2 --
3 -- SPDX-License-Identifier: GPL-3.0-only
4
5 {-# LANGUAGE OverloadedStrings #-}
6
7 -- generic AMQP rpc server
8 import           Control.Concurrent
9 import qualified Control.Exception             as X
10 import           Control.Monad
11 import qualified Data.ByteString.Char8         as BS
12 import           Data.Map                      (singleton)
13 import           Data.Maybe
14 import qualified Data.Text                     as T
15 import           Data.Time
16 import           Data.Version                  (showVersion)
17 import           Network.AMQP
18 import           Network.AMQP.Types
19 import           Network.AMQP.Utils.Connection
20 import           Network.AMQP.Utils.Helpers
21 import           Network.AMQP.Utils.Options
22 import           Paths_amqp_utils              (version)
23 import           System.Environment
24
25 main :: IO ()
26 main = do
27   hr "starting"
28   tid <- myThreadId
29   args <- getArgs >>= parseargs 'r'
30   X.onException
31     (printparam "worker" $ fromJust $ fileProcess args)
32     (error "-X option required")
33   printparam "cleanup temp file" $ cleanupTmpFile args
34   let addiArgs = reverse $ additionalArgs args
35   printparam "client version" ["amqp-utils", showVersion version]
36   (conn, chan) <- connect args
37   addChannelExceptionHandler chan (X.throwTo tid)
38   -- set prefetch
39   printparam "prefetch" $ preFetch args
40   qos chan 0 (preFetch args) False
41   queue <-
42     maybe
43       (declareQueue
44          chan
45          newQueue {queueExclusive = True, queueName = (T.pack $ tmpQName args)} >>=
46        (\(x, _, _) -> return x))
47       (return)
48       (fmap T.pack (qName args))
49   printparam "queue name" queue
50   if (currentExchange args /= "")
51     then do
52       printparam "exchange" $ currentExchange args
53       bindQueue chan queue (T.pack $ currentExchange args) queue
54     else return ()
55   ctag <-
56     consumeMsgs
57       chan
58       queue
59       (if ack args
60          then Ack
61          else NoAck)
62       (rpcServerCallback tid args addiArgs chan)
63   printparam "consumer tag" ctag
64   printparam "send acks" $ ack args
65   printparam "requeue if rejected" $ (ack args) && (requeuenack args)
66   hr "entering main loop"
67   sleepingBeauty >>= printparam "exception"
68   closeConnection conn
69   hr "connection closed"
70
71 rpcServerCallback ::
72      ThreadId -> Args -> [String] -> Channel -> (Message, Envelope) -> IO ()
73 rpcServerCallback tid a addi c m@(msg, env) = do
74   let numstring = show $ envDeliveryTag env
75   hr $ "BEGIN " ++ numstring
76   now <- getZonedTime
77   (callbackoptions, callbackenv) <-
78     X.catch
79       (printmsg Nothing m (anRiss a) now)
80       (\x -> X.throwTo tid (x :: X.IOException) >> return ([], []))
81   either (\e -> printparam "ERROR" (e :: X.IOException)) return =<<
82     X.try
83       (optionalFileStuff
84          m
85          callbackoptions
86          addi
87          numstring
88          a
89          tid
90          (Just reply)
91          callbackenv)
92   hr $ "END " ++ numstring
93   where
94     reply e contents = do
95       void $
96         publishMsg
97           c
98           (envExchangeName env)
99           (fromJust $ msgReplyTo msg)
100           newMsg
101             { msgBody = contents
102             , msgCorrelationID = msgCorrelationID msg
103             , msgTimestamp = msgTimestamp msg
104             , msgExpiration = msgExpiration msg
105             , msgHeaders =
106                 Just $
107                 FieldTable $ singleton "exitcode" $ FVString $ BS.pack $ show e
108             }