]> woffs.de Git - fd/haskell-amqp-utils.git/blobdiff - agitprop.hs
WiP rabbit hole
[fd/haskell-amqp-utils.git] / agitprop.hs
index 1beef5f908e15b476b4f17cf156eb50593f5e51d..445f686f80e0aca7bef4a69b5119146c4f49a7f3 100644 (file)
+-- SPDX-FileCopyrightText: 2022 Frank Doepper
+--
+-- SPDX-License-Identifier: GPL-3.0-only
+
+{-# LANGUAGE CPP               #-}
 {-# LANGUAGE OverloadedStrings #-}
 
-import           Paths_amqp_utils           ( version )
-import           Data.Version               ( showVersion )
-import           System.Environment
-import qualified Data.Text                  as T
+-- generic AMQP publisher
+import           Control.Concurrent
+import qualified Control.Exception                as X
+import           Control.Monad                    (forM_)
+import qualified Data.ByteString.Char8            as BS
+import qualified Data.ByteString.Lazy.Char8       as BL
+import qualified Data.Map                         as M
+import           Data.Maybe
+import qualified Data.Text                        as T
+import           Data.Time
+import           Data.Time.Clock.POSIX
+import           Data.Version                     (showVersion)
+import           Data.Word                        (Word64)
+import           Magic
 import           Network.AMQP
-import           Network.AMQP.Utils.Options
-import           Network.AMQP.Utils.Helpers
+import           Network.AMQP.Types
 import           Network.AMQP.Utils.Connection
-import qualified Data.ByteString.Lazy.Char8 as BL
+import           Network.AMQP.Utils.Helpers
+import           Network.AMQP.Utils.Options
+import           Paths_amqp_utils                 (version)
+import qualified System.Directory.OsPath          as DO
+import           System.Environment
+import           System.Exit
+#if linux_HOST_OS
+import           System.INotify
+#endif
+import           System.Posix.ByteString           (RawFilePath)
+import qualified System.Posix.Files.ByteString    as FB
+import qualified System.Posix.Files.PosixString   as FP
+import qualified System.OsPath                    as OS
+import qualified System.File.OsPath               as FOS
 
 main :: IO ()
 main = do
-    hr "starting"
-    --  tid <- myThreadId
-    args <- getArgs >>= parseargs "agitprop"
-    printparam' "client version" $ "amqp-utils " ++ (showVersion version)
-    messageFile <- BL.readFile (inputFile args)
-    (conn, chan) <- connect args
-    let publishOneMsg f = publishMsg chan
-                                     (T.pack $ currentExchange args)
-                                     (T.pack $ rKey args)
-                                     newMsg { msgBody = f
-                                            , msgDeliveryMode = Just Persistent
-                                            }
-    _ <- if (lineMode args)
-         then mapM_ publishOneMsg (BL.lines messageFile)
-         else publishOneMsg messageFile >> return ()
-
-    closeConnection conn
+  hr "starting"
+  tid <- myThreadId
+  args <- getArgs >>= parseargs 'a'
+  printparam "client version" ["amqp-utils", showVersion version]
+  printparam "routing key" $ rKey args
+  printparam "exchange" $ currentExchange args
+  (conn, chan) <- connect args
+  addChannelExceptionHandler chan (X.throwTo tid)
+  printparam "confirm mode" $ confirm args
+  if (confirm args)
+    then do
+      confirmSelect chan False
+      addConfirmationListener chan confirmCallback
+    else return ()
+  let inputFile' = firstInputFile (inputFiles args)
+  isDir <-
+    if inputFile' == "-"
+      then return False
+      else FB.getFileStatus inputFile' >>= return . FB.isDirectory
+  let publishOneMsg =
+        publishOneMsg' chan args {removeSentFile = removeSentFile args && isDir}
+  if isDir
+    then do
+      printparam "hotfolder mode" True
+      printparam "initial scan" (initialScan args)
+      if isNothing (moveSentFileTo args)
+        then printparam "remove sent file" (removeSentFile args)
+        else printparam "move sent file to" (moveSentFileTo args)
+    else printparam
+           "input file"
+           [ inputFile'
+           , if (lineMode args)
+               then "(line-by-line)"
+               else ""
+           ]
+  X.catch
+    (if isDir
+       then do
+#if linux_HOST_OS
+         wds <- mapM (watchHotfolder args publishOneMsg) (inputFiles args)
+         sleepingBeauty >>= (\x -> do
+           forM_ wds (\(wd,folder) -> do
+             removeWatch wd
+             printparam "END watching" folder
+             )
+           X.throw x)
+#else
+         X.throw (X.ErrorCall "ERROR: watching a directory is only supported in Linux")
+#endif
+       else do
+         hr $ "BEGIN sending"
+         messageFile <-
+           if inputFile' == "-"
+             then BL.getContents
+             else OS.encodeUtf (BS.unpack inputFile') >>= FOS.readFile
+         if (lineMode args)
+           then mapM_ (publishOneMsg (currentExchange args) (rKey args) Nothing) (BL.lines messageFile)
+           else publishOneMsg (currentExchange args) (rKey args) (Just (inputFile')) messageFile
+         hr "END sending"
+         if (confirm args)
+           then waitForConfirms chan >>= printparam "confirmed"
+           else return ()
+         X.catch (closeConnection conn) exceptionHandler
+         )
+    exceptionHandler
+
+#if linux_HOST_OS
+-- | watch a hotfolder
+watchHotfolder ::
+     Args
+  -> (String -> String -> Maybe RawFilePath -> BL.ByteString -> IO ())
+  -> (RawFilePath, String, String)
+  -> IO (WatchDescriptor, RawFilePath)
+watchHotfolder args publishOneMsg (folder, exchange, rkey) = do
+  printparam "hotfolder" folder
+  inotify <- initINotify
+  wd <-
+   addWatch
+     inotify
+     [CloseWrite, MoveIn]
+     folder
+     (handleEvent (publishOneMsg exchange rkey) (suffix args) folder)
+  hr "BEGIN watching"
+  if (initialScan args)
+    then do
+      folder' <- OS.encodeUtf (BS.unpack folder)
+      DO.listDirectory folder' >>=
+        mapM_ (\fn -> handleFile (publishOneMsg exchange rkey) (suffix args) (folder' OS.</> fn))
+    else return ()
+  return (wd,folder)
+#endif
+
+-- | A handler for clean exit
+exceptionHandler :: AMQPException -> IO ()
+exceptionHandler (ChannelClosedException Normal txt) =
+  printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler (ConnectionClosedException Normal txt) =
+  printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
+
+-- | The handler for publisher confirms
+confirmCallback :: (Word64, Bool, AckType) -> IO ()
+confirmCallback (deliveryTag, isAll, ackType) =
+  printparam
+    "confirmed"
+    [ show deliveryTag
+    , if isAll
+        then "all"
+        else "this"
+    , show ackType
+    ]
+
+#if linux_HOST_OS
+-- | Hotfolder event handler
+handleEvent ::
+     (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> OS.OsPath -> Event -> IO ()
+-- just handle closewrite and movedin events
+handleEvent func suffixes folder (Closed False (Just fileName) True) =
+  handleFile func suffixes (folder OS.</> fileName)
+handleEvent func suffixes folder (MovedIn False fileName _) =
+  handleFile func suffixes (folder OS.</> fileName)
+handleEvent _ _ _ _ = return ()
+
+-- | Hotfolder file handler
+handleFile ::
+     (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> OS.OsPath -> IO ()
+handleFile func suffixes@(_:_) fileName =
+  if (any (flip BS.isSuffixOf fileName) suffixes) && not ("." `BS.isPrefixOf` fileName)
+    then handleFile func [] fileName
+    else return ()
+handleFile func [] fileName =
+  X.catch
+    (FOS.readFile fileName >>= func (Just fileName))
+    (\e -> printparam "exception while processing" fileName >> printparam "exception in handleFile" (e :: X.IOException))
+#endif
+
+-- | Publish one message with our settings
+publishOneMsg' ::
+     Channel
+  -> Args
+  -> String
+  -> String
+  -> Maybe RawFilePath
+  -> BL.ByteString
+  -> IO ()
+publishOneMsg' chan a exchange rkey fn content = do
+  printparam "sending" fn
+  (mtype, mencoding) <-
+    if (magic a)
+      then do
+        let firstchunk = if BL.null content then BS.empty else head $ BL.toChunks content
+        m <- magicOpen [MagicMimeType]
+        magicLoadDefault m
+        t <- BS.useAsCStringLen firstchunk (magicCString m)
+        magicSetFlags m [MagicMimeEncoding]
+        e <- BS.useAsCStringLen firstchunk (magicCString m)
+        return (Just (T.pack t), Just (T.pack e))
+      else return ((contenttype a), (contentencoding a))
+  now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
+  publishMsg
+    chan
+    (T.pack $ exchange)
+    (T.pack $ rkey)
+    newMsg
+      { msgBody = content
+      , msgDeliveryMode = persistent a
+      , msgTimestamp = Just now
+      , msgID = msgid a
+      , msgType = msgtype a
+      , msgUserID = userid a
+      , msgApplicationID = appid a
+      , msgClusterID = clusterid a
+      , msgContentType = mtype
+      , msgContentEncoding = mencoding
+      , msgReplyTo = replyto a
+      , msgPriority = prio a
+      , msgCorrelationID = corrid a
+      , msgExpiration = msgexp a
+      , msgHeaders = substheader (fnheader a) (fmap OS.takeFileName fn) $ msgheader a
+      } >>=
+    printparam "sent"
+  removeSentFileIfRequested (removeSentFile a) (moveSentFileTo a) fn
+  where
+    substheader ::
+         [String] -> Maybe BS.ByteString -> Maybe FieldTable -> Maybe FieldTable
+    substheader (s:r) (Just fname) old =
+      substheader r (Just fname) (addheader' old s fname)
+    substheader _ _ old = old
+    removeSentFileIfRequested False _ _ = return ()
+    removeSentFileIfRequested True _ Nothing = return ()
+    removeSentFileIfRequested True Nothing (Just fname) =
+      printparam "removing" fname >> DO.removeFile fname
+    removeSentFileIfRequested True (Just path) (Just fname) =
+      printparam "moving" [fname,"to",path] >>
+      FP.rename fname (OS.replaceDirectory fname ((OS.takeDirectory fname) OS.</> path))
+    addheader' :: Maybe FieldTable -> String -> BS.ByteString -> Maybe FieldTable
+    addheader' Nothing k v =
+      Just $ FieldTable $ M.singleton (T.pack k) (FVString v)
+    addheader' (Just (FieldTable oldheader)) k v =
+      Just $ FieldTable $ M.insert (T.pack k) (FVString v) oldheader
don't click here