]> woffs.de Git - fd/haskell-amqp-utils.git/blob - agitprop.hs
fbc8efc796932a2dfe79840c8df75d9829328798
[fd/haskell-amqp-utils.git] / agitprop.hs
1 {-# LANGUAGE CPP #-}
2
3 -- generic AMQP publisher
4 import           Control.Concurrent
5 import qualified Control.Exception             as X
6 import           Control.Monad                 (forever)
7 import qualified Data.ByteString.Lazy.Char8    as BL
8 #if MIN_VERSION_hinotify(0,3,10)
9 import qualified Data.ByteString.Char8         as BS
10 #endif
11 import           Data.List                     (isSuffixOf)
12 import           Data.Maybe
13 import qualified Data.Text                     as T
14 import           Data.Time
15 import           Data.Time.Clock.POSIX
16 import           Data.Version                  (showVersion)
17 import           Data.Word                     (Word64)
18 import           Magic
19 import           Network.AMQP
20 import           Network.AMQP.Types
21 import           Network.AMQP.Utils.Connection
22 import           Network.AMQP.Utils.Helpers
23 import           Network.AMQP.Utils.Options
24 import           Paths_amqp_utils              (version)
25 import           System.Directory
26 import           System.Environment
27 import           System.Exit
28 import           System.INotify
29 import qualified System.Posix.Files            as F
30
31 main :: IO ()
32 main = do
33   hr "starting"
34   tid <- myThreadId
35   args <- getArgs >>= parseargs 'a'
36   printparam "client version" ["amqp-utils", showVersion version]
37   printparam "routing key" $ rKey args
38   printparam "exchange" $ currentExchange args
39   isDir <-
40     if inputFile args == "-"
41       then return False
42       else F.getFileStatus (inputFile args) >>= return . F.isDirectory
43   if isDir
44     then printparam "hotfolder" $ inputFile args
45     else printparam
46            "input file"
47            [ inputFile args
48            , if (lineMode args)
49                then "(line-by-line)"
50                else ""
51            ]
52   (conn, chan) <- connect args
53   addChannelExceptionHandler chan (X.throwTo tid)
54   printparam "confirm mode" $ confirm args
55   if (confirm args)
56     then do
57       confirmSelect chan False
58       addConfirmationListener chan confirmCallback
59     else return ()
60   let publishOneMsg = publishOneMsg' chan args
61   X.catch
62     (if isDir
63        then do
64          if (initialScan args)
65            then getDirectoryContents (inputFile args) >>= mapM_ (\fn -> handleFile publishOneMsg (suffix args) ((inputFile args) ++ "/" ++ fn))
66            else return()
67          inotify <- initINotify
68          wd <-
69            addWatch
70              inotify
71              [CloseWrite, MoveIn]
72 #if MIN_VERSION_hinotify(0,3,10)
73              (BS.pack (inputFile args))
74 #else
75              (inputFile args)
76 #endif
77              (handleEvent publishOneMsg (suffix args) (inputFile args))
78          hr $ "BEGIN watching " ++ (inputFile args)
79          _ <- forever $ threadDelay 1000000
80          removeWatch wd
81          hr $ "END watching " ++ (inputFile args)
82        else do
83          hr $ "BEGIN sending"
84          messageFile <-
85            if inputFile args == "-"
86              then BL.getContents
87              else BL.readFile (inputFile args)
88          if (lineMode args)
89            then mapM_ (publishOneMsg Nothing) (BL.lines messageFile)
90            else publishOneMsg (Just (inputFile args)) messageFile
91          hr "END sending")
92     exceptionHandler
93   -- all done. wait and close.
94   if (confirm args)
95     then waitForConfirms chan >>= printparam "confirmed"
96     else return ()
97   X.catch (closeConnection conn) exceptionHandler
98
99 -- | A handler for clean exit
100 exceptionHandler :: AMQPException -> IO ()
101 exceptionHandler (ChannelClosedException Normal txt) =
102   printparam "exit" txt >> exitWith ExitSuccess
103 exceptionHandler (ConnectionClosedException Normal txt) =
104   printparam "exit" txt >> exitWith ExitSuccess
105 exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
106
107 -- | The handler for publisher confirms
108 confirmCallback :: (Word64, Bool, AckType) -> IO ()
109 confirmCallback (deliveryTag, isAll, ackType) =
110   printparam
111     "confirmed"
112     [ show deliveryTag
113     , if isAll
114         then "all"
115         else "this"
116     , show ackType
117     ]
118
119 -- | Hotfolder event handler
120 handleEvent ::
121      (Maybe String -> BL.ByteString -> IO ())
122   -> [String]
123   -> String
124   -> Event
125   -> IO ()
126 -- just handle closewrite and movedin events
127 #if MIN_VERSION_hinotify(0,3,10)
128 handleEvent func suffixes path (Closed False (Just fileName) True) =
129   handleFile func suffixes (path ++ "/" ++ (BS.unpack fileName))
130 handleEvent func suffixes path (MovedIn False fileName _) =
131   handleFile func suffixes (path ++ "/" ++ (BS.unpack fileName))
132 #else
133 handleEvent func suffixes path (Closed False (Just fileName) True) = handleFile func suffixes (path ++ "/" ++ fileName)
134 handleEvent func suffixes path (MovedIn False fileName _) = handleFile func suffixes (path ++ "/" ++ fileName)
135 #endif
136 handleEvent _ _ _ _ = return ()
137
138 -- | Hotfolder file handler
139 handleFile ::
140      (Maybe String -> BL.ByteString -> IO ()) -> [String] -> FilePath -> IO ()
141 handleFile _ _ ('.':_) = return () -- ignore hidden files
142 handleFile func suffixes@(_:_) fileName =
143   if any (flip isSuffixOf fileName) suffixes
144     then handleFile func [] fileName
145     else return ()
146 handleFile func [] fileName =
147   X.catch
148     (BL.readFile fileName >>= func (Just fileName))
149     (\e -> printparam "exception in handleFile" (e :: X.SomeException))
150
151 -- | Publish one message with our settings
152 publishOneMsg' :: Channel -> Args -> Maybe FilePath -> BL.ByteString -> IO ()
153 publishOneMsg' chan a fn content = do
154   printparam "sending" fn
155   (mtype, mencoding) <-
156     if (magic a) && isJust fn
157       then do
158         m <- magicOpen [MagicMimeType]
159         magicLoadDefault m
160         t <- magicFile m (fromJust fn)
161         magicSetFlags m [MagicMimeEncoding]
162         e <- magicFile m (fromJust fn)
163         return (Just (T.pack t), Just (T.pack e))
164       else return ((contenttype a), (contentencoding a))
165   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
166   publishMsg
167     chan
168     (T.pack $ currentExchange a)
169     (T.pack $ rKey a)
170     newMsg
171       { msgBody = content
172       , msgDeliveryMode = persistent a
173       , msgTimestamp = Just now
174       , msgID = msgid a
175       , msgType = msgtype a
176       , msgUserID = userid a
177       , msgApplicationID = appid a
178       , msgClusterID = clusterid a
179       , msgContentType = mtype
180       , msgContentEncoding = mencoding
181       , msgReplyTo = replyto a
182       , msgPriority = prio a
183       , msgCorrelationID = corrid a
184       , msgExpiration = msgexp a
185       , msgHeaders = substheader (fnheader a) fn $ msgheader a
186       } >>=
187     printparam "sent"
188   removeSentFileIfRequested (removeSentFile a) fn
189   where
190     substheader ::
191          [String] -> Maybe String -> Maybe FieldTable -> Maybe FieldTable
192     substheader (s:r) (Just fname) old =
193       substheader r (Just fname) (addheader old (s ++ "=" ++ fname))
194     substheader _ _ old = old
195     removeSentFileIfRequested False _           = return ()
196     removeSentFileIfRequested True Nothing      = return ()
197     removeSentFileIfRequested True (Just fname) = printparam "removing" fname >> removeFile fname
don't click here