-- | opens a connection and a channel
connect :: Args -> IO (Connection, Channel)
connect args = do
- printparam' "server" $ server args
- printparam' "port" $ show $ port args
- printparam' "vhost" $ vHost args
+ printparam "server" $ server args
+ printparam "port" $ show $ port args
+ printparam "vhost" $ vHost args
printparam "connection_name" $ connectionName args
- printparam' "connect timeout" $ (show (connect_timeout args)) ++ "s"
+ printparam "connect timeout" $ (show (connect_timeout args)) ++ "s"
globalCertificateStore <- getSystemCertificateStore
let myTLS =
N.TLSSettings
-- addChannelExceptionHandler chan
-- (\exception -> closeConnection conn >>
--- printparam' "exiting" (show exception) >>
+-- printparam "exiting" (show exception) >>
-- killThread tid)
--
-- -- noop sharedValidationCache, handy when debugging
myCert (Just cert') (Just key') _ = do
result <- credentialLoadX509 cert' key'
case result of
- Left x -> printparam' "ERROR" x >> return Nothing
+ Left x -> printparam "ERROR" x >> return Nothing
Right x -> return $ Just x
myCert _ _ _ = return Nothing
+{-# LANGUAGE FlexibleInstances #-}
module Network.AMQP.Utils.Helpers where
import Control.Concurrent
i
hFlush stderr
--- | optional parameters
-printparam :: String -> Maybe String -> IO ()
-printparam labl ms = printwithlabel labl $ fmap (hPutStrLn stderr) ms
+class Flexprint a where
+ flexprint :: a -> Maybe (IO ())
--- | required parameters
-printparam' :: String -> String -> IO ()
-printparam' d s = printparam d (Just s)
+instance Flexprint (Maybe String) where
+ flexprint = fmap (hPutStrLn stderr)
+
+instance Flexprint String where
+ flexprint x = flexprint (Just x)
+
+-- | optional or required parameters
+printparam :: Flexprint a => String -> a -> IO ()
+printparam labl x = printwithlabel labl $ flexprint x
-- | head chars of body
printbody :: String -> Maybe BL.ByteString -> IO ()
createProcess (proc exe args) {std_out = out, std_err = Inherit}
sout <- mapM BL.hGetContents h
exitcode <- maybe 0 id (fmap BL.length sout) `seq` waitForProcess processhandle
- printparam' (numstring ++ " call returned") $ show exitcode
+ printparam (numstring ++ " call returned") $ show exitcode
if isJust action && isJust sout
then ((fromJust action $ exitcode) (fromJust sout)) >> acke envi a
else case exitcode of
hr "starting"
tid <- myThreadId
args <- getArgs >>= parseargs 'a'
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
- printparam' "routing key" $ rKey args
- printparam' "exchange" $ currentExchange args
+ printparam "client version" $ "amqp-utils " ++ (showVersion version)
+ printparam "routing key" $ rKey args
+ printparam "exchange" $ currentExchange args
isDir <-
if inputFile args == "-"
then return False
else F.getFileStatus (inputFile args) >>= return . F.isDirectory
if isDir
- then printparam' "hotfolder" $ inputFile args
- else printparam' "input file" $
+ then printparam "hotfolder" $ inputFile args
+ else printparam "input file" $
(inputFile args) ++
if (lineMode args)
then " (line-by-line)"
else ""
(conn, chan) <- connect args
addChannelExceptionHandler chan (X.throwTo tid)
- printparam' "confirm mode" $ show $ confirm args
+ printparam "confirm mode" $ show $ confirm args
if (confirm args)
then do
confirmSelect chan False
exceptionHandler
-- all done. wait and close.
if (confirm args)
- then waitForConfirms chan >>= (printparam' "confirmed") . show
+ then waitForConfirms chan >>= (printparam "confirmed") . show
else return ()
X.catch (closeConnection conn) exceptionHandler
-- | A handler for clean exit
exceptionHandler :: AMQPException -> IO ()
-exceptionHandler (ChannelClosedException Normal txt) = printparam' "exit" txt >> exitWith ExitSuccess
-exceptionHandler (ConnectionClosedException Normal txt) = printparam' "exit" txt >> exitWith ExitSuccess
-exceptionHandler x = printparam' "exception" (show x) >> exitWith (ExitFailure 1)
+exceptionHandler (ChannelClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler (ConnectionClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler x = printparam "exception" (show x) >> exitWith (ExitFailure 1)
-- | The handler for publisher confirms
confirmCallback :: (Word64, Bool, AckType) -> IO ()
confirmCallback (deliveryTag, isAll, ackType) =
- printparam'
+ printparam
"confirmed"
((show deliveryTag) ++
(if isAll
X.catch
(BL.readFile x >>= f (Just x))
(\exception ->
- printparam' "exception in handleFile" $
+ printparam "exception in handleFile" $
show (exception :: X.SomeException))
-- | Publish one message with our settings
tid <- myThreadId
args <- getArgs >>= parseargs 'r'
X.onException
- (printparam' "worker" $ fromJust $ fileProcess args)
+ (printparam "worker" $ fromJust $ fileProcess args)
(error "-X option required")
let addiArgs = reverse $ additionalArgs args
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+ printparam "client version" $ "amqp-utils " ++ (showVersion version)
(conn, chan) <- connect args
addChannelExceptionHandler chan (X.throwTo tid)
queue <-
(\(x, _, _) -> return x))
(return)
(fmap T.pack (qName args))
- printparam' "queue name" $ T.unpack queue
+ printparam "queue name" $ T.unpack queue
if (currentExchange args /= "")
then do
- printparam' "exchange" $ currentExchange args
+ printparam "exchange" $ currentExchange args
bindQueue chan queue (T.pack $ currentExchange args) queue
else return ()
ctag <-
then Ack
else NoAck)
(rpcServerCallback tid args addiArgs chan)
- printparam' "consumer tag" $ T.unpack ctag
- printparam' "send acks" $ show (ack args)
+ printparam "consumer tag" $ T.unpack ctag
+ printparam "send acks" $ show (ack args)
printparam "requeue if rejected" $
if (ack args)
then Just (show (requeuenack args))
hr "entering main loop"
X.catch
(forever $ threadDelay 5000000)
- (\exception -> printparam' "exception" $ show (exception :: X.SomeException))
+ (\exception -> printparam "exception" $ show (exception :: X.SomeException))
closeConnection conn
hr "connection closed"
X.catch
(printmsg Nothing m (anRiss a) now)
(\x -> X.throwTo tid (x :: X.SomeException) >> return [])
- either (\e -> printparam' "ERROR" (show (e :: X.SomeException))) return =<<
+ either (\e -> printparam "ERROR" (show (e :: X.SomeException))) return =<<
X.try
(optionalFileStuff m callbackoptions addi numstring a tid (Just reply))
hr $ "END " ++ numstring
tid <- myThreadId
args <- getArgs >>= parseargs 'k'
let addiArgs = reverse $ additionalArgs args
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+ printparam "client version" $ "amqp-utils " ++ (showVersion version)
(conn, chan) <- connect args
addChannelExceptionHandler chan (X.throwTo tid)
-- set prefetch
- printparam' "prefetch" $ show $ preFetch args
+ printparam "prefetch" $ show $ preFetch args
qos chan 0 (fromIntegral $ preFetch args) False
-- attach to given queue? or build exclusive queue and bind it?
queue <-
(tempQueue chan (tmpQName args) (bindings args) (currentExchange args))
(return)
(fmap T.pack (qName args))
- printparam' "queue name" $ T.unpack queue
+ printparam "queue name" $ T.unpack queue
printparam "shown body chars" $ fmap show $ anRiss args
printparam "temp dir" $ tempDir args
printparam "callback" $ fileProcess args
then Ack
else NoAck)
(myCallback args addiArgs tid)
- printparam' "consumer tag" $ T.unpack ctag
- printparam' "send acks" $ show (ack args)
+ printparam "consumer tag" $ T.unpack ctag
+ printparam "send acks" $ show (ack args)
printparam "requeue if rejected" $
if (ack args)
then Just (show (requeuenack args))
hr "entering main loop"
X.catch
(forever $ threadDelay 5000000)
- (\exception -> printparam' "exception" $ show (exception :: X.SomeException))
+ (\exception -> printparam "exception" $ show (exception :: X.SomeException))
closeConnection conn
hr "connection closed"
mapM_
(\(xchange, bkey) ->
bindQueue chan q (T.pack xchange) (T.pack bkey) >>
- printparam' "binding" (xchange ++ ":" ++ bkey))
+ printparam "binding" (xchange ++ ":" ++ bkey))
(if null bindlist
then [(x, "#")]
else bindlist)
(printmsg Nothing m (anRiss a) now)
(\x -> X.throwTo tid (x :: X.SomeException) >> return [])
either
- (\e -> printparam' "ERROR" (show (e :: X.SomeException)) >> reje envi a)
+ (\e -> printparam "ERROR" (show (e :: X.SomeException)) >> reje envi a)
return =<<
X.try (optionalFileStuff m callbackoptions addi numstring a tid Nothing)
hr $ "END " ++ numstring
tid <- myThreadId
args <- getArgs >>= parseargs 'p'
X.onException
- (printparam' "rpc_timeout" $ show (rpc_timeout args) ++ "s")
+ (printparam "rpc_timeout" $ show (rpc_timeout args) ++ "s")
(error $ "invalid rpc_timeout")
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
- printparam' "destination queue" $ tmpQName args
+ printparam "client version" $ "amqp-utils " ++ (showVersion version)
+ printparam "destination queue" $ tmpQName args
(conn, chan) <- connect args
addChannelExceptionHandler chan (X.throwTo tid)
(q, _, _) <- declareQueue chan newQueue {queueExclusive = True}
if (currentExchange args /= "")
then do
- printparam' "exchange" $ currentExchange args
+ printparam "exchange" $ currentExchange args
bindQueue chan q (T.pack $ currentExchange args) q
else return ()
- printparam' "input file" $ inputFile args
+ printparam "input file" $ inputFile args
message <-
if inputFile args == "-"
then BL.getContents
else BL.readFile (inputFile args)
- printparam' "output file" $ outputFile args
+ printparam "output file" $ outputFile args
h <- if outputFile args == "-" then return stdout else openBinaryFile (outputFile args) WriteMode
ctag <- consumeMsgs chan q NoAck (rpcClientCallback h tid args)
- printparam' "consumer tag" $ T.unpack ctag
+ printparam "consumer tag" $ T.unpack ctag
now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
hr "publishing request"
_ <- publishMsg
ec <- exceptionHandler x
hr "closing connection"
closeConnection conn
- printparam' "exiting" $ show ec
+ printparam "exiting" $ show ec
exitWith ec)
exceptionHandler :: RpcException -> IO (ExitCode)