X-Git-Url: https://woffs.de/git/fd/haskell-amqp-utils.git/blobdiff_plain/ecda45ae058424d1a556fbb36c1833046a4f07bb..eada32fb2ddd6837a80d6e958b970234d4f87780:/agitprop.hs diff --git a/agitprop.hs b/agitprop.hs index 2a15291..480307a 100644 --- a/agitprop.hs +++ b/agitprop.hs @@ -1,13 +1,18 @@ -{-# LANGUAGE OverloadedStrings #-} - +-- generic AMQP publisher import Control.Concurrent (threadDelay) import qualified Control.Exception as X import Control.Monad (forever) import qualified Data.ByteString.Lazy.Char8 as BL +import Data.List (isSuffixOf) +import Data.Maybe import qualified Data.Text as T +import Data.Time +import Data.Time.Clock.POSIX import Data.Version (showVersion) import Data.Word (Word64) +import Magic import Network.AMQP +import Network.AMQP.Types import Network.AMQP.Utils.Connection import Network.AMQP.Utils.Helpers import Network.AMQP.Utils.Options @@ -47,22 +52,25 @@ main = do inotify [CloseWrite, MoveIn] (inputFile args) - (handleEvent publishOneMsg) - hr $ "watching " ++ (inputFile args) + (handleEvent publishOneMsg (suffix args) (inputFile args)) + hr $ "BEGIN watching " ++ (inputFile args) _ <- forever $ threadDelay 1000000 removeWatch wd + hr $ "END watching " ++ (inputFile args) else do - hr $ "sending " ++ (inputFile args) + hr $ "BEGIN sending" messageFile <- BL.readFile (inputFile args) if (lineMode args) - then mapM_ publishOneMsg (BL.lines messageFile) - else publishOneMsg messageFile) + then mapM_ (publishOneMsg Nothing) (BL.lines messageFile) + else publishOneMsg (Just (inputFile args)) messageFile + hr "END sending") (\exception -> printparam' "exception" $ show (exception :: X.SomeException)) -- all done. wait and close. if (confirm args) - then waitForConfirms chan >>= return . show >> return () + then waitForConfirms chan >>= (printparam' "confirmed") . show else return () closeConnection conn + hr "connection closed" -- | The handler for publisher confirms confirmCallback :: (Word64, Bool, AckType) -> IO () @@ -76,44 +84,73 @@ confirmCallback (deliveryTag, isAll, ackType) = (show ackType)) -- | Hotfolder event handler -handleEvent :: (BL.ByteString -> IO ()) -> Event -> IO () +handleEvent :: + (Maybe String -> BL.ByteString -> IO ()) + -> [String] + -> String + -> Event + -> IO () -- just handle closewrite and movedin events -handleEvent f (Closed False (Just x) True) = handleFile f x -handleEvent f (MovedIn False x _) = handleFile f x -handleEvent _ _ = return () +handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x) +handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x) +handleEvent _ _ _ _ = return () -- | Hotfolder file handler -handleFile :: (BL.ByteString -> IO ()) -> FilePath -> IO () -handleFile _ ('.':_) = return () -- ignore hidden files -handleFile f x = +handleFile :: + (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO () +handleFile _ _ ('.':_) = return () -- ignore hidden files +handleFile f s@(_:_) x = + if any (flip isSuffixOf x) s + then handleFile f [] x + else return () +handleFile f [] x = X.catch - (hr ("sending " ++ x) >> BL.readFile x >>= f) + (BL.readFile x >>= f (Just x)) (\exception -> printparam' "exception in handleFile" $ show (exception :: X.SomeException)) -- | Publish one message with our settings -publishOneMsg' :: Channel -> Args -> BL.ByteString -> IO () -publishOneMsg' c a f = do - r <- - publishMsg - c - (T.pack $ currentExchange a) - (T.pack $ rKey a) - newMsg { msgBody = f - , msgDeliveryMode = Just Persistent - , msgTimestamp = msgtimestamp a - , msgID = msgid a - , msgType = msgtype a - , msgUserID = msguserid a - , msgApplicationID = msgappid a - , msgClusterID = msgclusterid a - , msgContentType = msgcontenttype a - , msgContentEncoding = msgcontentencoding a - , msgReplyTo = msgreplyto a - , msgPriority = msgprio a - , msgCorrelationID = msgcorrid a - , msgExpiration = msgexp a - -- , msgHeaders = - } - printparam "sent" $ fmap show r +publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO () +publishOneMsg' c a fn f = do + printparam "sending" fn + (mtype, mencoding) <- + if (magic a) && isJust fn + then do + m <- magicOpen [MagicMimeType] + magicLoadDefault m + t <- magicFile m (fromJust fn) + magicSetFlags m [MagicMimeEncoding] + e <- magicFile m (fromJust fn) + return (Just (T.pack t), Just (T.pack e)) + else return ((contenttype a), (contentencoding a)) + now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds + r <- + publishMsg + c + (T.pack $ currentExchange a) + (T.pack $ rKey a) + newMsg + { msgBody = f + , msgDeliveryMode = persistent a + , msgTimestamp = Just now + , msgID = msgid a + , msgType = msgtype a + , msgUserID = userid a + , msgApplicationID = appid a + , msgClusterID = clusterid a + , msgContentType = mtype + , msgContentEncoding = mencoding + , msgReplyTo = replyto a + , msgPriority = prio a + , msgCorrelationID = corrid a + , msgExpiration = msgexp a + , msgHeaders = substheader (fnheader a) fn $ msgheader a + } + printparam "sent" $ fmap show r + where + substheader :: + [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable + substheader (s:r) (Just fname) old = + substheader r (Just fname) (addheader old (s ++ "=" ++ fname)) + substheader _ _ old = old