]> woffs.de Git - fd/haskell-amqp-utils.git/blob - agitprop.hs
Haskell2010
[fd/haskell-amqp-utils.git] / agitprop.hs
1 {-# LANGUAGE CPP #-}
2 -- generic AMQP publisher
3 import Control.Concurrent
4 import qualified Control.Exception as X
5 import Control.Monad (forever)
6 import qualified Data.ByteString.Lazy.Char8 as BL
7 #if MIN_VERSION_hinotify(0,3,10)
8 import qualified Data.ByteString.Char8 as BS
9 #endif
10 import Data.List (isSuffixOf)
11 import Data.Maybe
12 import qualified Data.Text as T
13 import Data.Time
14 import Data.Time.Clock.POSIX
15 import Data.Version (showVersion)
16 import Data.Word (Word64)
17 import Magic
18 import Network.AMQP
19 import Network.AMQP.Types
20 import Network.AMQP.Utils.Connection
21 import Network.AMQP.Utils.Helpers
22 import Network.AMQP.Utils.Options
23 import Paths_amqp_utils (version)
24 import System.Environment
25 import System.Exit
26 import System.INotify
27 import qualified System.Posix.Files as F
28
29 main :: IO ()
30 main = do
31   hr "starting"
32   tid <- myThreadId
33   args <- getArgs >>= parseargs 'a'
34   printparam "client version" ["amqp-utils", showVersion version]
35   printparam "routing key" $ rKey args
36   printparam "exchange" $ currentExchange args
37   isDir <-
38     if inputFile args == "-"
39       then return False
40       else F.getFileStatus (inputFile args) >>= return . F.isDirectory
41   if isDir
42     then printparam "hotfolder" $ inputFile args
43     else printparam
44            "input file"
45            [ inputFile args
46            , if (lineMode args)
47                then "(line-by-line)"
48                else ""
49            ]
50   (conn, chan) <- connect args
51   addChannelExceptionHandler chan (X.throwTo tid)
52   printparam "confirm mode" $ confirm args
53   if (confirm args)
54     then do
55       confirmSelect chan False
56       addConfirmationListener chan confirmCallback
57     else return ()
58   let publishOneMsg = publishOneMsg' chan args
59   X.catch
60     (if isDir
61        then do
62          inotify <- initINotify
63          wd <-
64            addWatch
65              inotify
66              [CloseWrite, MoveIn]
67 #if MIN_VERSION_hinotify(0,3,10)
68              (BS.pack (inputFile args))
69 #else
70              (inputFile args)
71 #endif
72              (handleEvent publishOneMsg (suffix args) (inputFile args))
73          hr $ "BEGIN watching " ++ (inputFile args)
74          _ <- forever $ threadDelay 1000000
75          removeWatch wd
76          hr $ "END watching " ++ (inputFile args)
77        else do
78          hr $ "BEGIN sending"
79          messageFile <-
80            if inputFile args == "-"
81              then BL.getContents
82              else BL.readFile (inputFile args)
83          if (lineMode args)
84            then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
85            else publishOneMsg (Just (inputFile args)) messageFile
86          hr "END sending")
87     exceptionHandler
88   -- all done. wait and close.
89   if (confirm args)
90     then waitForConfirms chan >>= printparam "confirmed"
91     else return ()
92   X.catch (closeConnection conn) exceptionHandler
93
94 -- | A handler for clean exit
95 exceptionHandler :: AMQPException -> IO ()
96 exceptionHandler (ChannelClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess
97 exceptionHandler (ConnectionClosedException Normal txt) = printparam "exit" txt >> exitWith ExitSuccess
98 exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
99
100 -- | The handler for publisher confirms
101 confirmCallback :: (Word64, Bool, AckType) -> IO ()
102 confirmCallback (deliveryTag, isAll, ackType) =
103   printparam
104     "confirmed"
105     [ show deliveryTag
106     , if isAll
107         then "all"
108         else "this"
109     , show ackType
110     ]
111
112 -- | Hotfolder event handler
113 handleEvent ::
114      (Maybe String -> BL.ByteString -> IO ())
115   -> [String]
116   -> String
117   -> Event
118   -> IO ()
119 -- just handle closewrite and movedin events
120 #if MIN_VERSION_hinotify(0,3,10)
121 handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ (BS.unpack x))
122 handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ (BS.unpack x))
123 #else
124 handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x)
125 handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x)
126 #endif
127 handleEvent _ _ _ _ = return ()
128
129 -- | Hotfolder file handler
130 handleFile ::
131      (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
132 handleFile _ _ ('.':_) = return () -- ignore hidden files
133 handleFile f s@(_:_) x =
134   if any (flip isSuffixOf x) s
135     then handleFile f [] x
136     else return ()
137 handleFile f [] x =
138   X.catch
139     (BL.readFile x >>= f (Just x))
140     (\e -> printparam "exception in handleFile" (e :: X.SomeException))
141
142 -- | Publish one message with our settings
143 publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO ()
144 publishOneMsg' c a fn f = do
145   printparam "sending" fn
146   (mtype, mencoding) <-
147     if (magic a) && isJust fn
148       then do
149         m <- magicOpen [MagicMimeType]
150         magicLoadDefault m
151         t <- magicFile m (fromJust fn)
152         magicSetFlags m [MagicMimeEncoding]
153         e <- magicFile m (fromJust fn)
154         return (Just (T.pack t), Just (T.pack e))
155       else return ((contenttype a), (contentencoding a))
156   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
157   publishMsg
158     c
159     (T.pack $ currentExchange a)
160     (T.pack $ rKey a)
161     newMsg
162       { msgBody = f
163       , msgDeliveryMode = persistent a
164       , msgTimestamp = Just now
165       , msgID = msgid a
166       , msgType = msgtype a
167       , msgUserID = userid a
168       , msgApplicationID = appid a
169       , msgClusterID = clusterid a
170       , msgContentType = mtype
171       , msgContentEncoding = mencoding
172       , msgReplyTo = replyto a
173       , msgPriority = prio a
174       , msgCorrelationID = corrid a
175       , msgExpiration = msgexp a
176       , msgHeaders = substheader (fnheader a) fn $ msgheader a
177       } >>=
178     printparam "sent"
179   where
180     substheader ::
181          [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
182     substheader (s:r) (Just fname) old =
183       substheader r (Just fname) (addheader old (s ++ "=" ++ fname))
184     substheader _ _ old = old