X-Git-Url: https://woffs.de/git/fd/haskell-amqp-utils.git/blobdiff_plain/98a2d1e6c69161f7ea0e5418157385568826fee3..refs/heads/ospath:/agitprop.hs diff --git a/agitprop.hs b/agitprop.hs index bd03adc..445f686 100644 --- a/agitprop.hs +++ b/agitprop.hs @@ -1,169 +1,241 @@ -{-# LANGUAGE CPP #-} +-- SPDX-FileCopyrightText: 2022 Frank Doepper +-- +-- SPDX-License-Identifier: GPL-3.0-only + +{-# LANGUAGE CPP #-} +{-# LANGUAGE OverloadedStrings #-} + -- generic AMQP publisher -import Control.Concurrent (threadDelay) -import qualified Control.Exception as X -import Control.Monad (forever) -import qualified Data.ByteString.Lazy.Char8 as BL -#if MIN_VERSION_hinotify(0,3,10) -import qualified Data.ByteString.Char8 as BS +import Control.Concurrent +import qualified Control.Exception as X +import Control.Monad (forM_) +import qualified Data.ByteString.Char8 as BS +import qualified Data.ByteString.Lazy.Char8 as BL +import qualified Data.Map as M +import Data.Maybe +import qualified Data.Text as T +import Data.Time +import Data.Time.Clock.POSIX +import Data.Version (showVersion) +import Data.Word (Word64) +import Magic +import Network.AMQP +import Network.AMQP.Types +import Network.AMQP.Utils.Connection +import Network.AMQP.Utils.Helpers +import Network.AMQP.Utils.Options +import Paths_amqp_utils (version) +import qualified System.Directory.OsPath as DO +import System.Environment +import System.Exit +#if linux_HOST_OS +import System.INotify #endif -import Data.List (isSuffixOf) -import Data.Maybe -import qualified Data.Text as T -import Data.Time -import Data.Time.Clock.POSIX -import Data.Version (showVersion) -import Data.Word (Word64) -import Magic -import Network.AMQP -import Network.AMQP.Types -import Network.AMQP.Utils.Connection -import Network.AMQP.Utils.Helpers -import Network.AMQP.Utils.Options -import Paths_amqp_utils (version) -import System.Environment -import System.INotify -import qualified System.Posix.Files as F +import System.Posix.ByteString (RawFilePath) +import qualified System.Posix.Files.ByteString as FB +import qualified System.Posix.Files.PosixString as FP +import qualified System.OsPath as OS +import qualified System.File.OsPath as FOS main :: IO () main = do hr "starting" - args <- getArgs >>= parseargs "agitprop" - printparam' "client version" $ "amqp-utils " ++ (showVersion version) - printparam' "routing key" $ rKey args - isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory - if isDir - then printparam' "hotfolder" $ inputFile args - else printparam' "input file" $ - (inputFile args) ++ - if (lineMode args) - then " (line-by-line)" - else "" + tid <- myThreadId + args <- getArgs >>= parseargs 'a' + printparam "client version" ["amqp-utils", showVersion version] + printparam "routing key" $ rKey args + printparam "exchange" $ currentExchange args (conn, chan) <- connect args - printparam' "confirm mode" $ show $ confirm args + addChannelExceptionHandler chan (X.throwTo tid) + printparam "confirm mode" $ confirm args if (confirm args) then do confirmSelect chan False addConfirmationListener chan confirmCallback else return () - let publishOneMsg = publishOneMsg' chan args + let inputFile' = firstInputFile (inputFiles args) + isDir <- + if inputFile' == "-" + then return False + else FB.getFileStatus inputFile' >>= return . FB.isDirectory + let publishOneMsg = + publishOneMsg' chan args {removeSentFile = removeSentFile args && isDir} + if isDir + then do + printparam "hotfolder mode" True + printparam "initial scan" (initialScan args) + if isNothing (moveSentFileTo args) + then printparam "remove sent file" (removeSentFile args) + else printparam "move sent file to" (moveSentFileTo args) + else printparam + "input file" + [ inputFile' + , if (lineMode args) + then "(line-by-line)" + else "" + ] X.catch (if isDir then do - inotify <- initINotify - wd <- - addWatch - inotify - [CloseWrite, MoveIn] -#if MIN_VERSION_hinotify(0,3,10) - (BS.pack (inputFile args)) +#if linux_HOST_OS + wds <- mapM (watchHotfolder args publishOneMsg) (inputFiles args) + sleepingBeauty >>= (\x -> do + forM_ wds (\(wd,folder) -> do + removeWatch wd + printparam "END watching" folder + ) + X.throw x) #else - (inputFile args) + X.throw (X.ErrorCall "ERROR: watching a directory is only supported in Linux") #endif - (handleEvent publishOneMsg (suffix args) (inputFile args)) - hr $ "BEGIN watching " ++ (inputFile args) - _ <- forever $ threadDelay 1000000 - removeWatch wd - hr $ "END watching " ++ (inputFile args) else do hr $ "BEGIN sending" - messageFile <- BL.readFile (inputFile args) + messageFile <- + if inputFile' == "-" + then BL.getContents + else OS.encodeUtf (BS.unpack inputFile') >>= FOS.readFile if (lineMode args) - then mapM_ (publishOneMsg Nothing) (BL.lines messageFile) - else publishOneMsg (Just (inputFile args)) messageFile - hr "END sending") - (\exception -> printparam' "exception" $ show (exception :: X.SomeException)) - -- all done. wait and close. - if (confirm args) - then waitForConfirms chan >>= (printparam' "confirmed") . show + then mapM_ (publishOneMsg (currentExchange args) (rKey args) Nothing) (BL.lines messageFile) + else publishOneMsg (currentExchange args) (rKey args) (Just (inputFile')) messageFile + hr "END sending" + if (confirm args) + then waitForConfirms chan >>= printparam "confirmed" + else return () + X.catch (closeConnection conn) exceptionHandler + ) + exceptionHandler + +#if linux_HOST_OS +-- | watch a hotfolder +watchHotfolder :: + Args + -> (String -> String -> Maybe RawFilePath -> BL.ByteString -> IO ()) + -> (RawFilePath, String, String) + -> IO (WatchDescriptor, RawFilePath) +watchHotfolder args publishOneMsg (folder, exchange, rkey) = do + printparam "hotfolder" folder + inotify <- initINotify + wd <- + addWatch + inotify + [CloseWrite, MoveIn] + folder + (handleEvent (publishOneMsg exchange rkey) (suffix args) folder) + hr "BEGIN watching" + if (initialScan args) + then do + folder' <- OS.encodeUtf (BS.unpack folder) + DO.listDirectory folder' >>= + mapM_ (\fn -> handleFile (publishOneMsg exchange rkey) (suffix args) (folder' OS. fn)) else return () - closeConnection conn - hr "connection closed" + return (wd,folder) +#endif + +-- | A handler for clean exit +exceptionHandler :: AMQPException -> IO () +exceptionHandler (ChannelClosedException Normal txt) = + printparam "exit" txt >> exitWith ExitSuccess +exceptionHandler (ConnectionClosedException Normal txt) = + printparam "exit" txt >> exitWith ExitSuccess +exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1) -- | The handler for publisher confirms confirmCallback :: (Word64, Bool, AckType) -> IO () confirmCallback (deliveryTag, isAll, ackType) = - printparam' + printparam "confirmed" - ((show deliveryTag) ++ - (if isAll - then " all " - else " this ") ++ - (show ackType)) + [ show deliveryTag + , if isAll + then "all" + else "this" + , show ackType + ] +#if linux_HOST_OS -- | Hotfolder event handler handleEvent :: - (Maybe String -> BL.ByteString -> IO ()) - -> [String] - -> String - -> Event - -> IO () + (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> OS.OsPath -> Event -> IO () -- just handle closewrite and movedin events -#if MIN_VERSION_hinotify(0,3,10) -handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ (BS.unpack x)) -handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ (BS.unpack x)) -#else -handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x) -handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x) -#endif +handleEvent func suffixes folder (Closed False (Just fileName) True) = + handleFile func suffixes (folder OS. fileName) +handleEvent func suffixes folder (MovedIn False fileName _) = + handleFile func suffixes (folder OS. fileName) handleEvent _ _ _ _ = return () -- | Hotfolder file handler handleFile :: - (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO () -handleFile _ _ ('.':_) = return () -- ignore hidden files -handleFile f s@(_:_) x = - if any (flip isSuffixOf x) s - then handleFile f [] x + (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> OS.OsPath -> IO () +handleFile func suffixes@(_:_) fileName = + if (any (flip BS.isSuffixOf fileName) suffixes) && not ("." `BS.isPrefixOf` fileName) + then handleFile func [] fileName else return () -handleFile f [] x = +handleFile func [] fileName = X.catch - (BL.readFile x >>= f (Just x)) - (\exception -> - printparam' "exception in handleFile" $ - show (exception :: X.SomeException)) + (FOS.readFile fileName >>= func (Just fileName)) + (\e -> printparam "exception while processing" fileName >> printparam "exception in handleFile" (e :: X.IOException)) +#endif -- | Publish one message with our settings -publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO () -publishOneMsg' c a fn f = do +publishOneMsg' :: + Channel + -> Args + -> String + -> String + -> Maybe RawFilePath + -> BL.ByteString + -> IO () +publishOneMsg' chan a exchange rkey fn content = do printparam "sending" fn (mtype, mencoding) <- - if (magic a) && isJust fn + if (magic a) then do + let firstchunk = if BL.null content then BS.empty else head $ BL.toChunks content m <- magicOpen [MagicMimeType] magicLoadDefault m - t <- magicFile m (fromJust fn) + t <- BS.useAsCStringLen firstchunk (magicCString m) magicSetFlags m [MagicMimeEncoding] - e <- magicFile m (fromJust fn) + e <- BS.useAsCStringLen firstchunk (magicCString m) return (Just (T.pack t), Just (T.pack e)) else return ((contenttype a), (contentencoding a)) now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds - r <- - publishMsg - c - (T.pack $ currentExchange a) - (T.pack $ rKey a) - newMsg - { msgBody = f - , msgDeliveryMode = persistent a - , msgTimestamp = Just now - , msgID = msgid a - , msgType = msgtype a - , msgUserID = userid a - , msgApplicationID = appid a - , msgClusterID = clusterid a - , msgContentType = mtype - , msgContentEncoding = mencoding - , msgReplyTo = replyto a - , msgPriority = prio a - , msgCorrelationID = corrid a - , msgExpiration = msgexp a - , msgHeaders = substheader (fnheader a) fn $ msgheader a - } - printparam "sent" $ fmap show r + publishMsg + chan + (T.pack $ exchange) + (T.pack $ rkey) + newMsg + { msgBody = content + , msgDeliveryMode = persistent a + , msgTimestamp = Just now + , msgID = msgid a + , msgType = msgtype a + , msgUserID = userid a + , msgApplicationID = appid a + , msgClusterID = clusterid a + , msgContentType = mtype + , msgContentEncoding = mencoding + , msgReplyTo = replyto a + , msgPriority = prio a + , msgCorrelationID = corrid a + , msgExpiration = msgexp a + , msgHeaders = substheader (fnheader a) (fmap OS.takeFileName fn) $ msgheader a + } >>= + printparam "sent" + removeSentFileIfRequested (removeSentFile a) (moveSentFileTo a) fn where substheader :: - [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable + [String] -> Maybe BS.ByteString -> Maybe FieldTable -> Maybe FieldTable substheader (s:r) (Just fname) old = - substheader r (Just fname) (addheader old (s ++ "=" ++ fname)) + substheader r (Just fname) (addheader' old s fname) substheader _ _ old = old + removeSentFileIfRequested False _ _ = return () + removeSentFileIfRequested True _ Nothing = return () + removeSentFileIfRequested True Nothing (Just fname) = + printparam "removing" fname >> DO.removeFile fname + removeSentFileIfRequested True (Just path) (Just fname) = + printparam "moving" [fname,"to",path] >> + FP.rename fname (OS.replaceDirectory fname ((OS.takeDirectory fname) OS. path)) + addheader' :: Maybe FieldTable -> String -> BS.ByteString -> Maybe FieldTable + addheader' Nothing k v = + Just $ FieldTable $ M.singleton (T.pack k) (FVString v) + addheader' (Just (FieldTable oldheader)) k v = + Just $ FieldTable $ M.insert (T.pack k) (FVString v) oldheader