]> woffs.de Git - fd/haskell-amqp-utils.git/blob - konsum.hs
update reuse compat
[fd/haskell-amqp-utils.git] / konsum.hs
1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
2 --
3 -- SPDX-License-Identifier: GPL-3.0-only
4
5 -- generic amqp consumer
6 import Control.Concurrent
7 import qualified Control.Exception as X
8 import qualified Data.Text as T
9 import Data.Time
10 import Data.Version (showVersion)
11 import Network.AMQP
12 import Network.AMQP.Utils.Connection
13 import Network.AMQP.Utils.Helpers
14 import Network.AMQP.Utils.Options
15 import Paths_amqp_utils (version)
16 import System.Environment
17 import System.IO
18
19 main :: IO ()
20 main = do
21   hr "starting"
22   tid <- myThreadId
23   args <- getArgs >>= parseargs 'k'
24   hSetBuffering stdout LineBuffering
25   hSetBuffering stderr LineBuffering
26   let addiArgs = reverse $ additionalArgs args
27   printparam "client version" ["amqp-utils", showVersion version]
28   (conn, chan) <- connect args
29   addChannelExceptionHandler chan (X.throwTo tid)
30   -- set prefetch
31   printparam "prefetch" $ preFetch args
32   qos chan 0 (preFetch args) False
33   -- attach to given queue? or build exclusive queue and bind it?
34   queue <-
35     maybe
36       (tempQueue chan (tmpQName args) (bindings args))
37       (return)
38       (fmap T.pack (qName args))
39   printparam "queue name" queue
40   printparam "consumer args" $ formatheaders fieldshow $ streamoffset args
41   printparam "shown body chars" $ anRiss args
42   printparam "temp dir" $ tempDir args
43   printparam "callback" $ fileProcess args
44   printparam "callback args" $ addiArgs
45   printparam "cleanup temp file" $
46     maybe Nothing (\_ -> Just (cleanupTmpFile args)) (fileProcess args)
47   -- subscribe to the queue
48   ctag <-
49     consumeMsgs'
50       chan
51       queue
52       (if ack args
53          then Ack
54          else NoAck)
55       (myCallback args addiArgs tid)
56       (\_ -> return ())
57       (streamoffset args)
58   printparam "consumer tag" ctag
59   printparam "send acks" $ ack args
60   printparam "requeue if rejected" $ (ack args) && (requeuenack args)
61   printparam "delay negative acknowledgements for" $ if delaynack args == 0 then Nothing else Just [(show (delaynack args)),"s"]
62
63   hr "entering main loop"
64   sleepingBeauty >>=
65     (\x -> do
66        closeConnection conn
67        hr "connection closed"
68        X.throw x)
69
70 -- | exclusive temp queue
71 tempQueue :: Channel -> String -> [(String, String)] -> IO T.Text
72 tempQueue chan tmpqname bindlist = do
73   (q, _, _) <-
74     declareQueue
75       chan
76       newQueue {queueExclusive = True, queueName = T.pack tmpqname}
77   mapM_
78     (\(xchange, bkey) ->
79        bindQueue chan q (T.pack xchange) (T.pack bkey) >>
80        printparam "binding" [xchange, bkey])
81     bindlist
82   return q
83
84 -- | process received message
85 myCallback :: Args -> [String] -> ThreadId -> (Message, Envelope) -> IO ()
86 myCallback a addi tid m@(_, envi) = do
87   let numstring = show $ envDeliveryTag envi
88   hr $ "BEGIN " ++ numstring
89   now <- getZonedTime
90   (callbackoptions, callbackenv) <-
91     X.catch
92       (printmsg Nothing m (anRiss a) now)
93       (\x -> X.throwTo tid (x :: X.IOException) >> return ([], []))
94   either (\e -> printparam "ERROR" (e :: X.IOException) >> reje envi a) return =<<
95     X.try
96       (optionalFileStuff
97          m
98          callbackoptions
99          addi
100          numstring
101          a
102          tid
103          Nothing
104          callbackenv)
105   hr $ "END " ++ numstring