1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
3 -- SPDX-License-Identifier: GPL-3.0-only
5 {-# LANGUAGE OverloadedStrings #-}
7 module Network.AMQP.Utils.Options where
9 import qualified Data.ByteString.Char8 as BS
10 import Data.Default.Class
11 import Data.Int (Int64)
12 import qualified Data.Map as M
14 import Data.Text (Text, pack)
15 import Data.Version (showVersion)
16 import Data.Word (Word16)
18 import Network.AMQP.Types
19 import Network.Socket (PortNumber)
20 import Paths_amqp_utils (version)
21 import System.Console.GetOpt
22 import System.FilePath.Posix.ByteString (RawFilePath)
23 import Text.Read (readMaybe)
25 portnumber :: Args -> PortNumber
27 | (port a) == Nothing && (tls a) = 5671
28 | (port a) == Nothing = 5672
29 | otherwise = fromJust (port a)
31 -- | A data type for our options
35 , port :: Maybe PortNumber
38 , currentExchange :: String
39 , bindings :: [(String, String)]
41 , anRiss :: Maybe Int64
42 , fileProcess :: Maybe String
43 , qName :: Maybe String
44 , cert :: Maybe String
49 , heartBeat :: Maybe Word16
50 , tempDir :: Maybe String
51 , additionalArgs :: [String]
52 , connectionName :: Maybe String
54 , inputFiles :: [(RawFilePath,String,String)]
55 , outputFile :: String
59 , msgtype :: Maybe Text
60 , userid :: Maybe Text
62 , clusterid :: Maybe Text
63 , contenttype :: Maybe Text
64 , contentencoding :: Maybe Text
65 , replyto :: Maybe Text
67 , corrid :: Maybe Text
68 , msgexp :: Maybe Text
69 , msgheader :: Maybe FieldTable
70 , fnheader :: [String]
71 , suffix :: [BS.ByteString]
73 , persistent :: Maybe DeliveryMode
76 , rpc_timeout :: Double
77 , connect_timeout :: Int
79 , cleanupTmpFile :: Bool
80 , removeSentFile :: Bool
81 , moveSentFileTo :: Maybe RawFilePath
83 , streamoffset :: FieldTable
86 instance Default Args where
141 allOptions :: [(String, OptDescr (Args -> Args))]
148 (\s o -> o {bindings = (currentExchange o, s) : (bindings o)})
150 ("AMQP binding key"))
158 { fileProcess = Just (fromMaybe callback s)
159 , tempDir = Just (fromMaybe "/tmp" (tempDir o))
162 ("Callback Script File (implies -t) (-X without arg: " ++
168 (ReqArg (\s o -> o {additionalArgs = s : (additionalArgs o)}) "ARG")
169 "additional argument for -X callback")
173 ["tempdir", "target"]
174 (OptArg (\s o -> o {tempDir = Just (fromMaybe "/tmp" s)}) "DIR")
175 "tempdir (default: no file creation, -t without arg: /tmp)")
180 (ReqArg (\s o -> o {preFetch = read s}) "INT")
181 ("Prefetch count. (0=unlimited, 1=off, default: " ++
182 show (preFetch def) ++ ")"))
187 (NoArg (\o -> o {ack = not (ack o)}))
188 ("Toggle ack messages (default: " ++ show (ack def) ++ ")"))
193 (NoArg (\o -> o {requeuenack = not (requeuenack o)}))
194 ("Toggle requeue when rejected (default: " ++
195 show (requeuenack def) ++ ")"))
200 (ReqArg (\s o -> o {rKey = s}) "ROUTINGKEY")
205 ["routingkey", "qname"]
206 (ReqArg (\s o -> o {rKey = s}) "ROUTINGKEY")
212 (ReqArg (\s o -> o {inputFiles = (BS.pack s,currentExchange o,rKey o):(inputFiles o)}) "INPUTFILE")
213 ("Message input file (default: <stdin>)"))
218 (ReqArg (\s o -> o {outputFile = s}) "OUTPUTFILE")
219 ("Message output file (default: " ++ (outputFile def) ++ ")"))
224 (NoArg (\o -> o {lineMode = not (lineMode o)}))
225 ("Toggle line-by-line mode (default: " ++ show (lineMode def) ++ ")"))
230 (NoArg (\o -> o {confirm = not (confirm o)}))
231 ("Toggle confirms (default: " ++ show (confirm def) ++ ")"))
236 (ReqArg (\s o -> o {msgid = Just $ pack s}) "ID")
242 (ReqArg (\s o -> o {msgtype = Just $ pack s}) "TYPE")
248 (ReqArg (\s o -> o {userid = Just $ pack s}) "USERID")
254 (ReqArg (\s o -> o {appid = Just $ pack s}) "APPID")
260 (ReqArg (\s o -> o {clusterid = Just $ pack s}) "CLUSTERID")
261 "Message Cluster-ID")
266 (ReqArg (\s o -> o {contenttype = Just $ pack s}) "CONTENTTYPE")
267 "Message Content-Type")
272 (ReqArg (\s o -> o {contentencoding = Just $ pack s}) "CONTENTENCODING")
273 "Message Content-Encoding")
278 (ReqArg (\s o -> o {replyto = Just $ pack s}) "REPLYTO")
284 (ReqArg (\s o -> o {rpc_timeout = read s}) "SECONDS")
285 ("How long to wait for reply (default: " ++
286 show (rpc_timeout def) ++ ")"))
291 (ReqArg (\s o -> o {prio = Just $ read s}) "PRIO")
297 (ReqArg (\s o -> o {corrid = Just $ pack s}) "CORRID")
298 "Message CorrelationID")
303 (ReqArg (\s o -> o {msgexp = Just $ pack s}) "EXP")
304 "Message Expiration")
310 (\s o -> o {msgheader = addheader (msgheader o) s})
318 (\s o -> o {streamoffset = mkStreamOffset s})
320 "x-stream-offset consumer argument")
325 (ReqArg (\s o -> o {fnheader = s : (fnheader o)}) "HEADERNAME")
326 "Put filename into this header")
331 (ReqArg (\s o -> o {suffix = (BS.pack s) : (suffix o)}) "SUFFIX")
332 "Allowed file suffixes in hotfolder mode")
337 (OptArg (\s o -> o {removeSentFile = True, moveSentFileTo = fmap BS.pack s}) "DIR")
338 ("Remove (or move to DIR) sent file in hotfolder mode"))
343 (NoArg (\o -> o {initialScan = not (initialScan o)}))
344 ("Toggle initial directory scan in hotfolder mode (default: " ++
345 show (initialScan def) ++ ")"))
350 (NoArg (\o -> o {magic = not (magic o)}))
351 ("Toggle setting content-type and -encoding from file contents (default: " ++
352 show (magic def) ++ ")"))
357 (NoArg (\o -> o {persistent = Just Persistent}))
358 "Set persistent delivery")
363 (NoArg (\o -> o {persistent = Just NonPersistent}))
364 "Set nonpersistent delivery")
369 (ReqArg (\s o -> o {anRiss = Just (read s)}) "INT")
370 "limit number of shown body chars (default: unlimited)")
375 (ReqArg (\s o -> o {qName = Just s}) "QUEUENAME")
376 "Ignore Exchange and bind to existing Queue")
382 (\o -> o {simple = True, cleanupTmpFile = not (cleanupTmpFile o)}))
383 "call callback with one arg (filename) only")
388 (NoArg (\o -> o {cleanupTmpFile = not (cleanupTmpFile o)}))
389 "Toggle remove tempfile after script call. Default False, but default True if --simple (-i)")
394 (ReqArg (\s o -> o {tmpQName = s}) "TEMPQNAME")
395 "Name for temporary exclusive Queue")
400 (ReqArg (\s o -> o {currentExchange = s}) "EXCHANGE")
401 ("AMQP Exchange (default: \"\")"))
406 (ReqArg (\s o -> o {server = s}) "SERVER")
407 ("AMQP Server (default: " ++ server def ++ ")"))
412 (ReqArg (\s o -> o {vHost = s}) "VHOST")
413 ("AMQP Virtual Host (default: " ++ vHost def ++ ")"))
418 (ReqArg (\s o -> o {port = Just (read s)}) "PORT")
419 ("Server Port Number (default: " ++ show (portnumber def) ++ ")"))
424 (NoArg (\o -> o {tls = not (tls o)}))
425 ("Toggle TLS (default: " ++ show (tls def) ++ ")"))
430 (ReqArg (\s o -> o {cert = Just s}) "CERTFILE")
431 ("TLS Client Certificate File"))
436 (ReqArg (\s o -> o {key = Just s}) "KEYFILE")
437 ("TLS Client Private Key File"))
442 (ReqArg (\s o -> o {user = s}) "USERNAME")
443 ("Username for Auth"))
448 (ReqArg (\s o -> o {pass = s}) "PASSWORD")
449 ("Password for Auth"))
454 (ReqArg (\s o -> o {heartBeat = (Just (read s))}) "INT")
455 "heartbeat interval (0=disable, default: set by server)")
460 (ReqArg (\s o -> o {connectionName = Just s}) "NAME")
461 "connection name, will be shown in RabbitMQ web interface")
466 (ReqArg (\s o -> o {connect_timeout = read s}) "SECONDS")
467 ("timeout for establishing initial connection (default: " ++
468 show (connect_timeout def) ++ ")"))
471 -- | Options for the executables
472 options :: Char -> [OptDescr (Args -> Args)]
473 options exename = map snd $ filter ((elem exename) . fst) allOptions
475 -- | Add a header with a String value
476 addheader :: Maybe FieldTable -> String -> Maybe FieldTable
477 addheader Nothing string =
478 Just $ FieldTable $ M.singleton (getkey string) (getval string)
479 addheader (Just (FieldTable oldheader)) string =
480 Just $ FieldTable $ M.insert (getkey string) (getval string) oldheader
482 getkey :: String -> Text
483 getkey s = pack $ takeWhile (/= '=') s
485 getval :: String -> FieldValue
486 getval s = FVString $ BS.pack $ tail $ dropWhile (/= '=') s
488 -- | Parse streamoffset argument as number or string
489 mkStreamOffset :: String -> FieldTable
490 mkStreamOffset s = FieldTable $ M.singleton (pack "x-stream-offset") value
492 value = maybe (FVString $ BS.pack s) FVInt32 $ readMaybe s
494 -- | 'parseargs' exename argstring
495 -- applies options onto argstring
496 parseargs :: Char -> [String] -> IO Args
497 parseargs exename argstring =
498 case getOpt Permute opt argstring of
499 (o, [], []) -> return $ foldl (flip id) def o
501 ioError $ userError $ concat errs ++ usageInfo (usage exename) opt
503 opt = options exename
505 -- | the default callback for the -X option
507 callback = "/usr/lib/haskell-amqp-utils/callback"
509 usage :: Char -> String
513 (showVersion version) ++
516 (longname exename) ++
520 longname :: Char -> String
521 longname 'a' = "agitprop"
522 longname 'k' = "konsum"
523 longname 'r' = "arbeite"
524 longname 'p' = "plane"
525 longname _ = "command"