]> woffs.de Git - fd/haskell-amqp-utils.git/commitdiff
hindent
authorFrank Doepper <[email protected]>
Mon, 9 Dec 2019 14:21:48 +0000 (15:21 +0100)
committerFrank Doepper <[email protected]>
Mon, 9 Dec 2019 14:21:48 +0000 (15:21 +0100)
Network/AMQP/Utils/Helpers.hs
Network/AMQP/Utils/Options.hs
agitprop.hs
arbeite.hs
konsum.hs
plane.hs

index 85f396cf7c039fbc165c87cf5381aee9b140ba61..4be054b1875944539d24235e92a3403f3b897aed 100644 (file)
@@ -1,4 +1,5 @@
 {-# LANGUAGE FlexibleInstances #-}
+
 module Network.AMQP.Utils.Helpers where
 
 import Control.Concurrent
@@ -122,7 +123,12 @@ printopt (_, Nothing) = []
 printopt (opt, Just s) = [opt, s]
 
 -- | prints header and head on stderr and returns cmdline options to callback
-printmsg :: Maybe Handle -> (Message, Envelope) -> Maybe Int64 -> ZonedTime -> IO [String]
+printmsg ::
+     Maybe Handle
+  -> (Message, Envelope)
+  -> Maybe Int64
+  -> ZonedTime
+  -> IO [String]
 printmsg h (msg, envi) anR now = do
   mapM_
     (uncurry printparam)
@@ -284,7 +290,8 @@ doProc a numstring envi (exe:args) action = do
   (_, h, _, processhandle) <-
     createProcess (proc exe args) {std_out = out, std_err = Inherit}
   sout <- mapM BL.hGetContents h
-  exitcode <- maybe 0 id (fmap BL.length sout) `seq` waitForProcess processhandle
+  exitcode <-
+    maybe 0 id (fmap BL.length sout) `seq` waitForProcess processhandle
   printparam (numstring ++ " call returned") exitcode
   if isJust action && isJust sout
     then ((fromJust action $ exitcode) (fromJust sout)) >> acke envi a
index 0a7b8e56073ff32a7cfe31ffab7a328204afd0d5..0970cca6bcc9d4c3944ae70aa77ddbd099f97441 100644 (file)
@@ -20,53 +20,54 @@ portnumber a
   | otherwise = fromJust (port a)
 
 -- | A data type for our options
-data Args = Args
-  { server :: String
-  , port :: Maybe PortNumber
-  , tls :: Bool
-  , vHost :: String
-  , currentExchange :: String
-  , bindings :: [(String, String)]
-  , rKey :: String
-  , anRiss :: Maybe Int64
-  , fileProcess :: Maybe String
-  , qName :: Maybe String
-  , cert :: Maybe String
-  , key :: Maybe String
-  , user :: String
-  , pass :: String
-  , preFetch :: Word16
-  , heartBeat :: Maybe Word16
-  , tempDir :: Maybe String
-  , additionalArgs :: [String]
-  , connectionName :: Maybe String
-  , tmpQName :: String
-  , inputFile :: String
-  , outputFile :: String
-  , lineMode :: Bool
-  , confirm :: Bool
-  , msgid :: Maybe Text
-  , msgtype :: Maybe Text
-  , userid :: Maybe Text
-  , appid :: Maybe Text
-  , clusterid :: Maybe Text
-  , contenttype :: Maybe Text
-  , contentencoding :: Maybe Text
-  , replyto :: Maybe Text
-  , prio :: Maybe Octet
-  , corrid :: Maybe Text
-  , msgexp :: Maybe Text
-  , msgheader :: Maybe FieldTable
-  , fnheader :: [String]
-  , suffix :: [String]
-  , magic :: Bool
-  , persistent :: Maybe DeliveryMode
-  , ack :: Bool
-  , requeuenack :: Bool
-  , rpc_timeout :: Double
-  , connect_timeout :: Int
-  , simple :: Bool
-  }
+data Args =
+  Args
+    { server :: String
+    , port :: Maybe PortNumber
+    , tls :: Bool
+    , vHost :: String
+    , currentExchange :: String
+    , bindings :: [(String, String)]
+    , rKey :: String
+    , anRiss :: Maybe Int64
+    , fileProcess :: Maybe String
+    , qName :: Maybe String
+    , cert :: Maybe String
+    , key :: Maybe String
+    , user :: String
+    , pass :: String
+    , preFetch :: Word16
+    , heartBeat :: Maybe Word16
+    , tempDir :: Maybe String
+    , additionalArgs :: [String]
+    , connectionName :: Maybe String
+    , tmpQName :: String
+    , inputFile :: String
+    , outputFile :: String
+    , lineMode :: Bool
+    , confirm :: Bool
+    , msgid :: Maybe Text
+    , msgtype :: Maybe Text
+    , userid :: Maybe Text
+    , appid :: Maybe Text
+    , clusterid :: Maybe Text
+    , contenttype :: Maybe Text
+    , contentencoding :: Maybe Text
+    , replyto :: Maybe Text
+    , prio :: Maybe Octet
+    , corrid :: Maybe Text
+    , msgexp :: Maybe Text
+    , msgheader :: Maybe FieldTable
+    , fnheader :: [String]
+    , suffix :: [String]
+    , magic :: Bool
+    , persistent :: Maybe DeliveryMode
+    , ack :: Bool
+    , requeuenack :: Bool
+    , rpc_timeout :: Double
+    , connect_timeout :: Int
+    , simple :: Bool
+    }
 
 instance Default Args where
   def =
@@ -256,7 +257,8 @@ allOptions =
         ['t']
         ["rpc_timeout"]
         (ReqArg (\s o -> o {rpc_timeout = read s}) "SECONDS")
-        ("How long to wait for reply (default: " ++ show (rpc_timeout def) ++ ")"))
+        ("How long to wait for reply (default: " ++
+         show (rpc_timeout def) ++ ")"))
   , ( "a"
     , Option
         []
@@ -409,7 +411,8 @@ allOptions =
         ['w']
         ["connect_timeout"]
         (ReqArg (\s o -> o {connect_timeout = read s}) "SECONDS")
-        ("timeout for establishing initial connection (default: " ++ show (connect_timeout def) ++ ")"))
+        ("timeout for establishing initial connection (default: " ++
+         show (connect_timeout def) ++ ")"))
   ]
 
 -- | Options for the executables
