X-Git-Url: https://woffs.de/git/fd/haskell-amqp-utils.git/blobdiff_plain/7ce94a064818e0ce1d911c5d9aba2095058ad513..eada32fb2ddd6837a80d6e958b970234d4f87780:/agitprop.hs diff --git a/agitprop.hs b/agitprop.hs index 1beef5f..480307a 100644 --- a/agitprop.hs +++ b/agitprop.hs @@ -1,31 +1,156 @@ -{-# LANGUAGE OverloadedStrings #-} - -import Paths_amqp_utils ( version ) -import Data.Version ( showVersion ) -import System.Environment -import qualified Data.Text as T -import Network.AMQP -import Network.AMQP.Utils.Options -import Network.AMQP.Utils.Helpers -import Network.AMQP.Utils.Connection +-- generic AMQP publisher +import Control.Concurrent (threadDelay) +import qualified Control.Exception as X +import Control.Monad (forever) import qualified Data.ByteString.Lazy.Char8 as BL +import Data.List (isSuffixOf) +import Data.Maybe +import qualified Data.Text as T +import Data.Time +import Data.Time.Clock.POSIX +import Data.Version (showVersion) +import Data.Word (Word64) +import Magic +import Network.AMQP +import Network.AMQP.Types +import Network.AMQP.Utils.Connection +import Network.AMQP.Utils.Helpers +import Network.AMQP.Utils.Options +import Paths_amqp_utils (version) +import System.Environment +import System.INotify +import qualified System.Posix.Files as F main :: IO () main = do - hr "starting" - -- tid <- myThreadId - args <- getArgs >>= parseargs "agitprop" - printparam' "client version" $ "amqp-utils " ++ (showVersion version) - messageFile <- BL.readFile (inputFile args) - (conn, chan) <- connect args - let publishOneMsg f = publishMsg chan - (T.pack $ currentExchange args) - (T.pack $ rKey args) - newMsg { msgBody = f - , msgDeliveryMode = Just Persistent - } - _ <- if (lineMode args) - then mapM_ publishOneMsg (BL.lines messageFile) - else publishOneMsg messageFile >> return () + hr "starting" + args <- getArgs >>= parseargs "agitprop" + printparam' "client version" $ "amqp-utils " ++ (showVersion version) + printparam' "routing key" $ rKey args + isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory + if isDir + then printparam' "hotfolder" $ inputFile args + else printparam' "input file" $ + (inputFile args) ++ + if (lineMode args) + then " (line-by-line)" + else "" + (conn, chan) <- connect args + printparam' "confirm mode" $ show $ confirm args + if (confirm args) + then do + confirmSelect chan False + addConfirmationListener chan confirmCallback + else return () + let publishOneMsg = publishOneMsg' chan args + X.catch + (if isDir + then do + inotify <- initINotify + wd <- + addWatch + inotify + [CloseWrite, MoveIn] + (inputFile args) + (handleEvent publishOneMsg (suffix args) (inputFile args)) + hr $ "BEGIN watching " ++ (inputFile args) + _ <- forever $ threadDelay 1000000 + removeWatch wd + hr $ "END watching " ++ (inputFile args) + else do + hr $ "BEGIN sending" + messageFile <- BL.readFile (inputFile args) + if (lineMode args) + then mapM_ (publishOneMsg Nothing) (BL.lines messageFile) + else publishOneMsg (Just (inputFile args)) messageFile + hr "END sending") + (\exception -> printparam' "exception" $ show (exception :: X.SomeException)) + -- all done. wait and close. + if (confirm args) + then waitForConfirms chan >>= (printparam' "confirmed") . show + else return () + closeConnection conn + hr "connection closed" + +-- | The handler for publisher confirms +confirmCallback :: (Word64, Bool, AckType) -> IO () +confirmCallback (deliveryTag, isAll, ackType) = + printparam' + "confirmed" + ((show deliveryTag) ++ + (if isAll + then " all " + else " this ") ++ + (show ackType)) + +-- | Hotfolder event handler +handleEvent :: + (Maybe String -> BL.ByteString -> IO ()) + -> [String] + -> String + -> Event + -> IO () +-- just handle closewrite and movedin events +handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x) +handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x) +handleEvent _ _ _ _ = return () + +-- | Hotfolder file handler +handleFile :: + (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO () +handleFile _ _ ('.':_) = return () -- ignore hidden files +handleFile f s@(_:_) x = + if any (flip isSuffixOf x) s + then handleFile f [] x + else return () +handleFile f [] x = + X.catch + (BL.readFile x >>= f (Just x)) + (\exception -> + printparam' "exception in handleFile" $ + show (exception :: X.SomeException)) - closeConnection conn +-- | Publish one message with our settings +publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO () +publishOneMsg' c a fn f = do + printparam "sending" fn + (mtype, mencoding) <- + if (magic a) && isJust fn + then do + m <- magicOpen [MagicMimeType] + magicLoadDefault m + t <- magicFile m (fromJust fn) + magicSetFlags m [MagicMimeEncoding] + e <- magicFile m (fromJust fn) + return (Just (T.pack t), Just (T.pack e)) + else return ((contenttype a), (contentencoding a)) + now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds + r <- + publishMsg + c + (T.pack $ currentExchange a) + (T.pack $ rKey a) + newMsg + { msgBody = f + , msgDeliveryMode = persistent a + , msgTimestamp = Just now + , msgID = msgid a + , msgType = msgtype a + , msgUserID = userid a + , msgApplicationID = appid a + , msgClusterID = clusterid a + , msgContentType = mtype + , msgContentEncoding = mencoding + , msgReplyTo = replyto a + , msgPriority = prio a + , msgCorrelationID = corrid a + , msgExpiration = msgexp a + , msgHeaders = substheader (fnheader a) fn $ msgheader a + } + printparam "sent" $ fmap show r + where + substheader :: + [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable + substheader (s:r) (Just fname) old = + substheader r (Just fname) (addheader old (s ++ "=" ++ fname)) + substheader _ _ old = old