]> woffs.de Git - fd/haskell-amqp-utils.git/blob - arbeite.hs
release 0.6.2.0
[fd/haskell-amqp-utils.git] / arbeite.hs
1 {-# LANGUAGE OverloadedStrings #-}
2
3 -- generic AMQP rpc server
4 import           Control.Concurrent
5 import qualified Control.Exception             as X
6 import           Control.Monad
7 import qualified Data.ByteString.Char8         as BS
8 import           Data.Map                      (singleton)
9 import           Data.Maybe
10 import qualified Data.Text                     as T
11 import           Data.Time
12 import           Data.Version                  (showVersion)
13 import           Network.AMQP
14 import           Network.AMQP.Types
15 import           Network.AMQP.Utils.Connection
16 import           Network.AMQP.Utils.Helpers
17 import           Network.AMQP.Utils.Options
18 import           Paths_amqp_utils              (version)
19 import           System.Environment
20
21 main :: IO ()
22 main = do
23   hr "starting"
24   tid <- myThreadId
25   args <- getArgs >>= parseargs 'r'
26   X.onException
27     (printparam "worker" $ fromJust $ fileProcess args)
28     (error "-X option required")
29   printparam "cleanup temp file" $ cleanupTmpFile args
30   let addiArgs = reverse $ additionalArgs args
31   printparam "client version" ["amqp-utils", showVersion version]
32   (conn, chan) <- connect args
33   addChannelExceptionHandler chan (X.throwTo tid)
34   -- set prefetch
35   printparam "prefetch" $ preFetch args
36   qos chan 0 (preFetch args) False
37   queue <-
38     maybe
39       (declareQueue
40          chan
41          newQueue {queueExclusive = True, queueName = (T.pack $ tmpQName args)} >>=
42        (\(x, _, _) -> return x))
43       (return)
44       (fmap T.pack (qName args))
45   printparam "queue name" queue
46   if (currentExchange args /= "")
47     then do
48       printparam "exchange" $ currentExchange args
49       bindQueue chan queue (T.pack $ currentExchange args) queue
50     else return ()
51   ctag <-
52     consumeMsgs
53       chan
54       queue
55       (if ack args
56          then Ack
57          else NoAck)
58       (rpcServerCallback tid args addiArgs chan)
59   printparam "consumer tag" ctag
60   printparam "send acks" $ ack args
61   printparam "requeue if rejected" $ (ack args) && (requeuenack args)
62   hr "entering main loop"
63   sleepingBeauty >>= printparam "exception"
64   closeConnection conn
65   hr "connection closed"
66
67 rpcServerCallback ::
68      ThreadId -> Args -> [String] -> Channel -> (Message, Envelope) -> IO ()
69 rpcServerCallback tid a addi c m@(msg, env) = do
70   let numstring = show $ envDeliveryTag env
71   hr $ "BEGIN " ++ numstring
72   now <- getZonedTime
73   (callbackoptions, callbackenv) <-
74     X.catch
75       (printmsg Nothing m (anRiss a) now)
76       (\x -> X.throwTo tid (x :: X.IOException) >> return ([], []))
77   either (\e -> printparam "ERROR" (e :: X.IOException)) return =<<
78     X.try
79       (optionalFileStuff
80          m
81          callbackoptions
82          addi
83          numstring
84          a
85          tid
86          (Just reply)
87          callbackenv)
88   hr $ "END " ++ numstring
89   where
90     reply e contents = do
91       void $
92         publishMsg
93           c
94           (envExchangeName env)
95           (fromJust $ msgReplyTo msg)
96           newMsg
97             { msgBody = contents
98             , msgCorrelationID = msgCorrelationID msg
99             , msgTimestamp = msgTimestamp msg
100             , msgExpiration = msgExpiration msg
101             , msgHeaders =
102                 Just $
103                 FieldTable $ singleton "exitcode" $ FVString $ BS.pack $ show e
104             }