]> woffs.de Git - fd/haskell-amqp-utils.git/blob - konsum.hs
fix exception handling
[fd/haskell-amqp-utils.git] / konsum.hs
1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
2 --
3 -- SPDX-License-Identifier: GPL-3.0-only
4
5 -- generic amqp consumer
6 import Control.Concurrent
7 import qualified Control.Exception as X
8 import qualified Data.Text as T
9 import Data.Time
10 import Data.Version (showVersion)
11 import Network.AMQP
12 import Network.AMQP.Utils.Connection
13 import Network.AMQP.Utils.Helpers
14 import Network.AMQP.Utils.Options
15 import Paths_amqp_utils (version)
16 import System.Environment
17
18 main :: IO ()
19 main = do
20   hr "starting"
21   tid <- myThreadId
22   args <- getArgs >>= parseargs 'k'
23   let addiArgs = reverse $ additionalArgs args
24   printparam "client version" ["amqp-utils", showVersion version]
25   (conn, chan) <- connect args
26   addChannelExceptionHandler chan (X.throwTo tid)
27   -- set prefetch
28   printparam "prefetch" $ preFetch args
29   qos chan 0 (preFetch args) False
30   -- attach to given queue? or build exclusive queue and bind it?
31   queue <-
32     maybe
33       (tempQueue chan (tmpQName args) (bindings args))
34       (return)
35       (fmap T.pack (qName args))
36   printparam "queue name" queue
37   printparam "shown body chars" $ anRiss args
38   printparam "temp dir" $ tempDir args
39   printparam "callback" $ fileProcess args
40   printparam "callback args" $ addiArgs
41   printparam "cleanup temp file" $
42     maybe Nothing (\_ -> Just (cleanupTmpFile args)) (fileProcess args)
43   -- subscribe to the queue
44   ctag <-
45     consumeMsgs
46       chan
47       queue
48       (if ack args
49          then Ack
50          else NoAck)
51       (myCallback args addiArgs tid)
52   printparam "consumer tag" ctag
53   printparam "send acks" $ ack args
54   printparam "requeue if rejected" $ (ack args) && (requeuenack args)
55   hr "entering main loop"
56   sleepingBeauty >>=
57     (\x -> do
58        closeConnection conn
59        hr "connection closed"
60        X.throw x)
61
62 -- | exclusive temp queue
63 tempQueue :: Channel -> String -> [(String, String)] -> IO T.Text
64 tempQueue chan tmpqname bindlist = do
65   (q, _, _) <-
66     declareQueue
67       chan
68       newQueue {queueExclusive = True, queueName = T.pack tmpqname}
69   mapM_
70     (\(xchange, bkey) ->
71        bindQueue chan q (T.pack xchange) (T.pack bkey) >>
72        printparam "binding" [xchange, bkey])
73     bindlist
74   return q
75
76 -- | process received message
77 myCallback :: Args -> [String] -> ThreadId -> (Message, Envelope) -> IO ()
78 myCallback a addi tid m@(_, envi) = do
79   let numstring = show $ envDeliveryTag envi
80   hr $ "BEGIN " ++ numstring
81   now <- getZonedTime
82   (callbackoptions, callbackenv) <-
83     X.catch
84       (printmsg Nothing m (anRiss a) now)
85       (\x -> X.throwTo tid (x :: X.IOException) >> return ([], []))
86   either (\e -> printparam "ERROR" (e :: X.IOException) >> reje envi a) return =<<
87     X.try
88       (optionalFileStuff
89          m
90          callbackoptions
91          addi
92          numstring
93          a
94          tid
95          Nothing
96          callbackenv)
97   hr $ "END " ++ numstring
don't click here