.stack*
.cabal*
.ghc*
-.todo
dist
result
Setup
-.envrc
# Revision history for haskell-amqp-utils
+## 0.3.4.0 -- 2018-07-21
+
+* bug fix: re-add exception handler
+* plane + arbeite: rpc client + server
+
## 0.3.3.1 -- 2018-07-10
* fix debian builds
DESTDIR=
-pfx=/usr/local
-flavour=$(shell if stack --version >/dev/null 2>&1; then echo stack; elif cabal --version >/dev/null 2>&1; then echo cabal; else echo simple; fi)
-parallel=$(filter -j%, $(MAKEFLAGS))
+pfx = /usr/local
+flavour = $(shell if stack --version >/dev/null 2>&1; then echo stack; elif cabal --version >/dev/null 2>&1; then echo cabal; else echo simple; fi)
+parallel = $(filter -j%, $(MAKEFLAGS))
+nix = $(if $(findstring nix,$(shell bash -c "type -p stack")),--nix,)
build: build-$(flavour)
install: install-$(flavour) install-common
build-stack:
- stack setup
- stack build
+ stack $(nix) setup
+ stack $(nix) build
install-stack: build-stack
install -d -m755 $(DESTDIR)$(pfx)/bin
- install -m755 $$(stack path --local-install-root)/bin/konsum $(DESTDIR)$(pfx)/bin/
- install -m755 $$(stack path --local-install-root)/bin/agitprop $(DESTDIR)$(pfx)/bin/
+ install -m755 $$(stack $(nix) path --local-install-root)/bin/konsum $(DESTDIR)$(pfx)/bin/
+ install -m755 $$(stack $(nix) path --local-install-root)/bin/agitprop $(DESTDIR)$(pfx)/bin/
install-common:
install -d -m755 $(DESTDIR)$(pfx)/lib/haskell-amqp-utils
module Network.AMQP.Utils.Helpers where
+import Control.Concurrent
+import Control.Monad
import qualified Data.ByteString.Lazy.Char8 as BL
+import Data.List
import qualified Data.Map as M
+import Data.Maybe
import qualified Data.Text as T
import Data.Time
import Data.Time.Clock.POSIX
import Network.AMQP
import Network.AMQP.Types
+import Network.AMQP.Utils.Options
+import System.Exit
import System.IO
-import Data.Maybe
-import Data.List
-import Control.Monad
+import System.Process
-- | log cmdline options
listToMaybeUnwords :: [String] -> Maybe String
[ ("routing key", rkey)
, ("message-id", messageid)
, ("headers", headers)
- , ("content-type", contenttype)
- , ("content-encoding", contentencoding)
+ , ("content-type", ctype)
+ , ("content-encoding", cenc)
, ("redelivered", redeliv)
, ("timestamp", timestamp'')
, ("time now", now')
, ("size", size)
- , ("priority", prio)
+ , ("priority", pri)
, ("type", mtype)
, ("user id", muserid)
, ("application id", mappid)
(map
printopt
[ ("-r", rkey)
- , ("-m", contenttype)
- , ("-e", contentencoding)
+ , ("-m", ctype)
+ , ("-e", cenc)
, ("-i", messageid)
, ("-t", timestamp)
- , ("-p", prio)
+ , ("-p", pri)
] ++
maybeToList headers')
where
headers' = fmap (formatheaders fieldshow') $ msgHeaders msg
body = msgBody msg
anriss =
- if isimage contenttype
+ if isimage ctype
then Nothing
else Just (anriss' anR body) :: Maybe BL.ByteString
anriss'' = maybe "" (\a -> "first " ++ (show a) ++ " bytes of ") anR
label = anriss'' ++ "body"
- contenttype = fmap T.unpack $ msgContentType msg
- contentencoding = fmap T.unpack $ msgContentEncoding msg
+ ctype = fmap T.unpack $ msgContentType msg
+ cenc = fmap T.unpack $ msgContentEncoding msg
rkey = Just . T.unpack $ envRoutingKey envi
messageid = fmap T.unpack $ msgID msg
- prio = fmap show $ msgPriority msg
+ pri = fmap show $ msgPriority msg
mtype = fmap show $ msgType msg
muserid = fmap show $ msgUserID msg
mappid = fmap show $ msgApplicationID msg
| otherwise = diff ++ " in the future"
where
diff = show (diffUTCTime now msg)
+
+-- | if the message is to be saved
+-- and maybe processed further
+optionalFileStuff ::
+ (Message, Envelope)
+ -> [String]
+ -> [String]
+ -> String
+ -> Args
+ -> ThreadId
+ -> Maybe (ExitCode -> Handle -> IO ())
+ -> IO ()
+optionalFileStuff (msg, envi) callbackoptions addi numstring a tid action = do
+ path <- saveFile (tempDir a) numstring (msgBody msg)
+ printparam "saved to" path
+ let callbackcmdline =
+ liftM2
+ (constructCallbackCmdLine callbackoptions addi numstring)
+ (fileProcess a)
+ path
+ printparam "calling" $ fmap unwords callbackcmdline
+ maybe
+ (acke envi a)
+ (\c ->
+ forkFinally
+ (doProc a numstring envi c action)
+ (either (throwTo tid) return) >>
+ return ())
+ callbackcmdline
+
+-- | save message into temp file
+saveFile :: Maybe String -> String -> BL.ByteString -> IO (Maybe String)
+saveFile Nothing _ _ = return Nothing
+saveFile (Just tempD) numstring body = do
+ (p, h) <-
+ openBinaryTempFileWithDefaultPermissions
+ tempD
+ ("konsum-" ++ numstring ++ "-.tmp")
+ BL.hPut h body
+ hClose h
+ return $ Just p
+
+-- | construct cmdline for callback script
+constructCallbackCmdLine ::
+ [String] -> [String] -> String -> String -> String -> [String]
+constructCallbackCmdLine opts addi num exe path =
+ exe : "-f" : path : "-n" : num : opts ++ addi
+
+-- | call callback script
+doProc ::
+ Args
+ -> String
+ -> Envelope
+ -> [String]
+ -> Maybe (ExitCode -> Handle -> IO ())
+ -> IO ()
+doProc a numstring envi (exe:args) action = do
+ (_, h, _, processhandle) <-
+ createProcess (proc exe args) {std_out = out, std_err = Inherit}
+ exitcode <- waitForProcess processhandle
+ printparam' (numstring ++ " call returned") $ show exitcode
+ if isJust action
+ then ((fromJust action $ exitcode) (fromJust h)) >> acke envi a
+ else case exitcode of
+ ExitSuccess -> acke envi a
+ ExitFailure _ -> reje envi a
+ where
+ out =
+ if isJust action
+ then CreatePipe
+ else Inherit
+doProc _ _ _ _ _ = return ()
+
+-- | ack
+acke :: Envelope -> Args -> IO ()
+acke envi a
+ | (ack a) = ackEnv envi
+ | otherwise = return ()
+
+-- | reject
+reje :: Envelope -> Args -> IO ()
+reje envi a
+ | (ack a) = rejectEnv envi (requeuenack a)
+ | otherwise = return ()
, persistent :: Maybe DeliveryMode
, ack :: Bool
, requeuenack :: Bool
+ , timeout :: Double
}
instance Default Args where
Nothing
True
True
+ 5
--- | Common options
-cOptions :: [OptDescr (Args -> Args)]
-cOptions =
- [ Option
- ['o']
- ["server"]
- (ReqArg (\s o -> o {server = s}) "SERVER")
- ("AMQP Server (default: " ++ server def ++ ")")
- , Option
- ['y']
- ["vhost"]
- (ReqArg (\s o -> o {vHost = s}) "VHOST")
- ("AMQP Virtual Host (default: " ++ vHost def ++ ")")
- , Option
- ['x']
- ["exchange"]
- (ReqArg (\s o -> o {currentExchange = s}) "EXCHANGE")
- ("AMQP Exchange (default: \"\")")
- , Option
- ['Q']
- ["qname"]
- (ReqArg (\s o -> o {tmpQName = s}) "TEMPQNAME")
- "Name for temporary exclusive Queue"
- , Option
- ['p']
- ["port"]
- (ReqArg (\s o -> o {port = read s}) "PORT")
- ("Server Port Number (default: " ++ show (port def) ++ ")")
- , Option
- ['T']
- ["tls"]
- (NoArg (\o -> o {tls = not (tls o)}))
- ("Toggle TLS (default: " ++ show (tls def) ++ ")")
- , Option
- ['q']
- ["queue"]
- (ReqArg (\s o -> o {qName = Just s}) "QUEUENAME")
- "Ignore Exchange and bind to existing Queue"
- , Option
- ['c']
- ["cert"]
- (ReqArg (\s o -> o {cert = Just s}) "CERTFILE")
- ("TLS Client Certificate File")
- , Option
- ['k']
- ["key"]
- (ReqArg (\s o -> o {key = Just s}) "KEYFILE")
- ("TLS Client Private Key File")
- , Option
- ['U']
- ["user"]
- (ReqArg (\s o -> o {user = s}) "USERNAME")
- ("Username for Auth")
- , Option
- ['P']
- ["pass"]
- (ReqArg (\s o -> o {pass = s}) "PASSWORD")
- ("Password for Auth")
- , Option
- ['s']
- ["heartbeats"]
- (ReqArg (\s o -> o {heartBeat = (Just (read s))}) "INT")
- "heartbeat interval (0=disable, default: set by server)"
- , Option
- ['n']
- ["name"]
- (ReqArg (\s o -> o {connectionName = Just s}) "NAME")
- "connection name, will be shown in RabbitMQ web interface"
- ]
-
--- | Options for konsum
-kOptions :: [OptDescr (Args -> Args)]
-kOptions =
- [ Option
- ['r']
- ["bindingkey"]
- (ReqArg
- (\s o -> o {bindings = (currentExchange o, s) : (bindings o)})
- "BINDINGKEY")
- ("AMQP binding key (default: #)")
- , Option
- ['X']
- ["execute"]
- (OptArg
- (\s o ->
- o
- { fileProcess = Just (fromMaybe callback s)
- , tempDir = Just (fromMaybe "/tmp" (tempDir o))
- })
- "EXE")
- ("Callback Script File (implies -t) (-X without arg: " ++ callback ++ ")")
- , Option
- ['a']
- ["args"]
- (ReqArg (\s o -> o {additionalArgs = s : (additionalArgs o)}) "ARG")
- "additional argument for -X callback"
- , Option
- ['l']
- ["charlimit"]
- (ReqArg (\s o -> o {anRiss = Just (read s :: Int)}) "INT")
- "limit number of shown body chars (default: unlimited)"
- , Option
- ['t']
- ["tempdir", "target"]
- (OptArg (\s o -> o {tempDir = Just (fromMaybe "/tmp" s)}) "DIR")
- "tempdir (default: no file creation, -t without arg: /tmp)"
- , Option
- ['f']
- ["prefetch"]
- (ReqArg (\s o -> o {preFetch = read s}) "INT")
- ("Prefetch count. (0=unlimited, 1=off, default: " ++
- show (preFetch def) ++ ")")
- , Option
- ['A']
- ["ack"]
- (NoArg (\o -> o {ack = not (ack o)}))
- ("Toggle ack messages (default: " ++ show (ack def) ++ ")")
- , Option
- ['R']
- ["requeuenack"]
- (NoArg (\o -> o {requeuenack = not (requeuenack o)}))
- ("Toggle requeue when rejected (default: " ++
- show (requeuenack def) ++ ")")
- ]
-
--- | Options for agitprop
-aOptions :: [OptDescr (Args -> Args)]
-aOptions =
- [ Option
- ['r']
- ["routingkey"]
- (ReqArg (\s o -> o {rKey = s}) "ROUTINGKEY")
- "AMQP routing key"
- , Option
- ['f']
- ["inputfile"]
- (ReqArg (\s o -> o {inputFile = s}) "INPUTFILE")
- ("Message input file (default: " ++ (inputFile def) ++ ")")
- , Option
- ['l']
- ["linemode"]
- (NoArg (\o -> o {lineMode = not (lineMode o)}))
- ("Toggle line-by-line mode (default: " ++ show (lineMode def) ++ ")")
- , Option
- ['C']
- ["confirm"]
- (NoArg (\o -> o {confirm = not (confirm o)}))
- ("Toggle confirms (default: " ++ show (confirm def) ++ ")")
- , Option
- []
- ["msgid"]
- (ReqArg (\s o -> o {msgid = Just $ pack s}) "ID")
- "Message ID"
- , Option
- []
- ["type"]
- (ReqArg (\s o -> o {msgtype = Just $ pack s}) "TYPE")
- "Message Type"
- , Option
- []
- ["userid"]
- (ReqArg (\s o -> o {userid = Just $ pack s}) "USERID")
- "Message User-ID"
- , Option
- []
- ["appid"]
- (ReqArg (\s o -> o {appid = Just $ pack s}) "APPID")
- "Message App-ID"
- , Option
- []
- ["clusterid"]
- (ReqArg (\s o -> o {clusterid = Just $ pack s}) "CLUSTERID")
- "Message Cluster-ID"
- , Option
- []
- ["contenttype"]
- (ReqArg (\s o -> o {contenttype = Just $ pack s}) "CONTENTTYPE")
- "Message Content-Type"
- , Option
- []
- ["contentencoding"]
- (ReqArg (\s o -> o {contentencoding = Just $ pack s}) "CONTENTENCODING")
- "Message Content-Encoding"
- , Option
- []
- ["replyto"]
- (ReqArg (\s o -> o {replyto = Just $ pack s}) "REPLYTO")
- "Message Reply-To"
- , Option
- []
- ["prio"]
- (ReqArg (\s o -> o {prio = Just $ read s}) "PRIO")
- "Message Priority"
- , Option
- []
- ["corrid"]
- (ReqArg (\s o -> o {corrid = Just $ pack s}) "CORRID")
- "Message CorrelationID"
- , Option
- []
- ["exp"]
- (ReqArg (\s o -> o {msgexp = Just $ pack s}) "EXP")
- "Message Expiration"
- , Option
- ['h']
- ["header"]
- (ReqArg (\s o -> o {msgheader = addheader (msgheader o) s}) "HEADER=VALUE")
- "Message Headers"
- , Option
- ['F']
- ["fnheader"]
- (ReqArg (\s o -> o {fnheader = s : (fnheader o)}) "HEADERNAME")
- "Put filename into this header"
- , Option
- ['S']
- ["suffix"]
- (ReqArg (\s o -> o {suffix = s : (suffix o)}) "SUFFIX")
- "Allowed file suffixes in hotfolder mode"
- , Option
- ['m']
- ["magic"]
- (NoArg (\o -> o {magic = not (magic o)}))
- ("Toggle setting content-type and -encoding from file contents (default: " ++
- show (magic def) ++ ")")
- , Option
- ['e']
- ["persistent"]
- (NoArg (\o -> o {persistent = Just Persistent}))
- "Set persistent delivery"
- , Option
- ['E']
- ["nonpersistent"]
- (NoArg (\o -> o {persistent = Just NonPersistent}))
- "Set nonpersistent delivery"
+-- | all options
+allOptions :: [(String, OptDescr (Args -> Args))]
+allOptions =
+ [ ( "k"
+ , Option
+ ['r']
+ ["bindingkey"]
+ (ReqArg
+ (\s o -> o {bindings = (currentExchange o, s) : (bindings o)})
+ "BINDINGKEY")
+ ("AMQP binding key (default: #)"))
+ , ( "kr"
+ , Option
+ ['X']
+ ["execute"]
+ (OptArg
+ (\s o ->
+ o
+ { fileProcess = Just (fromMaybe callback s)
+ , tempDir = Just (fromMaybe "/tmp" (tempDir o))
+ })
+ "EXE")
+ ("Callback Script File (implies -t) (-X without arg: " ++
+ callback ++ ")"))
+ , ( "kr"
+ , Option
+ ['a']
+ ["args"]
+ (ReqArg (\s o -> o {additionalArgs = s : (additionalArgs o)}) "ARG")
+ "additional argument for -X callback")
+ , ( "kr"
+ , Option
+ ['t']
+ ["tempdir", "target"]
+ (OptArg (\s o -> o {tempDir = Just (fromMaybe "/tmp" s)}) "DIR")
+ "tempdir (default: no file creation, -t without arg: /tmp)")
+ , ( "k"
+ , Option
+ ['f']
+ ["prefetch"]
+ (ReqArg (\s o -> o {preFetch = read s}) "INT")
+ ("Prefetch count. (0=unlimited, 1=off, default: " ++
+ show (preFetch def) ++ ")"))
+ , ( "kr"
+ , Option
+ ['A']
+ ["ack"]
+ (NoArg (\o -> o {ack = not (ack o)}))
+ ("Toggle ack messages (default: " ++ show (ack def) ++ ")"))
+ , ( "kr"
+ , Option
+ ['R']
+ ["requeuenack"]
+ (NoArg (\o -> o {requeuenack = not (requeuenack o)}))
+ ("Toggle requeue when rejected (default: " ++
+ show (requeuenack def) ++ ")"))
+ , ( "a"
+ , Option
+ ['r']
+ ["routingkey"]
+ (ReqArg (\s o -> o {rKey = s}) "ROUTINGKEY")
+ "AMQP routing key")
+ , ( "a"
+ , Option
+ ['f']
+ ["inputfile"]
+ (ReqArg (\s o -> o {inputFile = s}) "INPUTFILE")
+ ("Message input file (default: " ++ (inputFile def) ++ ")"))
+ , ( "a"
+ , Option
+ ['l']
+ ["linemode"]
+ (NoArg (\o -> o {lineMode = not (lineMode o)}))
+ ("Toggle line-by-line mode (default: " ++ show (lineMode def) ++ ")"))
+ , ( "a"
+ , Option
+ ['C']
+ ["confirm"]
+ (NoArg (\o -> o {confirm = not (confirm o)}))
+ ("Toggle confirms (default: " ++ show (confirm def) ++ ")"))
+ , ( "a"
+ , Option
+ []
+ ["msgid"]
+ (ReqArg (\s o -> o {msgid = Just $ pack s}) "ID")
+ "Message ID")
+ , ( "a"
+ , Option
+ []
+ ["type"]
+ (ReqArg (\s o -> o {msgtype = Just $ pack s}) "TYPE")
+ "Message Type")
+ , ( "a"
+ , Option
+ []
+ ["userid"]
+ (ReqArg (\s o -> o {userid = Just $ pack s}) "USERID")
+ "Message User-ID")
+ , ( "a"
+ , Option
+ []
+ ["appid"]
+ (ReqArg (\s o -> o {appid = Just $ pack s}) "APPID")
+ "Message App-ID")
+ , ( "a"
+ , Option
+ []
+ ["clusterid"]
+ (ReqArg (\s o -> o {clusterid = Just $ pack s}) "CLUSTERID")
+ "Message Cluster-ID")
+ , ( "a"
+ , Option
+ []
+ ["contenttype"]
+ (ReqArg (\s o -> o {contenttype = Just $ pack s}) "CONTENTTYPE")
+ "Message Content-Type")
+ , ( "a"
+ , Option
+ []
+ ["contentencoding"]
+ (ReqArg (\s o -> o {contentencoding = Just $ pack s}) "CONTENTENCODING")
+ "Message Content-Encoding")
+ , ( "a"
+ , Option
+ []
+ ["replyto"]
+ (ReqArg (\s o -> o {replyto = Just $ pack s}) "REPLYTO")
+ "Message Reply-To")
+ , ( "p"
+ , Option
+ ['t']
+ ["timeout"]
+ (ReqArg (\s o -> o {timeout = read s}) "SECONDS")
+ ("How long to wait for reply (default: " ++ show (timeout def) ++ ")"))
+ , ( "a"
+ , Option
+ []
+ ["prio"]
+ (ReqArg (\s o -> o {prio = Just $ read s}) "PRIO")
+ "Message Priority")
+ , ( "ap"
+ , Option
+ []
+ ["corrid"]
+ (ReqArg (\s o -> o {corrid = Just $ pack s}) "CORRID")
+ "Message CorrelationID")
+ , ( "ap"
+ , Option
+ []
+ ["exp"]
+ (ReqArg (\s o -> o {msgexp = Just $ pack s}) "EXP")
+ "Message Expiration")
+ , ( "a"
+ , Option
+ ['h']
+ ["header"]
+ (ReqArg
+ (\s o -> o {msgheader = addheader (msgheader o) s})
+ "HEADER=VALUE")
+ "Message Headers")
+ , ( "a"
+ , Option
+ ['F']
+ ["fnheader"]
+ (ReqArg (\s o -> o {fnheader = s : (fnheader o)}) "HEADERNAME")
+ "Put filename into this header")
+ , ( "a"
+ , Option
+ ['S']
+ ["suffix"]
+ (ReqArg (\s o -> o {suffix = s : (suffix o)}) "SUFFIX")
+ "Allowed file suffixes in hotfolder mode")
+ , ( "a"
+ , Option
+ ['m']
+ ["magic"]
+ (NoArg (\o -> o {magic = not (magic o)}))
+ ("Toggle setting content-type and -encoding from file contents (default: " ++
+ show (magic def) ++ ")"))
+ , ( "a"
+ , Option
+ ['e']
+ ["persistent"]
+ (NoArg (\o -> o {persistent = Just Persistent}))
+ "Set persistent delivery")
+ , ( "a"
+ , Option
+ ['E']
+ ["nonpersistent"]
+ (NoArg (\o -> o {persistent = Just NonPersistent}))
+ "Set nonpersistent delivery")
+ , ( "krp"
+ , Option
+ ['l']
+ ["charlimit"]
+ (ReqArg (\s o -> o {anRiss = Just (read s :: Int)}) "INT")
+ "limit number of shown body chars (default: unlimited)")
+ , ( "akr"
+ , Option
+ ['q']
+ ["queue"]
+ (ReqArg (\s o -> o {qName = Just s}) "QUEUENAME")
+ "Ignore Exchange and bind to existing Queue")
+ , ( "akrp"
+ , Option
+ ['Q']
+ ["qname"]
+ (ReqArg (\s o -> o {tmpQName = s}) "TEMPQNAME")
+ "Name for temporary exclusive Queue")
+ , ( "akp"
+ , Option
+ ['x']
+ ["exchange"]
+ (ReqArg (\s o -> o {currentExchange = s}) "EXCHANGE")
+ ("AMQP Exchange (default: \"\")"))
+ , ( "akrp"
+ , Option
+ ['o']
+ ["server"]
+ (ReqArg (\s o -> o {server = s}) "SERVER")
+ ("AMQP Server (default: " ++ server def ++ ")"))
+ , ( "akrp"
+ , Option
+ ['y']
+ ["vhost"]
+ (ReqArg (\s o -> o {vHost = s}) "VHOST")
+ ("AMQP Virtual Host (default: " ++ vHost def ++ ")"))
+ , ( "akrp"
+ , Option
+ ['p']
+ ["port"]
+ (ReqArg (\s o -> o {port = read s}) "PORT")
+ ("Server Port Number (default: " ++ show (port def) ++ ")"))
+ , ( "akrp"
+ , Option
+ ['T']
+ ["tls"]
+ (NoArg (\o -> o {tls = not (tls o)}))
+ ("Toggle TLS (default: " ++ show (tls def) ++ ")"))
+ , ( "akrp"
+ , Option
+ ['c']
+ ["cert"]
+ (ReqArg (\s o -> o {cert = Just s}) "CERTFILE")
+ ("TLS Client Certificate File"))
+ , ( "akrp"
+ , Option
+ ['k']
+ ["key"]
+ (ReqArg (\s o -> o {key = Just s}) "KEYFILE")
+ ("TLS Client Private Key File"))
+ , ( "akrp"
+ , Option
+ ['U']
+ ["user"]
+ (ReqArg (\s o -> o {user = s}) "USERNAME")
+ ("Username for Auth"))
+ , ( "akrp"
+ , Option
+ ['P']
+ ["pass"]
+ (ReqArg (\s o -> o {pass = s}) "PASSWORD")
+ ("Password for Auth"))
+ , ( "akrp"
+ , Option
+ ['s']
+ ["heartbeats"]
+ (ReqArg (\s o -> o {heartBeat = (Just (read s))}) "INT")
+ "heartbeat interval (0=disable, default: set by server)")
+ , ( "akrp"
+ , Option
+ ['n']
+ ["name"]
+ (ReqArg (\s o -> o {connectionName = Just s}) "NAME")
+ "connection name, will be shown in RabbitMQ web interface")
]
-- | Options for the executables
-options :: String -> [OptDescr (Args -> Args)]
-options "konsum" = kOptions ++ cOptions
-options "agitprop" = aOptions ++ cOptions
-options _ = cOptions
+options :: Char -> [OptDescr (Args -> Args)]
+options exename = map snd $ filter ((elem exename) . fst) allOptions
-- | Add a header with a String value
addheader :: Maybe FieldTable -> String -> Maybe FieldTable
-addheader Nothing string = Just $ FieldTable $ M.singleton (k string) (v string)
+addheader Nothing string = Just $ FieldTable $ M.singleton (getkey string) (getval string)
addheader (Just (FieldTable oldheader)) string =
- Just $ FieldTable $ M.insert (k string) (v string) oldheader
+ Just $ FieldTable $ M.insert (getkey string) (getval string) oldheader
-k :: String -> Text
-k s = pack $ takeWhile (/= '=') s
+getkey :: String -> Text
+getkey s = pack $ takeWhile (/= '=') s
-v :: String -> FieldValue
-v s = FVString $ pack $ tail $ dropWhile (/= '=') s
+getval :: String -> FieldValue
+getval s = FVString $ pack $ tail $ dropWhile (/= '=') s
-- | 'parseargs' exename argstring
-- applies options onto argstring
-parseargs :: String -> [String] -> IO Args
+parseargs :: Char -> [String] -> IO Args
parseargs exename argstring =
case getOpt Permute opt argstring of
(o, [], []) -> return $ foldl (flip id) def o
callback :: String
callback = "/usr/lib/haskell-amqp-utils/callback"
-usage :: String -> String
+usage :: Char -> String
usage exename =
"\n\
\amqp-utils " ++
(showVersion version) ++
"\n\n\
\Usage:\n" ++
- exename ++
+ (longname exename) ++
" [options]\n\n\
\Options:"
+
+longname :: Char -> String
+longname 'a' = "agitprop"
+longname 'k' = "konsum"
+longname 'r' = "arbeite"
+longname 'p' = "plane"
+longname _ = "command"
generic Haskell AMQP commandline tools for use with RabbitMQ
## Overview
-The package contains 2 binaries for commandline use.
+The package contains 4 binaries for commandline use.
- konsum, a generic consumer
- agitprop, a generic publisher
+- arbeite, an rpc server
+- plane, an rpc client
## konsum
### usage
-r BINDINGKEY --bindingkey=BINDINGKEY AMQP binding key (default: #)
-X[EXE] --execute[=EXE] Callback Script File (implies -t) (-X without arg: /usr/lib/haskell-amqp-utils/callback)
-a ARG --args=ARG additional argument for -X callback
- -l INT --charlimit=INT limit number of shown body chars (default: unlimited)
-t[DIR] --tempdir[=DIR], --target[=DIR] tempdir (default: no file creation, -t without arg: /tmp)
-f INT --prefetch=INT Prefetch count. (0=unlimited, 1=off, default: 1)
-A --ack Toggle ack messages (default: True)
-R --requeuenack Toggle requeue when rejected (default: True)
+ -l INT --charlimit=INT limit number of shown body chars (default: unlimited)
+ -q QUEUENAME --queue=QUEUENAME Ignore Exchange and bind to existing Queue
+ -Q TEMPQNAME --qname=TEMPQNAME Name for temporary exclusive Queue
+ -x EXCHANGE --exchange=EXCHANGE AMQP Exchange (default: "")
-o SERVER --server=SERVER AMQP Server (default: localhost)
-y VHOST --vhost=VHOST AMQP Virtual Host (default: /)
- -x EXCHANGE --exchange=EXCHANGE AMQP Exchange (default: "")
- -Q TEMPQNAME --qname=TEMPQNAME Name for temporary exclusive Queue
-p PORT --port=PORT Server Port Number (default: 5672)
-T --tls Toggle TLS (default: False)
- -q QUEUENAME --queue=QUEUENAME Ignore Exchange and bind to existing Queue
-c CERTFILE --cert=CERTFILE TLS Client Certificate File
-k KEYFILE --key=KEYFILE TLS Client Private Key File
-U USERNAME --user=USERNAME Username for Auth
-m --magic Toggle setting content-type and -encoding from file contents (default: False)
-e --persistent Set persistent delivery
-E --nonpersistent Set nonpersistent delivery
+ -q QUEUENAME --queue=QUEUENAME Ignore Exchange and bind to existing Queue
+ -Q TEMPQNAME --qname=TEMPQNAME Name for temporary exclusive Queue
+ -x EXCHANGE --exchange=EXCHANGE AMQP Exchange (default: "")
-o SERVER --server=SERVER AMQP Server (default: localhost)
-y VHOST --vhost=VHOST AMQP Virtual Host (default: /)
- -x EXCHANGE --exchange=EXCHANGE AMQP Exchange (default: "")
- -Q TEMPQNAME --qname=TEMPQNAME Name for temporary exclusive Queue
-p PORT --port=PORT Server Port Number (default: 5672)
-T --tls Toggle TLS (default: False)
- -q QUEUENAME --queue=QUEUENAME Ignore Exchange and bind to existing Queue
-c CERTFILE --cert=CERTFILE TLS Client Certificate File
-k KEYFILE --key=KEYFILE TLS Client Private Key File
-U USERNAME --user=USERNAME Username for Auth
message header:
agitprop -x amq.topic -r test -F fileName -f agitprop.hs
+
+## plane
+### usage
+
+ plane [options]
+
+ Options:
+ -t SECONDS --timeout=SECONDS How long to wait for reply (default: 5.0)
+ --corrid=CORRID Message CorrelationID
+ --exp=EXP Message Expiration
+ -l INT --charlimit=INT limit number of shown body chars (default: unlimited)
+ -Q TEMPQNAME --qname=TEMPQNAME Name for temporary exclusive Queue
+ -x EXCHANGE --exchange=EXCHANGE AMQP Exchange (default: "")
+ -o SERVER --server=SERVER AMQP Server (default: localhost)
+ -y VHOST --vhost=VHOST AMQP Virtual Host (default: /)
+ -p PORT --port=PORT Server Port Number (default: 5672)
+ -T --tls Toggle TLS (default: False)
+ -c CERTFILE --cert=CERTFILE TLS Client Certificate File
+ -k KEYFILE --key=KEYFILE TLS Client Private Key File
+ -U USERNAME --user=USERNAME Username for Auth
+ -P PASSWORD --pass=PASSWORD Password for Auth
+ -s INT --heartbeats=INT heartbeat interval (0=disable, default: set by server)
+ -n NAME --name=NAME connection name, will be shown in RabbitMQ web interface
+
+## arbeite
+### usage
+
+ arbeite [options]
+
+ Options:
+ -X[EXE] --execute[=EXE] Callback Script File (implies -t) (-X without arg: /usr/lib/haskell-amqp-utils/callback)
+ -a ARG --args=ARG additional argument for -X callback
+ -t[DIR] --tempdir[=DIR], --target[=DIR] tempdir (default: no file creation, -t without arg: /tmp)
+ -A --ack Toggle ack messages (default: True)
+ -R --requeuenack Toggle requeue when rejected (default: True)
+ -l INT --charlimit=INT limit number of shown body chars (default: unlimited)
+ -q QUEUENAME --queue=QUEUENAME Ignore Exchange and bind to existing Queue
+ -Q TEMPQNAME --qname=TEMPQNAME Name for temporary exclusive Queue
+ -o SERVER --server=SERVER AMQP Server (default: localhost)
+ -y VHOST --vhost=VHOST AMQP Virtual Host (default: /)
+ -p PORT --port=PORT Server Port Number (default: 5672)
+ -T --tls Toggle TLS (default: False)
+ -c CERTFILE --cert=CERTFILE TLS Client Certificate File
+ -k KEYFILE --key=KEYFILE TLS Client Private Key File
+ -U USERNAME --user=USERNAME Username for Auth
+ -P PASSWORD --pass=PASSWORD Password for Auth
+ -s INT --heartbeats=INT heartbeat interval (0=disable, default: set by server)
+ -n NAME --name=NAME connection name, will be shown in RabbitMQ web interface
{-# LANGUAGE CPP #-}
-- generic AMQP publisher
-import Control.Concurrent (threadDelay)
+import Control.Concurrent
import qualified Control.Exception as X
import Control.Monad (forever)
import qualified Data.ByteString.Lazy.Char8 as BL
main :: IO ()
main = do
hr "starting"
- args <- getArgs >>= parseargs "agitprop"
+ tid <- myThreadId
+ args <- getArgs >>= parseargs 'a'
printparam' "client version" $ "amqp-utils " ++ (showVersion version)
printparam' "routing key" $ rKey args
isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory
then " (line-by-line)"
else ""
(conn, chan) <- connect args
+ addChannelExceptionHandler chan (X.throwTo tid)
printparam' "confirm mode" $ show $ confirm args
if (confirm args)
then do
name: amqp-utils
-version: 0.3.3.1
+version: 0.3.4.0
synopsis: Generic Haskell AMQP Consumer
x509-system,
tls >= 1.3.9,
amqp >=0.17
-
+
ghc-options: -threaded -Wall
-
+
default-language: Haskell98
other-modules: Network.AMQP.Utils.Options,
unix >= 2.7,
hinotify >= 0.3.8,
magic
-
+
ghc-options: -threaded -Wall
-
+
default-language: Haskell98
other-modules: Network.AMQP.Utils.Options,
Network.AMQP.Utils.Connection,
Paths_amqp_utils
+executable plane
+ main-is: plane.hs
+ build-depends: base >=4.6 && <5,
+ containers,
+ text,
+ connection,
+ data-default-class,
+ time,
+ process,
+ bytestring,
+ x509-system,
+ tls >= 1.3.9,
+ amqp >=0.17,
+ unix >= 2.7
+
+ ghc-options: -threaded -Wall
+
+ default-language: Haskell98
+
+ other-modules: Network.AMQP.Utils.Options,
+ Network.AMQP.Utils.Helpers,
+ Network.AMQP.Utils.Connection,
+ Paths_amqp_utils
+
+executable arbeite
+ main-is: arbeite.hs
+ build-depends: base >=4.6 && <5,
+ containers,
+ text,
+ connection,
+ data-default-class,
+ time,
+ process,
+ bytestring,
+ x509-system,
+ tls >= 1.3.9,
+ amqp >=0.17,
+ unix >= 2.7
+
+ ghc-options: -threaded -Wall
+
+ default-language: Haskell98
+
+ other-modules: Network.AMQP.Utils.Options,
+ Network.AMQP.Utils.Helpers,
+ Network.AMQP.Utils.Connection,
+ Paths_amqp_utils
source-repository head
type: git
}:
mkDerivation {
pname = "amqp-utils";
- version = "0.3.3.1";
+ version = "0.3.4.0";
src = ./.;
isLibrary = false;
isExecutable = true;
--- /dev/null
+{-# LANGUAGE OverloadedStrings #-}
+-- generic AMQP rpc server
+import Control.Concurrent
+import qualified Control.Exception as X
+import Control.Monad
+import qualified Data.ByteString.Lazy.Char8 as BL
+import Data.Map (singleton)
+import Data.Maybe
+import qualified Data.Text as T
+import Data.Time
+import Data.Version (showVersion)
+import Network.AMQP
+import Network.AMQP.Types
+import Network.AMQP.Utils.Connection
+import Network.AMQP.Utils.Helpers
+import Network.AMQP.Utils.Options
+import Paths_amqp_utils (version)
+import System.Environment
+
+main :: IO ()
+main = do
+ hr "starting"
+ tid <- myThreadId
+ args <- getArgs >>= parseargs 'r'
+ X.onException
+ (printparam' "worker" $ fromJust $ fileProcess args)
+ (error "-X option required")
+ let addiArgs = reverse $ additionalArgs args
+ printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+ (conn, chan) <- connect args
+ addChannelExceptionHandler chan (X.throwTo tid)
+ queue <-
+ maybe
+ (declareQueue
+ chan
+ newQueue {queueExclusive = True, queueName = (T.pack $ tmpQName args)} >>=
+ (\(x, _, _) -> return x))
+ (return)
+ (fmap T.pack (qName args))
+ printparam' "queue name" $ T.unpack queue
+ ctag <-
+ consumeMsgs
+ chan
+ queue
+ (if ack args
+ then Ack
+ else NoAck)
+ (rpcServerCallback tid args addiArgs chan)
+ printparam' "consumer tag" $ T.unpack ctag
+ printparam' "send acks" $ show (ack args)
+ printparam "requeue if rejected" $
+ if (ack args)
+ then Just (show (requeuenack args))
+ else Nothing
+ hr "entering main loop"
+ X.catch
+ (forever $ threadDelay 5000000)
+ (\exception -> printparam' "exception" $ show (exception :: X.SomeException))
+ closeConnection conn
+ hr "connection closed"
+
+rpcServerCallback ::
+ ThreadId -> Args -> [String] -> Channel -> (Message, Envelope) -> IO ()
+rpcServerCallback tid a addi c m@(msg, env) = do
+ let numstring = show $ envDeliveryTag env
+ hr $ "BEGIN " ++ numstring
+ now <- getZonedTime
+ callbackoptions <-
+ X.catch
+ (printmsg m (anRiss a) now)
+ (\x -> X.throwTo tid (x :: X.SomeException) >> return [])
+ either (\e -> printparam' "ERROR" (show (e :: X.SomeException))) return =<<
+ X.try
+ (optionalFileStuff m callbackoptions addi numstring a tid (Just reply))
+ hr $ "END " ++ numstring
+ where
+ reply e h = do
+ contents <- BL.hGetContents h
+ void $
+ publishMsg
+ c
+ (envExchangeName env)
+ (fromJust $ msgReplyTo msg)
+ newMsg
+ { msgBody = contents
+ , msgCorrelationID = msgCorrelationID msg
+ , msgTimestamp = msgTimestamp msg
+ , msgExpiration = msgExpiration msg
+ , msgHeaders = Just $ FieldTable $ singleton "exitcode" $ FVString $ T.pack $ show e
+ }
+haskell-amqp-utils (0.3.4.0) unstable; urgency=medium
+
+ * bug fix: re-add exception handler
+ * plane + arbeite: rpc client + server
+
+ -- Frank Doepper <woffs@lapdoepp> Sat, 21 Jul 2018 13:26:00 +0200
+
haskell-amqp-utils (0.3.3.1) unstable; urgency=medium
* hinotify 0.3.10 and lts-12.0 compat
Section: misc
Depends: ${shlibs:Depends}
, ${misc:Depends}
-Description: Generic Haskell AMQP consumer and publisher
+Description: Generic Haskell AMQP tools
AMQP consumer which can
create a temporary queue and attach it to an exchange, or
attach to an existing queue;
call a callback script.
AMQP publisher with file, line-by-line and
hotfolder capabilities.
+ AMQP rpc client and server.
import Control.Concurrent
import qualified Control.Exception as X
import Control.Monad
-import qualified Data.ByteString.Lazy.Char8 as BL
import qualified Data.Text as T
import Data.Time
import Data.Version (showVersion)
import Network.AMQP.Utils.Options
import Paths_amqp_utils (version)
import System.Environment
-import System.Exit
-import System.IO
-import System.Process
main :: IO ()
main = do
hr "starting"
tid <- myThreadId
- args <- getArgs >>= parseargs "konsum"
+ args <- getArgs >>= parseargs 'k'
let addiArgs = reverse $ additionalArgs args
printparam' "client version" $ "amqp-utils " ++ (showVersion version)
(conn, chan) <- connect args
+ addChannelExceptionHandler chan (X.throwTo tid)
-- set prefetch
printparam' "prefetch" $ show $ preFetch args
qos chan 0 (fromIntegral $ preFetch args) False
(forever $ threadDelay 5000000)
(\exception -> printparam' "exception" $ show (exception :: X.SomeException))
closeConnection conn
+ hr "connection closed"
-- | exclusive temp queue
tempQueue :: Channel -> String -> [(String, String)] -> String -> IO T.Text
let numstring = show $ envDeliveryTag envi
hr $ "BEGIN " ++ numstring
now <- getZonedTime
- callbackoptions <- printmsg m (anRiss a) now
+ callbackoptions <-
+ X.catch
+ (printmsg m (anRiss a) now)
+ (\x -> X.throwTo tid (x :: X.SomeException) >> return [])
either
(\e -> printparam' "ERROR" (show (e :: X.SomeException)) >> reje envi a)
return =<<
- X.try (optionalFileStuff m callbackoptions addi numstring a tid)
+ X.try (optionalFileStuff m callbackoptions addi numstring a tid Nothing)
hr $ "END " ++ numstring
-
--- | if the message is to be saved
--- and maybe processed further
-optionalFileStuff ::
- (Message, Envelope)
- -> [String]
- -> [String]
- -> String
- -> Args
- -> ThreadId
- -> IO ()
-optionalFileStuff (msg, envi) callbackoptions addi numstring a tid = do
- path <- saveFile (tempDir a) numstring (msgBody msg)
- printparam "saved to" path
- let callbackcmdline =
- liftM2
- (constructCallbackCmdLine callbackoptions addi numstring)
- (fileProcess a)
- path
- printparam "calling" $ fmap unwords callbackcmdline
- maybe
- (acke envi a)
- (\c ->
- forkFinally (doProc a numstring envi c) (either (throwTo tid) return) >>
- return ())
- callbackcmdline
-
--- | save message into temp file
-saveFile :: Maybe String -> String -> BL.ByteString -> IO (Maybe String)
-saveFile Nothing _ _ = return Nothing
-saveFile (Just tempD) numstring body = do
- (p, h) <-
- openBinaryTempFileWithDefaultPermissions
- tempD
- ("konsum-" ++ numstring ++ "-.tmp")
- BL.hPut h body
- hClose h
- return $ Just p
-
--- | construct cmdline for callback script
-constructCallbackCmdLine ::
- [String] -> [String] -> String -> String -> String -> [String]
-constructCallbackCmdLine opts addi num exe path =
- exe : "-f" : path : "-n" : num : opts ++ addi
-
--- | call callback script
-doProc :: Args -> String -> Envelope -> [String] -> IO ()
-doProc a numstring envi (exe:args) = do
- (_, _, _, processhandle) <-
- createProcess (proc exe args) {std_out = Inherit, std_err = Inherit}
- exitcode <- waitForProcess processhandle
- printparam' (numstring ++ " call returned") $ show exitcode
- case exitcode of
- ExitSuccess -> acke envi a
- ExitFailure _ -> reje envi a
-doProc _ _ _ _ = return ()
-
--- | ack
-acke :: Envelope -> Args -> IO ()
-acke envi a
- | (ack a) = ackEnv envi
- | otherwise = return ()
-
--- | reject
-reje :: Envelope -> Args -> IO ()
-reje envi a
- | (ack a) = rejectEnv envi (requeuenack a)
- | otherwise = return ()
--- /dev/null
+{-# LANGUAGE OverloadedStrings #-}
+
+-- generic AMQP rpc client
+import Control.Concurrent
+import qualified Control.Exception as X
+import Control.Monad
+import qualified Data.ByteString.Lazy.Char8 as BL
+import qualified Data.Text as T
+import Data.Time
+import Data.Time.Clock.POSIX
+import Data.Version (showVersion)
+import Network.AMQP
+import Network.AMQP.Utils.Connection
+import Network.AMQP.Utils.Helpers
+import Network.AMQP.Utils.Options
+import Paths_amqp_utils (version)
+import System.Environment
+import System.Exit
+
+main :: IO ()
+main = do
+ hr "starting"
+ tid <- myThreadId
+ args <- getArgs >>= parseargs 'p'
+ X.onException
+ (printparam' "timeout" $ show $ timeout args)
+ (error $ "invalid timeout")
+ printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+ printparam' "destination queue" $ tmpQName args
+ (conn, chan) <- connect args
+ addChannelExceptionHandler chan (X.throwTo tid)
+ (q, _, _) <- declareQueue chan newQueue {queueExclusive = True}
+ ctag <- consumeMsgs chan q NoAck (rpcClientCallback tid args)
+ printparam' "consumer tag" $ T.unpack ctag
+ message <- BL.readFile (inputFile args)
+ now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
+ hr "publishing request"
+ _ <- publishMsg
+ chan
+ (T.pack $ currentExchange args)
+ (T.pack $ tmpQName args)
+ newMsg
+ { msgBody = message
+ , msgReplyTo = Just q
+ , msgCorrelationID = corrid args
+ , msgExpiration = msgexp args
+ , msgTimestamp = Just now
+ }
+ hr "waiting for answer"
+ _ <- forkIO
+ (threadDelay (floor (1000000 * timeout args)) >>
+ throwTo tid TimeoutException)
+ X.catch
+ (forever $ threadDelay 200000)
+ (\x -> do
+ ec <- exceptionHandler x
+ hr "closing connection"
+ closeConnection conn
+ printparam' "exiting" $ show ec
+ exitWith ec)
+
+exceptionHandler :: RpcException -> IO (ExitCode)
+exceptionHandler ReceivedException = return ExitSuccess
+exceptionHandler TimeoutException = return $ ExitFailure 1
+
+rpcClientCallback :: ThreadId -> Args -> (Message, Envelope) -> IO ()
+rpcClientCallback tid a m@(_, env) = do
+ let numstring = show $ envDeliveryTag env
+ hr $ "received " ++ numstring
+ now <- getZonedTime
+ _ <-
+ X.catch
+ (printmsg m (anRiss a) now)
+ (\x -> X.throwTo tid (x :: X.SomeException) >> return [])
+ throwTo tid ReceivedException
+
+data RpcException
+ = ReceivedException
+ | TimeoutException
+ deriving (Show)
+
+instance X.Exception RpcException