]> woffs.de Git - fd/haskell-amqp-utils.git/blob - Network/AMQP/Utils/Helpers.hs
whitespace
[fd/haskell-amqp-utils.git] / Network / AMQP / Utils / Helpers.hs
1 {-# LANGUAGE FlexibleInstances #-}
2
3 module Network.AMQP.Utils.Helpers where
4
5 import           Control.Concurrent
6 import qualified Control.Exception          as X
7 import           Control.Monad
8 import qualified Data.ByteString.Lazy.Char8 as BL
9 import qualified Data.ByteString.UTF8       as BS
10 import           Data.Int                   (Int64)
11 import           Data.List
12 import qualified Data.Map                   as M
13 import           Data.Maybe
14 import qualified Data.Text                  as T
15 import           Data.Time
16 import           Data.Time.Clock.POSIX
17 import           Data.Word                  (Word16)
18 import           Network.AMQP
19 import           Network.AMQP.Types
20 import           Network.AMQP.Utils.Options
21 import           Network.Socket             (PortNumber)
22 import           System.Directory           (removeFile)
23 import           System.Environment         (getEnvironment)
24 import           System.Exit
25 import           System.IO
26 import           System.Process
27
28 -- | print config parameters
29 class (Show a) =>
30       Flexprint a
31   where
32   flexprint :: a -> IO ()
33   flexprint = (hPutStrLn stderr) . show
34   empty :: a -> Bool
35   empty _ = False
36   printparam :: String -> a -> IO ()
37   printparam label x =
38     if empty x
39       then return ()
40       else do
41         mapM_ (hPutStr stderr) [" --- ", label, ": "]
42         flexprint x
43         hFlush stderr
44
45 instance (Flexprint a) => Flexprint (Maybe a) where
46   empty = isNothing
47   printparam _ Nothing  = return ()
48   printparam x (Just y) = printparam x y
49
50 instance Flexprint String where
51   flexprint = hPutStrLn stderr
52   empty = null
53
54 instance Flexprint [String] where
55   flexprint = flexprint . unwords
56   empty = null
57
58 instance Flexprint [Maybe String] where
59   flexprint = flexprint . catMaybes
60   empty = null . catMaybes
61
62 instance Flexprint T.Text where
63   flexprint = flexprint . T.unpack
64   empty = T.null
65
66 instance Flexprint BL.ByteString where
67   flexprint x = hPutStrLn stderr "" >> BL.hPut stderr x >> hPutStrLn stderr ""
68   empty = BL.null
69
70 instance Flexprint Bool where
71   empty = not
72
73 instance Flexprint Int
74
75 instance Flexprint Int64
76
77 instance Flexprint Word16
78
79 instance Flexprint ExitCode
80
81 instance Flexprint X.SomeException
82
83 instance Flexprint X.IOException
84
85 instance Flexprint AMQPException
86
87 instance Flexprint ConfirmationResult
88
89 instance Flexprint PortNumber
90
91 -- | log marker
92 hr :: String -> IO ()
93 hr x = hPutStrLn stderr hr' >> hFlush stderr
94   where
95     hr' = take 72 $ (take 25 hr'') ++ " " ++ x ++ " " ++ hr''
96     hr'' = repeat '-'
97
98 -- | format headers for printing
99 formatheaders :: ((T.Text, FieldValue) -> [a]) -> FieldTable -> [a]
100 formatheaders f (FieldTable ll) = concat $ map f $ M.toList ll
101
102 -- | format headers for setting environment variables
103 formatheadersEnv ::
104      ((Int, (T.Text, FieldValue)) -> [(String, String)])
105   -> FieldTable
106   -> [(String, String)]
107 formatheadersEnv f (FieldTable ll) = concat $ map f $ zip [0 ..] $ M.toList ll
108
109 -- | log formatting
110 fieldshow :: (T.Text, FieldValue) -> String
111 fieldshow (k, v) = "\n        " ++ T.unpack k ++ ": " ++ valueshow v
112
113 fieldshow' :: (T.Text, FieldValue) -> String
114 fieldshow' (k, v) = "\n           " ++ T.unpack k ++ ": " ++ valueshow v
115
116 -- | callback cmdline formatting
117 fieldshowOpt :: (T.Text, FieldValue) -> [String]
118 fieldshowOpt (k, v) = ["-h", T.unpack k ++ "=" ++ valueshow v]
119
120 -- | environment variable formatting
121 fieldshowEnv :: (Int, (T.Text, FieldValue)) -> [(String, String)]
122 fieldshowEnv (n, (k, v)) =
123   [ ("AMQP_HEADER_KEY_" ++ nn, T.unpack k)
124   , ("AMQP_HEADER_VALUE_" ++ nn, valueshow v)
125   ]
126   where
127     nn = show n
128
129 -- | showing a FieldValue
130 valueshow :: FieldValue -> String
131 valueshow (FVString value)     = BS.toString value
132 valueshow (FVInt8 value)       = show value
133 valueshow (FVInt16 value)      = show value
134 valueshow (FVInt32 value)      = show value
135 valueshow (FVInt64 value)      = show value
136 valueshow (FVFloat value)      = show value
137 valueshow (FVDouble value)     = show value
138 valueshow (FVBool value)       = show value
139 valueshow (FVFieldTable value) = (formatheaders fieldshow') value
140 valueshow value                = show value
141
142 -- | skip showing body head if binary type
143 isimage :: Maybe String -> Bool
144 isimage Nothing = False
145 isimage (Just ctype)
146   | isPrefixOf "application/xml" ctype = False
147   | isPrefixOf "application/json" ctype = False
148   | otherwise = any (flip isPrefixOf ctype) ["application", "image"]
149
150 -- | show the first bytes of message body
151 anriss' :: Maybe Int64 -> BL.ByteString -> BL.ByteString
152 anriss' x =
153   case x of
154     Nothing -> id
155     Just y  -> BL.take y
156
157 -- | callback cmdline with optional parameters
158 printopt :: (String, Maybe String) -> [String]
159 printopt (_, Nothing)  = []
160 printopt (opt, Just s) = [opt, s]
161
162 -- | prints header and head on stderr and returns
163 -- cmdline options and environment variables to callback
164 printmsg ::
165      Maybe Handle
166   -> (Message, Envelope)
167   -> Maybe Int64
168   -> ZonedTime
169   -> IO ([String], [(String, String)])
170 printmsg h (msg, envi) anR now = do
171   mapM_
172     (uncurry printparam)
173     [ ("routing key", rkey)
174     , ("message-id", messageid)
175     , ("headers", headers)
176     , ("content-type", ctype)
177     , ("content-encoding", cenc)
178     , ("redelivered", redeliv)
179     , ("timestamp", timestamp'')
180     , ("time now", now')
181     , ("size", size)
182     , ("priority", pri)
183     , ("type", mtype)
184     , ("user id", muserid)
185     , ("application id", mappid)
186     , ("cluster id", mclusterid)
187     , ("reply to", mreplyto)
188     , ("correlation id", mcorrid)
189     , ("expiration", mexp)
190     , ("delivery mode", mdelivmode)
191     ]
192   printparam label anriss
193   mapM_ (\hdl -> BL.hPut hdl body >> hFlush hdl) h
194   oldenv <- getEnvironment
195   let environment =
196         foldr
197           step
198           oldenv
199           [ ("ROUTINGKEY", rkey)
200           , ("CONTENTTYPE", ctype)
201           , ("ENCODING", cenc)
202           , ("MSGID", messageid)
203           , ("TIMESTAMP", timestamp)
204           , ("PRIORITY", pri)
205           , ("REDELIVERED", redeliv)
206           , ("SIZE", size)
207           , ("TYPE", mtype)
208           , ("USERID", muserid)
209           , ("APPID", mappid)
210           , ("CLUSTERID", mclusterid)
211           , ("REPLYTO", mreplyto)
212           , ("CORRID", mcorrid)
213           , ("EXPIRATION", mexp)
214           , ("DELIVERYMODE", mdelivmode)
215           ] ++
216         headersEnv
217   return (cmdline, environment)
218   where
219     step (_, Nothing) xs = xs
220     step (k, Just v) xs  = ("AMQP_" ++ k, v) : xs
221     cmdline =
222       concat
223         (map
224            printopt
225            [ ("-r", rkey)
226            , ("-m", ctype)
227            , ("-e", cenc)
228            , ("-i", messageid)
229            , ("-t", timestamp)
230            , ("-p", pri)
231            , ("-R", redeliv)
232            ] ++
233          headersOpt)
234     headers = fmap (formatheaders fieldshow) $ msgHeaders msg
235     headersOpt =
236       maybeToList $ fmap (formatheaders fieldshowOpt) $ msgHeaders msg
237     headersEnv =
238       concat . maybeToList $
239       fmap (formatheadersEnv fieldshowEnv) $ msgHeaders msg
240     body = msgBody msg
241     anriss =
242       if isimage ctype
243         then Nothing
244         else Just (anriss' anR body) :: Maybe BL.ByteString
245     anriss'' = maybe "" (\a -> "first " ++ (show a) ++ " bytes of ") anR
246     label = anriss'' ++ "body"
247     ctype = fmap T.unpack $ msgContentType msg
248     cenc = fmap T.unpack $ msgContentEncoding msg
249     rkey = Just . T.unpack $ envRoutingKey envi
250     messageid = fmap T.unpack $ msgID msg
251     pri = fmap show $ msgPriority msg
252     mtype = fmap T.unpack $ msgType msg
253     muserid = fmap T.unpack $ msgUserID msg
254     mappid = fmap T.unpack $ msgApplicationID msg
255     mclusterid = fmap T.unpack $ msgClusterID msg
256     mreplyto = fmap T.unpack $ msgReplyTo msg
257     mcorrid = fmap T.unpack $ msgCorrelationID msg
258     mexp = fmap T.unpack $ msgExpiration msg
259     mdelivmode = fmap show $ msgDeliveryMode msg
260     size = Just . show $ BL.length body
261     redeliv =
262       if envRedelivered envi
263         then Just "YES"
264         else Nothing
265     tz = zonedTimeZone now
266     nowutc = zonedTimeToUTCFLoor now
267     msgtime = msgTimestamp msg
268     msgtimeutc = fmap (posixSecondsToUTCTime . realToFrac) msgtime
269     timestamp = fmap show msgtime
270     timediff = fmap (difftime nowutc) msgtimeutc
271     now' =
272       case timediff of
273         Just "now" -> Nothing
274         _          -> showtime tz $ Just nowutc
275     timestamp' = showtime tz msgtimeutc
276     timestamp'' =
277       liftM3
278         (\a b c -> a ++ " (" ++ b ++ ") (" ++ c ++ ")")
279         timestamp
280         timestamp'
281         timediff
282
283 -- | timestamp conversion
284 zonedTimeToUTCFLoor :: ZonedTime -> UTCTime
285 zonedTimeToUTCFLoor x =
286   posixSecondsToUTCTime $
287   realToFrac ((floor . utcTimeToPOSIXSeconds . zonedTimeToUTC) x :: Timestamp)
288
289 -- | show the timestamp
290 showtime :: TimeZone -> Maybe UTCTime -> Maybe String
291 showtime tz = fmap (show . (utcToZonedTime tz))
292
293 -- | show difference between two timestamps
294 difftime :: UTCTime -> UTCTime -> String
295 difftime now msg
296   | now == msg = "now"
297   | now > msg = diff ++ " ago"
298   | otherwise = diff ++ " in the future"
299   where
300     diff = show (diffUTCTime now msg)
301
302 -- | if the message is to be saved
303 -- and maybe processed further
304 optionalFileStuff ::
305      (Message, Envelope)
306   -> [String]
307   -> [String]
308   -> String
309   -> Args
310   -> ThreadId
311   -> Maybe (ExitCode -> BL.ByteString -> IO ())
312   -> [(String, String)]
313   -> IO ()
314 optionalFileStuff (msg, envi) callbackoptions addi numstring a tid action environment = do
315   path <- saveFile (tempDir a) numstring (msgBody msg)
316   printparam "saved to" path
317   let callbackcmdline =
318         liftM2
319           (constructCallbackCmdLine (simple a) callbackoptions addi numstring)
320           (fileProcess a)
321           path
322   printparam "calling" callbackcmdline
323   maybe
324     (acke envi a)
325     (\c ->
326        forkFinally
327          (doProc a numstring envi c action path environment)
328          (either (throwTo tid) return) >>
329        return ())
330     callbackcmdline
331
332 -- | save message into temp file
333 saveFile :: Maybe String -> String -> BL.ByteString -> IO (Maybe String)
334 saveFile Nothing _ _ = return Nothing
335 saveFile (Just tempD) numstring body = do
336   (p, h) <-
337     openBinaryTempFileWithDefaultPermissions
338       tempD
339       ("amqp-utils-" ++ numstring ++ "-.tmp")
340   BL.hPut h body
341   hClose h
342   return $ Just p
343
344 -- | construct cmdline for callback script
345 constructCallbackCmdLine ::
346      Bool -> [String] -> [String] -> String -> String -> String -> [String]
347 constructCallbackCmdLine True _ addi _ exe path = exe : addi ++ path : []
348 constructCallbackCmdLine False opts addi num exe path =
349   exe : "-f" : path : "-n" : num : opts ++ addi
350
351 -- | call callback script
352 doProc ::
353      Args
354   -> String
355   -> Envelope
356   -> [String]
357   -> Maybe (ExitCode -> BL.ByteString -> IO ())
358   -> Maybe String
359   -> [(String, String)]
360   -> IO ()
361 doProc a numstring envi (exe:args) action path environment = do
362   (_, h, _, processhandle) <-
363     createProcess
364       (proc exe args)
365         {std_out = out, std_err = Inherit, env = Just environment'}
366   sout <- mapM BL.hGetContents h
367   exitcode <-
368     maybe 0 id (fmap BL.length sout) `seq` waitForProcess processhandle
369   printparam (numstring ++ " call returned") exitcode
370   if isJust action && isJust sout
371     then ((fromJust action $ exitcode) (fromJust sout)) >> acke envi a
372     else case exitcode of
373            ExitSuccess   -> acke envi a
374            ExitFailure _ -> reje envi a
375   if (cleanupTmpFile a)
376     then X.catch
377            (maybe (return ()) removeFile path)
378            (\e -> printparam "error removing temp file" (e :: X.IOException))
379     else return ()
380   where
381     out =
382       if isJust action
383         then CreatePipe
384         else Inherit
385     environment' =
386       ("AMQP_NUMBER", numstring) : ("AMQP_FILE", fromJust path) : environment
387 doProc _ _ _ _ _ _ _ = return ()
388
389 -- | ack
390 acke :: Envelope -> Args -> IO ()
391 acke envi a
392   | (ack a) = ackEnv envi
393   | otherwise = return ()
394
395 -- | reject
396 reje :: Envelope -> Args -> IO ()
397 reje envi a
398   | (ack a) = rejectEnv envi (requeuenack a)
399   | otherwise = return ()
400
401 -- | main loop: sleep forever or wait for an exception
402 sleepingBeauty :: IO (X.SomeException)
403 sleepingBeauty =
404   X.catch
405     (forever (threadDelay 600000000) >>
406      return (X.toException $ X.ErrorCall "not reached"))
407     return
408
409 -- | extract first input file in case only one is needed
410 firstInputFile :: [(String,String,String)] -> String
411 firstInputFile [] = "-"
412 firstInputFile ((x,_,_):_) = x