2 {-# LANGUAGE OverloadedStrings #-}
4 -- generic AMQP publisher
5 import Control.Concurrent
6 import qualified Control.Exception as X
7 import Control.Monad (forM_)
8 import qualified Data.ByteString.Lazy.Char8 as BL
10 import qualified Data.ByteString.Char8 as BS
11 import Data.List (isSuffixOf)
14 import qualified Data.Text as T
16 import Data.Time.Clock.POSIX
17 import Data.Version (showVersion)
18 import Data.Word (Word64)
21 import Network.AMQP.Types
22 import Network.AMQP.Utils.Connection
23 import Network.AMQP.Utils.Helpers
24 import Network.AMQP.Utils.Options
25 import Paths_amqp_utils (version)
26 import System.Directory
27 import System.Environment
29 import System.FilePath.Posix
33 import qualified System.Posix.Files as F
39 args <- getArgs >>= parseargs 'a'
40 printparam "client version" ["amqp-utils", showVersion version]
41 printparam "routing key" $ rKey args
42 printparam "exchange" $ currentExchange args
43 (conn, chan) <- connect args
44 addChannelExceptionHandler chan (X.throwTo tid)
45 printparam "confirm mode" $ confirm args
48 confirmSelect chan False
49 addConfirmationListener chan confirmCallback
51 let inputFile' = firstInputFile (inputFiles args)
55 else F.getFileStatus inputFile' >>= return . F.isDirectory
57 publishOneMsg' chan args {removeSentFile = removeSentFile args && isDir}
59 then printparam "initial scan" (initialScan args) >>
60 if isNothing (moveSentFileTo args)
61 then printparam "remove sent file" (removeSentFile args)
62 else printparam "move sent file to" (moveSentFileTo args)
73 wds <- mapM (watchHotfolder args publishOneMsg) (inputFiles args)
74 sleepingBeauty >>= printparam "exception"
75 forM_ wds (\(wd,folder) -> do
77 hr $ "END watching " ++ folder
84 else BL.readFile (inputFile')
86 then mapM_ (publishOneMsg (currentExchange args) (rKey args) Nothing Nothing) (BL.lines messageFile)
87 else publishOneMsg (currentExchange args) (rKey args) Nothing (Just (inputFile')) messageFile
90 -- all done. wait and close.
92 then waitForConfirms chan >>= printparam "confirmed"
94 X.catch (closeConnection conn) exceptionHandler
96 -- | watch a hotfolder
99 -> (String -> String -> Maybe String -> Maybe String -> BL.ByteString -> IO ())
100 -> (String, String, String)
101 -> IO (WatchDescriptor,String)
102 watchHotfolder args publishOneMsg (folder, exchange, rkey) = do
103 printparam "hotfolder" folder
105 setCurrentDirectory folder
106 if (initialScan args)
107 then getDirectoryContents "." >>=
108 mapM_ (\fn -> handleFile (publishOneMsg exchange rkey (Just folder)) (suffix args) (Just folder) fn)
110 inotify <- initINotify
116 (handleEvent (publishOneMsg exchange rkey (Just folder)) (suffix args) (Just folder))
117 hr $ "BEGIN watching " ++ folder
120 X.throw (X.ErrorCall "ERROR: watching a directory is only supported in Linux")
123 -- | A handler for clean exit
124 exceptionHandler :: AMQPException -> IO ()
125 exceptionHandler (ChannelClosedException Normal txt) =
126 printparam "exit" txt >> exitWith ExitSuccess
127 exceptionHandler (ConnectionClosedException Normal txt) =
128 printparam "exit" txt >> exitWith ExitSuccess
129 exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
131 -- | The handler for publisher confirms
132 confirmCallback :: (Word64, Bool, AckType) -> IO ()
133 confirmCallback (deliveryTag, isAll, ackType) =
143 -- | Hotfolder event handler
145 (Maybe String -> BL.ByteString -> IO ()) -> [String] -> Maybe FilePath -> Event -> IO ()
146 -- just handle closewrite and movedin events
147 handleEvent func suffixes folder (Closed False (Just fileName) True) =
148 handleFile func suffixes folder (BS.unpack fileName)
149 handleEvent func suffixes folder (MovedIn False fileName _) =
150 handleFile func suffixes folder (BS.unpack fileName)
151 handleEvent _ _ _ _ = return ()
153 -- | Hotfolder file handler
155 (Maybe String -> BL.ByteString -> IO ()) -> [String] -> Maybe FilePath -> FilePath -> IO ()
156 handleFile _ _ _ ('.':_) = return () -- ignore hidden files
157 handleFile func suffixes@(_:_) folder fileName =
158 if any (flip isSuffixOf fileName) suffixes
159 then handleFile func [] folder fileName
161 handleFile func [] folder fileName =
163 (mapM_ setCurrentDirectory folder >> BL.readFile fileName >>= func (Just fileName))
164 (\e -> printparam "exception in handleFile" (e :: X.IOException))
166 -- | Publish one message with our settings
176 publishOneMsg' chan a exchange rkey folder fn content = do
177 printparam "sending" [folder, fn]
178 (mtype, mencoding) <-
179 if (magic a) && isJust fn
181 m <- magicOpen [MagicMimeType]
183 t <- magicFile m (fromJust fn)
184 magicSetFlags m [MagicMimeEncoding]
185 e <- magicFile m (fromJust fn)
186 return (Just (T.pack t), Just (T.pack e))
187 else return ((contenttype a), (contentencoding a))
188 now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
195 , msgDeliveryMode = persistent a
196 , msgTimestamp = Just now
198 , msgType = msgtype a
199 , msgUserID = userid a
200 , msgApplicationID = appid a
201 , msgClusterID = clusterid a
202 , msgContentType = mtype
203 , msgContentEncoding = mencoding
204 , msgReplyTo = replyto a
205 , msgPriority = prio a
206 , msgCorrelationID = corrid a
207 , msgExpiration = msgexp a
208 , msgHeaders = substheader (fnheader a) fn $ msgheader a
211 removeSentFileIfRequested (removeSentFile a) (moveSentFileTo a) fn
214 [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
215 substheader (s:r) (Just fname) old =
216 substheader r (Just fname) (addheader old (s ++ "=" ++ fname))
217 substheader _ _ old = old
218 removeSentFileIfRequested False _ _ = return ()
219 removeSentFileIfRequested True _ Nothing = return ()
220 removeSentFileIfRequested True Nothing (Just fname) =
221 printparam "removing" fname >> removeFile fname
222 removeSentFileIfRequested True (Just path) (Just fname) =
223 printparam "moving" (fname ++ " to " ++ path) >>
224 renameFile fname (replaceDirectory fname path)