-- | opens a connection and a channel
connect :: Args -> IO (Connection, Channel)
connect args = do
- printparam' "server" $ server args
- printparam' "port" $ show $ port args
- printparam' "vhost" $ vHost args
+ printparam "server" $ server args
+ printparam "port" $ port args
+ printparam "vhost" $ vHost args
printparam "connection_name" $ connectionName args
- printparam' "connect timeout" $ (show (connect_timeout args)) ++ "s"
+ printparam "connect timeout" $ [show (connect_timeout args), "s"]
globalCertificateStore <- getSystemCertificateStore
let myTLS =
N.TLSSettings
-- addChannelExceptionHandler chan
-- (\exception -> closeConnection conn >>
--- printparam' "exiting" (show exception) >>
+-- printparam "exiting" (show exception) >>
-- killThread tid)
--
-- -- noop sharedValidationCache, handy when debugging
myCert (Just cert') (Just key') _ = do
result <- credentialLoadX509 cert' key'
case result of
- Left x -> printparam' "ERROR" x >> return Nothing
+ Left x -> printparam "ERROR" x >> return Nothing
Right x -> return $ Just x
myCert _ _ _ = return Nothing
+{-# LANGUAGE FlexibleInstances #-}
module Network.AMQP.Utils.Helpers where
import Control.Concurrent
+import qualified Control.Exception as X
import Control.Monad
import qualified Data.ByteString.Lazy.Char8 as BL
import Data.List
import System.IO
import System.Process
--- | log cmdline options
-listToMaybeUnwords :: [String] -> Maybe String
-listToMaybeUnwords [] = Nothing
-listToMaybeUnwords x = Just $ unwords x
+class (Show a) =>
+ Flexprint a
+ where
+ flexprint :: a -> IO ()
+ flexprint = (hPutStrLn stderr) . show
+ empty :: a -> Bool
+ empty _ = False
+ printparam :: String -> a -> IO ()
+ printparam label x =
+ if empty x
+ then return ()
+ else do
+ mapM_ (hPutStr stderr) [" --- ", label, ": "]
+ flexprint x
+ hFlush stderr
+
+instance (Flexprint a) => Flexprint (Maybe a) where
+ empty = isNothing
+ printparam _ Nothing = return ()
+ printparam x (Just y) = printparam x y
+
+instance Flexprint String where
+ flexprint = hPutStrLn stderr
+ empty = null
+
+instance Flexprint [String] where
+ flexprint = flexprint . unwords
+ empty = null
+
+instance Flexprint T.Text where
+ flexprint = flexprint . T.unpack
+ empty = T.null
+
+instance Flexprint BL.ByteString where
+ flexprint x = hPutStrLn stderr "" >> BL.hPut stderr x >> hPutStrLn stderr ""
+ empty = BL.null
+
+instance Flexprint Bool
+
+instance Flexprint Int
--- | Strings or ByteStrings with label, oder nothing at all
-printwithlabel :: String -> Maybe (IO ()) -> IO ()
-printwithlabel _ Nothing = return ()
-printwithlabel labl (Just i) = do
- mapM_ (hPutStr stderr) [" --- ", labl, ": "]
- i
- hFlush stderr
+instance Flexprint ExitCode
--- | optional parameters
-printparam :: String -> Maybe String -> IO ()
-printparam labl ms = printwithlabel labl $ fmap (hPutStrLn stderr) ms
+instance Flexprint X.SomeException
--- | required parameters
-printparam' :: String -> String -> IO ()
-printparam' d s = printparam d (Just s)
+instance Flexprint AMQPException
--- | head chars of body
-printbody :: String -> Maybe BL.ByteString -> IO ()
-printbody labl ms = do
- printwithlabel labl $
- fmap
- (\s -> hPutStrLn stderr "" >> BL.hPut stderr s >> hPutStrLn stderr "")
- ms
- hFlush stderr
+instance Flexprint ConfirmationResult
-- | log marker
hr :: String -> IO ()
, ("expiration", mexp)
, ("delivery mode", mdelivmode)
]
- printbody label anriss
+ printparam label anriss
mapM_ (\hdl -> BL.hPut hdl body >> hFlush hdl) h
return $
concat
(constructCallbackCmdLine callbackoptions addi numstring)
(fileProcess a)
path
- printparam "calling" $ fmap unwords callbackcmdline
+ printparam "calling" callbackcmdline
maybe
(acke envi a)
(\c ->
createProcess (proc exe args) {std_out = out, std_err = Inherit}
sout <- mapM BL.hGetContents h
exitcode <- maybe 0 id (fmap BL.length sout) `seq` waitForProcess processhandle
- printparam' (numstring ++ " call returned") $ show exitcode
+ printparam (numstring ++ " call returned") exitcode
if isJust action && isJust sout
then ((fromJust action $ exitcode) (fromJust sout)) >> acke envi a
else case exitcode of
hr "starting"
tid <- myThreadId
args <- getArgs >>= parseargs 'a'
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
- printparam' "routing key" $ rKey args
- printparam' "exchange" $ currentExchange args
+ printparam "client version" ["amqp-utils", showVersion version]
+ printparam "routing key" $ rKey args
+ printparam "exchange" $ currentExchange args
isDir <-
if inputFile args == "-"
then return False
else F.getFileStatus (inputFile args) >>= return . F.isDirectory
if isDir
- then printparam' "hotfolder" $ inputFile args
- else printparam' "input file" $
- (inputFile args) ++
- if (lineMode args)
- then " (line-by-line)"
- else ""
+ then printparam "hotfolder" $ inputFile args
+ else printparam
+ "input file"
+ [ inputFile args
+ , if (lineMode args)
+ then "(line-by-line)"
+ else ""
+ ]
(conn, chan) <- connect args
addChannelExceptionHandler chan (X.throwTo tid)
- printparam' "confirm mode" $ show $ confirm args
+ printparam "confirm mode" $ confirm args
if (confirm args)
then do
confirmSelect chan False
exceptionHandler
-- all done. wait and close.
if (confirm args)
- then waitForConfirms chan >>= (printparam' "confirmed") . show
+ then waitForConfirms chan >>= printparam "confirmed"
else return ()
X.catch (closeConnection conn) exceptionHandler
-- | A handler for clean exit
exceptionHandler :: AMQPException -> IO ()
-exceptionHandler (ChannelClosedException Normal txt) = printparam' "exit" txt >> exitWith ExitSuccess
-exceptionHandler (ConnectionClosedException Normal txt) = printparam' "exit" txt >> exitWith ExitSuccess
-exceptionHandler x = printparam' "exception" (show x) >> exitWith (ExitFailure 1)
+exceptionHandler (ChannelClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler (ConnectionClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
-- | The handler for publisher confirms
confirmCallback :: (Word64, Bool, AckType) -> IO ()
confirmCallback (deliveryTag, isAll, ackType) =
- printparam'
+ printparam
"confirmed"
- ((show deliveryTag) ++
- (if isAll
- then " all "
- else " this ") ++
- (show ackType))
+ [ show deliveryTag
+ , if isAll
+ then "all"
+ else "this"
+ , show ackType
+ ]
-- | Hotfolder event handler
handleEvent ::
handleFile f [] x =
X.catch
(BL.readFile x >>= f (Just x))
- (\exception ->
- printparam' "exception in handleFile" $
- show (exception :: X.SomeException))
+ (\e -> printparam "exception in handleFile" (e :: X.SomeException))
-- | Publish one message with our settings
publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO ()
return (Just (T.pack t), Just (T.pack e))
else return ((contenttype a), (contentencoding a))
now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
- r <-
- publishMsg
- c
- (T.pack $ currentExchange a)
- (T.pack $ rKey a)
- newMsg
- { msgBody = f
- , msgDeliveryMode = persistent a
- , msgTimestamp = Just now
- , msgID = msgid a
- , msgType = msgtype a
- , msgUserID = userid a
- , msgApplicationID = appid a
- , msgClusterID = clusterid a
- , msgContentType = mtype
- , msgContentEncoding = mencoding
- , msgReplyTo = replyto a
- , msgPriority = prio a
- , msgCorrelationID = corrid a
- , msgExpiration = msgexp a
- , msgHeaders = substheader (fnheader a) fn $ msgheader a
- }
- printparam "sent" $ fmap show r
+ publishMsg
+ c
+ (T.pack $ currentExchange a)
+ (T.pack $ rKey a)
+ newMsg
+ { msgBody = f
+ , msgDeliveryMode = persistent a
+ , msgTimestamp = Just now
+ , msgID = msgid a
+ , msgType = msgtype a
+ , msgUserID = userid a
+ , msgApplicationID = appid a
+ , msgClusterID = clusterid a
+ , msgContentType = mtype
+ , msgContentEncoding = mencoding
+ , msgReplyTo = replyto a
+ , msgPriority = prio a
+ , msgCorrelationID = corrid a
+ , msgExpiration = msgexp a
+ , msgHeaders = substheader (fnheader a) fn $ msgheader a
+ } >>=
+ printparam "sent"
where
substheader ::
[String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
tid <- myThreadId
args <- getArgs >>= parseargs 'r'
X.onException
- (printparam' "worker" $ fromJust $ fileProcess args)
+ (printparam "worker" $ fromJust $ fileProcess args)
(error "-X option required")
let addiArgs = reverse $ additionalArgs args
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+ printparam "client version" ["amqp-utils", showVersion version]
(conn, chan) <- connect args
addChannelExceptionHandler chan (X.throwTo tid)
queue <-
(\(x, _, _) -> return x))
(return)
(fmap T.pack (qName args))
- printparam' "queue name" $ T.unpack queue
+ printparam "queue name" queue
if (currentExchange args /= "")
then do
- printparam' "exchange" $ currentExchange args
+ printparam "exchange" $ currentExchange args
bindQueue chan queue (T.pack $ currentExchange args) queue
else return ()
ctag <-
then Ack
else NoAck)
(rpcServerCallback tid args addiArgs chan)
- printparam' "consumer tag" $ T.unpack ctag
- printparam' "send acks" $ show (ack args)
+ printparam "consumer tag" ctag
+ printparam "send acks" $ ack args
printparam "requeue if rejected" $
if (ack args)
- then Just (show (requeuenack args))
+ then Just (requeuenack args)
else Nothing
hr "entering main loop"
X.catch
(forever $ threadDelay 5000000)
- (\exception -> printparam' "exception" $ show (exception :: X.SomeException))
+ (\e -> printparam "exception" (e :: X.SomeException))
closeConnection conn
hr "connection closed"
X.catch
(printmsg Nothing m (anRiss a) now)
(\x -> X.throwTo tid (x :: X.SomeException) >> return [])
- either (\e -> printparam' "ERROR" (show (e :: X.SomeException))) return =<<
+ either (\e -> printparam "ERROR" (e :: X.SomeException)) return =<<
X.try
(optionalFileStuff m callbackoptions addi numstring a tid (Just reply))
hr $ "END " ++ numstring
tid <- myThreadId
args <- getArgs >>= parseargs 'k'
let addiArgs = reverse $ additionalArgs args
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+ printparam "client version" ["amqp-utils", showVersion version]
(conn, chan) <- connect args
addChannelExceptionHandler chan (X.throwTo tid)
-- set prefetch
- printparam' "prefetch" $ show $ preFetch args
+ printparam "prefetch" $ preFetch args
qos chan 0 (fromIntegral $ preFetch args) False
-- attach to given queue? or build exclusive queue and bind it?
queue <-
(tempQueue chan (tmpQName args) (bindings args) (currentExchange args))
(return)
(fmap T.pack (qName args))
- printparam' "queue name" $ T.unpack queue
- printparam "shown body chars" $ fmap show $ anRiss args
+ printparam "queue name" queue
+ printparam "shown body chars" $ anRiss args
printparam "temp dir" $ tempDir args
printparam "callback" $ fileProcess args
- printparam "callback args" $ listToMaybeUnwords addiArgs
+ printparam "callback args" $ addiArgs
-- subscribe to the queue
ctag <-
consumeMsgs
then Ack
else NoAck)
(myCallback args addiArgs tid)
- printparam' "consumer tag" $ T.unpack ctag
- printparam' "send acks" $ show (ack args)
+ printparam "consumer tag" ctag
+ printparam "send acks" $ ack args
printparam "requeue if rejected" $
if (ack args)
then Just (show (requeuenack args))
hr "entering main loop"
X.catch
(forever $ threadDelay 5000000)
- (\exception -> printparam' "exception" $ show (exception :: X.SomeException))
+ (\e -> printparam "exception" (e :: X.SomeException))
closeConnection conn
hr "connection closed"
mapM_
(\(xchange, bkey) ->
bindQueue chan q (T.pack xchange) (T.pack bkey) >>
- printparam' "binding" (xchange ++ ":" ++ bkey))
+ printparam "binding" [xchange, bkey])
(if null bindlist
then [(x, "#")]
else bindlist)
(printmsg Nothing m (anRiss a) now)
(\x -> X.throwTo tid (x :: X.SomeException) >> return [])
either
- (\e -> printparam' "ERROR" (show (e :: X.SomeException)) >> reje envi a)
+ (\e -> printparam "ERROR" (e :: X.SomeException) >> reje envi a)
return =<<
X.try (optionalFileStuff m callbackoptions addi numstring a tid Nothing)
hr $ "END " ++ numstring
tid <- myThreadId
args <- getArgs >>= parseargs 'p'
X.onException
- (printparam' "rpc_timeout" $ show (rpc_timeout args) ++ "s")
+ (printparam "rpc_timeout" [show (rpc_timeout args), "s"])
(error $ "invalid rpc_timeout")
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
- printparam' "destination queue" $ tmpQName args
+ printparam "client version" ["amqp-utils", showVersion version]
+ printparam "destination queue" $ tmpQName args
(conn, chan) <- connect args
addChannelExceptionHandler chan (X.throwTo tid)
(q, _, _) <- declareQueue chan newQueue {queueExclusive = True}
if (currentExchange args /= "")
then do
- printparam' "exchange" $ currentExchange args
+ printparam "exchange" $ currentExchange args
bindQueue chan q (T.pack $ currentExchange args) q
else return ()
- printparam' "input file" $ inputFile args
+ printparam "input file" $ inputFile args
message <-
if inputFile args == "-"
then BL.getContents
else BL.readFile (inputFile args)
- printparam' "output file" $ outputFile args
+ printparam "output file" $ outputFile args
h <- if outputFile args == "-" then return stdout else openBinaryFile (outputFile args) WriteMode
ctag <- consumeMsgs chan q NoAck (rpcClientCallback h tid args)
- printparam' "consumer tag" $ T.unpack ctag
+ printparam "consumer tag" ctag
now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
hr "publishing request"
_ <- publishMsg
ec <- exceptionHandler x
hr "closing connection"
closeConnection conn
- printparam' "exiting" $ show ec
+ printparam "exiting" ec
exitWith ec)
exceptionHandler :: RpcException -> IO (ExitCode)