]> woffs.de Git - fd/haskell-amqp-utils.git/blobdiff - agitprop.hs
formatting
[fd/haskell-amqp-utils.git] / agitprop.hs
index 1beef5f908e15b476b4f17cf156eb50593f5e51d..80585b61772cccf4c2366a6e6be140bfa643b8d3 100644 (file)
-{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE CPP #-}
 
-import           Paths_amqp_utils           ( version )
-import           Data.Version               ( showVersion )
-import           System.Environment
-import qualified Data.Text                  as T
-import           Network.AMQP
-import           Network.AMQP.Utils.Options
-import           Network.AMQP.Utils.Helpers
-import           Network.AMQP.Utils.Connection
+-- generic AMQP publisher
+import Control.Concurrent
+import qualified Control.Exception as X
+import Control.Monad (forever)
 import qualified Data.ByteString.Lazy.Char8 as BL
+#if MIN_VERSION_hinotify(0,3,10)
+import qualified Data.ByteString.Char8 as BS
+#endif
+import Data.List (isSuffixOf)
+import Data.Maybe
+import qualified Data.Text as T
+import Data.Time
+import Data.Time.Clock.POSIX
+import Data.Version (showVersion)
+import Data.Word (Word64)
+import Magic
+import Network.AMQP
+import Network.AMQP.Types
+import Network.AMQP.Utils.Connection
+import Network.AMQP.Utils.Helpers
+import Network.AMQP.Utils.Options
+import Paths_amqp_utils (version)
+import System.Environment
+import System.Exit
+import System.INotify
+import qualified System.Posix.Files as F
 
 main :: IO ()
 main = do
-    hr "starting"
-    --  tid <- myThreadId
-    args <- getArgs >>= parseargs "agitprop"
-    printparam' "client version" $ "amqp-utils " ++ (showVersion version)
-    messageFile <- BL.readFile (inputFile args)
-    (conn, chan) <- connect args
-    let publishOneMsg f = publishMsg chan
-                                     (T.pack $ currentExchange args)
-                                     (T.pack $ rKey args)
-                                     newMsg { msgBody = f
-                                            , msgDeliveryMode = Just Persistent
-                                            }
-    _ <- if (lineMode args)
-         then mapM_ publishOneMsg (BL.lines messageFile)
-         else publishOneMsg messageFile >> return ()
+  hr "starting"
+  tid <- myThreadId
+  args <- getArgs >>= parseargs 'a'
+  printparam "client version" ["amqp-utils", showVersion version]
+  printparam "routing key" $ rKey args
+  printparam "exchange" $ currentExchange args
+  isDir <-
+    if inputFile args == "-"
+      then return False
+      else F.getFileStatus (inputFile args) >>= return . F.isDirectory
+  if isDir
+    then printparam "hotfolder" $ inputFile args
+    else printparam
+           "input file"
+           [ inputFile args
+           , if (lineMode args)
+               then "(line-by-line)"
+               else ""
+           ]
+  (conn, chan) <- connect args
+  addChannelExceptionHandler chan (X.throwTo tid)
+  printparam "confirm mode" $ confirm args
+  if (confirm args)
+    then do
+      confirmSelect chan False
+      addConfirmationListener chan confirmCallback
+    else return ()
+  let publishOneMsg = publishOneMsg' chan args
+  X.catch
+    (if isDir
+       then do
+         inotify <- initINotify
+         wd <-
+           addWatch
+             inotify
+             [CloseWrite, MoveIn]
+#if MIN_VERSION_hinotify(0,3,10)
+             (BS.pack (inputFile args))
+#else
+             (inputFile args)
+#endif
+             (handleEvent publishOneMsg (suffix args) (inputFile args))
+         hr $ "BEGIN watching " ++ (inputFile args)
+         _ <- forever $ threadDelay 1000000
+         removeWatch wd
+         hr $ "END watching " ++ (inputFile args)
+       else do
+         hr $ "BEGIN sending"
+         messageFile <-
+           if inputFile args == "-"
+             then BL.getContents
+             else BL.readFile (inputFile args)
+         if (lineMode args)
+           then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
+           else publishOneMsg (Just (inputFile args)) messageFile
+         hr "END sending")
+    exceptionHandler
+  -- all done. wait and close.
+  if (confirm args)
+    then waitForConfirms chan >>= printparam "confirmed"
+    else return ()
+  X.catch (closeConnection conn) exceptionHandler
 
-    closeConnection conn
+-- | A handler for clean exit
+exceptionHandler :: AMQPException -> IO ()
+exceptionHandler (ChannelClosedException Normal txt) =
+  printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler (ConnectionClosedException Normal txt) =
+  printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
+
+-- | The handler for publisher confirms
+confirmCallback :: (Word64, Bool, AckType) -> IO ()
+confirmCallback (deliveryTag, isAll, ackType) =
+  printparam
+    "confirmed"
+    [ show deliveryTag
+    , if isAll
+        then "all"
+        else "this"
+    , show ackType
+    ]
+
+-- | Hotfolder event handler
+handleEvent ::
+     (Maybe String -> BL.ByteString -> IO ())
+  -> [String]
+  -> String
+  -> Event
+  -> IO ()
+-- just handle closewrite and movedin events
+#if MIN_VERSION_hinotify(0,3,10)
+handleEvent f s p (Closed False (Just x) True) =
+  handleFile f s (p ++ "/" ++ (BS.unpack x))
+handleEvent f s p (MovedIn False x _) =
+  handleFile f s (p ++ "/" ++ (BS.unpack x))
+#else
+handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x)
+handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x)
+#endif
+handleEvent _ _ _ _ = return ()
+
+-- | Hotfolder file handler
+handleFile ::
+     (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
+handleFile _ _ ('.':_) = return () -- ignore hidden files
+handleFile f s@(_:_) x =
+  if any (flip isSuffixOf x) s
+    then handleFile f [] x
+    else return ()
+handleFile f [] x =
+  X.catch
+    (BL.readFile x >>= f (Just x))
+    (\e -> printparam "exception in handleFile" (e :: X.SomeException))
+
+-- | Publish one message with our settings
+publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO ()
+publishOneMsg' c a fn f = do
+  printparam "sending" fn
+  (mtype, mencoding) <-
+    if (magic a) && isJust fn
+      then do
+        m <- magicOpen [MagicMimeType]
+        magicLoadDefault m
+        t <- magicFile m (fromJust fn)
+        magicSetFlags m [MagicMimeEncoding]
+        e <- magicFile m (fromJust fn)
+        return (Just (T.pack t), Just (T.pack e))
+      else return ((contenttype a), (contentencoding a))
+  now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
+  publishMsg
+    c
+    (T.pack $ currentExchange a)
+    (T.pack $ rKey a)
+    newMsg
+      { msgBody = f
+      , msgDeliveryMode = persistent a
+      , msgTimestamp = Just now
+      , msgID = msgid a
+      , msgType = msgtype a
+      , msgUserID = userid a
+      , msgApplicationID = appid a
+      , msgClusterID = clusterid a
+      , msgContentType = mtype
+      , msgContentEncoding = mencoding
+      , msgReplyTo = replyto a
+      , msgPriority = prio a
+      , msgCorrelationID = corrid a
+      , msgExpiration = msgexp a
+      , msgHeaders = substheader (fnheader a) fn $ msgheader a
+      } >>=
+    printparam "sent"
+  where
+    substheader ::
+         [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
+    substheader (s:r) (Just fname) old =
+      substheader r (Just fname) (addheader old (s ++ "=" ++ fname))
+    substheader _ _ old = old