]> woffs.de Git - fd/haskell-amqp-utils.git/blob - konsum.hs
formatting
[fd/haskell-amqp-utils.git] / konsum.hs
1 -- generic amqp consumer
2 import Control.Concurrent
3 import qualified Control.Exception as X
4 import Control.Monad
5 import qualified Data.Text as T
6 import Data.Time
7 import Data.Version (showVersion)
8 import Network.AMQP
9 import Network.AMQP.Utils.Connection
10 import Network.AMQP.Utils.Helpers
11 import Network.AMQP.Utils.Options
12 import Paths_amqp_utils (version)
13 import System.Environment
14
15 main :: IO ()
16 main = do
17   hr "starting"
18   tid <- myThreadId
19   args <- getArgs >>= parseargs 'k'
20   let addiArgs = reverse $ additionalArgs args
21   printparam "client version" ["amqp-utils", showVersion version]
22   (conn, chan) <- connect args
23   addChannelExceptionHandler chan (X.throwTo tid)
24   -- set prefetch
25   printparam "prefetch" $ preFetch args
26   qos chan 0 (preFetch args) False
27   -- attach to given queue? or build exclusive queue and bind it?
28   queue <-
29     maybe
30       (tempQueue chan (tmpQName args) (bindings args) (currentExchange args))
31       (return)
32       (fmap T.pack (qName args))
33   printparam "queue name" queue
34   printparam "shown body chars" $ anRiss args
35   printparam "temp dir" $ tempDir args
36   printparam "callback" $ fileProcess args
37   printparam "callback args" $ addiArgs
38   printparam "cleanup temp file" $
39     maybe Nothing (\_ -> Just (cleanupTmpFile args)) (fileProcess args)
40   -- subscribe to the queue
41   ctag <-
42     consumeMsgs
43       chan
44       queue
45       (if ack args
46          then Ack
47          else NoAck)
48       (myCallback args addiArgs tid)
49   printparam "consumer tag" ctag
50   printparam "send acks" $ ack args
51   printparam "requeue if rejected" $
52     if (ack args)
53       then Just (show (requeuenack args))
54       else Nothing
55   hr "entering main loop"
56   X.catch
57     (forever $ threadDelay 5000000)
58     (\e -> printparam "exception" (e :: X.SomeException))
59   closeConnection conn
60   hr "connection closed"
61
62 -- | exclusive temp queue
63 tempQueue :: Channel -> String -> [(String, String)] -> String -> IO T.Text
64 tempQueue chan tmpqname bindlist x = do
65   (q, _, _) <-
66     declareQueue
67       chan
68       newQueue {queueExclusive = True, queueName = T.pack tmpqname}
69   mapM_
70     (\(xchange, bkey) ->
71        bindQueue chan q (T.pack xchange) (T.pack bkey) >>
72        printparam "binding" [xchange, bkey])
73     (if null bindlist
74        then [(x, "#")]
75        else bindlist)
76   return q
77
78 -- | process received message
79 myCallback :: Args -> [String] -> ThreadId -> (Message, Envelope) -> IO ()
80 myCallback a addi tid m@(_, envi) = do
81   let numstring = show $ envDeliveryTag envi
82   hr $ "BEGIN " ++ numstring
83   now <- getZonedTime
84   (callbackoptions, callbackenv) <-
85     X.catch
86       (printmsg Nothing m (anRiss a) now)
87       (\x -> X.throwTo tid (x :: X.SomeException) >> return ([], []))
88   either (\e -> printparam "ERROR" (e :: X.SomeException) >> reje envi a) return =<<
89     X.try
90       (optionalFileStuff
91          m
92          callbackoptions
93          addi
94          numstring
95          a
96          tid
97          Nothing
98          callbackenv)
99   hr $ "END " ++ numstring