1 {-# LANGUAGE FlexibleInstances #-}
3 module Network.AMQP.Utils.Helpers where
5 import Control.Concurrent
6 import qualified Control.Exception as X
8 import qualified Data.ByteString.Lazy.Char8 as BL
9 import qualified Data.ByteString.UTF8 as BS
10 import Data.Int (Int64)
12 import qualified Data.Map as M
14 import qualified Data.Text as T
16 import Data.Time.Clock.POSIX
17 import Data.Word (Word16)
19 import Network.AMQP.Types
20 import Network.AMQP.Utils.Options
21 import Network.Socket (PortNumber)
22 import System.Directory (removeFile)
23 import System.Environment (getEnvironment)
28 -- | print config parameters
32 flexprint :: a -> IO ()
33 flexprint = (hPutStrLn stderr) . show
36 printparam :: String -> a -> IO ()
41 mapM_ (hPutStr stderr) [" --- ", label, ": "]
45 instance (Flexprint a) => Flexprint (Maybe a) where
47 printparam _ Nothing = return ()
48 printparam x (Just y) = printparam x y
50 instance Flexprint String where
51 flexprint = hPutStrLn stderr
54 instance Flexprint [String] where
55 flexprint = flexprint . unwords
58 instance Flexprint T.Text where
59 flexprint = flexprint . T.unpack
62 instance Flexprint BL.ByteString where
63 flexprint x = hPutStrLn stderr "" >> BL.hPut stderr x >> hPutStrLn stderr ""
66 instance Flexprint Bool
68 instance Flexprint Int
70 instance Flexprint Int64
72 instance Flexprint Word16
74 instance Flexprint ExitCode
76 instance Flexprint X.SomeException
78 instance Flexprint AMQPException
80 instance Flexprint ConfirmationResult
82 instance Flexprint PortNumber
86 hr x = hPutStrLn stderr hr' >> hFlush stderr
88 hr' = take 72 $ (take 25 hr'') ++ " " ++ x ++ " " ++ hr''
91 -- | format headers for printing
92 formatheaders :: ((T.Text, FieldValue) -> [a]) -> FieldTable -> [a]
93 formatheaders f (FieldTable ll) = concat $ map f $ M.toList ll
95 -- | format headers for setting environment variables
97 ((Int, (T.Text, FieldValue)) -> [(String, String)]) -> FieldTable -> [(String,String)]
98 formatheaders' f (FieldTable ll) = concat $ map f $ zip [0 ..] $ M.toList ll
101 fieldshow :: (T.Text, FieldValue) -> String
102 fieldshow (k, v) = "\n " ++ T.unpack k ++ ": " ++ valueshow v
104 -- | callback cmdline formatting
105 fieldshow' :: (T.Text, FieldValue) -> [String]
106 fieldshow' (k, v) = ["-h", T.unpack k ++ "=" ++ valueshow v]
108 -- | environment variable formatting
109 fieldshow'' :: (Int, (T.Text, FieldValue)) -> [(String, String)]
110 fieldshow'' (n, (k, v)) =
111 [("AMQP_HEADER_KEY_" ++ nn, T.unpack k), ("AMQP_HEADER_VALUE_" ++ nn, valueshow v)]
115 -- | showing a FieldValue
116 valueshow :: FieldValue -> String
117 valueshow (FVString value) = BS.toString value
118 valueshow (FVInt8 value) = show value
119 valueshow (FVInt16 value) = show value
120 valueshow (FVInt32 value) = show value
121 valueshow (FVInt64 value) = show value
122 valueshow (FVFloat value) = show value
123 valueshow (FVDouble value) = show value
124 valueshow value = show value
126 -- | skip showing body head if binary type
127 isimage :: Maybe String -> Bool
128 isimage Nothing = False
130 | isPrefixOf "application/xml" ctype = False
131 | isPrefixOf "application/json" ctype = False
132 | otherwise = any (flip isPrefixOf ctype) ["application", "image"]
134 -- | show the first bytes of message body
135 anriss' :: Maybe Int64 -> BL.ByteString -> BL.ByteString
141 -- | callback cmdline with optional parameters
142 printopt :: (String, Maybe String) -> [String]
143 printopt (_, Nothing) = []
144 printopt (opt, Just s) = [opt, s]
146 -- | prints header and head on stderr and returns
147 -- cmdline options and environment variables to callback
150 -> (Message, Envelope)
153 -> IO ([String], [(String, String)])
154 printmsg h (msg, envi) anR now = do
157 [ ("routing key", rkey)
158 , ("message-id", messageid)
159 , ("headers", headers)
160 , ("content-type", ctype)
161 , ("content-encoding", cenc)
162 , ("redelivered", redeliv)
163 , ("timestamp", timestamp'')
168 , ("user id", muserid)
169 , ("application id", mappid)
170 , ("cluster id", mclusterid)
171 , ("reply to", mreplyto)
172 , ("correlation id", mcorrid)
173 , ("expiration", mexp)
174 , ("delivery mode", mdelivmode)
176 printparam label anriss
177 mapM_ (\hdl -> BL.hPut hdl body >> hFlush hdl) h
178 oldenv <- getEnvironment
183 [ ("ROUTINGKEY", rkey)
184 , ("CONTENTTYPE", ctype)
186 , ("MSGID", messageid)
187 , ("TIMESTAMP", timestamp)
189 , ("REDELIVERED", redeliv)
192 return (cmdline, environment)
194 step (_, Nothing) xs = xs
195 step (k, Just v) xs = ("AMQP_" ++ k, v) : xs
209 headers = fmap (formatheaders fieldshow) $ msgHeaders msg
210 headersOpt = maybeToList $ fmap (formatheaders fieldshow') $ msgHeaders msg
212 concat . maybeToList $ fmap (formatheaders' fieldshow'') $ msgHeaders msg
217 else Just (anriss' anR body) :: Maybe BL.ByteString
218 anriss'' = maybe "" (\a -> "first " ++ (show a) ++ " bytes of ") anR
219 label = anriss'' ++ "body"
220 ctype = fmap T.unpack $ msgContentType msg
221 cenc = fmap T.unpack $ msgContentEncoding msg
222 rkey = Just . T.unpack $ envRoutingKey envi
223 messageid = fmap T.unpack $ msgID msg
224 pri = fmap show $ msgPriority msg
225 mtype = fmap show $ msgType msg
226 muserid = fmap show $ msgUserID msg
227 mappid = fmap show $ msgApplicationID msg
228 mclusterid = fmap show $ msgClusterID msg
229 mreplyto = fmap show $ msgReplyTo msg
230 mcorrid = fmap show $ msgCorrelationID msg
231 mexp = fmap show $ msgExpiration msg
232 mdelivmode = fmap show $ msgDeliveryMode msg
233 size = Just . show $ BL.length body
235 if envRedelivered envi
238 tz = zonedTimeZone now
239 nowutc = zonedTimeToUTCFLoor now
240 msgtime = msgTimestamp msg
241 msgtimeutc = fmap (posixSecondsToUTCTime . realToFrac) msgtime
242 timestamp = fmap show msgtime
243 timediff = fmap (difftime nowutc) msgtimeutc
246 Just "now" -> Nothing
247 _ -> showtime tz $ Just nowutc
248 timestamp' = showtime tz msgtimeutc
251 (\a b c -> a ++ " (" ++ b ++ ") (" ++ c ++ ")")
256 -- | timestamp conversion
257 zonedTimeToUTCFLoor :: ZonedTime -> UTCTime
258 zonedTimeToUTCFLoor x =
259 posixSecondsToUTCTime $
260 realToFrac ((floor . utcTimeToPOSIXSeconds . zonedTimeToUTC) x :: Timestamp)
262 -- | show the timestamp
263 showtime :: TimeZone -> Maybe UTCTime -> Maybe String
264 showtime tz = fmap (show . (utcToZonedTime tz))
266 -- | show difference between two timestamps
267 difftime :: UTCTime -> UTCTime -> String
270 | now > msg = diff ++ " ago"
271 | otherwise = diff ++ " in the future"
273 diff = show (diffUTCTime now msg)
275 -- | if the message is to be saved
276 -- and maybe processed further
284 -> Maybe (ExitCode -> BL.ByteString -> IO ())
287 optionalFileStuff (msg, envi) callbackoptions addi numstring a tid action environment = do
288 path <- saveFile (tempDir a) numstring (msgBody msg)
289 printparam "saved to" path
290 let callbackcmdline =
292 (constructCallbackCmdLine (simple a) callbackoptions addi numstring)
295 printparam "calling" callbackcmdline
300 (doProc a numstring envi c action path environment)
301 (either (throwTo tid) return) >>
305 -- | save message into temp file
306 saveFile :: Maybe String -> String -> BL.ByteString -> IO (Maybe String)
307 saveFile Nothing _ _ = return Nothing
308 saveFile (Just tempD) numstring body = do
310 openBinaryTempFileWithDefaultPermissions
312 ("amqp-utils-" ++ numstring ++ "-.tmp")
317 -- | construct cmdline for callback script
318 constructCallbackCmdLine ::
319 Bool -> [String] -> [String] -> String -> String -> String -> [String]
320 constructCallbackCmdLine True _ addi _ exe path = exe : addi ++ path : []
321 constructCallbackCmdLine False opts addi num exe path =
322 exe : "-f" : path : "-n" : num : opts ++ addi
324 -- | call callback script
330 -> Maybe (ExitCode -> BL.ByteString -> IO ())
334 doProc a numstring envi (exe:args) action path environment = do
335 (_, h, _, processhandle) <-
338 {std_out = out, std_err = Inherit, env = Just environment'}
339 sout <- mapM BL.hGetContents h
341 maybe 0 id (fmap BL.length sout) `seq` waitForProcess processhandle
342 printparam (numstring ++ " call returned") exitcode
343 if isJust action && isJust sout
344 then ((fromJust action $ exitcode) (fromJust sout)) >> acke envi a
345 else case exitcode of
346 ExitSuccess -> acke envi a
347 ExitFailure _ -> reje envi a
348 if (cleanupTmpFile a)
350 (maybe (return ()) removeFile path)
351 (\e -> printparam "error removing temp file" (e :: X.SomeException))
359 ("AMQP_NUMBER",numstring):("AMQP_FILE",fromJust path):environment
360 doProc _ _ _ _ _ _ _ = return ()
363 acke :: Envelope -> Args -> IO ()
365 | (ack a) = ackEnv envi
366 | otherwise = return ()
369 reje :: Envelope -> Args -> IO ()
371 | (ack a) = rejectEnv envi (requeuenack a)
372 | otherwise = return ()