]> woffs.de Git - fd/haskell-amqp-utils.git/blobdiff - agitprop.hs
formatting
[fd/haskell-amqp-utils.git] / agitprop.hs
index 480307a9e5bcd11df2e0cd62bff2a4cc5c9e9d0a..80585b61772cccf4c2366a6e6be140bfa643b8d3 100644 (file)
@@ -1,8 +1,13 @@
+{-# LANGUAGE CPP #-}
+
 -- generic AMQP publisher
-import Control.Concurrent (threadDelay)
+import Control.Concurrent
 import qualified Control.Exception as X
 import Control.Monad (forever)
 import qualified Data.ByteString.Lazy.Char8 as BL
+#if MIN_VERSION_hinotify(0,3,10)
+import qualified Data.ByteString.Char8 as BS
+#endif
 import Data.List (isSuffixOf)
 import Data.Maybe
 import qualified Data.Text as T
@@ -18,25 +23,34 @@ import Network.AMQP.Utils.Helpers
 import Network.AMQP.Utils.Options
 import Paths_amqp_utils (version)
 import System.Environment
+import System.Exit
 import System.INotify
 import qualified System.Posix.Files as F
 
 main :: IO ()
 main = do
   hr "starting"
-  args <- getArgs >>= parseargs "agitprop"
-  printparam' "client version" $ "amqp-utils " ++ (showVersion version)
-  printparam' "routing key" $ rKey args
-  isDir <- F.getFileStatus (inputFile args) >>= return . F.isDirectory
+  tid <- myThreadId
+  args <- getArgs >>= parseargs 'a'
+  printparam "client version" ["amqp-utils", showVersion version]
+  printparam "routing key" $ rKey args
+  printparam "exchange" $ currentExchange args
+  isDir <-
+    if inputFile args == "-"
+      then return False
+      else F.getFileStatus (inputFile args) >>= return . F.isDirectory
   if isDir
-    then printparam' "hotfolder" $ inputFile args
-    else printparam' "input file" $
-         (inputFile args) ++
-         if (lineMode args)
-           then " (line-by-line)"
-           else ""
+    then printparam "hotfolder" $ inputFile args
+    else printparam
+           "input file"
+           [ inputFile args
+           , if (lineMode args)
+               then "(line-by-line)"
+               else ""
+           ]
   (conn, chan) <- connect args
-  printparam' "confirm mode" $ show $ confirm args
+  addChannelExceptionHandler chan (X.throwTo tid)
+  printparam "confirm mode" $ confirm args
   if (confirm args)
     then do
       confirmSelect chan False
@@ -51,7 +65,11 @@ main = do
            addWatch
              inotify
              [CloseWrite, MoveIn]
+#if MIN_VERSION_hinotify(0,3,10)
+             (BS.pack (inputFile args))
+#else
              (inputFile args)
+#endif
              (handleEvent publishOneMsg (suffix args) (inputFile args))
          hr $ "BEGIN watching " ++ (inputFile args)
          _ <- forever $ threadDelay 1000000
@@ -59,29 +77,40 @@ main = do
          hr $ "END watching " ++ (inputFile args)
        else do
          hr $ "BEGIN sending"
-         messageFile <- BL.readFile (inputFile args)
+         messageFile <-
+           if inputFile args == "-"
+             then BL.getContents
+             else BL.readFile (inputFile args)
          if (lineMode args)
            then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
            else publishOneMsg (Just (inputFile args)) messageFile
          hr "END sending")
-    (\exception -> printparam' "exception" $ show (exception :: X.SomeException))
+    exceptionHandler
   -- all done. wait and close.
   if (confirm args)
-    then waitForConfirms chan >>= (printparam' "confirmed") . show
+    then waitForConfirms chan >>= printparam "confirmed"
     else return ()
-  closeConnection conn
-  hr "connection closed"
+  X.catch (closeConnection conn) exceptionHandler
+
+-- | A handler for clean exit
+exceptionHandler :: AMQPException -> IO ()
+exceptionHandler (ChannelClosedException Normal txt) =
+  printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler (ConnectionClosedException Normal txt) =
+  printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
 
 -- | The handler for publisher confirms
 confirmCallback :: (Word64, Bool, AckType) -> IO ()
 confirmCallback (deliveryTag, isAll, ackType) =
-  printparam'
+  printparam
     "confirmed"
-    ((show deliveryTag) ++
-     (if isAll
-        then " all "
-        else " this ") ++
-     (show ackType))
+    [ show deliveryTag
+    , if isAll
+        then "all"
+        else "this"
+    , show ackType
+    ]
 
 -- | Hotfolder event handler
 handleEvent ::
@@ -91,8 +120,15 @@ handleEvent ::
   -> Event
   -> IO ()
 -- just handle closewrite and movedin events
+#if MIN_VERSION_hinotify(0,3,10)
+handleEvent f s p (Closed False (Just x) True) =
+  handleFile f s (p ++ "/" ++ (BS.unpack x))
+handleEvent f s p (MovedIn False x _) =
+  handleFile f s (p ++ "/" ++ (BS.unpack x))
+#else
 handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x)
 handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x)
+#endif
 handleEvent _ _ _ _ = return ()
 
 -- | Hotfolder file handler
@@ -106,9 +142,7 @@ handleFile f s@(_:_) x =
 handleFile f [] x =
   X.catch
     (BL.readFile x >>= f (Just x))
-    (\exception ->
-       printparam' "exception in handleFile" $
-       show (exception :: X.SomeException))
+    (\e -> printparam "exception in handleFile" (e :: X.SomeException))
 
 -- | Publish one message with our settings
 publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO ()
@@ -125,29 +159,28 @@ publishOneMsg' c a fn f = do
         return (Just (T.pack t), Just (T.pack e))
       else return ((contenttype a), (contentencoding a))
   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
-  r <-
-    publishMsg
-      c
-      (T.pack $ currentExchange a)
-      (T.pack $ rKey a)
-      newMsg
-        { msgBody = f
-        , msgDeliveryMode = persistent a
-        , msgTimestamp = Just now
-        , msgID = msgid a
-        , msgType = msgtype a
-        , msgUserID = userid a
-        , msgApplicationID = appid a
-        , msgClusterID = clusterid a
-        , msgContentType = mtype
-        , msgContentEncoding = mencoding
-        , msgReplyTo = replyto a
-        , msgPriority = prio a
-        , msgCorrelationID = corrid a
-        , msgExpiration = msgexp a
-        , msgHeaders = substheader (fnheader a) fn $ msgheader a
-        }
-  printparam "sent" $ fmap show r
+  publishMsg
+    c
+    (T.pack $ currentExchange a)
+    (T.pack $ rKey a)
+    newMsg
+      { msgBody = f
+      , msgDeliveryMode = persistent a
+      , msgTimestamp = Just now
+      , msgID = msgid a
+      , msgType = msgtype a
+      , msgUserID = userid a
+      , msgApplicationID = appid a
+      , msgClusterID = clusterid a
+      , msgContentType = mtype
+      , msgContentEncoding = mencoding
+      , msgReplyTo = replyto a
+      , msgPriority = prio a
+      , msgCorrelationID = corrid a
+      , msgExpiration = msgexp a
+      , msgHeaders = substheader (fnheader a) fn $ msgheader a
+      } >>=
+    printparam "sent"
   where
     substheader ::
          [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable