X-Git-Url: https://woffs.de/git/fd/haskell-amqp-utils.git/blobdiff_plain/09d9ebc2743d18cd45a445d48df24f4db9c65714..eada32fb2ddd6837a80d6e958b970234d4f87780:/agitprop.hs diff --git a/agitprop.hs b/agitprop.hs index ef42e71..480307a 100644 --- a/agitprop.hs +++ b/agitprop.hs @@ -1,74 +1,156 @@ -{-# LANGUAGE OverloadedStrings #-} - -import Paths_amqp_utils ( version ) -import Data.Version ( showVersion ) -import System.Environment -import qualified Data.Text as T -import Network.AMQP -import Network.AMQP.Utils.Options -import Network.AMQP.Utils.Helpers -import Network.AMQP.Utils.Connection -import qualified Data.ByteString.Lazy.Char8 as BL -import Data.Word ( Word64 ) -import qualified System.Posix.Files as F -import System.INotify -import Control.Monad ( forever ) -import Control.Concurrent ( threadDelay ) +-- generic AMQP publisher +import Control.Concurrent (threadDelay) +import qualified Control.Exception as X +import Control.Monad (forever) +import qualified Data.ByteString.Lazy.Char8 as BL +import Data.List (isSuffixOf) +import Data.Maybe +import qualified Data.Text as T +import Data.Time +import Data.Time.Clock.POSIX +import Data.Version (showVersion) +import Data.Word (Word64) +import Magic +import Network.AMQP +import Network.AMQP.Types +import Network.AMQP.Utils.Connection +import Network.AMQP.Utils.Helpers +import Network.AMQP.Utils.Options +import Paths_amqp_utils (version) +import System.Environment +import System.INotify +import qualified System.Posix.Files as F main :: IO () main = do - hr "starting" - -- tid <- myThreadId - args <- getArgs >>= parseargs "agitprop" - printparam' "client version" $ "amqp-utils " ++ (showVersion version) - printparam' "routing key" $ rKey args - isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory - if isDir - then printparam' "hotfolder" $ inputFile args - else printparam' "input file" $ (inputFile args) ++ if (lineMode args) then " (line-by-line)" else "" - (conn, chan) <- connect args - printparam' "confirm mode" $ show $ confirm args - if (confirm args) - then do - confirmSelect chan False - addConfirmationListener chan confirmCallback - else return () - let publishOneMsg f = do - r <- publishMsg chan - (T.pack $ currentExchange args) - (T.pack $ rKey args) - newMsg { msgBody = f - , msgDeliveryMode = Just Persistent - } - printparam "sent" $ fmap show r - if isDir - then do - inotify <- initINotify - wd <- addWatch inotify [ Close ] (inputFile args) (handleEvent publishOneMsg) - hr (inputFile args) - _ <- forever $ threadDelay 1000000 - removeWatch wd - else do - hr (inputFile args) - messageFile <- BL.readFile (inputFile args) - if (lineMode args) - then mapM_ publishOneMsg (BL.lines messageFile) - else publishOneMsg messageFile - - -- all done. wait and close. - if (confirm args) - then waitForConfirms chan >>= return . show >> return () - else return () - closeConnection conn + hr "starting" + args <- getArgs >>= parseargs "agitprop" + printparam' "client version" $ "amqp-utils " ++ (showVersion version) + printparam' "routing key" $ rKey args + isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory + if isDir + then printparam' "hotfolder" $ inputFile args + else printparam' "input file" $ + (inputFile args) ++ + if (lineMode args) + then " (line-by-line)" + else "" + (conn, chan) <- connect args + printparam' "confirm mode" $ show $ confirm args + if (confirm args) + then do + confirmSelect chan False + addConfirmationListener chan confirmCallback + else return () + let publishOneMsg = publishOneMsg' chan args + X.catch + (if isDir + then do + inotify <- initINotify + wd <- + addWatch + inotify + [CloseWrite, MoveIn] + (inputFile args) + (handleEvent publishOneMsg (suffix args) (inputFile args)) + hr $ "BEGIN watching " ++ (inputFile args) + _ <- forever $ threadDelay 1000000 + removeWatch wd + hr $ "END watching " ++ (inputFile args) + else do + hr $ "BEGIN sending" + messageFile <- BL.readFile (inputFile args) + if (lineMode args) + then mapM_ (publishOneMsg Nothing) (BL.lines messageFile) + else publishOneMsg (Just (inputFile args)) messageFile + hr "END sending") + (\exception -> printparam' "exception" $ show (exception :: X.SomeException)) + -- all done. wait and close. + if (confirm args) + then waitForConfirms chan >>= (printparam' "confirmed") . show + else return () + closeConnection conn + hr "connection closed" -- | The handler for publisher confirms confirmCallback :: (Word64, Bool, AckType) -> IO () confirmCallback (deliveryTag, isAll, ackType) = - printparam' "confirmed" - ((show deliveryTag) ++ - (if isAll then " all " else " this ") ++ (show ackType)) + printparam' + "confirmed" + ((show deliveryTag) ++ + (if isAll + then " all " + else " this ") ++ + (show ackType)) + +-- | Hotfolder event handler +handleEvent :: + (Maybe String -> BL.ByteString -> IO ()) + -> [String] + -> String + -> Event + -> IO () +-- just handle closewrite and movedin events +handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x) +handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x) +handleEvent _ _ _ _ = return () + +-- | Hotfolder file handler +handleFile :: + (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO () +handleFile _ _ ('.':_) = return () -- ignore hidden files +handleFile f s@(_:_) x = + if any (flip isSuffixOf x) s + then handleFile f [] x + else return () +handleFile f [] x = + X.catch + (BL.readFile x >>= f (Just x)) + (\exception -> + printparam' "exception in handleFile" $ + show (exception :: X.SomeException)) --- | hotfolder event handler -handleEvent :: (BL.ByteString -> IO ()) -> Event -> IO () -handleEvent f (Closed False (Just x) True) = hr x >> BL.readFile x >>= f -handleEvent _ _ = return () +-- | Publish one message with our settings +publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO () +publishOneMsg' c a fn f = do + printparam "sending" fn + (mtype, mencoding) <- + if (magic a) && isJust fn + then do + m <- magicOpen [MagicMimeType] + magicLoadDefault m + t <- magicFile m (fromJust fn) + magicSetFlags m [MagicMimeEncoding] + e <- magicFile m (fromJust fn) + return (Just (T.pack t), Just (T.pack e)) + else return ((contenttype a), (contentencoding a)) + now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds + r <- + publishMsg + c + (T.pack $ currentExchange a) + (T.pack $ rKey a) + newMsg + { msgBody = f + , msgDeliveryMode = persistent a + , msgTimestamp = Just now + , msgID = msgid a + , msgType = msgtype a + , msgUserID = userid a + , msgApplicationID = appid a + , msgClusterID = clusterid a + , msgContentType = mtype + , msgContentEncoding = mencoding + , msgReplyTo = replyto a + , msgPriority = prio a + , msgCorrelationID = corrid a + , msgExpiration = msgexp a + , msgHeaders = substheader (fnheader a) fn $ msgheader a + } + printparam "sent" $ fmap show r + where + substheader :: + [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable + substheader (s:r) (Just fname) old = + substheader r (Just fname) (addheader old (s ++ "=" ++ fname)) + substheader _ _ old = old