@@ -418,7 +421,8 @@ options exename = map snd $ filter ((elem exename) . fst) allOptions
 
 -- | Add a header with a String value
 addheader :: Maybe FieldTable -> String -> Maybe FieldTable
-addheader Nothing string = Just $ FieldTable $ M.singleton (getkey string) (getval string)
+addheader Nothing string =
+  Just $ FieldTable $ M.singleton (getkey string) (getval string)
 addheader (Just (FieldTable oldheader)) string =
   Just $ FieldTable $ M.insert (getkey string) (getval string) oldheader
 
index 4b2aa43727f31f740962e84a7f54e4b55b9a3c7a..80585b61772cccf4c2366a6e6be140bfa643b8d3 100644 (file)
@@ -1,4 +1,5 @@
 {-# LANGUAGE CPP #-}
+
 -- generic AMQP publisher
 import Control.Concurrent
 import qualified Control.Exception as X
@@ -93,8 +94,10 @@ main = do
 
 -- | A handler for clean exit
 exceptionHandler :: AMQPException -> IO ()
-exceptionHandler (ChannelClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess
-exceptionHandler (ConnectionClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler (ChannelClosedException Normal txt) =
+  printparam "exit" txt >> exitWith ExitSuccess
+exceptionHandler (ConnectionClosedException Normal txt) =
+  printparam "exit" txt >> exitWith ExitSuccess
 exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
 
 -- | The handler for publisher confirms
@@ -118,8 +121,10 @@ handleEvent ::
   -> IO ()
 -- just handle closewrite and movedin events
 #if MIN_VERSION_hinotify(0,3,10)
-handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ (BS.unpack x))
-handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ (BS.unpack x))
+handleEvent f s p (Closed False (Just x) True) =
+  handleFile f s (p ++ "/" ++ (BS.unpack x))
+handleEvent f s p (MovedIn False x _) =
+  handleFile f s (p ++ "/" ++ (BS.unpack x))
 #else
 handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x)
 handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x)
