1 {-# LANGUAGE FlexibleInstances #-}
3 module Network.AMQP.Utils.Helpers where
5 import Control.Concurrent
6 import qualified Control.Exception as X
8 import qualified Data.ByteString.Lazy.Char8 as BL
9 import qualified Data.ByteString.UTF8 as BS
10 import Data.Int (Int64)
12 import qualified Data.Map as M
14 import qualified Data.Text as T
16 import Data.Time.Clock.POSIX
17 import Data.Word (Word16)
19 import Network.AMQP.Types
20 import Network.AMQP.Utils.Options
21 import Network.Socket (PortNumber)
22 import System.Directory (removeFile)
23 import System.Environment (getEnvironment)
28 -- | print config parameters
32 flexprint :: a -> IO ()
33 flexprint = (hPutStrLn stderr) . show
36 printparam :: String -> a -> IO ()
41 mapM_ (hPutStr stderr) [" --- ", label, ": "]
45 instance (Flexprint a) => Flexprint (Maybe a) where
47 printparam _ Nothing = return ()
48 printparam x (Just y) = printparam x y
50 instance Flexprint String where
51 flexprint = hPutStrLn stderr
54 instance Flexprint [String] where
55 flexprint = flexprint . unwords
58 instance Flexprint T.Text where
59 flexprint = flexprint . T.unpack
62 instance Flexprint BL.ByteString where
63 flexprint x = hPutStrLn stderr "" >> BL.hPut stderr x >> hPutStrLn stderr ""
66 instance Flexprint Bool
68 instance Flexprint Int
70 instance Flexprint Int64
72 instance Flexprint Word16
74 instance Flexprint ExitCode
76 instance Flexprint X.SomeException
78 instance Flexprint AMQPException
80 instance Flexprint ConfirmationResult
82 instance Flexprint PortNumber
86 hr x = hPutStrLn stderr hr' >> hFlush stderr
88 hr' = take 72 $ (take 25 hr'') ++ " " ++ x ++ " " ++ hr''
91 -- | format headers for printing
92 formatheaders :: ((T.Text, FieldValue) -> [a]) -> FieldTable -> [a]
93 formatheaders f (FieldTable ll) = concat $ map f $ M.toList ll
95 -- | format headers for setting environment variables
97 ((Int, (T.Text, FieldValue)) -> [(String, String)])
100 formatheadersEnv f (FieldTable ll) = concat $ map f $ zip [0 ..] $ M.toList ll
103 fieldshow :: (T.Text, FieldValue) -> String
104 fieldshow (k, v) = "\n " ++ T.unpack k ++ ": " ++ valueshow v
106 -- | callback cmdline formatting
107 fieldshowOpt :: (T.Text, FieldValue) -> [String]
108 fieldshowOpt (k, v) = ["-h", T.unpack k ++ "=" ++ valueshow v]
110 -- | environment variable formatting
111 fieldshowEnv :: (Int, (T.Text, FieldValue)) -> [(String, String)]
112 fieldshowEnv (n, (k, v)) =
113 [ ("AMQP_HEADER_KEY_" ++ nn, T.unpack k)
114 , ("AMQP_HEADER_VALUE_" ++ nn, valueshow v)
119 -- | showing a FieldValue
120 valueshow :: FieldValue -> String
121 valueshow (FVString value) = BS.toString value
122 valueshow (FVInt8 value) = show value
123 valueshow (FVInt16 value) = show value
124 valueshow (FVInt32 value) = show value
125 valueshow (FVInt64 value) = show value
126 valueshow (FVFloat value) = show value
127 valueshow (FVDouble value) = show value
128 valueshow value = show value
130 -- | skip showing body head if binary type
131 isimage :: Maybe String -> Bool
132 isimage Nothing = False
134 | isPrefixOf "application/xml" ctype = False
135 | isPrefixOf "application/json" ctype = False
136 | otherwise = any (flip isPrefixOf ctype) ["application", "image"]
138 -- | show the first bytes of message body
139 anriss' :: Maybe Int64 -> BL.ByteString -> BL.ByteString
145 -- | callback cmdline with optional parameters
146 printopt :: (String, Maybe String) -> [String]
147 printopt (_, Nothing) = []
148 printopt (opt, Just s) = [opt, s]
150 -- | prints header and head on stderr and returns
151 -- cmdline options and environment variables to callback
154 -> (Message, Envelope)
157 -> IO ([String], [(String, String)])
158 printmsg h (msg, envi) anR now = do
161 [ ("routing key", rkey)
162 , ("message-id", messageid)
163 , ("headers", headers)
164 , ("content-type", ctype)
165 , ("content-encoding", cenc)
166 , ("redelivered", redeliv)
167 , ("timestamp", timestamp'')
172 , ("user id", muserid)
173 , ("application id", mappid)
174 , ("cluster id", mclusterid)
175 , ("reply to", mreplyto)
176 , ("correlation id", mcorrid)
177 , ("expiration", mexp)
178 , ("delivery mode", mdelivmode)
180 printparam label anriss
181 mapM_ (\hdl -> BL.hPut hdl body >> hFlush hdl) h
182 oldenv <- getEnvironment
187 [ ("ROUTINGKEY", rkey)
188 , ("CONTENTTYPE", ctype)
190 , ("MSGID", messageid)
191 , ("TIMESTAMP", timestamp)
193 , ("REDELIVERED", redeliv)
196 return (cmdline, environment)
198 step (_, Nothing) xs = xs
199 step (k, Just v) xs = ("AMQP_" ++ k, v) : xs
213 headers = fmap (formatheaders fieldshow) $ msgHeaders msg
215 maybeToList $ fmap (formatheaders fieldshowOpt) $ msgHeaders msg
217 concat . maybeToList $
218 fmap (formatheadersEnv fieldshowEnv) $ msgHeaders msg
223 else Just (anriss' anR body) :: Maybe BL.ByteString
224 anriss'' = maybe "" (\a -> "first " ++ (show a) ++ " bytes of ") anR
225 label = anriss'' ++ "body"
226 ctype = fmap T.unpack $ msgContentType msg
227 cenc = fmap T.unpack $ msgContentEncoding msg
228 rkey = Just . T.unpack $ envRoutingKey envi
229 messageid = fmap T.unpack $ msgID msg
230 pri = fmap show $ msgPriority msg
231 mtype = fmap show $ msgType msg
232 muserid = fmap show $ msgUserID msg
233 mappid = fmap show $ msgApplicationID msg
234 mclusterid = fmap show $ msgClusterID msg
235 mreplyto = fmap show $ msgReplyTo msg
236 mcorrid = fmap show $ msgCorrelationID msg
237 mexp = fmap show $ msgExpiration msg
238 mdelivmode = fmap show $ msgDeliveryMode msg
239 size = Just . show $ BL.length body
241 if envRedelivered envi
244 tz = zonedTimeZone now
245 nowutc = zonedTimeToUTCFLoor now
246 msgtime = msgTimestamp msg
247 msgtimeutc = fmap (posixSecondsToUTCTime . realToFrac) msgtime
248 timestamp = fmap show msgtime
249 timediff = fmap (difftime nowutc) msgtimeutc
252 Just "now" -> Nothing
253 _ -> showtime tz $ Just nowutc
254 timestamp' = showtime tz msgtimeutc
257 (\a b c -> a ++ " (" ++ b ++ ") (" ++ c ++ ")")
262 -- | timestamp conversion
263 zonedTimeToUTCFLoor :: ZonedTime -> UTCTime
264 zonedTimeToUTCFLoor x =
265 posixSecondsToUTCTime $
266 realToFrac ((floor . utcTimeToPOSIXSeconds . zonedTimeToUTC) x :: Timestamp)
268 -- | show the timestamp
269 showtime :: TimeZone -> Maybe UTCTime -> Maybe String
270 showtime tz = fmap (show . (utcToZonedTime tz))
272 -- | show difference between two timestamps
273 difftime :: UTCTime -> UTCTime -> String
276 | now > msg = diff ++ " ago"
277 | otherwise = diff ++ " in the future"
279 diff = show (diffUTCTime now msg)
281 -- | if the message is to be saved
282 -- and maybe processed further
290 -> Maybe (ExitCode -> BL.ByteString -> IO ())
291 -> [(String, String)]
293 optionalFileStuff (msg, envi) callbackoptions addi numstring a tid action environment = do
294 path <- saveFile (tempDir a) numstring (msgBody msg)
295 printparam "saved to" path
296 let callbackcmdline =
298 (constructCallbackCmdLine (simple a) callbackoptions addi numstring)
301 printparam "calling" callbackcmdline
306 (doProc a numstring envi c action path environment)
307 (either (throwTo tid) return) >>
311 -- | save message into temp file
312 saveFile :: Maybe String -> String -> BL.ByteString -> IO (Maybe String)
313 saveFile Nothing _ _ = return Nothing
314 saveFile (Just tempD) numstring body = do
316 openBinaryTempFileWithDefaultPermissions
318 ("amqp-utils-" ++ numstring ++ "-.tmp")
323 -- | construct cmdline for callback script
324 constructCallbackCmdLine ::
325 Bool -> [String] -> [String] -> String -> String -> String -> [String]
326 constructCallbackCmdLine True _ addi _ exe path = exe : addi ++ path : []
327 constructCallbackCmdLine False opts addi num exe path =
328 exe : "-f" : path : "-n" : num : opts ++ addi
330 -- | call callback script
336 -> Maybe (ExitCode -> BL.ByteString -> IO ())
338 -> [(String, String)]
340 doProc a numstring envi (exe:args) action path environment = do
341 (_, h, _, processhandle) <-
344 {std_out = out, std_err = Inherit, env = Just environment'}
345 sout <- mapM BL.hGetContents h
347 maybe 0 id (fmap BL.length sout) `seq` waitForProcess processhandle
348 printparam (numstring ++ " call returned") exitcode
349 if isJust action && isJust sout
350 then ((fromJust action $ exitcode) (fromJust sout)) >> acke envi a
351 else case exitcode of
352 ExitSuccess -> acke envi a
353 ExitFailure _ -> reje envi a
354 if (cleanupTmpFile a)
356 (maybe (return ()) removeFile path)
357 (\e -> printparam "error removing temp file" (e :: X.SomeException))
365 ("AMQP_NUMBER", numstring) : ("AMQP_FILE", fromJust path) : environment
366 doProc _ _ _ _ _ _ _ = return ()
369 acke :: Envelope -> Args -> IO ()
371 | (ack a) = ackEnv envi
372 | otherwise = return ()
375 reje :: Envelope -> Args -> IO ()
377 | (ack a) = rejectEnv envi (requeuenack a)
378 | otherwise = return ()