]> woffs.de Git - fd/haskell-amqp-utils.git/blobdiff - agitprop.hs
option cleanup
[fd/haskell-amqp-utils.git] / agitprop.hs
index ef42e713c0487ce3e58ac59861f0aa9643c13142..fd4922d298b621a7094b9e95db20168f8febe555 100644 (file)
 {-# LANGUAGE OverloadedStrings #-}
 
-import           Paths_amqp_utils              ( version )
-import           Data.Version                  ( showVersion )
-import           System.Environment
-import qualified Data.Text                     as T
-import           Network.AMQP
-import           Network.AMQP.Utils.Options
-import           Network.AMQP.Utils.Helpers
-import           Network.AMQP.Utils.Connection
-import qualified Data.ByteString.Lazy.Char8    as BL
-import           Data.Word                     ( Word64 )
-import qualified System.Posix.Files            as F
-import           System.INotify
-import           Control.Monad      ( forever )
-import           Control.Concurrent ( threadDelay )
+import Control.Concurrent (threadDelay)
+import qualified Control.Exception as X
+import Control.Monad (forever)
+import qualified Data.ByteString.Lazy.Char8 as BL
+import qualified Data.Text as T
+import Data.List (isSuffixOf)
+import Data.Time
+import Data.Time.Clock.POSIX
+import Data.Version (showVersion)
+import Data.Word (Word64)
+import Network.AMQP
+import Network.AMQP.Types
+import Network.AMQP.Utils.Connection
+import Network.AMQP.Utils.Helpers
+import Network.AMQP.Utils.Options
+import Paths_amqp_utils (version)
+import System.Environment
+import System.INotify
+import qualified System.Posix.Files as F
 
 main :: IO ()
 main = do
-    hr "starting"
-    --  tid <- myThreadId
-    args <- getArgs >>= parseargs "agitprop"
-    printparam' "client version" $ "amqp-utils " ++ (showVersion version)
-    printparam' "routing key" $ rKey args
-    isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory
-    if isDir
-        then printparam' "hotfolder" $ inputFile args
-        else printparam' "input file" $ (inputFile args) ++ if (lineMode args) then " (line-by-line)" else ""
-    (conn, chan) <- connect args
-    printparam' "confirm mode" $ show $ confirm args
-    if (confirm args)
-        then do
-            confirmSelect chan False
-            addConfirmationListener chan confirmCallback
-        else return ()
-    let publishOneMsg f = do
-            r <- publishMsg chan
-                            (T.pack $ currentExchange args)
-                            (T.pack $ rKey args)
-                            newMsg { msgBody = f
-                                   , msgDeliveryMode = Just Persistent
-                                   }
-            printparam "sent" $ fmap show r
-    if isDir
-        then do
-            inotify <- initINotify
-            wd <- addWatch inotify [ Close ] (inputFile args) (handleEvent publishOneMsg)
-            hr (inputFile args)
-            _ <- forever $ threadDelay 1000000
-            removeWatch wd
-        else do
-            hr (inputFile args)
-            messageFile <- BL.readFile (inputFile args)
-            if (lineMode args)
-                then mapM_ publishOneMsg (BL.lines messageFile)
-                else publishOneMsg messageFile
-
-    -- all done. wait and close.
-    if (confirm args)
-        then waitForConfirms chan >>= return . show >> return ()
-        else return ()
-    closeConnection conn
+  hr "starting"
+  args <- getArgs >>= parseargs "agitprop"
+  printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+  printparam' "routing key" $ rKey args
+  isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory
+  if isDir
+    then printparam' "hotfolder" $ inputFile args
+    else printparam' "input file" $
+         (inputFile args) ++
+         if (lineMode args)
+           then " (line-by-line)"
+           else ""
+  (conn, chan) <- connect args
+  printparam' "confirm mode" $ show $ confirm args
+  if (confirm args)
+    then do
+      confirmSelect chan False
+      addConfirmationListener chan confirmCallback
+    else return ()
+  let publishOneMsg = publishOneMsg' chan args
+  X.catch
+    (if isDir
+       then do
+         inotify <- initINotify
+         wd <-
+           addWatch
+             inotify
+             [CloseWrite, MoveIn]
+             (inputFile args)
+             (handleEvent publishOneMsg (suffix args))
+         hr $ "watching " ++ (inputFile args)
+         _ <- forever $ threadDelay 1000000
+         removeWatch wd
+       else do
+         hr $ "sending " ++ (inputFile args)
+         messageFile <- BL.readFile (inputFile args)
+         if (lineMode args)
+           then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
+           else publishOneMsg (Just (inputFile args)) messageFile)
+    (\exception -> printparam' "exception" $ show (exception :: X.SomeException))
+  -- all done. wait and close.
+  if (confirm args)
+    then waitForConfirms chan >>= return . show >> return ()
+    else return ()
+  closeConnection conn
 
 -- | The handler for publisher confirms
 confirmCallback :: (Word64, Bool, AckType) -> IO ()
 confirmCallback (deliveryTag, isAll, ackType) =
-    printparam' "confirmed"
-                ((show deliveryTag) ++
-                     (if isAll then " all " else " this ") ++ (show ackType))
+  printparam'
+    "confirmed"
+    ((show deliveryTag) ++
+     (if isAll
+        then " all "
+        else " this ") ++
+     (show ackType))
+
+-- | Hotfolder event handler
+handleEvent :: (Maybe String -> BL.ByteString -> IO ()) -> [ String ] -> Event -> IO ()
+-- just handle closewrite and movedin events
+handleEvent f s (Closed False (Just x) True) = handleFile f s x
+handleEvent f s (MovedIn False x _) = handleFile f s x
+handleEvent _ _ _ = return ()
+
+-- | Hotfolder file handler
+handleFile :: (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
+handleFile _ _ ('.':_) = return () -- ignore hidden files
+handleFile f s@(_:_) x = if any (flip isSuffixOf x) s then handleFile f [] x else return ()
+handleFile f [] x =
+  X.catch
+    (hr ("sending " ++ x) >> BL.readFile x >>= f (Just x))
+    (\exception ->
+       printparam' "exception in handleFile" $
+       show (exception :: X.SomeException))
 
--- | hotfolder event handler
-handleEvent :: (BL.ByteString -> IO ()) -> Event -> IO ()
-handleEvent f (Closed False (Just x) True) = hr x >> BL.readFile x >>= f
-handleEvent _ _ = return ()
+-- | Publish one message with our settings
+publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO ()
+publishOneMsg' c a fn f = do
+  now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
+  r <-
+    publishMsg
+      c
+      (T.pack $ currentExchange a)
+      (T.pack $ rKey a)
+      newMsg
+        { msgBody = f
+        , msgDeliveryMode = Just Persistent
+        , msgTimestamp = Just now
+        , msgID = msgid a
+        , msgType = msgtype a
+        , msgUserID = userid a
+        , msgApplicationID = appid a
+        , msgClusterID = clusterid a
+        , msgContentType = contenttype a
+        , msgContentEncoding = contentencoding a
+        , msgReplyTo = replyto a
+        , msgPriority = prio a
+        , msgCorrelationID = corrid a
+        , msgExpiration = msgexp a
+        , msgHeaders = substheader (fnheader a) fn $ msgheader a
+        }
+  printparam "sent" $ fmap show r
+  where
+    substheader :: [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
+    substheader (s:r) (Just fname) old =
+      substheader r (Just fname) (addheader old (s ++ "=" ++ fname))
+    substheader _ _ old = old
don't click here