]> woffs.de Git - fd/haskell-amqp-utils.git/blob - konsum.hs
843757fc6eace3cf67c073d6bd2317ffccbff881
[fd/haskell-amqp-utils.git] / konsum.hs
1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
2 --
3 -- SPDX-License-Identifier: GPL-3.0-only
4
5 -- generic amqp consumer
6 import Control.Concurrent
7 import qualified Control.Exception as X
8 import qualified Data.Text as T
9 import Data.Time
10 import Data.Version (showVersion)
11 import Network.AMQP
12 import Network.AMQP.Utils.Connection
13 import Network.AMQP.Utils.Helpers
14 import Network.AMQP.Utils.Options
15 import Paths_amqp_utils (version)
16 import System.Environment
17
18 main :: IO ()
19 main = do
20   hr "starting"
21   tid <- myThreadId
22   args <- getArgs >>= parseargs 'k'
23   let addiArgs = reverse $ additionalArgs args
24   printparam "client version" ["amqp-utils", showVersion version]
25   (conn, chan) <- connect args
26   addChannelExceptionHandler chan (X.throwTo tid)
27   -- set prefetch
28   printparam "prefetch" $ preFetch args
29   qos chan 0 (preFetch args) False
30   -- attach to given queue? or build exclusive queue and bind it?
31   queue <-
32     maybe
33       (tempQueue chan (tmpQName args) (bindings args))
34       (return)
35       (fmap T.pack (qName args))
36   printparam "queue name" queue
37   printparam "consumer args" $ formatheaders fieldshow $ streamoffset args
38   printparam "shown body chars" $ anRiss args
39   printparam "temp dir" $ tempDir args
40   printparam "callback" $ fileProcess args
41   printparam "callback args" $ addiArgs
42   printparam "cleanup temp file" $
43     maybe Nothing (\_ -> Just (cleanupTmpFile args)) (fileProcess args)
44   -- subscribe to the queue
45   ctag <-
46     consumeMsgs'
47       chan
48       queue
49       (if ack args
50          then Ack
51          else NoAck)
52       (myCallback args addiArgs tid)
53       (\_ -> return ())
54       (streamoffset args)
55   printparam "consumer tag" ctag
56   printparam "send acks" $ ack args
57   printparam "requeue if rejected" $ (ack args) && (requeuenack args)
58   hr "entering main loop"
59   sleepingBeauty >>=
60     (\x -> do
61        closeConnection conn
62        hr "connection closed"
63        X.throw x)
64
65 -- | exclusive temp queue
66 tempQueue :: Channel -> String -> [(String, String)] -> IO T.Text
67 tempQueue chan tmpqname bindlist = do
68   (q, _, _) <-
69     declareQueue
70       chan
71       newQueue {queueExclusive = True, queueName = T.pack tmpqname}
72   mapM_
73     (\(xchange, bkey) ->
74        bindQueue chan q (T.pack xchange) (T.pack bkey) >>
75        printparam "binding" [xchange, bkey])
76     bindlist
77   return q
78
79 -- | process received message
80 myCallback :: Args -> [String] -> ThreadId -> (Message, Envelope) -> IO ()
81 myCallback a addi tid m@(_, envi) = do
82   let numstring = show $ envDeliveryTag envi
83   hr $ "BEGIN " ++ numstring
84   now <- getZonedTime
85   (callbackoptions, callbackenv) <-
86     X.catch
87       (printmsg Nothing m (anRiss a) now)
88       (\x -> X.throwTo tid (x :: X.IOException) >> return ([], []))
89   either (\e -> printparam "ERROR" (e :: X.IOException) >> reje envi a) return =<<
90     X.try
91       (optionalFileStuff
92          m
93          callbackoptions
94          addi
95          numstring
96          a
97          tid
98          Nothing
99          callbackenv)
100   hr $ "END " ++ numstring