-{-# LANGUAGE OverloadedStrings #-}
-
+-- generic AMQP publisher
import Control.Concurrent (threadDelay)
import qualified Control.Exception as X
import Control.Monad (forever)
import qualified Data.ByteString.Lazy.Char8 as BL
+import Data.List (isSuffixOf)
+import Data.Maybe
import qualified Data.Text as T
+import Data.Time
+import Data.Time.Clock.POSIX
import Data.Version (showVersion)
import Data.Word (Word64)
+import Magic
import Network.AMQP
+import Network.AMQP.Types
import Network.AMQP.Utils.Connection
import Network.AMQP.Utils.Helpers
import Network.AMQP.Utils.Options
inotify
[CloseWrite, MoveIn]
(inputFile args)
- (handleEvent publishOneMsg)
- hr $ "watching " ++ (inputFile args)
+ (handleEvent publishOneMsg (suffix args) (inputFile args))
+ hr $ "BEGIN watching " ++ (inputFile args)
_ <- forever $ threadDelay 1000000
removeWatch wd
+ hr $ "END watching " ++ (inputFile args)
else do
- hr $ "sending " ++ (inputFile args)
+ hr $ "BEGIN sending"
messageFile <- BL.readFile (inputFile args)
if (lineMode args)
- then mapM_ publishOneMsg (BL.lines messageFile)
- else publishOneMsg messageFile)
+ then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
+ else publishOneMsg (Just (inputFile args)) messageFile
+ hr "END sending")
(\exception -> printparam' "exception" $ show (exception :: X.SomeException))
-- all done. wait and close.
if (confirm args)
- then waitForConfirms chan >>= return . show >> return ()
+ then waitForConfirms chan >>= (printparam' "confirmed") . show
else return ()
closeConnection conn
+ hr "connection closed"
-- | The handler for publisher confirms
confirmCallback :: (Word64, Bool, AckType) -> IO ()
(show ackType))
-- | Hotfolder event handler
-handleEvent :: (BL.ByteString -> IO ()) -> Event -> IO ()
+handleEvent ::
+ (Maybe String -> BL.ByteString -> IO ())
+ -> [String]
+ -> String
+ -> Event
+ -> IO ()
-- just handle closewrite and movedin events
-handleEvent f (Closed False (Just x) True) = handleFile f x
-handleEvent f (MovedIn False x _) = handleFile f x
-handleEvent _ _ = return ()
+handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x)
+handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x)
+handleEvent _ _ _ _ = return ()
-- | Hotfolder file handler
-handleFile :: (BL.ByteString -> IO ()) -> FilePath -> IO ()
-handleFile _ ('.':_) = return () -- ignore hidden files
-handleFile f x =
+handleFile ::
+ (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
+handleFile _ _ ('.':_) = return () -- ignore hidden files
+handleFile f s@(_:_) x =
+ if any (flip isSuffixOf x) s
+ then handleFile f [] x
+ else return ()
+handleFile f [] x =
X.catch
- (hr ("sending " ++ x) >> BL.readFile x >>= f)
+ (BL.readFile x >>= f (Just x))
(\exception ->
printparam' "exception in handleFile" $
show (exception :: X.SomeException))
-- | Publish one message with our settings
-publishOneMsg' :: Channel -> Args -> BL.ByteString -> IO ()
-publishOneMsg' c a f = do
- r <-
- publishMsg
- c
- (T.pack $ currentExchange a)
- (T.pack $ rKey a)
- newMsg { msgBody = f
- , msgDeliveryMode = Just Persistent
- , msgTimestamp = msgtimestamp a
- , msgID = msgid a
- , msgType = msgtype a
- , msgUserID = msguserid a
- , msgApplicationID = msgappid a
- , msgClusterID = msgclusterid a
- , msgContentType = msgcontenttype a
- , msgContentEncoding = msgcontentencoding a
- , msgReplyTo = msgreplyto a
- , msgPriority = msgprio a
- , msgCorrelationID = msgcorrid a
- , msgExpiration = msgexp a
- -- , msgHeaders =
- }
- printparam "sent" $ fmap show r
+publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO ()
+publishOneMsg' c a fn f = do
+ printparam "sending" fn
+ (mtype, mencoding) <-
+ if (magic a) && isJust fn
+ then do
+ m <- magicOpen [MagicMimeType]
+ magicLoadDefault m
+ t <- magicFile m (fromJust fn)
+ magicSetFlags m [MagicMimeEncoding]
+ e <- magicFile m (fromJust fn)
+ return (Just (T.pack t), Just (T.pack e))
+ else return ((contenttype a), (contentencoding a))
+ now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
+ r <-
+ publishMsg
+ c
+ (T.pack $ currentExchange a)
+ (T.pack $ rKey a)
+ newMsg
+ { msgBody = f
+ , msgDeliveryMode = persistent a
+ , msgTimestamp = Just now
+ , msgID = msgid a
+ , msgType = msgtype a
+ , msgUserID = userid a
+ , msgApplicationID = appid a
+ , msgClusterID = clusterid a
+ , msgContentType = mtype
+ , msgContentEncoding = mencoding
+ , msgReplyTo = replyto a
+ , msgPriority = prio a
+ , msgCorrelationID = corrid a
+ , msgExpiration = msgexp a
+ , msgHeaders = substheader (fnheader a) fn $ msgheader a
+ }
+ printparam "sent" $ fmap show r
+ where
+ substheader ::
+ [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
+ substheader (s:r) (Just fname) old =
+ substheader r (Just fname) (addheader old (s ++ "=" ++ fname))
+ substheader _ _ old = old