]> woffs.de Git - fd/haskell-amqp-utils.git/blobdiff - agitprop.hs
WiP rabbit hole
[fd/haskell-amqp-utils.git] / agitprop.hs
index 32369b62d360fe27d3d0fc2c38ab130be6b04670..445f686f80e0aca7bef4a69b5119146c4f49a7f3 100644 (file)
@@ -1,32 +1,41 @@
+-- SPDX-FileCopyrightText: 2022 Frank Doepper
+--
+-- SPDX-License-Identifier: GPL-3.0-only
+
 {-# LANGUAGE CPP               #-}
 {-# LANGUAGE OverloadedStrings #-}
 
 -- generic AMQP publisher
 import           Control.Concurrent
-import qualified Control.Exception             as X
-import qualified Data.ByteString.Lazy.Char8    as BL
-#if MIN_VERSION_hinotify(0,3,10)
-import qualified Data.ByteString.Char8         as BS
-#endif
-import           Data.List                     (isSuffixOf)
+import qualified Control.Exception                as X
+import           Control.Monad                    (forM_)
+import qualified Data.ByteString.Char8            as BS
+import qualified Data.ByteString.Lazy.Char8       as BL
+import qualified Data.Map                         as M
 import           Data.Maybe
-import qualified Data.Text                     as T
+import qualified Data.Text                        as T
 import           Data.Time
 import           Data.Time.Clock.POSIX
-import           Data.Version                  (showVersion)
-import           Data.Word                     (Word64)
+import           Data.Version                     (showVersion)
+import           Data.Word                        (Word64)
 import           Magic
 import           Network.AMQP
 import           Network.AMQP.Types
 import           Network.AMQP.Utils.Connection
 import           Network.AMQP.Utils.Helpers
 import           Network.AMQP.Utils.Options
-import           Paths_amqp_utils              (version)
-import           System.Directory
+import           Paths_amqp_utils                 (version)
+import qualified System.Directory.OsPath          as DO
 import           System.Environment
 import           System.Exit
+#if linux_HOST_OS
 import           System.INotify
-import qualified System.Posix.Files            as F
+#endif
+import           System.Posix.ByteString           (RawFilePath)
+import qualified System.Posix.Files.ByteString    as FB
+import qualified System.Posix.Files.PosixString   as FP
+import qualified System.OsPath                    as OS
+import qualified System.File.OsPath               as FOS
 
 main :: IO ()
 main = do
@@ -36,65 +45,91 @@ main = do
   printparam "client version" ["amqp-utils", showVersion version]
   printparam "routing key" $ rKey args
   printparam "exchange" $ currentExchange args
+  (conn, chan) <- connect args
+  addChannelExceptionHandler chan (X.throwTo tid)
+  printparam "confirm mode" $ confirm args
+  if (confirm args)
+    then do
+      confirmSelect chan False
+      addConfirmationListener chan confirmCallback
+    else return ()
+  let inputFile' = firstInputFile (inputFiles args)
   isDir <-
-    if inputFile args == "-"
+    if inputFile' == "-"
       then return False
-      else F.getFileStatus (inputFile args) >>= return . F.isDirectory
+      else FB.getFileStatus inputFile' >>= return . FB.isDirectory
+  let publishOneMsg =
+        publishOneMsg' chan args {removeSentFile = removeSentFile args && isDir}
   if isDir
-    then
-      printparam "hotfolder"    (inputFile   args) >>
+    then do
+      printparam "hotfolder mode" True
       printparam "initial scan" (initialScan args)
+      if isNothing (moveSentFileTo args)
+        then printparam "remove sent file" (removeSentFile args)
+        else printparam "move sent file to" (moveSentFileTo args)
     else printparam
            "input file"
-           [ inputFile args
+           [ inputFile'
            , if (lineMode args)
                then "(line-by-line)"
                else ""
            ]
-  printparam "remove sent file" (removeSentFile args && isDir)
-  (conn, chan) <- connect args
-  addChannelExceptionHandler chan (X.throwTo tid)
-  printparam "confirm mode" $ confirm args
-  if (confirm args)
-    then do
-      confirmSelect chan False
-      addConfirmationListener chan confirmCallback
-    else return ()
-  let publishOneMsg = publishOneMsg' chan args {removeSentFile = removeSentFile args && isDir}
   X.catch
     (if isDir
        then do
-         setCurrentDirectory (inputFile args)
-         if (initialScan args)
-           then getDirectoryContents "." >>= mapM_ (\fn -> handleFile publishOneMsg (suffix args) fn)
-           else return()
-         inotify <- initINotify
-         wd <-
-           addWatch
-             inotify
-             [CloseWrite, MoveIn]
-             "."
-             (handleEvent publishOneMsg (suffix args))
-         hr $ "BEGIN watching " ++ (inputFile args)
-         sleepingBeauty >>= printparam "exception"
-         removeWatch wd
-         hr $ "END watching " ++ (inputFile args)
+#if linux_HOST_OS
+         wds <- mapM (watchHotfolder args publishOneMsg) (inputFiles args)
+         sleepingBeauty >>= (\x -> do
+           forM_ wds (\(wd,folder) -> do
+             removeWatch wd
+             printparam "END watching" folder
+             )
+           X.throw x)
+#else
+         X.throw (X.ErrorCall "ERROR: watching a directory is only supported in Linux")
+#endif
        else do
          hr $ "BEGIN sending"
          messageFile <-
-           if inputFile args == "-"
+           if inputFile' == "-"
              then BL.getContents
-             else BL.readFile (inputFile args)
+             else OS.encodeUtf (BS.unpack inputFile') >>= FOS.readFile
          if (lineMode args)
-           then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
-           else publishOneMsg (Just (inputFile args)) messageFile
-         hr "END sending")
+           then mapM_ (publishOneMsg (currentExchange args) (rKey args) Nothing) (BL.lines messageFile)
+           else publishOneMsg (currentExchange args) (rKey args) (Just (inputFile')) messageFile
+         hr "END sending"
+         if (confirm args)
+           then waitForConfirms chan >>= printparam "confirmed"
+           else return ()
+         X.catch (closeConnection conn) exceptionHandler
+         )
     exceptionHandler
-  -- all done. wait and close.
-  if (confirm args)
-    then waitForConfirms chan >>= printparam "confirmed"
+
+#if linux_HOST_OS
+-- | watch a hotfolder
+watchHotfolder ::
+     Args
+  -> (String -> String -> Maybe RawFilePath -> BL.ByteString -> IO ())
+  -> (RawFilePath, String, String)
+  -> IO (WatchDescriptor, RawFilePath)
+watchHotfolder args publishOneMsg (folder, exchange, rkey) = do
+  printparam "hotfolder" folder
+  inotify <- initINotify
+  wd <-
+   addWatch
+     inotify
+     [CloseWrite, MoveIn]
+     folder
+     (handleEvent (publishOneMsg exchange rkey) (suffix args) folder)
+  hr "BEGIN watching"
+  if (initialScan args)
+    then do
+      folder' <- OS.encodeUtf (BS.unpack folder)
+      DO.listDirectory folder' >>=
+        mapM_ (\fn -> handleFile (publishOneMsg exchange rkey) (suffix args) (folder' OS.</> fn))
     else return ()
-  X.catch (closeConnection conn) exceptionHandler
+  return (wd,folder)
+#endif
 
 -- | A handler for clean exit
 exceptionHandler :: AMQPException -> IO ()
@@ -116,56 +151,57 @@ confirmCallback (deliveryTag, isAll, ackType) =
     , show ackType
     ]
 
+#if linux_HOST_OS
 -- | Hotfolder event handler
 handleEvent ::
-     (Maybe String -> BL.ByteString -> IO ())
-  -> [String]
-  -> Event
-  -> IO ()
+     (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> OS.OsPath -> Event -> IO ()
 -- just handle closewrite and movedin events
-#if MIN_VERSION_hinotify(0,3,10)
-handleEvent func suffixes (Closed False (Just fileName) True) =
-  handleFile func suffixes (BS.unpack fileName)
-handleEvent func suffixes (MovedIn False fileName _) =
-  handleFile func suffixes (BS.unpack fileName)
-#else
-handleEvent func suffixes (Closed False (Just fileName) True) = handleFile func suffixes fileName
-handleEvent func suffixes (MovedIn False fileName _) = handleFile func suffixes fileName
-#endif
-handleEvent _ _ _ = return ()
+handleEvent func suffixes folder (Closed False (Just fileName) True) =
+  handleFile func suffixes (folder OS.</> fileName)
+handleEvent func suffixes folder (MovedIn False fileName _) =
+  handleFile func suffixes (folder OS.</> fileName)
+handleEvent _ _ _ _ = return ()
 
 -- | Hotfolder file handler
 handleFile ::
-     (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
-handleFile _ _ ('.':_) = return () -- ignore hidden files
+     (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> OS.OsPath -> IO ()
 handleFile func suffixes@(_:_) fileName =
-  if any (flip isSuffixOf fileName) suffixes
+  if (any (flip BS.isSuffixOf fileName) suffixes) && not ("." `BS.isPrefixOf` fileName)
     then handleFile func [] fileName
     else return ()
 handleFile func [] fileName =
   X.catch
-    (BL.readFile fileName >>= func (Just fileName))
-    (\e -> printparam "exception in handleFile" (e :: X.SomeException))
+    (FOS.readFile fileName >>= func (Just fileName))
+    (\e -> printparam "exception while processing" fileName >> printparam "exception in handleFile" (e :: X.IOException))
+#endif
 
 -- | Publish one message with our settings
-publishOneMsg' :: Channel -> Args -> Maybe FilePath -> BL.ByteString -> IO ()
-publishOneMsg' chan a fn content = do
+publishOneMsg' ::
+     Channel
+  -> Args
+  -> String
+  -> String
+  -> Maybe RawFilePath
+  -> BL.ByteString
+  -> IO ()
+publishOneMsg' chan a exchange rkey fn content = do
   printparam "sending" fn
   (mtype, mencoding) <-
-    if (magic a) && isJust fn
+    if (magic a)
       then do
+        let firstchunk = if BL.null content then BS.empty else head $ BL.toChunks content
         m <- magicOpen [MagicMimeType]
         magicLoadDefault m
-        t <- magicFile m (fromJust fn)
+        t <- BS.useAsCStringLen firstchunk (magicCString m)
         magicSetFlags m [MagicMimeEncoding]
-        e <- magicFile m (fromJust fn)
+        e <- BS.useAsCStringLen firstchunk (magicCString m)
         return (Just (T.pack t), Just (T.pack e))
       else return ((contenttype a), (contentencoding a))
   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
   publishMsg
     chan
-    (T.pack $ currentExchange a)
-    (T.pack $ rKey a)
+    (T.pack $ exchange)
+    (T.pack $ rkey)
     newMsg
       { msgBody = content
       , msgDeliveryMode = persistent a
@@ -181,16 +217,25 @@ publishOneMsg' chan a fn content = do
       , msgPriority = prio a
       , msgCorrelationID = corrid a
       , msgExpiration = msgexp a
-      , msgHeaders = substheader (fnheader a) fn $ msgheader a
+      , msgHeaders = substheader (fnheader a) (fmap OS.takeFileName fn) $ msgheader a
       } >>=
     printparam "sent"
-  removeSentFileIfRequested (removeSentFile a) fn
+  removeSentFileIfRequested (removeSentFile a) (moveSentFileTo a) fn
   where
     substheader ::
-         [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
+         [String] -> Maybe BS.ByteString -> Maybe FieldTable -> Maybe FieldTable
     substheader (s:r) (Just fname) old =
-      substheader r (Just fname) (addheader old (s ++ "=" ++ fname))
+      substheader r (Just fname) (addheader' old s fname)
     substheader _ _ old = old
-    removeSentFileIfRequested False _           = return ()
-    removeSentFileIfRequested True Nothing      = return ()
-    removeSentFileIfRequested True (Just fname) = printparam "removing" fname >> removeFile fname
+    removeSentFileIfRequested False _ _ = return ()
+    removeSentFileIfRequested True _ Nothing = return ()
+    removeSentFileIfRequested True Nothing (Just fname) =
+      printparam "removing" fname >> DO.removeFile fname
+    removeSentFileIfRequested True (Just path) (Just fname) =
+      printparam "moving" [fname,"to",path] >>
+      FP.rename fname (OS.replaceDirectory fname ((OS.takeDirectory fname) OS.</> path))
+    addheader' :: Maybe FieldTable -> String -> BS.ByteString -> Maybe FieldTable
+    addheader' Nothing k v =
+      Just $ FieldTable $ M.singleton (T.pack k) (FVString v)
+    addheader' (Just (FieldTable oldheader)) k v =
+      Just $ FieldTable $ M.insert (T.pack k) (FVString v) oldheader
don't click here