index 361a961809caab6d07bcef280cc61bc91e5289a6..8f2248125991af7eaebffef2d32d2fac90e09b27 100644 (file)
@@ -1,4 +1,5 @@
 {-# LANGUAGE OverloadedStrings #-}
+
 -- generic AMQP rpc server
 import Control.Concurrent
 import qualified Control.Exception as X
@@ -89,5 +90,7 @@ rpcServerCallback tid a addi c m@(msg, env) = do
             , msgCorrelationID = msgCorrelationID msg
             , msgTimestamp = msgTimestamp msg
             , msgExpiration = msgExpiration msg
-            , msgHeaders = Just $ FieldTable $ singleton "exitcode" $ FVString $ T.pack $ show e
+            , msgHeaders =
+                Just $
+                FieldTable $ singleton "exitcode" $ FVString $ T.pack $ show e
             }
index 178ad18d85c7d76f63823aca6ee78062da290770..bdbf065fc6365a8002a263f523cf7f3612d6d43c 100644 (file)
--- a/konsum.hs
+++ b/konsum.hs
@@ -83,8 +83,6 @@ myCallback a addi tid m@(_, envi) = do
     X.catch
       (printmsg Nothing m (anRiss a) now)
       (\x -> X.throwTo tid (x :: X.SomeException) >> return [])
-  either
-    (\e -> printparam "ERROR" (e :: X.SomeException) >> reje envi a)
-    return =<<
+  either (\e -> printparam "ERROR" (e :: X.SomeException) >> reje envi a) return =<<
     X.try (optionalFileStuff m callbackoptions addi numstring a tid Nothing)
   hr $ "END " ++ numstring
index 045bd0ae763bbfe58763bd9c7176ac8bc2b28548..a919d2dcfd79bcd6b3665f6d718acb5c6156d4ae 100644 (file)
--- a/plane.hs
+++ b/plane.hs
@@ -42,27 +42,32 @@ main = do
       then BL.getContents
       else BL.readFile (inputFile args)
   printparam "output file" $ outputFile args
-  h <- if outputFile args == "-" then return stdout else openBinaryFile (outputFile args) WriteMode
+  h <-
+    if outputFile args == "-"
+      then return stdout
+      else openBinaryFile (outputFile args) WriteMode
   ctag <- consumeMsgs chan q NoAck (rpcClientCallback h tid args)
   printparam "consumer tag" ctag
   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
   hr "publishing request"
-  _ <- publishMsg
-    chan
-    (T.pack $ currentExchange args)
-    (T.pack $ tmpQName args)
-    newMsg
-      { msgBody = message
-      , msgReplyTo = Just q
-      , msgCorrelationID = corrid args
-      , msgExpiration = msgexp args
-      , msgTimestamp = Just now
-      , msgHeaders = msgheader args
-      }
+  _ <-
+    publishMsg
+      chan
+      (T.pack $ currentExchange args)
+      (T.pack $ tmpQName args)
+      newMsg
+        { msgBody = message
+        , msgReplyTo = Just q
+        , msgCorrelationID = corrid args
+        , msgExpiration = msgexp args
+        , msgTimestamp = Just now
+        , msgHeaders = msgheader args
+        }
   hr "waiting for answer"
-  _ <- forkIO
-    (threadDelay (floor (1000000 * rpc_timeout args)) >>
-     throwTo tid TimeoutException)
+  _ <-
+    forkIO
+      (threadDelay (floor (1000000 * rpc_timeout args)) >>
+       throwTo tid TimeoutException)
   X.catch
     (forever $ threadDelay 200000)
     (\x -> do