module Network.AMQP.Utils.Connection where
-import qualified Data.Text as T
-import qualified Data.ByteString as B
-import Network.AMQP
-import Network.AMQP.Utils.Options
-import Network.AMQP.Utils.Helpers
-import Network.TLS
-import Network.TLS.Extra
-import qualified Network.Connection as N
-import System.X509
-import Data.Default.Class
+import qualified Data.ByteString as B
+import Data.Default.Class
+import qualified Data.Text as T
+import Network.AMQP
+import Network.AMQP.Utils.Helpers
+import Network.AMQP.Utils.Options
+import qualified Network.Connection as N
+import Network.TLS
+import Network.TLS.Extra
+import System.X509
-- | opens a connection and a channel
-connect :: Args -> IO (Connection,Channel)
+connect :: Args -> IO (Connection, Channel)
connect args = do
- printparam' "server" $ server args
- printparam' "port" $ show $ port args
- printparam' "vhost" $ vHost args
- printparam "connection_name" $ connectionName args
- globalCertificateStore <- getSystemCertificateStore
- let myTLS = N.TLSSettings (defaultParamsClient "" B.empty)
- { clientShared = def
- { sharedValidationCache = def
- , sharedCAStore = globalCertificateStore
- }
- , clientSupported = def
- { supportedCiphers = ciphersuite_default
- }
- , clientHooks = def
- { onCertificateRequest = myCert (cert args) (key args)
- }
+ printparam' "server" $ server args
+ printparam' "port" $ show $ port args
+ printparam' "vhost" $ vHost args
+ printparam "connection_name" $ connectionName args
+ globalCertificateStore <- getSystemCertificateStore
+ let myTLS =
+ N.TLSSettings
+ (defaultParamsClient "" B.empty)
+ { clientShared =
+ def
+ { sharedValidationCache = def
+ , sharedCAStore = globalCertificateStore
}
- conn <- openConnection'' defaultConnectionOpts
- { coAuth = [ SASLMechanism "EXTERNAL" B.empty Nothing
- , plain (T.pack (user args)) (T.pack (pass args))
- ]
- , coVHost = T.pack $ vHost args
- , coTLSSettings = if (tls args ) then Just ( TLSCustom myTLS ) else Nothing
- , coServers = [ (server args, fromIntegral $ port args) ]
- , coHeartbeatDelay = fmap fromIntegral $ heartBeat args
- , coName = fmap T.pack $ connectionName args
- }
- chan <- openChannel conn
+ , clientSupported = def {supportedCiphers = ciphersuite_default}
+ , clientHooks =
+ def {onCertificateRequest = myCert (cert args) (key args)}
+ }
+ conn <-
+ openConnection''
+ defaultConnectionOpts
+ { coAuth =
+ [ SASLMechanism "EXTERNAL" B.empty Nothing
+ , plain (T.pack (user args)) (T.pack (pass args))
+ ]
+ , coVHost = T.pack $ vHost args
+ , coTLSSettings =
+ if (tls args)
+ then Just (TLSCustom myTLS)
+ else Nothing
+ , coServers = [(server args, fromIntegral $ port args)]
+ , coHeartbeatDelay = fmap fromIntegral $ heartBeat args
+ , coName = fmap T.pack $ connectionName args
+ }
+ chan <- openChannel conn
+ return (conn, chan)
+
-- addChannelExceptionHandler chan
-- (\exception -> closeConnection conn >>
-- printparam' "exiting" (show exception) >>
-- killThread tid)
- return (conn,chan)
--
-- -- noop sharedValidationCache, handy when debugging
-- noValidation :: ValidationCache
-- noValidation = ValidationCache
-- (\_ _ _ -> return ValidationCachePass)
-- (\_ _ _ -> return ())
-
-- | provides the TLS client certificate
myCert :: Maybe FilePath -> Maybe FilePath -> t -> IO (Maybe Credential)
myCert (Just cert') (Just key') _ = do
- result <- credentialLoadX509 cert' key'
- case result of
- Left x -> printparam' "ERROR" x >> return Nothing
- Right x -> return $ Just x
+ result <- credentialLoadX509 cert' key'
+ case result of
+ Left x -> printparam' "ERROR" x >> return Nothing
+ Right x -> return $ Just x
myCert _ _ _ = return Nothing
-
module Network.AMQP.Utils.Helpers where
import qualified Data.ByteString.Lazy.Char8 as BL
-import System.IO
+import System.IO
-- | log cmdline options
listToMaybeUnwords :: [String] -> Maybe String
-- | Strings or ByteStrings with label, oder nothing at all
printwithlabel :: String -> Maybe (IO ()) -> IO ()
-printwithlabel _ Nothing =
- return ()
+printwithlabel _ Nothing = return ()
printwithlabel labl (Just i) = do
- mapM_ putStr [ " --- ", labl, ": " ]
- i
- hFlush stdout
+ mapM_ putStr [" --- ", labl, ": "]
+ i
+ hFlush stdout
-- | optional parameters
printparam :: String -> Maybe String -> IO ()
-printparam labl ms = printwithlabel labl $
- fmap putStrLn ms
+printparam labl ms = printwithlabel labl $ fmap putStrLn ms
-- | required parameters
printparam' :: String -> String -> IO ()
-- | head chars of body
printbody :: (String, Maybe BL.ByteString) -> IO ()
-printbody (labl, ms) = printwithlabel labl $
- fmap (\s -> putStrLn "" >> BL.putStrLn s) ms
+printbody (labl, ms) =
+ printwithlabel labl $ fmap (\s -> putStrLn "" >> BL.putStrLn s) ms
-- | log marker
hr :: String -> IO ()
module Network.AMQP.Utils.Options where
-import Paths_amqp_utils ( version )
-import Data.Version ( showVersion )
-import Data.Maybe
-import Data.Default.Class
-import System.Console.GetOpt
+import Data.Default.Class
+import Data.Maybe
+import Data.Version (showVersion)
+import Paths_amqp_utils (version)
+import System.Console.GetOpt
-- | A data type for our options
-data Args = Args { server :: String
- , port :: Int
- , tls :: Bool
- , vHost :: String
- , currentExchange :: String
- , bindings :: [(String, String)]
- , rKey :: String
- , anRiss :: Maybe Int
- , fileProcess :: Maybe String
- , qName :: Maybe String
- , cert :: Maybe String
- , key :: Maybe String
- , user :: String
- , pass :: String
- , preFetch :: Int
- , heartBeat :: Maybe Int
- , tempDir :: Maybe String
- , additionalArgs :: [String]
- , connectionName :: Maybe String
- , tmpQName :: String
- , inputFile :: String
- , lineMode :: Bool
- , confirm :: Bool
- }
+data Args = Args
+ { server :: String
+ , port :: Int
+ , tls :: Bool
+ , vHost :: String
+ , currentExchange :: String
+ , bindings :: [(String, String)]
+ , rKey :: String
+ , anRiss :: Maybe Int
+ , fileProcess :: Maybe String
+ , qName :: Maybe String
+ , cert :: Maybe String
+ , key :: Maybe String
+ , user :: String
+ , pass :: String
+ , preFetch :: Int
+ , heartBeat :: Maybe Int
+ , tempDir :: Maybe String
+ , additionalArgs :: [String]
+ , connectionName :: Maybe String
+ , tmpQName :: String
+ , inputFile :: String
+ , lineMode :: Bool
+ , confirm :: Bool
+ }
instance Default Args where
- def = Args "localhost"
- 5672
- False
- "/"
- "default"
- []
- ""
- Nothing
- Nothing
- Nothing
- Nothing
- Nothing
- "guest"
- "guest"
- 1
- Nothing
- Nothing
- []
- Nothing
- ""
- "/dev/stdin"
- False
- False
+ def =
+ Args
+ "localhost"
+ 5672
+ False
+ "/"
+ "default"
+ []
+ ""
+ Nothing
+ Nothing
+ Nothing
+ Nothing
+ Nothing
+ "guest"
+ "guest"
+ 1
+ Nothing
+ Nothing
+ []
+ Nothing
+ ""
+ "/dev/stdin"
+ False
+ False
-- | Common options
cOptions :: [OptDescr (Args -> Args)]
-cOptions = [ Option [ 'o' ]
- [ "server" ]
- (ReqArg (\s o -> o { server = s }) "SERVER")
- ("AMQP Server (default: " ++ server def ++ ")")
- , Option [ 'y' ]
- [ "vhost" ]
- (ReqArg (\s o -> o { vHost = s }) "VHOST")
- ("AMQP Virtual Host (default: " ++ vHost def ++ ")")
- , Option [ 'x' ]
- [ "exchange" ]
- (ReqArg (\s o -> o { currentExchange = s }) "EXCHANGE")
- ("AMQP Exchange (default: default)")
- , Option [ 'Q' ]
- [ "qname" ]
- (ReqArg (\s o -> o { tmpQName = s }) "TEMPQNAME")
- "Name for temporary exclusive Queue"
- , Option [ 'p' ]
- [ "port" ]
- (ReqArg (\s o -> o { port = read s }) "PORT")
- ("Server Port Number (default: " ++
- show (port def) ++ ")")
- , Option [ 'T' ]
- [ "tls" ]
- (NoArg (\o -> o { tls = not (tls o) }))
- ("Toggle TLS (default: " ++ show (tls def) ++ ")")
- , Option [ 'q' ]
- [ "queue" ]
- (ReqArg (\s o -> o { qName = Just s }) "QUEUENAME")
- "Ignore Exchange and bind to existing Queue"
- , Option [ 'c' ]
- [ "cert" ]
- (ReqArg (\s o -> o { cert = Just s }) "CERTFILE")
- ("TLS Client Certificate File")
- , Option [ 'k' ]
- [ "key" ]
- (ReqArg (\s o -> o { key = Just s }) "KEYFILE")
- ("TLS Client Private Key File")
- , Option [ 'U' ]
- [ "user" ]
- (ReqArg (\s o -> o { user = s }) "USERNAME")
- ("Username for Auth")
- , Option [ 'P' ]
- [ "pass" ]
- (ReqArg (\s o -> o { pass = s }) "PASSWORD")
- ("Password for Auth")
- , Option [ 's' ]
- [ "heartbeats" ]
- (ReqArg (\s o -> o { heartBeat = (Just (read s)) }) "INT")
- "heartbeat interval (0=disable, default: set by server)"
- , Option [ 'n' ]
- [ "name" ]
- (ReqArg (\s o -> o { connectionName = Just s }) "NAME")
- "connection name, will be shown in RabbitMQ web interface"
- ]
+cOptions =
+ [ Option
+ ['o']
+ ["server"]
+ (ReqArg (\s o -> o {server = s}) "SERVER")
+ ("AMQP Server (default: " ++ server def ++ ")")
+ , Option
+ ['y']
+ ["vhost"]
+ (ReqArg (\s o -> o {vHost = s}) "VHOST")
+ ("AMQP Virtual Host (default: " ++ vHost def ++ ")")
+ , Option
+ ['x']
+ ["exchange"]
+ (ReqArg (\s o -> o {currentExchange = s}) "EXCHANGE")
+ ("AMQP Exchange (default: default)")
+ , Option
+ ['Q']
+ ["qname"]
+ (ReqArg (\s o -> o {tmpQName = s}) "TEMPQNAME")
+ "Name for temporary exclusive Queue"
+ , Option
+ ['p']
+ ["port"]
+ (ReqArg (\s o -> o {port = read s}) "PORT")
+ ("Server Port Number (default: " ++ show (port def) ++ ")")
+ , Option
+ ['T']
+ ["tls"]
+ (NoArg (\o -> o {tls = not (tls o)}))
+ ("Toggle TLS (default: " ++ show (tls def) ++ ")")
+ , Option
+ ['q']
+ ["queue"]
+ (ReqArg (\s o -> o {qName = Just s}) "QUEUENAME")
+ "Ignore Exchange and bind to existing Queue"
+ , Option
+ ['c']
+ ["cert"]
+ (ReqArg (\s o -> o {cert = Just s}) "CERTFILE")
+ ("TLS Client Certificate File")
+ , Option
+ ['k']
+ ["key"]
+ (ReqArg (\s o -> o {key = Just s}) "KEYFILE")
+ ("TLS Client Private Key File")
+ , Option
+ ['U']
+ ["user"]
+ (ReqArg (\s o -> o {user = s}) "USERNAME")
+ ("Username for Auth")
+ , Option
+ ['P']
+ ["pass"]
+ (ReqArg (\s o -> o {pass = s}) "PASSWORD")
+ ("Password for Auth")
+ , Option
+ ['s']
+ ["heartbeats"]
+ (ReqArg (\s o -> o {heartBeat = (Just (read s))}) "INT")
+ "heartbeat interval (0=disable, default: set by server)"
+ , Option
+ ['n']
+ ["name"]
+ (ReqArg (\s o -> o {connectionName = Just s}) "NAME")
+ "connection name, will be shown in RabbitMQ web interface"
+ ]
-- | Options for konsum
kOptions :: [OptDescr (Args -> Args)]
-kOptions = [ Option [ 'r' ]
- [ "bindingkey" ]
- (ReqArg (\s o -> o { bindings = (currentExchange o, s) :
- (bindings o)
- })
- "BINDINGKEY")
- ("AMQP binding key (default: #)")
- , Option [ 'X' ]
- [ "execute" ]
- (OptArg (\s o -> o { fileProcess = Just (fromMaybe callback
- s)
- , tempDir = Just (fromMaybe "/tmp"
- (tempDir o))
- })
- "EXE")
- ("Callback Script File (implies -t) (-X without arg: " ++
- callback ++ ")")
- , Option [ 'a' ]
- [ "args" ]
- (ReqArg (\s o -> o { additionalArgs = s :
- (additionalArgs o)
- })
- "ARG")
- "additional argument for -X callback"
- , Option [ 'l' ]
- [ "charlimit" ]
- (ReqArg (\s o -> o { anRiss = Just (read s :: Int) }) "INT")
- "limit number of shown body chars (default: unlimited)"
- , Option [ 't' ]
- [ "tempdir", "target" ]
- (OptArg (\s o -> o { tempDir = Just (fromMaybe "/tmp" s) })
- "DIR")
- "tempdir (default: no file creation, -t without arg: /tmp)"
- , Option [ 'f' ]
- [ "prefetch" ]
- (ReqArg (\s o -> o { preFetch = read s }) "INT")
- ("Prefetch count. (0=unlimited, 1=off, default: " ++
- show (preFetch def) ++ ")")
- ]
+kOptions =
+ [ Option
+ ['r']
+ ["bindingkey"]
+ (ReqArg
+ (\s o -> o {bindings = (currentExchange o, s) : (bindings o)})
+ "BINDINGKEY")
+ ("AMQP binding key (default: #)")
+ , Option
+ ['X']
+ ["execute"]
+ (OptArg
+ (\s o ->
+ o
+ { fileProcess = Just (fromMaybe callback s)
+ , tempDir = Just (fromMaybe "/tmp" (tempDir o))
+ })
+ "EXE")
+ ("Callback Script File (implies -t) (-X without arg: " ++ callback ++ ")")
+ , Option
+ ['a']
+ ["args"]
+ (ReqArg (\s o -> o {additionalArgs = s : (additionalArgs o)}) "ARG")
+ "additional argument for -X callback"
+ , Option
+ ['l']
+ ["charlimit"]
+ (ReqArg (\s o -> o {anRiss = Just (read s :: Int)}) "INT")
+ "limit number of shown body chars (default: unlimited)"
+ , Option
+ ['t']
+ ["tempdir", "target"]
+ (OptArg (\s o -> o {tempDir = Just (fromMaybe "/tmp" s)}) "DIR")
+ "tempdir (default: no file creation, -t without arg: /tmp)"
+ , Option
+ ['f']
+ ["prefetch"]
+ (ReqArg (\s o -> o {preFetch = read s}) "INT")
+ ("Prefetch count. (0=unlimited, 1=off, default: " ++
+ show (preFetch def) ++ ")")
+ ]
-- | Options for agitprop
aOptions :: [OptDescr (Args -> Args)]
-aOptions = [ Option [ 'r' ]
- [ "routingkey" ]
- (ReqArg (\s o -> o { rKey = s }) "ROUTINGKEY")
- ("AMQP routing key (default: " ++ (rKey def) ++ ")")
- , Option [ 'f' ]
- [ "inputfile" ]
- (ReqArg (\s o -> o { inputFile = s }) "INPUTFILE")
- ("Message input file (default: " ++ (inputFile def) ++ ")")
- , Option [ 'l' ]
- [ "linemode" ]
- (NoArg (\o -> o { lineMode = not (lineMode o) }))
- ("Toggle line-by-line mode (default: " ++ show (lineMode def) ++ ")")
- , Option [ 'C' ]
- [ "confirm" ]
- (NoArg (\o -> o { confirm = not (confirm o) }))
- ("Toggle confirms (default: " ++ show (confirm def) ++ ")")
- ]
+aOptions =
+ [ Option
+ ['r']
+ ["routingkey"]
+ (ReqArg (\s o -> o {rKey = s}) "ROUTINGKEY")
+ ("AMQP routing key (default: " ++ (rKey def) ++ ")")
+ , Option
+ ['f']
+ ["inputfile"]
+ (ReqArg (\s o -> o {inputFile = s}) "INPUTFILE")
+ ("Message input file (default: " ++ (inputFile def) ++ ")")
+ , Option
+ ['l']
+ ["linemode"]
+ (NoArg (\o -> o {lineMode = not (lineMode o)}))
+ ("Toggle line-by-line mode (default: " ++ show (lineMode def) ++ ")")
+ , Option
+ ['C']
+ ["confirm"]
+ (NoArg (\o -> o {confirm = not (confirm o)}))
+ ("Toggle confirms (default: " ++ show (confirm def) ++ ")")
+ ]
-- |
options :: String -> [OptDescr (Args -> Args)]
-- applies options onto argstring
parseargs :: String -> [String] -> IO Args
parseargs exename argstring =
- case getOpt Permute opt argstring of
- (o, [], []) -> return $ foldl (flip id) def o
- (_, _, errs) -> ioError $
- userError $ concat errs ++ usageInfo (usage exename) opt
+ case getOpt Permute opt argstring of
+ (o, [], []) -> return $ foldl (flip id) def o
+ (_, _, errs) ->
+ ioError $ userError $ concat errs ++ usageInfo (usage exename) opt
where
opt = options exename
callback = "/usr/lib/haskell-amqp-utils/callback"
usage :: String -> String
-usage exename = "\n\
+usage exename =
+ "\n\
\amqp-utils " ++
- (showVersion version) ++
- "\n\n\
+ (showVersion version) ++
+ "\n\n\
\Usage:\n" ++
- exename ++
- " [options]\n\n\
+ exename ++
+ " [options]\n\n\
\Options:"
{-# LANGUAGE OverloadedStrings #-}
-import Paths_amqp_utils ( version )
-import Data.Version ( showVersion )
-import System.Environment
-import qualified Data.Text as T
-import Network.AMQP
-import Network.AMQP.Utils.Options
-import Network.AMQP.Utils.Helpers
-import Network.AMQP.Utils.Connection
-import qualified Data.ByteString.Lazy.Char8 as BL
-import Data.Word ( Word64 )
-import qualified System.Posix.Files as F
-import System.INotify
-import Control.Monad ( forever )
-import Control.Concurrent ( threadDelay )
+import Control.Concurrent (threadDelay)
+import Control.Monad (forever)
+import qualified Data.ByteString.Lazy.Char8 as BL
+import qualified Data.Text as T
+import Data.Version (showVersion)
+import Data.Word (Word64)
+import Network.AMQP
+import Network.AMQP.Utils.Connection
+import Network.AMQP.Utils.Helpers
+import Network.AMQP.Utils.Options
+import Paths_amqp_utils (version)
+import System.Environment
+import System.INotify
+import qualified System.Posix.Files as F
main :: IO ()
main = do
- hr "starting"
+ hr "starting"
-- tid <- myThreadId
- args <- getArgs >>= parseargs "agitprop"
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
- printparam' "routing key" $ rKey args
- isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory
- if isDir
- then printparam' "hotfolder" $ inputFile args
- else printparam' "input file" $ (inputFile args) ++ if (lineMode args) then " (line-by-line)" else ""
- (conn, chan) <- connect args
- printparam' "confirm mode" $ show $ confirm args
- if (confirm args)
- then do
- confirmSelect chan False
- addConfirmationListener chan confirmCallback
- else return ()
- let publishOneMsg f = do
- r <- publishMsg chan
- (T.pack $ currentExchange args)
- (T.pack $ rKey args)
- newMsg { msgBody = f
- , msgDeliveryMode = Just Persistent
- }
- printparam "sent" $ fmap show r
- if isDir
- then do
- inotify <- initINotify
- wd <- addWatch inotify [ Close ] (inputFile args) (handleEvent publishOneMsg)
- hr (inputFile args)
- _ <- forever $ threadDelay 1000000
- removeWatch wd
- else do
- hr (inputFile args)
- messageFile <- BL.readFile (inputFile args)
- if (lineMode args)
- then mapM_ publishOneMsg (BL.lines messageFile)
- else publishOneMsg messageFile
-
+ args <- getArgs >>= parseargs "agitprop"
+ printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+ printparam' "routing key" $ rKey args
+ isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory
+ if isDir
+ then printparam' "hotfolder" $ inputFile args
+ else printparam' "input file" $
+ (inputFile args) ++
+ if (lineMode args)
+ then " (line-by-line)"
+ else ""
+ (conn, chan) <- connect args
+ printparam' "confirm mode" $ show $ confirm args
+ if (confirm args)
+ then do
+ confirmSelect chan False
+ addConfirmationListener chan confirmCallback
+ else return ()
+ let publishOneMsg f = do
+ r <-
+ publishMsg
+ chan
+ (T.pack $ currentExchange args)
+ (T.pack $ rKey args)
+ newMsg {msgBody = f, msgDeliveryMode = Just Persistent}
+ printparam "sent" $ fmap show r
+ if isDir
+ then do
+ inotify <- initINotify
+ wd <-
+ addWatch inotify [Close] (inputFile args) (handleEvent publishOneMsg)
+ hr (inputFile args)
+ _ <- forever $ threadDelay 1000000
+ removeWatch wd
+ else do
+ hr (inputFile args)
+ messageFile <- BL.readFile (inputFile args)
+ if (lineMode args)
+ then mapM_ publishOneMsg (BL.lines messageFile)
+ else publishOneMsg messageFile
-- all done. wait and close.
- if (confirm args)
- then waitForConfirms chan >>= return . show >> return ()
- else return ()
- closeConnection conn
+ if (confirm args)
+ then waitForConfirms chan >>= return . show >> return ()
+ else return ()
+ closeConnection conn
-- | The handler for publisher confirms
confirmCallback :: (Word64, Bool, AckType) -> IO ()
confirmCallback (deliveryTag, isAll, ackType) =
- printparam' "confirmed"
- ((show deliveryTag) ++
- (if isAll then " all " else " this ") ++ (show ackType))
+ printparam'
+ "confirmed"
+ ((show deliveryTag) ++
+ (if isAll
+ then " all "
+ else " this ") ++
+ (show ackType))
-- | hotfolder event handler
handleEvent :: (BL.ByteString -> IO ()) -> Event -> IO ()
{-# LANGUAGE OverloadedStrings #-}
+
-- generic amqp consumer
--
-- compile:
--
-- Stop with ^C
-import Paths_amqp_utils ( version )
-import Data.Version ( showVersion )
-import System.Environment
-import System.IO
-import System.Process
-import System.Exit
-import Control.Monad
-import Control.Concurrent
-import qualified Control.Exception as X
-import Data.List
-import Data.Maybe
-import qualified Data.Text as T
-import qualified Data.Map as M
-import Network.AMQP
-import Network.AMQP.Types
-import Network.AMQP.Utils.Options
-import Network.AMQP.Utils.Helpers
-import Network.AMQP.Utils.Connection
+import Control.Concurrent
+import qualified Control.Exception as X
+import Control.Monad
import qualified Data.ByteString.Lazy.Char8 as BL
-import Data.Time
-import Data.Time.Clock.POSIX
+import Data.List
+import qualified Data.Map as M
+import Data.Maybe
+import qualified Data.Text as T
+import Data.Time
+import Data.Time.Clock.POSIX
+import Data.Version (showVersion)
+import Network.AMQP
+import Network.AMQP.Types
+import Network.AMQP.Utils.Connection
+import Network.AMQP.Utils.Helpers
+import Network.AMQP.Utils.Options
+import Paths_amqp_utils (version)
+import System.Environment
+import System.Exit
+import System.IO
+import System.Process
main :: IO ()
main = do
- hr "starting"
- tid <- myThreadId
- args <- getArgs >>= parseargs "konsum"
- let addiArgs = reverse $ additionalArgs args
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
- (conn,chan) <- connect args
-
- -- set prefetch
- printparam' "prefetch" $
- show $ preFetch args
- qos chan 0 (fromIntegral $ preFetch args) False
-
- -- attach to given queue? or build exclusive queue and bind it?
- queue <- maybe (tempQueue chan (tmpQName args) (bindings args) (currentExchange args))
- (return)
- (fmap T.pack (qName args))
- printparam' "queue name" $ T.unpack queue
-
- printparam "shown body chars" $ fmap show $ anRiss args
- printparam "temp dir" $ tempDir args
- printparam "callback" $ fileProcess args
- printparam "callback args" $ listToMaybeUnwords addiArgs
-
- -- subscribe to the queue
- ctag <- consumeMsgs chan
- queue
- Ack
- (myCallback (anRiss args)
- (fileProcess args)
- (tempDir args)
- addiArgs
- tid)
- printparam' "consumer tag" $ T.unpack ctag
- hr "entering main loop"
-
- X.catch (forever $ threadDelay 5000000)
- (\exception -> printparam' "exception" $
- show (exception :: X.SomeException))
- closeConnection conn
+ hr "starting"
+ tid <- myThreadId
+ args <- getArgs >>= parseargs "konsum"
+ let addiArgs = reverse $ additionalArgs args
+ printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+ (conn, chan) <- connect args
+ -- set prefetch
+ printparam' "prefetch" $ show $ preFetch args
+ qos chan 0 (fromIntegral $ preFetch args) False
+ -- attach to given queue? or build exclusive queue and bind it?
+ queue <-
+ maybe
+ (tempQueue chan (tmpQName args) (bindings args) (currentExchange args))
+ (return)
+ (fmap T.pack (qName args))
+ printparam' "queue name" $ T.unpack queue
+ printparam "shown body chars" $ fmap show $ anRiss args
+ printparam "temp dir" $ tempDir args
+ printparam "callback" $ fileProcess args
+ printparam "callback args" $ listToMaybeUnwords addiArgs
+ -- subscribe to the queue
+ ctag <-
+ consumeMsgs
+ chan
+ queue
+ Ack
+ (myCallback (anRiss args) (fileProcess args) (tempDir args) addiArgs tid)
+ printparam' "consumer tag" $ T.unpack ctag
+ hr "entering main loop"
+ X.catch
+ (forever $ threadDelay 5000000)
+ (\exception -> printparam' "exception" $ show (exception :: X.SomeException))
+ closeConnection conn
-- | exclusive temp queue
tempQueue :: Channel -> String -> [(String, String)] -> String -> IO T.Text
tempQueue chan tmpqname bindlist x = do
- (q, _, _) <- declareQueue chan
- newQueue { queueExclusive = True
- , queueName = T.pack tmpqname
- }
- mapM_ (\(xchange, bkey) -> bindQueue chan q (T.pack xchange) (T.pack bkey) >>
- printparam' "binding" (xchange ++ ":" ++ bkey))
- (if null bindlist then [ (x, "#") ] else bindlist)
- return q
+ (q, _, _) <-
+ declareQueue
+ chan
+ newQueue {queueExclusive = True, queueName = T.pack tmpqname}
+ mapM_
+ (\(xchange, bkey) ->
+ bindQueue chan q (T.pack xchange) (T.pack bkey) >>
+ printparam' "binding" (xchange ++ ":" ++ bkey))
+ (if null bindlist
+ then [(x, "#")]
+ else bindlist)
+ return q
-- | process received message
-myCallback :: Maybe Int
- -> Maybe String
- -> Maybe String
- -> [String]
- -> ThreadId
- -> (Message, Envelope)
- -> IO ()
+myCallback ::
+ Maybe Int
+ -> Maybe String
+ -> Maybe String
+ -> [String]
+ -> ThreadId
+ -> (Message, Envelope)
+ -> IO ()
myCallback anR filePr tempD addi tid m@(_, envi) = do
- let numstring = show $ envDeliveryTag envi
- hr $ "BEGIN " ++ numstring
- now <- getZonedTime
- callbackoptions <- printmsg m anR now
- either (\e -> printparam' "ERROR" (show (e :: X.SomeException)) >>
- rejectEnv envi True)
- return
- =<< X.try (optionalFileStuff m
- callbackoptions
- addi
- numstring
- tempD
- filePr
- tid)
- hr $ "END " ++ numstring
+ let numstring = show $ envDeliveryTag envi
+ hr $ "BEGIN " ++ numstring
+ now <- getZonedTime
+ callbackoptions <- printmsg m anR now
+ either
+ (\e ->
+ printparam' "ERROR" (show (e :: X.SomeException)) >> rejectEnv envi True)
+ return =<<
+ X.try (optionalFileStuff m callbackoptions addi numstring tempD filePr tid)
+ hr $ "END " ++ numstring
-- | if the message is to be saved
-- and maybe processed further
-optionalFileStuff :: (Message, Envelope)
- -> [String]
- -> [String]
- -> String
- -> Maybe String
- -> Maybe String
- -> ThreadId
- -> IO ()
+optionalFileStuff ::
+ (Message, Envelope)
+ -> [String]
+ -> [String]
+ -> String
+ -> Maybe String
+ -> Maybe String
+ -> ThreadId
+ -> IO ()
optionalFileStuff (msg, envi) callbackoptions addi numstring tempD filePr tid = do
- path <- saveFile tempD numstring (msgBody msg)
- printparam "saved to" path
- let callbackcmdline = liftM2 (constructCallbackCmdLine callbackoptions
- addi
- numstring)
- filePr
- path
- printparam "calling" $ fmap unwords callbackcmdline
- maybe (ackEnv envi)
- (\c -> forkFinally (doProc numstring envi c)
- (either (throwTo tid) return) >>
- return ())
- callbackcmdline
+ path <- saveFile tempD numstring (msgBody msg)
+ printparam "saved to" path
+ let callbackcmdline =
+ liftM2
+ (constructCallbackCmdLine callbackoptions addi numstring)
+ filePr
+ path
+ printparam "calling" $ fmap unwords callbackcmdline
+ maybe
+ (ackEnv envi)
+ (\c ->
+ forkFinally (doProc numstring envi c) (either (throwTo tid) return) >>
+ return ())
+ callbackcmdline
-- | save message into temp file
saveFile :: Maybe String -> String -> BL.ByteString -> IO (Maybe String)
saveFile Nothing _ _ = return Nothing
saveFile (Just tempD) numstring body = do
- (p, h) <- openBinaryTempFileWithDefaultPermissions tempD
- ("konsum-" ++
- numstring ++ "-.tmp")
- BL.hPut h body
- hClose h
- return $ Just p
+ (p, h) <-
+ openBinaryTempFileWithDefaultPermissions
+ tempD
+ ("konsum-" ++ numstring ++ "-.tmp")
+ BL.hPut h body
+ hClose h
+ return $ Just p
-- | construct cmdline for callback script
-constructCallbackCmdLine :: [String]
- -> [String]
- -> String
- -> String
- -> String
- -> [String]
+constructCallbackCmdLine ::
+ [String] -> [String] -> String -> String -> String -> [String]
constructCallbackCmdLine opts addi num exe path =
- exe : "-f" : path : "-n" : num : opts ++ addi
+ exe : "-f" : path : "-n" : num : opts ++ addi
-- | call callback script
doProc :: String -> Envelope -> [String] -> IO ()
-doProc numstring envi (exe : args) = do
- (_, _, _, processhandle) <- createProcess (proc exe args) { std_out = Inherit
- , std_err = Inherit
- }
- exitcode <- waitForProcess processhandle
- printparam' (numstring ++ " call returned") $ show exitcode
- case exitcode of
- ExitSuccess -> ackEnv envi
- ExitFailure _ -> rejectEnv envi True
+doProc numstring envi (exe:args) = do
+ (_, _, _, processhandle) <-
+ createProcess (proc exe args) {std_out = Inherit, std_err = Inherit}
+ exitcode <- waitForProcess processhandle
+ printparam' (numstring ++ " call returned") $ show exitcode
+ case exitcode of
+ ExitSuccess -> ackEnv envi
+ ExitFailure _ -> rejectEnv envi True
doProc _ _ _ = return ()
formatheaders :: ((T.Text, FieldValue) -> [a]) -> FieldTable -> [a]
-formatheaders f (FieldTable ll) =
- concat $ map f $ M.toList ll
+formatheaders f (FieldTable ll) = concat $ map f $ M.toList ll
-- | log formatting
fieldshow :: (T.Text, FieldValue) -> String
-- | callback cmdline formatting
fieldshow' :: (T.Text, FieldValue) -> [String]
-fieldshow' (k, v) = [ "-h", T.unpack k ++ "=" ++ valueshow v ]
+fieldshow' (k, v) = ["-h", T.unpack k ++ "=" ++ valueshow v]
-- | showing a FieldValue
valueshow :: FieldValue -> String
-valueshow (FVString value) =
- T.unpack value
-valueshow (FVInt32 value) =
- show value
+valueshow (FVString value) = T.unpack value
+valueshow (FVInt32 value) = show value
valueshow value = show value
-- | skip showing body head if binary type
isimage :: Maybe String -> Bool
isimage Nothing = False
isimage (Just ctype)
- | isPrefixOf "application/xml" ctype =
- False
- | isPrefixOf "application/json" ctype =
- False
- | otherwise = any (flip isPrefixOf ctype) [ "application", "image" ]
+ | isPrefixOf "application/xml" ctype = False
+ | isPrefixOf "application/json" ctype = False
+ | otherwise = any (flip isPrefixOf ctype) ["application", "image"]
-- | show the first bytes of message body
anriss' :: Maybe Int -> BL.ByteString -> BL.ByteString
-anriss' x = case x of
+anriss' x =
+ case x of
Nothing -> id
Just y -> BL.take (fromIntegral y)
-- | callback cmdline with optional parameters
printopt :: (String, Maybe String) -> [String]
printopt (_, Nothing) = []
-printopt (opt, Just s) =
- [ opt, s ]
+printopt (opt, Just s) = [opt, s]
-- | prints header and head on STDOUT and returns cmdline options to callback
printmsg :: (Message, Envelope) -> Maybe Int -> ZonedTime -> IO [String]
printmsg (msg, envi) anR now = do
- mapM_ (uncurry printparam)
- [ ("routing key", rkey)
- , ("message-id", messageid)
- , ("headers", headers)
- , ("content-type", contenttype)
- , ("content-encoding", contentencoding)
- , ("redelivered", redeliv)
- , ("timestamp", timestamp'')
- , ("time now", now')
- , ("size", size)
- , ("priority", prio)
- , ("type", mtype)
- , ("user id", muserid)
- , ("application id", mappid)
- , ("cluster id", mclusterid)
- , ("reply to", mreplyto)
- , ("correlation id", mcorrid)
- , ("expiration", mexp)
- , ("delivery mode", mdelivmode)
- ]
- printbody (label, anriss)
- return $
- concat (map printopt
- [ ("-r", rkey)
- , ("-m", contenttype)
- , ("-e", contentencoding)
- , ("-i", messageid)
- , ("-t", timestamp)
- , ("-p", prio)
- ] ++
- maybeToList headers')
+ mapM_
+ (uncurry printparam)
+ [ ("routing key", rkey)
+ , ("message-id", messageid)
+ , ("headers", headers)
+ , ("content-type", contenttype)
+ , ("content-encoding", contentencoding)
+ , ("redelivered", redeliv)
+ , ("timestamp", timestamp'')
+ , ("time now", now')
+ , ("size", size)
+ , ("priority", prio)
+ , ("type", mtype)
+ , ("user id", muserid)
+ , ("application id", mappid)
+ , ("cluster id", mclusterid)
+ , ("reply to", mreplyto)
+ , ("correlation id", mcorrid)
+ , ("expiration", mexp)
+ , ("delivery mode", mdelivmode)
+ ]
+ printbody (label, anriss)
+ return $
+ concat
+ (map
+ printopt
+ [ ("-r", rkey)
+ , ("-m", contenttype)
+ , ("-e", contentencoding)
+ , ("-i", messageid)
+ , ("-t", timestamp)
+ , ("-p", prio)
+ ] ++
+ maybeToList headers')
where
headers = fmap (formatheaders fieldshow) $ msgHeaders msg
headers' = fmap (formatheaders fieldshow') $ msgHeaders msg
body = msgBody msg
- anriss = if isimage contenttype
- then Nothing
- else Just (anriss' anR body) :: Maybe BL.ByteString
+ anriss =
+ if isimage contenttype
+ then Nothing
+ else Just (anriss' anR body) :: Maybe BL.ByteString
anriss'' = maybe "" (\a -> "first " ++ (show a) ++ " bytes of ") anR
label = anriss'' ++ "body"
contenttype = fmap T.unpack $ msgContentType msg
mexp = fmap show $ msgExpiration msg
mdelivmode = fmap show $ msgDeliveryMode msg
size = Just . show $ BL.length body
- redeliv = if envRedelivered envi then Just "YES" else Nothing
+ redeliv =
+ if envRedelivered envi
+ then Just "YES"
+ else Nothing
tz = zonedTimeZone now
nowutc = zonedTimeToUTCFLoor now
msgtime = msgTimestamp msg
msgtimeutc = fmap (posixSecondsToUTCTime . realToFrac) msgtime
timestamp = fmap show msgtime
timediff = fmap (difftime nowutc) msgtimeutc
- now' = case timediff of
+ now' =
+ case timediff of
Just "now" -> Nothing
_ -> showtime tz $ Just nowutc
timestamp' = showtime tz msgtimeutc
- timestamp'' = liftM3 (\a b c -> a ++ " (" ++ b ++ ") (" ++ c ++ ")")
- timestamp
- timestamp'
- timediff
+ timestamp'' =
+ liftM3
+ (\a b c -> a ++ " (" ++ b ++ ") (" ++ c ++ ")")
+ timestamp
+ timestamp'
+ timediff
-- | timestamp conversion
zonedTimeToUTCFLoor :: ZonedTime -> UTCTime
-zonedTimeToUTCFLoor x = posixSecondsToUTCTime $
- realToFrac ((floor .
- utcTimeToPOSIXSeconds .
- zonedTimeToUTC) x :: Timestamp)
+zonedTimeToUTCFLoor x =
+ posixSecondsToUTCTime $
+ realToFrac ((floor . utcTimeToPOSIXSeconds . zonedTimeToUTC) x :: Timestamp)
-- | show the timestamp
showtime :: TimeZone -> Maybe UTCTime -> Maybe String
-- | show difference between two timestamps
difftime :: UTCTime -> UTCTime -> String
difftime now msg
- | now == msg = "now"
- | now > msg = diff ++ " ago"
- | otherwise = diff ++ " in the future"
+ | now == msg = "now"
+ | now > msg = diff ++ " ago"
+ | otherwise = diff ++ " in the future"
where
diff = show (diffUTCTime now msg)