]> woffs.de Git - fd/haskell-amqp-utils.git/blob - arbeite.hs
formatting
[fd/haskell-amqp-utils.git] / arbeite.hs
1 {-# LANGUAGE OverloadedStrings #-}
2
3 -- generic AMQP rpc server
4 import Control.Concurrent
5 import qualified Control.Exception as X
6 import Control.Monad
7 import qualified Data.ByteString.Char8 as BS
8 import Data.Map (singleton)
9 import Data.Maybe
10 import qualified Data.Text as T
11 import Data.Time
12 import Data.Version (showVersion)
13 import Network.AMQP
14 import Network.AMQP.Types
15 import Network.AMQP.Utils.Connection
16 import Network.AMQP.Utils.Helpers
17 import Network.AMQP.Utils.Options
18 import Paths_amqp_utils (version)
19 import System.Environment
20
21 main :: IO ()
22 main = do
23   hr "starting"
24   tid <- myThreadId
25   args <- getArgs >>= parseargs 'r'
26   X.onException
27     (printparam "worker" $ fromJust $ fileProcess args)
28     (error "-X option required")
29   printparam "cleanup temp file" $ cleanupTmpFile args
30   let addiArgs = reverse $ additionalArgs args
31   printparam "client version" ["amqp-utils", showVersion version]
32   (conn, chan) <- connect args
33   addChannelExceptionHandler chan (X.throwTo tid)
34   queue <-
35     maybe
36       (declareQueue
37          chan
38          newQueue {queueExclusive = True, queueName = (T.pack $ tmpQName args)} >>=
39        (\(x, _, _) -> return x))
40       (return)
41       (fmap T.pack (qName args))
42   printparam "queue name" queue
43   if (currentExchange args /= "")
44     then do
45       printparam "exchange" $ currentExchange args
46       bindQueue chan queue (T.pack $ currentExchange args) queue
47     else return ()
48   ctag <-
49     consumeMsgs
50       chan
51       queue
52       (if ack args
53          then Ack
54          else NoAck)
55       (rpcServerCallback tid args addiArgs chan)
56   printparam "consumer tag" ctag
57   printparam "send acks" $ ack args
58   printparam "requeue if rejected" $
59     if (ack args)
60       then Just (requeuenack args)
61       else Nothing
62   hr "entering main loop"
63   X.catch
64     (forever $ threadDelay 5000000)
65     (\e -> printparam "exception" (e :: X.SomeException))
66   closeConnection conn
67   hr "connection closed"
68
69 rpcServerCallback ::
70      ThreadId -> Args -> [String] -> Channel -> (Message, Envelope) -> IO ()
71 rpcServerCallback tid a addi c m@(msg, env) = do
72   let numstring = show $ envDeliveryTag env
73   hr $ "BEGIN " ++ numstring
74   now <- getZonedTime
75   (callbackoptions, callbackenv) <-
76     X.catch
77       (printmsg Nothing m (anRiss a) now)
78       (\x -> X.throwTo tid (x :: X.SomeException) >> return ([], []))
79   either (\e -> printparam "ERROR" (e :: X.SomeException)) return =<<
80     X.try
81       (optionalFileStuff
82          m
83          callbackoptions
84          addi
85          numstring
86          a
87          tid
88          (Just reply)
89          callbackenv)
90   hr $ "END " ++ numstring
91   where
92     reply e contents = do
93       void $
94         publishMsg
95           c
96           (envExchangeName env)
97           (fromJust $ msgReplyTo msg)
98           newMsg
99             { msgBody = contents
100             , msgCorrelationID = msgCorrelationID msg
101             , msgTimestamp = msgTimestamp msg
102             , msgExpiration = msgExpiration msg
103             , msgHeaders =
104                 Just $
105                 FieldTable $ singleton "exitcode" $ FVString $ BS.pack $ show e
106             }