-{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE CPP #-}
-import Paths_amqp_utils ( version )
-import Data.Version ( showVersion )
-import System.Environment
-import qualified Data.Text as T
-import Network.AMQP
-import Network.AMQP.Utils.Options
-import Network.AMQP.Utils.Helpers
-import Network.AMQP.Utils.Connection
-import qualified Data.ByteString.Lazy.Char8 as BL
-import Data.Word ( Word64 )
-import qualified System.Posix.Files as F
-import System.INotify
-import Control.Monad ( forever )
-import Control.Concurrent ( threadDelay )
+-- generic AMQP publisher
+import Control.Concurrent
+import qualified Control.Exception as X
+import Control.Monad (forever)
+import qualified Data.ByteString.Lazy.Char8 as BL
+#if MIN_VERSION_hinotify(0,3,10)
+import qualified Data.ByteString.Char8 as BS
+#endif
+import Data.List (isSuffixOf)
+import Data.Maybe
+import qualified Data.Text as T
+import Data.Time
+import Data.Time.Clock.POSIX
+import Data.Version (showVersion)
+import Data.Word (Word64)
+import Magic
+import Network.AMQP
+import Network.AMQP.Types
+import Network.AMQP.Utils.Connection
+import Network.AMQP.Utils.Helpers
+import Network.AMQP.Utils.Options
+import Paths_amqp_utils (version)
+import System.Environment
+import System.Exit
+import System.INotify
+import qualified System.Posix.Files as F
main :: IO ()
main = do
- hr "starting"
- -- tid <- myThreadId
- args <- getArgs >>= parseargs "agitprop"
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
- printparam' "routing key" $ rKey args
- isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory
- if isDir
- then printparam' "hotfolder" $ inputFile args
- else printparam' "input file" $ (inputFile args) ++ if (lineMode args) then " (line-by-line)" else ""
- (conn, chan) <- connect args
- printparam' "confirm mode" $ show $ confirm args
- if (confirm args)
- then do
- confirmSelect chan False
- addConfirmationListener chan confirmCallback
- else return ()
- let publishOneMsg f = do
- r <- publishMsg chan
- (T.pack $ currentExchange args)
- (T.pack $ rKey args)
- newMsg { msgBody = f
- , msgDeliveryMode = Just Persistent
- }
- printparam "sent" $ fmap show r
- if isDir
- then do
- inotify <- initINotify
- wd <- addWatch inotify [ Close ] (inputFile args) (handleEvent publishOneMsg)
- hr (inputFile args)
- _ <- forever $ threadDelay 1000000
- removeWatch wd
- else do
- hr (inputFile args)
- messageFile <- BL.readFile (inputFile args)
- if (lineMode args)
- then mapM_ publishOneMsg (BL.lines messageFile)
- else publishOneMsg messageFile
+ hr "starting"
+ tid <- myThreadId
+ args <- getArgs >>= parseargs 'a'
+ printparam "client version" ["amqp-utils", showVersion version]
+ printparam "routing key" $ rKey args
+ printparam "exchange" $ currentExchange args
+ isDir <-
+ if inputFile args == "-"
+ then return False
+ else F.getFileStatus (inputFile args) >>= return . F.isDirectory
+ if isDir
+ then printparam "hotfolder" $ inputFile args
+ else printparam
+ "input file"
+ [ inputFile args
+ , if (lineMode args)
+ then "(line-by-line)"
+ else ""
+ ]
+ (conn, chan) <- connect args
+ addChannelExceptionHandler chan (X.throwTo tid)
+ printparam "confirm mode" $ confirm args
+ if (confirm args)
+ then do
+ confirmSelect chan False
+ addConfirmationListener chan confirmCallback
+ else return ()
+ let publishOneMsg = publishOneMsg' chan args
+ X.catch
+ (if isDir
+ then do
+ inotify <- initINotify
+ wd <-
+ addWatch
+ inotify
+ [CloseWrite, MoveIn]
+#if MIN_VERSION_hinotify(0,3,10)
+ (BS.pack (inputFile args))
+#else
+ (inputFile args)
+#endif
+ (handleEvent publishOneMsg (suffix args) (inputFile args))
+ hr $ "BEGIN watching " ++ (inputFile args)
+ _ <- forever $ threadDelay 1000000
+ removeWatch wd
+ hr $ "END watching " ++ (inputFile args)
+ else do
+ hr $ "BEGIN sending"
+ messageFile <-
+ if inputFile args == "-"
+ then BL.getContents
+ else BL.readFile (inputFile args)
+ if (lineMode args)
+ then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
+ else publishOneMsg (Just (inputFile args)) messageFile
+ hr "END sending")
+ exceptionHandler
+ -- all done. wait and close.
+ if (confirm args)
+ then waitForConfirms chan >>= printparam "confirmed"
+ else return ()
+ X.catch (closeConnection conn) exceptionHandler
- -- all done. wait and close.
- if (confirm args)
- then waitForConfirms chan >>= return . show >> return ()
- else return ()
- closeConnection conn
+-- | A handler for clean exit
+exceptionHandler :: AMQPException -> IO ()
+exceptionHandler (ChannelClosedException Normal txt) =
+ printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler (ConnectionClosedException Normal txt) =
+ printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
-- | The handler for publisher confirms
confirmCallback :: (Word64, Bool, AckType) -> IO ()
confirmCallback (deliveryTag, isAll, ackType) =
- printparam' "confirmed"
- ((show deliveryTag) ++
- (if isAll then " all " else " this ") ++ (show ackType))
+ printparam
+ "confirmed"
+ [ show deliveryTag
+ , if isAll
+ then "all"
+ else "this"
+ , show ackType
+ ]
--- | hotfolder event handler
-handleEvent :: (BL.ByteString -> IO ()) -> Event -> IO ()
-handleEvent f (Closed False (Just x) True) = hr x >> BL.readFile x >>= f
-handleEvent _ _ = return ()
+-- | Hotfolder event handler
+handleEvent ::
+ (Maybe String -> BL.ByteString -> IO ())
+ -> [String]
+ -> String
+ -> Event
+ -> IO ()
+-- just handle closewrite and movedin events
+#if MIN_VERSION_hinotify(0,3,10)
+handleEvent f s p (Closed False (Just x) True) =
+ handleFile f s (p ++ "/" ++ (BS.unpack x))
+handleEvent f s p (MovedIn False x _) =
+ handleFile f s (p ++ "/" ++ (BS.unpack x))
+#else
+handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x)
+handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x)
+#endif
+handleEvent _ _ _ _ = return ()
+
+-- | Hotfolder file handler
+handleFile ::
+ (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
+handleFile _ _ ('.':_) = return () -- ignore hidden files
+handleFile f s@(_:_) x =
+ if any (flip isSuffixOf x) s
+ then handleFile f [] x
+ else return ()
+handleFile f [] x =
+ X.catch
+ (BL.readFile x >>= f (Just x))
+ (\e -> printparam "exception in handleFile" (e :: X.SomeException))
+
+-- | Publish one message with our settings
+publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO ()
+publishOneMsg' c a fn f = do
+ printparam "sending" fn
+ (mtype, mencoding) <-
+ if (magic a) && isJust fn
+ then do
+ m <- magicOpen [MagicMimeType]
+ magicLoadDefault m
+ t <- magicFile m (fromJust fn)
+ magicSetFlags m [MagicMimeEncoding]
+ e <- magicFile m (fromJust fn)
+ return (Just (T.pack t), Just (T.pack e))
+ else return ((contenttype a), (contentencoding a))
+ now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
+ publishMsg
+ c
+ (T.pack $ currentExchange a)
+ (T.pack $ rKey a)
+ newMsg
+ { msgBody = f
+ , msgDeliveryMode = persistent a
+ , msgTimestamp = Just now
+ , msgID = msgid a
+ , msgType = msgtype a
+ , msgUserID = userid a
+ , msgApplicationID = appid a
+ , msgClusterID = clusterid a
+ , msgContentType = mtype
+ , msgContentEncoding = mencoding
+ , msgReplyTo = replyto a
+ , msgPriority = prio a
+ , msgCorrelationID = corrid a
+ , msgExpiration = msgexp a
+ , msgHeaders = substheader (fnheader a) fn $ msgheader a
+ } >>=
+ printparam "sent"
+ where
+ substheader ::
+ [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
+ substheader (s:r) (Just fname) old =
+ substheader r (Just fname) (addheader old (s ++ "=" ++ fname))
+ substheader _ _ old = old