- hr "starting"
- -- tid <- myThreadId
- args <- getArgs >>= parseargs "agitprop"
- printparam' "client version" $ "amqp-utils " ++ (showVersion version)
- messageFile <- BL.readFile (inputFile args)
- (conn, chan) <- connect args
- let publishOneMsg f = publishMsg chan
- (T.pack $ currentExchange args)
- (T.pack $ rKey args)
- newMsg { msgBody = f
- , msgDeliveryMode = Just Persistent
- }
- _ <- if (lineMode args)
- then mapM_ publishOneMsg (BL.lines messageFile)
- else publishOneMsg messageFile >> return ()
+ hr "starting"
+ args <- getArgs >>= parseargs "agitprop"
+ printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+ printparam' "routing key" $ rKey args
+ isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory
+ if isDir
+ then printparam' "hotfolder" $ inputFile args
+ else printparam' "input file" $
+ (inputFile args) ++
+ if (lineMode args)
+ then " (line-by-line)"
+ else ""
+ (conn, chan) <- connect args
+ printparam' "confirm mode" $ show $ confirm args
+ if (confirm args)
+ then do
+ confirmSelect chan False
+ addConfirmationListener chan confirmCallback
+ else return ()
+ let publishOneMsg = publishOneMsg' chan args
+ X.catch
+ (if isDir
+ then do
+ inotify <- initINotify
+ wd <-
+ addWatch
+ inotify
+ [CloseWrite, MoveIn]
+ (inputFile args)
+ (handleEvent publishOneMsg (suffix args) (inputFile args))
+ hr $ "BEGIN watching " ++ (inputFile args)
+ _ <- forever $ threadDelay 1000000
+ removeWatch wd
+ hr $ "END watching " ++ (inputFile args)
+ else do
+ hr $ "BEGIN sending"
+ messageFile <- BL.readFile (inputFile args)
+ if (lineMode args)
+ then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
+ else publishOneMsg (Just (inputFile args)) messageFile
+ hr "END sending")
+ (\exception -> printparam' "exception" $ show (exception :: X.SomeException))
+ -- all done. wait and close.
+ if (confirm args)
+ then waitForConfirms chan >>= (printparam' "confirmed") . show
+ else return ()
+ closeConnection conn
+ hr "connection closed"
+
+-- | The handler for publisher confirms
+confirmCallback :: (Word64, Bool, AckType) -> IO ()
+confirmCallback (deliveryTag, isAll, ackType) =
+ printparam'
+ "confirmed"
+ ((show deliveryTag) ++
+ (if isAll
+ then " all "
+ else " this ") ++
+ (show ackType))
+
+-- | Hotfolder event handler
+handleEvent ::
+ (Maybe String -> BL.ByteString -> IO ())
+ -> [String]
+ -> String
+ -> Event
+ -> IO ()
+-- just handle closewrite and movedin events
+handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x)
+handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x)
+handleEvent _ _ _ _ = return ()
+
+-- | Hotfolder file handler
+handleFile ::
+ (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
+handleFile _ _ ('.':_) = return () -- ignore hidden files
+handleFile f s@(_:_) x =
+ if any (flip isSuffixOf x) s
+ then handleFile f [] x
+ else return ()
+handleFile f [] x =
+ X.catch
+ (BL.readFile x >>= f (Just x))
+ (\exception ->
+ printparam' "exception in handleFile" $
+ show (exception :: X.SomeException))