]> woffs.de Git - fd/haskell-amqp-utils.git/blob - Network/AMQP/Utils/Options.hs
optionally delay negative acknowledgements
[fd/haskell-amqp-utils.git] / Network / AMQP / Utils / Options.hs
1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
2 --
3 -- SPDX-License-Identifier: GPL-3.0-only
4
5 {-# LANGUAGE OverloadedStrings #-}
6
7 module Network.AMQP.Utils.Options where
8
9 import qualified Data.ByteString.Char8            as BS
10 import           Data.Default.Class
11 import           Data.Int                         (Int64)
12 import qualified Data.Map                         as M
13 import           Data.Maybe
14 import           Data.Text                        (Text, pack)
15 import           Data.Version                     (showVersion)
16 import           Data.Word                        (Word16)
17 import           Network.AMQP
18 import           Network.AMQP.Types
19 import           Network.Socket                   (PortNumber)
20 import           Paths_amqp_utils                 (version)
21 import           System.Console.GetOpt
22 import           System.FilePath.Posix.ByteString (RawFilePath)
23 import           Text.Read                        (readMaybe)
24
25 portnumber :: Args -> PortNumber
26 portnumber a
27   | (port a) == Nothing && (tls a) = 5671
28   | (port a) == Nothing = 5672
29   | otherwise = fromJust (port a)
30
31 -- | A data type for our options
32 data Args =
33   Args
34     { server          :: String
35     , port            :: Maybe PortNumber
36     , tls             :: Bool
37     , vHost           :: String
38     , currentExchange :: String
39     , bindings        :: [(String, String)]
40     , rKey            :: String
41     , anRiss          :: Maybe Int64
42     , fileProcess     :: Maybe String
43     , qName           :: Maybe String
44     , cert            :: Maybe String
45     , key             :: Maybe String
46     , user            :: String
47     , pass            :: String
48     , preFetch        :: Word16
49     , heartBeat       :: Maybe Word16
50     , tempDir         :: Maybe String
51     , additionalArgs  :: [String]
52     , connectionName  :: Maybe String
53     , tmpQName        :: String
54     , inputFiles      :: [(RawFilePath,String,String)]
55     , outputFile      :: String
56     , lineMode        :: Bool
57     , confirm         :: Bool
58     , msgid           :: Maybe Text
59     , msgtype         :: Maybe Text
60     , userid          :: Maybe Text
61     , appid           :: Maybe Text
62     , clusterid       :: Maybe Text
63     , contenttype     :: Maybe Text
64     , contentencoding :: Maybe Text
65     , replyto         :: Maybe Text
66     , prio            :: Maybe Octet
67     , corrid          :: Maybe Text
68     , msgexp          :: Maybe Text
69     , msgheader       :: Maybe FieldTable
70     , fnheader        :: [String]
71     , suffix          :: [BS.ByteString]
72     , magic           :: Bool
73     , persistent      :: Maybe DeliveryMode
74     , ack             :: Bool
75     , requeuenack     :: Bool
76     , rpc_timeout     :: Double
77     , connect_timeout :: Int
78     , simple          :: Bool
79     , cleanupTmpFile  :: Bool
80     , removeSentFile  :: Bool
81     , moveSentFileTo  :: Maybe RawFilePath
82     , initialScan     :: Bool
83     , streamoffset    :: FieldTable
84     , delaynack       :: Int
85     }
86
87 instance Default Args where
88   def =
89     Args
90       "localhost"
91       Nothing
92       False
93       "/"
94       ""
95       []
96       ""
97       Nothing
98       Nothing
99       Nothing
100       Nothing
101       Nothing
102       "guest"
103       "guest"
104       1
105       Nothing
106       Nothing
107       []
108       Nothing
109       ""
110       []
111       "-"
112       False
113       False
114       Nothing
115       Nothing
116       Nothing
117       Nothing
118       Nothing
119       Nothing
120       Nothing
121       Nothing
122       Nothing
123       Nothing
124       Nothing
125       Nothing
126       []
127       []
128       False
129       Nothing
130       True
131       True
132       5
133       600
134       False
135       False
136       False
137       Nothing
138       False
139       (FieldTable M.empty)
140       0
141
142 -- | all options
143 allOptions :: [(String, OptDescr (Args -> Args))]
144 allOptions =
145   [ ( "k"
146     , Option
147         ['r']
148         ["bindingkey"]
149         (ReqArg
150            (\s o -> o {bindings = (currentExchange o, s) : (bindings o)})
151            "BINDINGKEY")
152         ("AMQP binding key"))
153   , ( "kr"
154     , Option
155         ['X']
156         ["execute"]
157         (OptArg
158            (\s o ->
159               o
160                 { fileProcess = Just (fromMaybe callback s)
161                 , tempDir = Just (fromMaybe "/tmp" (tempDir o))
162                 })
163            "EXE")
164         ("Callback Script File (implies -t) (-X without arg: " ++
165          callback ++ ")"))
166   , ( "kr"
167     , Option
168         ['a']
169         ["args", "arg"]
170         (ReqArg (\s o -> o {additionalArgs = s : (additionalArgs o)}) "ARG")
171         "additional argument for -X callback")
172   , ( "kr"
173     , Option
174         ['t']
175         ["tempdir", "target"]
176         (OptArg (\s o -> o {tempDir = Just (fromMaybe "/tmp" s)}) "DIR")
177         "tempdir (default: no file creation, -t without arg: /tmp)")
178   , ( "kr"
179     , Option
180         ['f']
181         ["prefetch"]
182         (ReqArg (\s o -> o {preFetch = read s}) "INT")
183         ("Prefetch count. (0=unlimited, 1=off, default: " ++
184          show (preFetch def) ++ ")"))
185   , ( "kr"
186     , Option
187         ['A']
188         ["ack"]
189         (NoArg (\o -> o {ack = not (ack o)}))
190         ("Toggle ack messages (default: " ++ show (ack def) ++ ")"))
191   , ( "kr"
192     , Option
193         ['R']
194         ["requeuenack"]
195         (NoArg (\o -> o {requeuenack = not (requeuenack o)}))
196         ("Toggle requeue when rejected (default: " ++
197          show (requeuenack def) ++ ")"))
198   , ( "a"
199     , Option
200         ['r']
201         ["routingkey"]
202         (ReqArg (\s o -> o {rKey = s}) "ROUTINGKEY")
203         "AMQP routing key")
204   , ( "p"
205     , Option
206         ['r', 'Q']
207         ["routingkey", "qname"]
208         (ReqArg (\s o -> o {rKey = s}) "ROUTINGKEY")
209         "AMQP routing key")
210   , ( "ap"
211     , Option
212         ['f']
213         ["inputfile"]
214         (ReqArg (\s o -> o {inputFiles = (BS.pack s,currentExchange o,rKey o):(inputFiles o)}) "INPUTFILE")
215         ("Message input file (default: <stdin>)"))
216   , ( "p"
217     , Option
218         ['O']
219         ["outputfile"]
220         (ReqArg (\s o -> o {outputFile = s}) "OUTPUTFILE")
221         ("Message output file (default: " ++ (outputFile def) ++ ")"))
222   , ( "a"
223     , Option
224         ['l']
225         ["linemode"]
226         (NoArg (\o -> o {lineMode = not (lineMode o)}))
227         ("Toggle line-by-line mode (default: " ++ show (lineMode def) ++ ")"))
228   , ( "a"
229     , Option
230         ['C']
231         ["confirm"]
232         (NoArg (\o -> o {confirm = not (confirm o)}))
233         ("Toggle confirms (default: " ++ show (confirm def) ++ ")"))
234   , ( "a"
235     , Option
236         []
237         ["msgid"]
238         (ReqArg (\s o -> o {msgid = Just $ pack s}) "ID")
239         "Message ID")
240   , ( "a"
241     , Option
242         []
243         ["type"]
244         (ReqArg (\s o -> o {msgtype = Just $ pack s}) "TYPE")
245         "Message Type")
246   , ( "a"
247     , Option
248         []
249         ["userid"]
250         (ReqArg (\s o -> o {userid = Just $ pack s}) "USERID")
251         "Message User-ID")
252   , ( "a"
253     , Option
254         []
255         ["appid"]
256         (ReqArg (\s o -> o {appid = Just $ pack s}) "APPID")
257         "Message App-ID")
258   , ( "a"
259     , Option
260         []
261         ["clusterid"]
262         (ReqArg (\s o -> o {clusterid = Just $ pack s}) "CLUSTERID")
263         "Message Cluster-ID")
264   , ( "a"
265     , Option
266         []
267         ["contenttype"]
268         (ReqArg (\s o -> o {contenttype = Just $ pack s}) "CONTENTTYPE")
269         "Message Content-Type")
270   , ( "a"
271     , Option
272         []
273         ["contentencoding"]
274         (ReqArg (\s o -> o {contentencoding = Just $ pack s}) "CONTENTENCODING")
275         "Message Content-Encoding")
276   , ( "a"
277     , Option
278         []
279         ["replyto"]
280         (ReqArg (\s o -> o {replyto = Just $ pack s}) "REPLYTO")
281         "Message Reply-To")
282   , ( "p"
283     , Option
284         ['t']
285         ["rpc_timeout"]
286         (ReqArg (\s o -> o {rpc_timeout = read s}) "SECONDS")
287         ("How long to wait for reply (default: " ++
288          show (rpc_timeout def) ++ ")"))
289   , ( "a"
290     , Option
291         []
292         ["prio"]
293         (ReqArg (\s o -> o {prio = Just $ read s}) "PRIO")
294         "Message Priority")
295   , ( "ap"
296     , Option
297         []
298         ["corrid"]
299         (ReqArg (\s o -> o {corrid = Just $ pack s}) "CORRID")
300         "Message CorrelationID")
301   , ( "ap"
302     , Option
303         []
304         ["exp"]
305         (ReqArg (\s o -> o {msgexp = Just $ pack s}) "EXP")
306         "Message Expiration")
307   , ( "ap"
308     , Option
309         ['h']
310         ["header"]
311         (ReqArg
312            (\s o -> o {msgheader = addheader (msgheader o) s})
313            "HEADER=VALUE")
314         "Message Headers")
315   , ( "k"
316     , Option
317         []
318         ["stream_offset"]
319         (ReqArg
320            (\s o -> o {streamoffset = mkStreamOffset s})
321            "OFFSET")
322         "x-stream-offset consumer argument")
323   , ( "a"
324     , Option
325         ['F']
326         ["fnheader"]
327         (ReqArg (\s o -> o {fnheader = s : (fnheader o)}) "HEADERNAME")
328         "Put filename into this header")
329   , ( "a"
330     , Option
331         ['S']
332         ["suffix"]
333         (ReqArg (\s o -> o {suffix = (BS.pack s) : (suffix o)}) "SUFFIX")
334         "Allowed file suffixes in hotfolder mode")
335   , ( "a"
336     , Option
337         ['u']
338         ["remove", "move"]
339         (OptArg (\s o -> o {removeSentFile = True, moveSentFileTo = fmap BS.pack s}) "DIR")
340         ("Remove (or move to DIR) sent file in hotfolder mode"))
341   , ( "a"
342     , Option
343         ['d']
344         ["dirscan"]
345         (NoArg (\o -> o {initialScan = not (initialScan o)}))
346         ("Toggle initial directory scan in hotfolder mode (default: " ++
347          show (initialScan def) ++ ")"))
348   , ( "a"
349     , Option
350         ['m']
351         ["magic"]
352         (NoArg (\o -> o {magic = not (magic o)}))
353         ("Toggle setting content-type and -encoding from file contents (default: " ++
354          show (magic def) ++ ")"))
355   , ( "a"
356     , Option
357         ['e']
358         ["persistent"]
359         (NoArg (\o -> o {persistent = Just Persistent}))
360         "Set persistent delivery")
361   , ( "a"
362     , Option
363         ['E']
364         ["nonpersistent"]
365         (NoArg (\o -> o {persistent = Just NonPersistent}))
366         "Set nonpersistent delivery")
367   , ( "krp"
368     , Option
369         ['l']
370         ["charlimit"]
371         (ReqArg (\s o -> o {anRiss = Just (read s)}) "INT")
372         "limit number of shown body chars (default: unlimited)")
373   , ( "kr"
374     , Option
375         ['q']
376         ["queue"]
377         (ReqArg (\s o -> o {qName = Just s}) "QUEUENAME")
378         "Ignore Exchange and bind to existing Queue")
379   , ( "kr"
380     , Option
381         ['i']
382         ["simple"]
383         (NoArg
384            (\o -> o {simple = True, cleanupTmpFile = not (cleanupTmpFile o)}))
385         "call callback with one arg (filename) only")
386   , ( "kr"
387     , Option
388         ['j']
389         ["cleanup"]
390         (NoArg (\o -> o {cleanupTmpFile = not (cleanupTmpFile o)}))
391         "Toggle remove tempfile after script call. Default False, but default True if --simple (-i)")
392   , ( "kr"
393     , Option
394         ['Q']
395         ["qname"]
396         (ReqArg (\s o -> o {tmpQName = s}) "TEMPQNAME")
397         "Name for temporary exclusive Queue")
398   , ( "akrp"
399     , Option
400         ['x']
401         ["exchange"]
402         (ReqArg (\s o -> o {currentExchange = s}) "EXCHANGE")
403         ("AMQP Exchange (default: \"\")"))
404   , ( "akrp"
405     , Option
406         ['o']
407         ["server"]
408         (ReqArg (\s o -> o {server = s}) "SERVER")
409         ("AMQP Server (default: " ++ server def ++ ")"))
410   , ( "akrp"
411     , Option
412         ['y']
413         ["vhost"]
414         (ReqArg (\s o -> o {vHost = s}) "VHOST")
415         ("AMQP Virtual Host (default: " ++ vHost def ++ ")"))
416   , ( "akrp"
417     , Option
418         ['p']
419         ["port"]
420         (ReqArg (\s o -> o {port = Just (read s)}) "PORT")
421         ("Server Port Number (default: " ++ show (portnumber def) ++ ")"))
422   , ( "akrp"
423     , Option
424         ['T']
425         ["tls"]
426         (NoArg (\o -> o {tls = not (tls o)}))
427         ("Toggle TLS (default: " ++ show (tls def) ++ ")"))
428   , ( "akrp"
429     , Option
430         ['c']
431         ["cert"]
432         (ReqArg (\s o -> o {cert = Just s}) "CERTFILE")
433         ("TLS Client Certificate File"))
434   , ( "akrp"
435     , Option
436         ['k']
437         ["key"]
438         (ReqArg (\s o -> o {key = Just s}) "KEYFILE")
439         ("TLS Client Private Key File"))
440   , ( "akrp"
441     , Option
442         ['U']
443         ["user"]
444         (ReqArg (\s o -> o {user = s}) "USERNAME")
445         ("Username for Auth"))
446   , ( "akrp"
447     , Option
448         ['P']
449         ["pass"]
450         (ReqArg (\s o -> o {pass = s}) "PASSWORD")
451         ("Password for Auth"))
452   , ( "akrp"
453     , Option
454         ['s']
455         ["heartbeats"]
456         (ReqArg (\s o -> o {heartBeat = (Just (read s))}) "INT")
457         "heartbeat interval (0=disable, default: set by server)")
458   , ( "akrp"
459     , Option
460         ['n']
461         ["name"]
462         (ReqArg (\s o -> o {connectionName = Just s}) "NAME")
463         "connection name, will be shown in RabbitMQ web interface")
464   , ( "akrp"
465     , Option
466         ['w']
467         ["connect_timeout"]
468         (ReqArg (\s o -> o {connect_timeout = read s}) "SECONDS")
469         ("timeout for establishing initial connection (default: " ++
470          show (connect_timeout def) ++ ")"))
471   , ( "k"
472     , Option
473         ['D']
474         ["delaynack"]
475         (ReqArg (\s o -> o {delaynack = read s}) "SECONDS")
476         ("delay negative acknowledgements (default: " ++
477          show (delaynack def) ++ ")"))
478   ]
479
480 -- | Options for the executables
481 options :: Char -> [OptDescr (Args -> Args)]
482 options exename = map snd $ filter ((elem exename) . fst) allOptions
483
484 -- | Add a header with a String value
485 addheader :: Maybe FieldTable -> String -> Maybe FieldTable
486 addheader Nothing string =
487   Just $ FieldTable $ M.singleton (getkey string) (getval string)
488 addheader (Just (FieldTable oldheader)) string =
489   Just $ FieldTable $ M.insert (getkey string) (getval string) oldheader
490
491 getkey :: String -> Text
492 getkey s = pack $ takeWhile (/= '=') s
493
494 getval :: String -> FieldValue
495 getval s = FVString $ BS.pack $ tail $ dropWhile (/= '=') s
496
497 -- | Parse streamoffset argument as number or string
498 mkStreamOffset :: String -> FieldTable
499 mkStreamOffset s = FieldTable $ M.singleton (pack "x-stream-offset") value
500   where
501     value = maybe (FVString $ BS.pack s) FVInt64 $ readMaybe s
502
503 -- | 'parseargs' exename argstring
504 -- applies options onto argstring
505 parseargs :: Char -> [String] -> IO Args
506 parseargs exename argstring =
507   case getOpt Permute opt argstring of
508     (o, [], []) -> return $ foldl (flip id) def o
509     (_, _, errs) ->
510       ioError $ userError $ concat errs ++ usageInfo (usage exename) opt
511   where
512     opt = options exename
513
514 -- | the default callback for the -X option
515 callback :: String
516 callback = "/usr/lib/haskell-amqp-utils/callback"
517
518 usage :: Char -> String
519 usage exename =
520   "\n\
521   \amqp-utils " ++
522   (showVersion version) ++
523   "\n\n\
524   \Usage:\n" ++
525   (longname exename) ++
526   " [options]\n\n\
527   \Options:"
528
529 longname :: Char -> String
530 longname 'a' = "agitprop"
531 longname 'k' = "konsum"
532 longname 'r' = "arbeite"
533 longname 'p' = "plane"
534 longname _   = "command"