# Revision history for haskell-amqp-utils
+## 0.3.1.0 -- 2018-06-22
+
+* agitprop, a publisher
+
## 0.3.0.2 -- 2018-04-24
* use ciphersuite\_default
--- /dev/null
+{-# LANGUAGE OverloadedStrings #-}
+
+module Network.AMQP.Utils.Connection where
+
+import qualified Data.Text as T
+import qualified Data.ByteString as B
+import Network.AMQP
+import Network.AMQP.Utils.Options
+import Network.AMQP.Utils.Helpers
+import Network.TLS
+import Network.TLS.Extra
+import qualified Network.Connection as N
+import System.X509
+import Data.Default.Class
+
+connect :: Args -> IO (Connection,Channel)
+connect args = do
+ printparam' "server" $ server args
+ printparam' "port" $ show $ port args
+ printparam' "vhost" $ vHost args
+ printparam "connection_name" $ connectionName args
+ globalCertificateStore <- getSystemCertificateStore
+ let myTLS = N.TLSSettings (defaultParamsClient "" B.empty)
+ { clientShared = def
+ { sharedValidationCache = def
+ , sharedCAStore = globalCertificateStore
+ }
+ , clientSupported = def
+ { supportedCiphers = ciphersuite_default
+ }
+ , clientHooks = def
+ { onCertificateRequest = myCert (cert args) (key args)
+ }
+ }
+ conn <- openConnection'' defaultConnectionOpts
+ { coAuth = [ SASLMechanism "EXTERNAL" B.empty Nothing
+ , plain (T.pack (user args)) (T.pack (pass args))
+ ]
+ , coVHost = T.pack $ vHost args
+ , coTLSSettings = if (tls args ) then Just ( TLSCustom myTLS ) else Nothing
+ , coServers = [ (server args, fromIntegral $ port args) ]
+ , coHeartbeatDelay = fmap fromIntegral $ heartBeat args
+ , coName = fmap T.pack $ connectionName args
+ }
+ chan <- openChannel conn
+-- addChannelExceptionHandler chan
+-- (\exception -> closeConnection conn >>
+-- printparam' "exiting" (show exception) >>
+-- killThread tid)
+ return (conn,chan)
+--
+-- -- noop sharedValidationCache, handy when debugging
+-- noValidation :: ValidationCache
+-- noValidation = ValidationCache
+-- (\_ _ _ -> return ValidationCachePass)
+-- (\_ _ _ -> return ())
+
+-- client certificate
+myCert :: Maybe FilePath -> Maybe FilePath -> t -> IO (Maybe Credential)
+myCert (Just cert') (Just key') _ = do
+ result <- credentialLoadX509 cert' key'
+ case result of
+ Left x -> printparam' "ERROR" x >> return Nothing
+ Right x -> return $ Just x
+myCert _ _ _ = return Nothing
+
--- /dev/null
+module Network.AMQP.Utils.Helpers where
+
+import qualified Data.ByteString.Lazy.Char8 as BL
+import System.IO
+
+-- log cmdline options
+listToMaybeUnwords :: [String] -> Maybe String
+listToMaybeUnwords [] = Nothing
+listToMaybeUnwords x = Just $ unwords x
+
+-- Strings or ByteStrings with label, oder nothing at all
+printwithlabel :: String -> Maybe (IO ()) -> IO ()
+printwithlabel _ Nothing =
+ return ()
+printwithlabel labl (Just i) = do
+ mapM_ putStr [ " --- ", labl, ": " ]
+ i
+ hFlush stdout
+
+-- optional parameters
+printparam :: String -> Maybe String -> IO ()
+printparam labl ms = printwithlabel labl $
+ fmap putStrLn ms
+
+-- required parameters
+printparam' :: String -> String -> IO ()
+printparam' d s = printparam d (Just s)
+
+-- head chars of body
+printbody :: (String, Maybe BL.ByteString) -> IO ()
+printbody (labl, ms) = printwithlabel labl $
+ fmap (\s -> putStrLn "" >> BL.putStrLn s) ms
+
+-- log marker
+hr :: String -> IO ()
+hr x = putStrLn hr' >> hFlush stdout
+ where
+ hr' = take 72 $ (take 25 hr'') ++ " " ++ x ++ " " ++ hr''
+ hr'' = repeat '-'
--- /dev/null
+module Network.AMQP.Utils.Options where
+
+import Paths_amqp_utils ( version )
+import Data.Version ( showVersion )
+import Data.Maybe
+import Data.Default.Class
+import System.Console.GetOpt
+
+-- | A data type for our options
+data Args = Args { server :: String
+ , port :: Int
+ , tls :: Bool
+ , vHost :: String
+ , currentExchange :: String
+ , bindings :: [(String, String)]
+ , rKey :: String
+ , anRiss :: Maybe Int
+ , fileProcess :: Maybe String
+ , qName :: Maybe String
+ , cert :: Maybe String
+ , key :: Maybe String
+ , user :: String
+ , pass :: String
+ , preFetch :: Int
+ , heartBeat :: Maybe Int
+ , tempDir :: Maybe String
+ , additionalArgs :: [String]
+ , connectionName :: Maybe String
+ , tmpQName :: String
+ , inputFile :: String
+ , lineMode :: Bool
+ }
+
+instance Default Args where
+ def = Args "localhost"
+ 5672
+ False
+ "/"
+ "default"
+ []
+ ""
+ Nothing
+ Nothing
+ Nothing
+ Nothing
+ Nothing
+ "guest"
+ "guest"
+ 1
+ Nothing
+ Nothing
+ []
+ Nothing
+ ""
+ ""
+ False
+
+-- | Common options
+cOptions :: [OptDescr (Args -> Args)]
+cOptions = [ Option [ 'o' ]
+ [ "server" ]
+ (ReqArg (\s o -> o { server = s }) "SERVER")
+ ("AMQP Server (default: " ++ server def ++ ")")
+ , Option [ 'y' ]
+ [ "vhost" ]
+ (ReqArg (\s o -> o { vHost = s }) "VHOST")
+ ("AMQP Virtual Host (default: " ++ vHost def ++ ")")
+ , Option [ 'x' ]
+ [ "exchange" ]
+ (ReqArg (\s o -> o { currentExchange = s }) "EXCHANGE")
+ ("AMQP Exchange (default: default)")
+ , Option [ 'Q' ]
+ [ "qname" ]
+ (ReqArg (\s o -> o { tmpQName = s }) "TEMPQNAME")
+ "Name for temporary exclusive Queue"
+ , Option [ 'p' ]
+ [ "port" ]
+ (ReqArg (\s o -> o { port = read s }) "PORT")
+ ("Server Port Number (default: " ++
+ show (port def) ++ ")")
+ , Option [ 'T' ]
+ [ "tls" ]
+ (NoArg (\o -> o { tls = not (tls o) }))
+ ("Toggle TLS (default: " ++ show (tls def) ++ ")")
+ , Option [ 'q' ]
+ [ "queue" ]
+ (ReqArg (\s o -> o { qName = Just s }) "QUEUENAME")
+ "Ignore Exchange and bind to existing Queue"
+ , Option [ 'c' ]
+ [ "cert" ]
+ (ReqArg (\s o -> o { cert = Just s }) "CERTFILE")
+ ("TLS Client Certificate File")
+ , Option [ 'k' ]
+ [ "key" ]
+ (ReqArg (\s o -> o { key = Just s }) "KEYFILE")
+ ("TLS Client Private Key File")
+ , Option [ 'U' ]
+ [ "user" ]
+ (ReqArg (\s o -> o { user = s }) "USERNAME")
+ ("Username for Auth")
+ , Option [ 'P' ]
+ [ "pass" ]
+ (ReqArg (\s o -> o { pass = s }) "PASSWORD")
+ ("Password for Auth")
+ , Option [ 's' ]
+ [ "heartbeats" ]
+ (ReqArg (\s o -> o { heartBeat = (Just (read s)) }) "INT")
+ "heartbeat interval (0=disable, default: set by server)"
+ , Option [ 'n' ]
+ [ "name" ]
+ (ReqArg (\s o -> o { connectionName = Just s }) "NAME")
+ "connection name, will be shown in RabbitMQ web interface"
+ ]
+
+-- | Options for konsum
+kOptions :: [OptDescr (Args -> Args)]
+kOptions = [ Option [ 'r' ]
+ [ "bindingkey" ]
+ (ReqArg (\s o -> o { bindings = (currentExchange o, s) :
+ (bindings o)
+ })
+ "BINDINGKEY")
+ ("AMQP binding key (default: #)")
+ , Option [ 'X' ]
+ [ "execute" ]
+ (OptArg (\s o -> o { fileProcess = Just (fromMaybe callback
+ s)
+ , tempDir = Just (fromMaybe "/tmp"
+ (tempDir o))
+ })
+ "EXE")
+ ("Callback Script File (implies -t) (-X without arg: " ++
+ callback ++ ")")
+ , Option [ 'a' ]
+ [ "args" ]
+ (ReqArg (\s o -> o { additionalArgs = s :
+ (additionalArgs o)
+ })
+ "ARG")
+ "additional argument for -X callback"
+ , Option [ 'l' ]
+ [ "charlimit" ]
+ (ReqArg (\s o -> o { anRiss = Just (read s :: Int) }) "INT")
+ "limit number of shown body chars (default: unlimited)"
+ , Option [ 't' ]
+ [ "tempdir", "target" ]
+ (OptArg (\s o -> o { tempDir = Just (fromMaybe "/tmp" s) })
+ "DIR")
+ "tempdir (default: no file creation, -t without arg: /tmp)"
+ , Option [ 'f' ]
+ [ "prefetch" ]
+ (ReqArg (\s o -> o { preFetch = read s }) "INT")
+ ("Prefetch count. (0=unlimited, 1=off, default: " ++
+ show (preFetch def) ++ ")")
+ ]
+
+-- | Options for agitprop
+aOptions :: [OptDescr (Args -> Args)]
+aOptions = [ Option [ 'r' ]
+ [ "routingkey" ]
+ (ReqArg (\s o -> o { rKey = s }) "BINDINGKEY")
+ ("AMQP binding key (default: #)")
+ , Option [ 'f' ]
+ [ "inputfile" ]
+ (ReqArg (\s o -> o { inputFile = s }) "INPUTFILE")
+ "Message input file"
+ , Option [ 'l' ]
+ [ "linemode" ]
+ (NoArg (\o -> o { lineMode = not (lineMode o) }))
+ "Toggle line-by-line mode"
+ ]
+
+-- |
+options :: String -> [OptDescr (Args -> Args)]
+options "konsum" = kOptions ++ cOptions
+options "agitprop" = aOptions ++ cOptions
+options _ = cOptions
+
+-- | 'parseargs' exename argstring
+-- applies options onto argstring
+parseargs :: String -> [String] -> IO Args
+parseargs exename argstring =
+ case getOpt Permute opt argstring of
+ (o, [], []) -> return $ foldl (flip id) def o
+ (_, _, errs) -> ioError $
+ userError $ concat errs ++ usageInfo (usage exename) opt
+ where
+ opt = options exename
+
+-- | the default callback for the -X option
+callback :: String
+callback = "/usr/lib/haskell-amqp-utils/callback"
+
+usage :: String -> String
+usage exename = "\n\
+ \amqp-utils " ++
+ (showVersion version) ++
+ "\n\n\
+ \Usage:\n" ++
+ exename ++
+ " [options]\n\n\
+ \Options:"
--- /dev/null
+{-# LANGUAGE OverloadedStrings #-}
+
+import Paths_amqp_utils ( version )
+import Data.Version ( showVersion )
+import System.Environment
+import qualified Data.Text as T
+import Network.AMQP
+import Network.AMQP.Utils.Options
+import Network.AMQP.Utils.Helpers
+import Network.AMQP.Utils.Connection
+import qualified Data.ByteString.Lazy.Char8 as BL
+main :: IO ()
+main = do
+ hr "starting"
+ -- tid <- myThreadId
+ args <- getArgs >>= parseargs "agitprop"
+ printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+ messageFile <- BL.readFile (inputFile args)
+ (conn, chan) <- connect args
+ let publishOneMsg f = publishMsg chan
+ (T.pack $ currentExchange args)
+ (T.pack $ rKey args)
+ newMsg { msgBody = f
+ , msgDeliveryMode = Just Persistent
+ }
+ _ <- if (lineMode args)
+ then mapM_ publishOneMsg (BL.lines messageFile)
+ else publishOneMsg messageFile >> return ()
+
+ closeConnection conn
name: amqp-utils
-version: 0.3.0.2
+version: 0.3.1.0
synopsis: Generic Haskell AMQP Consumer
ghc-options: -threaded -Wall
default-language: Haskell98
+
+ other-modules: Network.AMQP.Utils.Options,
+ Network.AMQP.Utils.Helpers,
+ Network.AMQP.Utils.Connection,
+ Paths_amqp_utils
+
+executable agitprop
+ main-is: agitprop.hs
+ build-depends: base >=4.6 && <5,
+ containers,
+ text,
+ connection,
+ data-default-class,
+ time,
+ process,
+ bytestring,
+ x509-system,
+ tls,
+ amqp >=0.17
+
+ ghc-options: -threaded -Wall
+ default-language: Haskell98
+
+ other-modules: Network.AMQP.Utils.Options,
+ Network.AMQP.Utils.Helpers,
+ Network.AMQP.Utils.Connection,
+ Paths_amqp_utils
+
+
source-repository head
type: git
location: git://github.com/woffs/haskell-amqp-utils
src = ./.;
isLibrary = false;
isExecutable = true;
+ enableSharedExecutables = false;
executableHaskellDepends = [
amqp base bytestring connection containers data-default-class
process text time tls x509-system
+haskell-amqp-utils (0.3.1.0) unstable; urgency=medium
+
+ * agitprop, a publisher
+
+
haskell-amqp-utils (0.3.0.2) unstable; urgency=medium
* use ciphersuite_default
import Paths_amqp_utils ( version )
import Data.Version ( showVersion )
import System.Environment
-import System.Console.GetOpt
import System.IO
import System.Process
import System.Exit
import Data.List
import Data.Maybe
import qualified Data.Text as T
-import qualified Data.ByteString as B
import qualified Data.Map as M
import Network.AMQP
import Network.AMQP.Types
-import Network.Connection
-import Network.TLS
-import Network.TLS.Extra
-import System.X509
+import Network.AMQP.Utils.Options
+import Network.AMQP.Utils.Helpers
+import Network.AMQP.Utils.Connection
import qualified Data.ByteString.Lazy.Char8 as BL
import Data.Time
import Data.Time.Clock.POSIX
-import Data.Default.Class
main :: IO ()
main = do
hr "starting"
tid <- myThreadId
- args <- getArgs >>= parseargs
+ args <- getArgs >>= parseargs "konsum"
let addiArgs = reverse $ additionalArgs args
printparam' "client version" $ "amqp-utils " ++ (showVersion version)
- printparam' "server" $ server args
- printparam' "port" $ show $ port args
- printparam' "vhost" $ vHost args
- printparam "connection_name" $ connectionName args
- globalCertificateStore <- getSystemCertificateStore
- let myTLS = TLSSettings (defaultParamsClient "" B.empty)
- { clientShared = def
- { sharedValidationCache = def
- , sharedCAStore = globalCertificateStore
- }
- , clientSupported = def
- { supportedCiphers = ciphersuite_default
- }
- , clientHooks = def
- { onCertificateRequest = myCert (cert args) (key args)
- }
- }
- conn <- openConnection'' defaultConnectionOpts
- { coAuth = [ SASLMechanism "EXTERNAL" B.empty Nothing
- , plain (T.pack (user args)) (T.pack (pass args))
- ]
- , coVHost = T.pack $ vHost args
- , coTLSSettings = if (tls args ) then Just ( TLSCustom myTLS ) else Nothing
- , coServers = [ (server args, fromIntegral $ port args) ]
- , coHeartbeatDelay = fmap fromIntegral $ heartBeat args
- , coName = fmap T.pack $ connectionName args
- }
- chan <- openChannel conn
- addChannelExceptionHandler chan
- (\exception -> closeConnection conn >>
- printparam' "exiting" (show exception) >>
- killThread tid)
+ (conn,chan) <- connect args
-- set prefetch
printparam' "prefetch" $
show (exception :: X.SomeException))
closeConnection conn
--- -- noop sharedValidationCache, handy when debugging
--- noValidation :: ValidationCache
--- noValidation = ValidationCache
--- (\_ _ _ -> return ValidationCachePass)
--- (\_ _ _ -> return ())
-
--- client certificate
-myCert :: Maybe FilePath -> Maybe FilePath -> t -> IO (Maybe Credential)
-myCert (Just cert') (Just key') _ = do
- result <- credentialLoadX509 cert' key'
- case result of
- Left x -> printparam' "ERROR" x >> return Nothing
- Right x -> return $ Just x
-myCert _ _ _ = return Nothing
-
-- exclusive temp queue
tempQueue :: Channel -> String -> [(String, String)] -> String -> IO T.Text
tempQueue chan tmpqname bindlist x = do
(if null bindlist then [ (x, "#") ] else bindlist)
return q
--- log cmdline options
-listToMaybeUnwords :: [String] -> Maybe String
-listToMaybeUnwords [] = Nothing
-listToMaybeUnwords x = Just $ unwords x
-
--- Strings or ByteStrings with label, oder nothing at all
-printwithlabel :: String -> Maybe (IO ()) -> IO ()
-printwithlabel _ Nothing =
- return ()
-printwithlabel labl (Just i) = do
- mapM_ putStr [ " --- ", labl, ": " ]
- i
- hFlush stdout
-
--- optional parameters
-printparam :: String -> Maybe String -> IO ()
-printparam labl ms = printwithlabel labl $
- fmap putStrLn ms
-
--- required parameters
-printparam' :: String -> String -> IO ()
-printparam' d s = printparam d (Just s)
-
--- head chars of body
-printbody :: (String, Maybe BL.ByteString) -> IO ()
-printbody (labl, ms) = printwithlabel labl $
- fmap (\s -> putStrLn "" >> BL.putStrLn s) ms
-
--- options for everybody
-data Args = Args { server :: String
- , port :: Int
- , tls :: Bool
- , vHost :: String
- , currentExchange :: String
- , bindings :: [(String, String)]
- , anRiss :: Maybe Int
- , fileProcess :: Maybe String
- , qName :: Maybe String
- , cert :: Maybe String
- , key :: Maybe String
- , user :: String
- , pass :: String
- , preFetch :: Int
- , heartBeat :: Maybe Int
- , tempDir :: Maybe String
- , additionalArgs :: [String]
- , connectionName :: Maybe String
- , tmpQName :: String
- }
-
-instance Default Args where
- def = Args "localhost"
- 5672
- False
- "/"
- "default"
- []
- Nothing
- Nothing
- Nothing
- Nothing
- Nothing
- "guest"
- "guest"
- 1
- Nothing
- Nothing
- []
- Nothing
- ""
-
-callback :: String
-callback = "/usr/lib/haskell-amqp-utils/callback"
-
-options :: [OptDescr (Args -> Args)]
-options = [ Option [ 'o' ]
- [ "server" ]
- (ReqArg (\s o -> o { server = s }) "SERVER")
- ("AMQP Server (default: " ++ server def ++ ")")
- , Option [ 'y' ]
- [ "vhost" ]
- (ReqArg (\s o -> o { vHost = s }) "VHOST")
- ("AMQP Virtual Host (default: " ++ vHost def ++ ")")
- , Option [ 'x' ]
- [ "exchange" ]
- (ReqArg (\s o -> o { currentExchange = s }) "EXCHANGE")
- ("AMQP Exchange (default: default)")
- , Option [ 'r' ]
- [ "bindingkey" ]
- (ReqArg (\s o -> o { bindings = (currentExchange o, s) :
- (bindings o)
- })
- "BINDINGKEY")
- ("AMQP binding key (default: #)")
- , Option [ 'Q' ]
- [ "qname" ]
- (ReqArg (\s o -> o { tmpQName = s }) "TEMPQNAME")
- "Name for temporary exclusive Queue"
- , Option [ 'X' ]
- [ "execute" ]
- (OptArg (\s o -> o { fileProcess = Just (fromMaybe callback s)
- , tempDir = Just (fromMaybe "/tmp"
- (tempDir o))
- })
- "EXE")
- ("Callback Script File (implies -t) (-X without arg: " ++
- callback ++ ")")
- , Option [ 'p' ]
- [ "port" ]
- (ReqArg (\s o -> o { port = read s }) "PORT")
- ("Server Port Number (default: " ++ show (port def) ++ ")")
- , Option [ 'T' ]
- [ "tls" ]
- (NoArg (\o -> o { tls = not (tls o) }))
- ("Toggle TLS (default: " ++ show (tls def) ++ ")")
- , Option [ 'q' ]
- [ "queue" ]
- (ReqArg (\s o -> o { qName = Just s }) "QUEUENAME")
- "Ignore Exchange and bind to existing Queue"
- , Option [ 'c' ]
- [ "cert" ]
- (ReqArg (\s o -> o { cert = Just s }) "CERTFILE")
- ("TLS Client Certificate File")
- , Option [ 'k' ]
- [ "key" ]
- (ReqArg (\s o -> o { key = Just s }) "KEYFILE")
- ("TLS Client Private Key File")
- , Option [ 'U' ]
- [ "user" ]
- (ReqArg (\s o -> o { user = s }) "USERNAME")
- ("Username for Auth")
- , Option [ 'P' ]
- [ "pass" ]
- (ReqArg (\s o -> o { pass = s }) "PASSWORD")
- ("Password for Auth")
- , Option [ 'f' ]
- [ "prefetch" ]
- (ReqArg (\s o -> o { preFetch = read s }) "INT")
- ("Prefetch count. (0=unlimited, 1=off, default: " ++
- show (preFetch def) ++ ")")
- , Option [ 's' ]
- [ "heartbeats" ]
- (ReqArg (\s o -> o { heartBeat = (Just (read s)) }) "INT")
- "heartbeat interval (0=disable, default: set by server)"
- , Option [ 't' ]
- [ "tempdir", "target" ]
- (OptArg (\s o -> o { tempDir = Just (fromMaybe "/tmp" s) })
- "DIR")
- "tempdir (default: no file creation, -t without arg: /tmp)"
- , Option [ 'a' ]
- [ "args" ]
- (ReqArg (\s o -> o { additionalArgs = s : (additionalArgs o)
- })
- "ARG")
- "additional argument for -X callback"
- , Option [ 'n' ]
- [ "name" ]
- (ReqArg (\s o -> o { connectionName = Just s }) "NAME")
- "connection name, will be shown in RabbitMQ web interface"
- , Option [ 'l' ]
- [ "charlimit" ]
- (ReqArg (\s o -> o { anRiss = Just (read s :: Int) }) "INT")
- "limit number of shown body chars (default: unlimited)"
- ]
-
-usage :: String
-usage = "\n\
- \amqp-utils " ++
- (showVersion version) ++
- "\n\n\
- \Usage:\n\
- \konsum [options]\n\n\
- \Options:"
-
--- apply options onto argstring
-parseargs :: [String] -> IO Args
-parseargs argstring = case getOpt Permute options argstring of
- (o, [], []) -> return $ foldl (flip id) def o
- (_, _, errs) -> ioError $ userError $ concat errs ++ usageInfo usage options
-
-- process received message
myCallback :: Maybe Int
-> Maybe String
ExitFailure _ -> rejectEnv envi True
doProc _ _ _ = return ()
--- log marker
-hr :: String -> IO ()
-hr x = putStrLn hr' >> hFlush stdout
- where
- hr' = take 72 $ (take 25 hr'') ++ " " ++ x ++ " " ++ hr''
- hr'' = repeat '-'
-
formatheaders :: ((T.Text, FieldValue) -> [a]) -> FieldTable -> [a]
formatheaders f (FieldTable ll) =
concat $ map f $ M.toList ll