]> woffs.de Git - fd/haskell-amqp-utils.git/blobdiff - agitprop.hs
konsum: options nack and requeuenack
[fd/haskell-amqp-utils.git] / agitprop.hs
index fd4922d298b621a7094b9e95db20168f8febe555..480307a9e5bcd11df2e0cd62bff2a4cc5c9e9d0a 100644 (file)
@@ -1,15 +1,16 @@
-{-# LANGUAGE OverloadedStrings #-}
-
+-- generic AMQP publisher
 import Control.Concurrent (threadDelay)
 import qualified Control.Exception as X
 import Control.Monad (forever)
 import qualified Data.ByteString.Lazy.Char8 as BL
-import qualified Data.Text as T
 import Data.List (isSuffixOf)
+import Data.Maybe
+import qualified Data.Text as T
 import Data.Time
 import Data.Time.Clock.POSIX
 import Data.Version (showVersion)
 import Data.Word (Word64)
+import Magic
 import Network.AMQP
 import Network.AMQP.Types
 import Network.AMQP.Utils.Connection
@@ -51,22 +52,25 @@ main = do
              inotify
              [CloseWrite, MoveIn]
              (inputFile args)
-             (handleEvent publishOneMsg (suffix args))
-         hr $ "watching " ++ (inputFile args)
+             (handleEvent publishOneMsg (suffix args) (inputFile args))
+         hr $ "BEGIN watching " ++ (inputFile args)
          _ <- forever $ threadDelay 1000000
          removeWatch wd
+         hr $ "END watching " ++ (inputFile args)
        else do
-         hr $ "sending " ++ (inputFile args)
+         hr $ "BEGIN sending"
          messageFile <- BL.readFile (inputFile args)
          if (lineMode args)
            then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
-           else publishOneMsg (Just (inputFile args)) messageFile)
+           else publishOneMsg (Just (inputFile args)) messageFile
+         hr "END sending")
     (\exception -> printparam' "exception" $ show (exception :: X.SomeException))
   -- all done. wait and close.
   if (confirm args)
-    then waitForConfirms chan >>= return . show >> return ()
+    then waitForConfirms chan >>= (printparam' "confirmed") . show
     else return ()
   closeConnection conn
+  hr "connection closed"
 
 -- | The handler for publisher confirms
 confirmCallback :: (Word64, Bool, AckType) -> IO ()
@@ -80,19 +84,28 @@ confirmCallback (deliveryTag, isAll, ackType) =
      (show ackType))
 
 -- | Hotfolder event handler
-handleEvent :: (Maybe String -> BL.ByteString -> IO ()) -> [ String ] -> Event -> IO ()
+handleEvent ::
+     (Maybe String -> BL.ByteString -> IO ())
+  -> [String]
+  -> String
+  -> Event
+  -> IO ()
 -- just handle closewrite and movedin events
-handleEvent f s (Closed False (Just x) True) = handleFile f s x
-handleEvent f s (MovedIn False x _) = handleFile f s x
-handleEvent _ _ _ = return ()
+handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x)
+handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x)
+handleEvent _ _ _ = return ()
 
 -- | Hotfolder file handler
-handleFile :: (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
+handleFile ::
+     (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
 handleFile _ _ ('.':_) = return () -- ignore hidden files
-handleFile f s@(_:_) x = if any (flip isSuffixOf x) s then handleFile f [] x else return ()
+handleFile f s@(_:_) x =
+  if any (flip isSuffixOf x) s
+    then handleFile f [] x
+    else return ()
 handleFile f [] x =
   X.catch
-    (hr ("sending " ++ x) >> BL.readFile x >>= f (Just x))
+    (BL.readFile x >>= f (Just x))
     (\exception ->
        printparam' "exception in handleFile" $
        show (exception :: X.SomeException))
@@ -100,6 +113,17 @@ handleFile f [] x =
 -- | Publish one message with our settings
 publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO ()
 publishOneMsg' c a fn f = do
+  printparam "sending" fn
+  (mtype, mencoding) <-
+    if (magic a) && isJust fn
+      then do
+        m <- magicOpen [MagicMimeType]
+        magicLoadDefault m
+        t <- magicFile m (fromJust fn)
+        magicSetFlags m [MagicMimeEncoding]
+        e <- magicFile m (fromJust fn)
+        return (Just (T.pack t), Just (T.pack e))
+      else return ((contenttype a), (contentencoding a))
   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
   r <-
     publishMsg
@@ -108,15 +132,15 @@ publishOneMsg' c a fn f = do
       (T.pack $ rKey a)
       newMsg
         { msgBody = f
-        , msgDeliveryMode = Just Persistent
+        , msgDeliveryMode = persistent a
         , msgTimestamp = Just now
         , msgID = msgid a
         , msgType = msgtype a
         , msgUserID = userid a
         , msgApplicationID = appid a
         , msgClusterID = clusterid a
-        , msgContentType = contenttype a
-        , msgContentEncoding = contentencoding a
+        , msgContentType = mtype
+        , msgContentEncoding = mencoding
         , msgReplyTo = replyto a
         , msgPriority = prio a
         , msgCorrelationID = corrid a
@@ -125,7 +149,8 @@ publishOneMsg' c a fn f = do
         }
   printparam "sent" $ fmap show r
   where
-    substheader :: [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
+    substheader ::
+         [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
     substheader (s:r) (Just fname) old =
       substheader r (Just fname) (addheader old (s ++ "=" ++ fname))
     substheader _ _ old = old
don't click here