]> woffs.de Git - fd/haskell-amqp-utils.git/commitdiff
0.3.1.1: agitprop option for publisher confirms 0.3.1.1
authorFrank Doepper <[email protected]>
Sun, 24 Jun 2018 21:23:40 +0000 (23:23 +0200)
committerFrank Doepper <[email protected]>
Mon, 25 Jun 2018 15:25:32 +0000 (17:25 +0200)
ChangeLog.md
Network/AMQP/Utils/Options.hs
agitprop.hs
amqp-utils.cabal
amqp-utils.nix
debian/changelog

index fed21adff2a88d4ca33a0ffdccf2a2625239a4c9..1c91438378c71c5af45287f72fa0e1184ffc6f97 100644 (file)
@@ -1,5 +1,9 @@
 # Revision history for haskell-amqp-utils
 
+## 0.3.1.1  -- 2018-06-25
+
+* agitprop with optional publisher confirms
+
 ## 0.3.1.0  -- 2018-06-22
 
 * agitprop, a publisher
index 34c29be090bb88e70497e8bf727bf830b92035e8..6d2d12703ff3955dfceae878492a5f335949a509 100644 (file)
@@ -29,6 +29,7 @@ data Args = Args { server          :: String
                  , tmpQName        :: String
                  , inputFile       :: String
                  , lineMode        :: Bool
+                 , confirm         :: Bool
                  }
 
 instance Default Args where
@@ -54,6 +55,7 @@ instance Default Args where
                ""
                "/dev/stdin"
                False
+               False
 
 -- | Common options
 cOptions :: [OptDescr (Args -> Args)]
@@ -168,6 +170,10 @@ aOptions = [ Option [ 'r' ]
                     [ "linemode" ]
                     (NoArg (\o -> o { lineMode = not (lineMode o) }))
                     ("Toggle line-by-line mode (default: " ++ show (lineMode def) ++ ")")
+           , Option [ 'C' ]
+                    [ "confirm" ]
+                    (NoArg (\o -> o { confirm = not (confirm o) }))
+                    ("Toggle confirms (default: " ++ show (confirm def) ++ ")")
            ]
 
 -- |
index 1beef5f908e15b476b4f17cf156eb50593f5e51d..45e296d01c740777c4a820ac9044791fcbe40586 100644 (file)
@@ -1,14 +1,15 @@
 {-# LANGUAGE OverloadedStrings #-}
 
-import           Paths_amqp_utils           ( version )
-import           Data.Version               ( showVersion )
+import           Paths_amqp_utils              ( version )
+import           Data.Version                  ( showVersion )
 import           System.Environment
-import qualified Data.Text                  as T
+import qualified Data.Text                     as T
 import           Network.AMQP
 import           Network.AMQP.Utils.Options
 import           Network.AMQP.Utils.Helpers
 import           Network.AMQP.Utils.Connection
-import qualified Data.ByteString.Lazy.Char8 as BL
+import qualified Data.ByteString.Lazy.Char8    as BL
+import           Data.Word                     ( Word64 )
 
 main :: IO ()
 main = do
@@ -16,16 +17,37 @@ main = do
     --  tid <- myThreadId
     args <- getArgs >>= parseargs "agitprop"
     printparam' "client version" $ "amqp-utils " ++ (showVersion version)
+    printparam' "routing key" $ rKey args
+    printparam' "input file" $
+        (inputFile args) ++ if (lineMode args) then " (line-by-line)" else ""
     messageFile <- BL.readFile (inputFile args)
     (conn, chan) <- connect args
-    let publishOneMsg f = publishMsg chan
-                                     (T.pack $ currentExchange args)
-                                     (T.pack $ rKey args)
-                                     newMsg { msgBody = f
-                                            , msgDeliveryMode = Just Persistent
-                                            }
-    _ <- if (lineMode args)
-         then mapM_ publishOneMsg (BL.lines messageFile)
-         else publishOneMsg messageFile >> return ()
+    printparam' "confirm mode" $ show $ confirm args
+    if (confirm args)
+        then do
+            confirmSelect chan False
+            addConfirmationListener chan confirmCallback
+        else return ()
+    let publishOneMsg f = do
+            r <- publishMsg chan
+                            (T.pack $ currentExchange args)
+                            (T.pack $ rKey args)
+                            newMsg { msgBody = f
+                                   , msgDeliveryMode = Just Persistent
+                                   }
+            printparam "sent" $ fmap show r
+    if (lineMode args)
+        then mapM_ publishOneMsg (BL.lines messageFile)
+        else publishOneMsg messageFile
 
+    if (confirm args)
+        then waitForConfirms chan >>= return . show >> return ()
+        else return ()
     closeConnection conn
+
+-- | The handler for publisher confirms
+confirmCallback :: (Word64, Bool, AckType) -> IO ()
+confirmCallback (deliveryTag, isAll, ackType) =
+    printparam' "confirmed"
+                ((show deliveryTag) ++
+                     (if isAll then " all " else " this ") ++ (show ackType))
index 63bad54d26ffe17b34ce5548182653d590cfdff4..d1b039d792f010b4acc6ae6824ad05d84ef54fb1 100644 (file)
@@ -1,6 +1,6 @@
 name:                amqp-utils
 
-version:             0.3.1.0
+version:             0.3.1.1
 
 synopsis:            Generic Haskell AMQP Consumer
 
index 7865d0c83171fe53752c530f13e3c1a3bf27ad9d..0e58d1d46e2603fe36fba64df243f6a977a7f0d4 100644 (file)
@@ -3,7 +3,7 @@
 }:
 mkDerivation {
   pname = "amqp-utils";
-  version = "0.3.0.2";
+  version = "0.3.1.1";
   src = ./.;
   isLibrary = false;
   isExecutable = true;
index 60cd025bdb7df54c1e6d229fac176fa5a303c6e9..7d0e6453edf866006aeb54ee2c68b5bbc18b22c1 100644 (file)
@@ -1,3 +1,9 @@
+haskell-amqp-utils (0.3.1.1) unstable; urgency=medium
+
+  * agitprop: publisher confirms
+
+ -- Frank Doepper <[email protected]>  Mon, 25 Jun 2018 17:24:35 +0200
+
 haskell-amqp-utils (0.3.1.0) unstable; urgency=medium
 
   * agitprop, a publisher