]> woffs.de Git - fd/haskell-amqp-utils.git/blob - agitprop.hs
formatting
[fd/haskell-amqp-utils.git] / agitprop.hs
1 {-# LANGUAGE CPP #-}
2
3 -- generic AMQP publisher
4 import Control.Concurrent
5 import qualified Control.Exception as X
6 import Control.Monad (forever)
7 import qualified Data.ByteString.Lazy.Char8 as BL
8 #if MIN_VERSION_hinotify(0,3,10)
9 import qualified Data.ByteString.Char8 as BS
10 #endif
11 import Data.List (isSuffixOf)
12 import Data.Maybe
13 import qualified Data.Text as T
14 import Data.Time
15 import Data.Time.Clock.POSIX
16 import Data.Version (showVersion)
17 import Data.Word (Word64)
18 import Magic
19 import Network.AMQP
20 import Network.AMQP.Types
21 import Network.AMQP.Utils.Connection
22 import Network.AMQP.Utils.Helpers
23 import Network.AMQP.Utils.Options
24 import Paths_amqp_utils (version)
25 import System.Environment
26 import System.Exit
27 import System.INotify
28 import qualified System.Posix.Files as F
29
30 main :: IO ()
31 main = do
32   hr "starting"
33   tid <- myThreadId
34   args <- getArgs >>= parseargs 'a'
35   printparam "client version" ["amqp-utils", showVersion version]
36   printparam "routing key" $ rKey args
37   printparam "exchange" $ currentExchange args
38   isDir <-
39     if inputFile args == "-"
40       then return False
41       else F.getFileStatus (inputFile args) >>= return . F.isDirectory
42   if isDir
43     then printparam "hotfolder" $ inputFile args
44     else printparam
45            "input file"
46            [ inputFile args
47            , if (lineMode args)
48                then "(line-by-line)"
49                else ""
50            ]
51   (conn, chan) <- connect args
52   addChannelExceptionHandler chan (X.throwTo tid)
53   printparam "confirm mode" $ confirm args
54   if (confirm args)
55     then do
56       confirmSelect chan False
57       addConfirmationListener chan confirmCallback
58     else return ()
59   let publishOneMsg = publishOneMsg' chan args
60   X.catch
61     (if isDir
62        then do
63          inotify <- initINotify
64          wd <-
65            addWatch
66              inotify
67              [CloseWrite, MoveIn]
68 #if MIN_VERSION_hinotify(0,3,10)
69              (BS.pack (inputFile args))
70 #else
71              (inputFile args)
72 #endif
73              (handleEvent publishOneMsg (suffix args) (inputFile args))
74          hr $ "BEGIN watching " ++ (inputFile args)
75          _ <- forever $ threadDelay 1000000
76          removeWatch wd
77          hr $ "END watching " ++ (inputFile args)
78        else do
79          hr $ "BEGIN sending"
80          messageFile <-
81            if inputFile args == "-"
82              then BL.getContents
83              else BL.readFile (inputFile args)
84          if (lineMode args)
85            then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
86            else publishOneMsg (Just (inputFile args)) messageFile
87          hr "END sending")
88     exceptionHandler
89   -- all done. wait and close.
90   if (confirm args)
91     then waitForConfirms chan >>= printparam "confirmed"
92     else return ()
93   X.catch (closeConnection conn) exceptionHandler
94
95 -- | A handler for clean exit
96 exceptionHandler :: AMQPException -> IO ()
97 exceptionHandler (ChannelClosedException Normal txt) =
98   printparam "exit" txt >> exitWith ExitSuccess
99 exceptionHandler (ConnectionClosedException Normal txt) =
100   printparam "exit" txt >> exitWith ExitSuccess
101 exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
102
103 -- | The handler for publisher confirms
104 confirmCallback :: (Word64, Bool, AckType) -> IO ()
105 confirmCallback (deliveryTag, isAll, ackType) =
106   printparam
107     "confirmed"
108     [ show deliveryTag
109     , if isAll
110         then "all"
111         else "this"
112     , show ackType
113     ]
114
115 -- | Hotfolder event handler
116 handleEvent ::
117      (Maybe String -> BL.ByteString -> IO ())
118   -> [String]
119   -> String
120   -> Event
121   -> IO ()
122 -- just handle closewrite and movedin events
123 #if MIN_VERSION_hinotify(0,3,10)
124 handleEvent f s p (Closed False (Just x) True) =
125   handleFile f s (p ++ "/" ++ (BS.unpack x))
126 handleEvent f s p (MovedIn False x _) =
127   handleFile f s (p ++ "/" ++ (BS.unpack x))
128 #else
129 handleEvent f s p (Closed False (Just x) True) = handleFile f s (p ++ "/" ++ x)
130 handleEvent f s p (MovedIn False x _) = handleFile f s (p ++ "/" ++ x)
131 #endif
132 handleEvent _ _ _ _ = return ()
133
134 -- | Hotfolder file handler
135 handleFile ::
136      (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
137 handleFile _ _ ('.':_) = return () -- ignore hidden files
138 handleFile f s@(_:_) x =
139   if any (flip isSuffixOf x) s
140     then handleFile f [] x
141     else return ()
142 handleFile f [] x =
143   X.catch
144     (BL.readFile x >>= f (Just x))
145     (\e -> printparam "exception in handleFile" (e :: X.SomeException))
146
147 -- | Publish one message with our settings
148 publishOneMsg' :: Channel -> Args -> Maybe String -> BL.ByteString -> IO ()
149 publishOneMsg' c a fn f = do
150   printparam "sending" fn
151   (mtype, mencoding) <-
152     if (magic a) && isJust fn
153       then do
154         m <- magicOpen [MagicMimeType]
155         magicLoadDefault m
156         t <- magicFile m (fromJust fn)
157         magicSetFlags m [MagicMimeEncoding]
158         e <- magicFile m (fromJust fn)
159         return (Just (T.pack t), Just (T.pack e))
160       else return ((contenttype a), (contentencoding a))
161   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
162   publishMsg
163     c
164     (T.pack $ currentExchange a)
165     (T.pack $ rKey a)
166     newMsg
167       { msgBody = f
168       , msgDeliveryMode = persistent a
169       , msgTimestamp = Just now
170       , msgID = msgid a
171       , msgType = msgtype a
172       , msgUserID = userid a
173       , msgApplicationID = appid a
174       , msgClusterID = clusterid a
175       , msgContentType = mtype
176       , msgContentEncoding = mencoding
177       , msgReplyTo = replyto a
178       , msgPriority = prio a
179       , msgCorrelationID = corrid a
180       , msgExpiration = msgexp a
181       , msgHeaders = substheader (fnheader a) fn $ msgheader a
182       } >>=
183     printparam "sent"
184   where
185     substheader ::
186          [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
187     substheader (s:r) (Just fname) old =
188       substheader r (Just fname) (addheader old (s ++ "=" ++ fname))
189     substheader _ _ old = old