-{-# LANGUAGE CPP #-}
+-- SPDX-FileCopyrightText: 2022 Frank Doepper
+--
+-- SPDX-License-Identifier: GPL-3.0-only
+
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE OverloadedStrings #-}
+
-- generic AMQP publisher
-import Control.Concurrent
-import qualified Control.Exception as X
-import Control.Monad (forever)
-import qualified Data.ByteString.Lazy.Char8 as BL
-#if MIN_VERSION_hinotify(0,3,10)
-import qualified Data.ByteString.Char8 as BS
+import Control.Concurrent
+import qualified Control.Exception as X
+import Control.Monad (forM_)
+import qualified Data.ByteString.Char8 as BS
+import qualified Data.ByteString.Lazy.Char8 as BL
+import qualified Data.Map as M
+import Data.Maybe
+import qualified Data.Text as T
+import Data.Time
+import Data.Time.Clock.POSIX
+import Data.Version (showVersion)
+import Data.Word (Word64)
+import Magic
+import Network.AMQP
+import Network.AMQP.Types
+import Network.AMQP.Utils.Connection
+import Network.AMQP.Utils.Helpers
+import Network.AMQP.Utils.Options
+import Paths_amqp_utils (version)
+import qualified System.Directory.OsPath as DO
+import System.Environment
+import System.Exit
+#if linux_HOST_OS
+import System.INotify
#endif
-import Data.List (isSuffixOf)
-import Data.Maybe
-import qualified Data.Text as T
-import Data.Time
-import Data.Time.Clock.POSIX
-import Data.Version (showVersion)
-import Data.Word (Word64)
-import Magic
-import Network.AMQP
-import Network.AMQP.Types
-import Network.AMQP.Utils.Connection
-import Network.AMQP.Utils.Helpers
-import Network.AMQP.Utils.Options
-import Paths_amqp_utils (version)
-import System.Environment
-import System.INotify
-import qualified System.Posix.Files as F
+import System.Posix.ByteString (RawFilePath)
+import qualified System.Posix.Files.ByteString as FB
+import qualified System.Posix.Files.PosixString as FP
+import qualified System.OsPath as OS
+import qualified System.File.OsPath as FOS
main :: IO ()
main = do
hr "starting"
tid <- myThreadId
args <- getArgs >>= parseargs 'a'
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
- printparam' "routing key" $ rKey args
- isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory
- if isDir
- then printparam' "hotfolder" $ inputFile args
- else printparam' "input file" $
- (inputFile args) ++
- if (lineMode args)
- then " (line-by-line)"
- else ""
+ printparam "client version" ["amqp-utils", showVersion version]
+ printparam "routing key" $ rKey args
+ printparam "exchange" $ currentExchange args
(conn, chan) <- connect args
addChannelExceptionHandler chan (X.throwTo tid)
- printparam' "confirm mode" $ show $ confirm args
+ printparam "confirm mode" $ confirm args
if (confirm args)
then do
confirmSelect chan False
addConfirmationListener chan confirmCallback
else return ()
- let publishOneMsg = publishOneMsg' chan args
+ let inputFile' = firstInputFile (inputFiles args)
+ isDir <-
+ if inputFile' == "-"
+ then return False
+ else FB.getFileStatus inputFile' >>= return . FB.isDirectory
+ let publishOneMsg =
+ publishOneMsg' chan args {removeSentFile = removeSentFile args && isDir}
+ if isDir
+ then do
+ printparam "hotfolder mode" True
+ printparam "initial scan" (initialScan args)
+ if isNothing (moveSentFileTo args)
+ then printparam "remove sent file" (removeSentFile args)
+ else printparam "move sent file to" (moveSentFileTo args)
+ else printparam
+ "input file"
+ [ inputFile'
+ , if (lineMode args)
+ then "(line-by-line)"
+ else ""
+ ]
X.catch
(if isDir
then do
- inotify <- initINotify
- wd <-
- addWatch
- inotify
- [CloseWrite, MoveIn]
-#if MIN_VERSION_hinotify(0,3,10)
- (BS.pack (inputFile args))
+#if linux_HOST_OS
+ wds <- mapM (watchHotfolder args publishOneMsg) (inputFiles args)
+ sleepingBeauty >>= (\x -> do
+ forM_ wds (\(wd,folder) -> do
+ removeWatch wd
+ printparam "END watching" folder
+ )
+ X.throw x)
#else
- (inputFile args)
+ X.throw (X.ErrorCall "ERROR: watching a directory is only supported in Linux")
#endif
- (handleEvent publishOneMsg (suffix args) (inputFile args))
- hr $ "BEGIN watching " ++ (inputFile args)
- _ <- forever $ threadDelay 1000000
- removeWatch wd
- hr $ "END watching " ++ (inputFile args)
else do
hr $ "BEGIN sending"
- messageFile <- BL.readFile (inputFile args)
+ messageFile <-
+ if inputFile' == "-"
+ then BL.getContents
+ else OS.encodeUtf (BS.unpack inputFile') >>= FOS.readFile
if (lineMode args)
- then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
- else publishOneMsg (Just (inputFile args)) messageFile
- hr "END sending")
- (\exception -> printparam' "exception" $ show (exception :: X.SomeException))
- -- all done. wait and close.
- if (confirm args)
- then waitForConfirms chan >>= (printparam' "confirmed") . show
+ then mapM_ (publishOneMsg (currentExchange args) (rKey args) Nothing) (BL.lines messageFile)
+ else publishOneMsg (currentExchange args) (rKey args) (Just (inputFile')) messageFile
+ hr "END sending"
+ if (confirm args)
+ then waitForConfirms chan >>= printparam "confirmed"
+ else return ()
+ X.catch (closeConnection conn) exceptionHandler
+ )
+ exceptionHandler
+
+#if linux_HOST_OS
+-- | watch a hotfolder
+watchHotfolder ::
+ Args
+ -> (String -> String -> Maybe RawFilePath -> BL.ByteString -> IO ())
+ -> (RawFilePath, String, String)
+ -> IO (WatchDescriptor, RawFilePath)
+watchHotfolder args publishOneMsg (folder, exchange, rkey) = do
+ printparam "hotfolder" folder
+ inotify <- initINotify
+ wd <-
+ addWatch
+ inotify
+ [CloseWrite, MoveIn]
+ folder
+ (handleEvent (publishOneMsg exchange rkey) (suffix args) folder)
+ hr "BEGIN watching"
+ if (initialScan args)
+ then do
+ folder' <- OS.encodeUtf (BS.unpack folder)
+ DO.listDirectory folder' >>=
+ mapM_ (\fn -> handleFile (publishOneMsg exchange rkey) (suffix args) (folder' OS.</> fn))
else return ()
- closeConnection conn
- hr "connection closed"
+ return (wd,folder)
+#endif
+
+-- | A handler for clean exit
+exceptionHandler :: AMQPException -> IO ()
+exceptionHandler (ChannelClosedException Normal txt) =
+ printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler (ConnectionClosedException Normal txt) =
+ printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
-- | The handler for publisher confirms
confirmCallback :: (Word64, Bool, AckType) -> IO ()
confirmCallback (deliveryTag, isAll, ackType) =
- printparam'
+ printparam
"confirmed"
- ((show deliveryTag) ++
- (if isAll
- then " all "
- else " this ") ++
- (show ackType))
+ [ show deliveryTag
+ , if isAll
+ then "all"
+ else "this"
+ , show ackType
+ ]
+#if linux_HOST_OS
-- | Hotfolder event handler
handleEvent ::
- (Maybe String -> BL.ByteString -> IO ())
- -> [String]
- -> String
- -> Event
- -> IO ()
+ (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> OS.OsPath -> Event -> IO ()
-- just handle closewrite and movedin events
-#if MIN_VERSION_hinotify(0,3,10)
-handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ (BS.unpack x))
-handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ (BS.unpack x))
-#else
-handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x)
-handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x)
-#endif
+handleEvent func suffixes folder (Closed False (Just fileName) True) =
+ handleFile func suffixes (folder OS.</> fileName)
+handleEvent func suffixes folder (MovedIn False fileName _) =
+ handleFile func suffixes (folder OS.</> fileName)
handleEvent _ _ _ _ = return ()
-- | Hotfolder file handler
handleFile ::
- (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
-handleFile _ _ ('.':_) = return () -- ignore hidden files
-handleFile f s@(_:_) x =
- if any (flip isSuffixOf x) s
- then handleFile f [] x
+ (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> OS.OsPath -> IO ()
+handleFile func suffixes@(_:_) fileName =
+ if (any (flip BS.isSuffixOf fileName) suffixes) && not ("." `BS.isPrefixOf` fileName)
+ then handleFile func [] fileName
else return ()
-handleFile f [] x =
+handleFile func [] fileName =
X.catch
- (BL.readFile x >>= f (Just x))
- (\exception ->
- printparam' "exception in handleFile" $
- show (exception :: X.SomeException))
+ (FOS.readFile fileName >>= func (Just fileName))
+ (\e -> printparam "exception while processing" fileName >> printparam "exception in handleFile" (e :: X.IOException))
+#endif
-- | Publish one message with our settings
-publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO ()
-publishOneMsg' c a fn f = do
+publishOneMsg' ::
+ Channel
+ -> Args
+ -> String
+ -> String
+ -> Maybe RawFilePath
+ -> BL.ByteString
+ -> IO ()
+publishOneMsg' chan a exchange rkey fn content = do
printparam "sending" fn
(mtype, mencoding) <-
- if (magic a) && isJust fn
+ if (magic a)
then do
+ let firstchunk = if BL.null content then BS.empty else head $ BL.toChunks content
m <- magicOpen [MagicMimeType]
magicLoadDefault m
- t <- magicFile m (fromJust fn)
+ t <- BS.useAsCStringLen firstchunk (magicCString m)
magicSetFlags m [MagicMimeEncoding]
- e <- magicFile m (fromJust fn)
+ e <- BS.useAsCStringLen firstchunk (magicCString m)
return (Just (T.pack t), Just (T.pack e))
else return ((contenttype a), (contentencoding a))
now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
- r <-
- publishMsg
- c
- (T.pack $ currentExchange a)
- (T.pack $ rKey a)
- newMsg
- { msgBody = f
- , msgDeliveryMode = persistent a
- , msgTimestamp = Just now
- , msgID = msgid a
- , msgType = msgtype a
- , msgUserID = userid a
- , msgApplicationID = appid a
- , msgClusterID = clusterid a
- , msgContentType = mtype
- , msgContentEncoding = mencoding
- , msgReplyTo = replyto a
- , msgPriority = prio a
- , msgCorrelationID = corrid a
- , msgExpiration = msgexp a
- , msgHeaders = substheader (fnheader a) fn $ msgheader a
- }
- printparam "sent" $ fmap show r
+ publishMsg
+ chan
+ (T.pack $ exchange)
+ (T.pack $ rkey)
+ newMsg
+ { msgBody = content
+ , msgDeliveryMode = persistent a
+ , msgTimestamp = Just now
+ , msgID = msgid a
+ , msgType = msgtype a
+ , msgUserID = userid a
+ , msgApplicationID = appid a
+ , msgClusterID = clusterid a
+ , msgContentType = mtype
+ , msgContentEncoding = mencoding
+ , msgReplyTo = replyto a
+ , msgPriority = prio a
+ , msgCorrelationID = corrid a
+ , msgExpiration = msgexp a
+ , msgHeaders = substheader (fnheader a) (fmap OS.takeFileName fn) $ msgheader a
+ } >>=
+ printparam "sent"
+ removeSentFileIfRequested (removeSentFile a) (moveSentFileTo a) fn
where
substheader ::
- [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
+ [String] -> Maybe BS.ByteString -> Maybe FieldTable -> Maybe FieldTable
substheader (s:r) (Just fname) old =
- substheader r (Just fname) (addheader old (s ++ "=" ++ fname))
+ substheader r (Just fname) (addheader' old s fname)
substheader _ _ old = old
+ removeSentFileIfRequested False _ _ = return ()
+ removeSentFileIfRequested True _ Nothing = return ()
+ removeSentFileIfRequested True Nothing (Just fname) =
+ printparam "removing" fname >> DO.removeFile fname
+ removeSentFileIfRequested True (Just path) (Just fname) =
+ printparam "moving" [fname,"to",path] >>
+ FP.rename fname (OS.replaceDirectory fname ((OS.takeDirectory fname) OS.</> path))
+ addheader' :: Maybe FieldTable -> String -> BS.ByteString -> Maybe FieldTable
+ addheader' Nothing k v =
+ Just $ FieldTable $ M.singleton (T.pack k) (FVString v)
+ addheader' (Just (FieldTable oldheader)) k v =
+ Just $ FieldTable $ M.insert (T.pack k) (FVString v) oldheader