]> woffs.de Git - fd/haskell-amqp-utils.git/commitdiff
agitprop: hotfolder mode
authorFrank Doepper <[email protected]>
Mon, 25 Jun 2018 22:04:02 +0000 (00:04 +0200)
committerFrank Doepper <[email protected]>
Mon, 25 Jun 2018 22:04:02 +0000 (00:04 +0200)
agitprop.hs
amqp-utils.cabal
amqp-utils.nix

index 45e296d01c740777c4a820ac9044791fcbe40586..ef42e713c0487ce3e58ac59861f0aa9643c13142 100644 (file)
@@ -10,6 +10,10 @@ import           Network.AMQP.Utils.Helpers
 import           Network.AMQP.Utils.Connection
 import qualified Data.ByteString.Lazy.Char8    as BL
 import           Data.Word                     ( Word64 )
+import qualified System.Posix.Files            as F
+import           System.INotify
+import           Control.Monad      ( forever )
+import           Control.Concurrent ( threadDelay )
 
 main :: IO ()
 main = do
@@ -18,9 +22,10 @@ main = do
     args <- getArgs >>= parseargs "agitprop"
     printparam' "client version" $ "amqp-utils " ++ (showVersion version)
     printparam' "routing key" $ rKey args
-    printparam' "input file" $
-        (inputFile args) ++ if (lineMode args) then " (line-by-line)" else ""
-    messageFile <- BL.readFile (inputFile args)
+    isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory
+    if isDir
+        then printparam' "hotfolder" $ inputFile args
+        else printparam' "input file" $ (inputFile args) ++ if (lineMode args) then " (line-by-line)" else ""
     (conn, chan) <- connect args
     printparam' "confirm mode" $ show $ confirm args
     if (confirm args)
@@ -36,10 +41,21 @@ main = do
                                    , msgDeliveryMode = Just Persistent
                                    }
             printparam "sent" $ fmap show r
-    if (lineMode args)
-        then mapM_ publishOneMsg (BL.lines messageFile)
-        else publishOneMsg messageFile
+    if isDir
+        then do
+            inotify <- initINotify
+            wd <- addWatch inotify [ Close ] (inputFile args) (handleEvent publishOneMsg)
+            hr (inputFile args)
+            _ <- forever $ threadDelay 1000000
+            removeWatch wd
+        else do
+            hr (inputFile args)
+            messageFile <- BL.readFile (inputFile args)
+            if (lineMode args)
+                then mapM_ publishOneMsg (BL.lines messageFile)
+                else publishOneMsg messageFile
 
+    -- all done. wait and close.
     if (confirm args)
         then waitForConfirms chan >>= return . show >> return ()
         else return ()
@@ -51,3 +67,8 @@ confirmCallback (deliveryTag, isAll, ackType) =
     printparam' "confirmed"
                 ((show deliveryTag) ++
                      (if isAll then " all " else " this ") ++ (show ackType))
+
+-- | hotfolder event handler
+handleEvent :: (BL.ByteString -> IO ()) -> Event -> IO ()
+handleEvent f (Closed False (Just x) True) = hr x >> BL.readFile x >>= f
+handleEvent _ _ = return ()
index d1b039d792f010b4acc6ae6824ad05d84ef54fb1..8e328ab47d64a5b17a09c8fbb9169707abe8ac67 100644 (file)
@@ -64,7 +64,9 @@ executable agitprop
                        bytestring,
                        x509-system,
                        tls,
-                       amqp >=0.17
+                       amqp >=0.17,
+                       unix >= 2.7,
+                       hinotify >= 0.3.8 && < 0.3.10
   
   ghc-options:         -threaded -Wall
   
index 0e58d1d46e2603fe36fba64df243f6a977a7f0d4..353c4521e23a7f0aa5237918d89893e93923038c 100644 (file)
@@ -1,5 +1,6 @@
 { mkDerivation, amqp, base, bytestring, connection, containers
 , data-default-class, process, stdenv, text, time, tls, x509-system
+, unix, hinotify
 }:
 mkDerivation {
   pname = "amqp-utils";
@@ -10,7 +11,7 @@ mkDerivation {
   enableSharedExecutables = false;
   executableHaskellDepends = [
     amqp base bytestring connection containers data-default-class
-    process text time tls x509-system
+    process text time tls x509-system unix hinotify
   ];
   description = "Generic Haskell AMQP Consumer";
   license = stdenv.lib.licenses.gpl3;