]> woffs.de Git - fd/haskell-amqp-utils.git/blob - agitprop.hs
update reuse compat
[fd/haskell-amqp-utils.git] / agitprop.hs
1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
2 --
3 -- SPDX-License-Identifier: GPL-3.0-only
4
5 {-# LANGUAGE CPP               #-}
6 {-# LANGUAGE OverloadedStrings #-}
7
8 -- generic AMQP publisher
9 import           Control.Concurrent
10 import qualified Control.Exception                as X
11 import           Control.Monad                    (forM_)
12 import qualified Data.ByteString.Char8            as BS
13 import qualified Data.ByteString.Lazy.Char8       as BL
14 import qualified Data.Map                         as M
15 import           Data.Maybe
16 import qualified Data.Text                        as T
17 import           Data.Time
18 import           Data.Time.Clock.POSIX
19 import           Data.Version                     (showVersion)
20 import           Data.Word                        (Word64)
21 import           Magic
22 import           Network.AMQP
23 import           Network.AMQP.Types
24 import           Network.AMQP.Utils.Connection
25 import           Network.AMQP.Utils.Helpers
26 import           Network.AMQP.Utils.Options
27 import           Paths_amqp_utils                 (version)
28 import qualified RawFilePath.Directory            as RD
29 import           System.Environment
30 import           System.Exit
31 import           System.FilePath.Posix.ByteString
32 #if linux_HOST_OS
33 import           System.INotify
34 #endif
35 import qualified System.Posix.Files.ByteString    as F
36 import           System.IO
37
38 main :: IO ()
39 main = do
40   hr "starting"
41   tid <- myThreadId
42   args <- getArgs >>= parseargs 'a'
43   hSetBuffering stdout LineBuffering
44   hSetBuffering stderr LineBuffering
45   printparam "client version" ["amqp-utils", showVersion version]
46   printparam "routing key" $ rKey args
47   printparam "exchange" $ currentExchange args
48   (conn, chan) <- connect args
49   addChannelExceptionHandler chan (X.throwTo tid)
50   printparam "confirm mode" $ confirm args
51   if (confirm args)
52     then do
53       confirmSelect chan False
54       addConfirmationListener chan confirmCallback
55     else return ()
56   let inputFile' = firstInputFile (inputFiles args)
57   isDir <-
58     if inputFile' == "-"
59       then return False
60       else F.getFileStatus inputFile' >>= return . F.isDirectory
61   let publishOneMsg =
62         publishOneMsg' chan args {removeSentFile = removeSentFile args && isDir}
63   if isDir
64     then do
65       printparam "hotfolder mode" True
66       printparam "initial scan" (initialScan args)
67       if isNothing (moveSentFileTo args)
68         then printparam "remove sent file" (removeSentFile args)
69         else printparam "move sent file to" (moveSentFileTo args)
70     else printparam
71            "input file"
72            [ inputFile'
73            , if (lineMode args)
74                then "(line-by-line)"
75                else ""
76            ]
77   X.catch
78     (if isDir
79        then do
80 #if linux_HOST_OS
81          wds <- mapM (watchHotfolder args publishOneMsg) (inputFiles args)
82          sleepingBeauty >>= (\x -> do
83            forM_ wds (\(wd,folder) -> do
84              removeWatch wd
85              printparam "END watching" folder
86              )
87            X.throw x)
88 #else
89          X.throw (X.ErrorCall "ERROR: watching a directory is only supported in Linux")
90 #endif
91        else do
92          hr $ "BEGIN sending"
93          messageFile <-
94            if inputFile' == "-"
95              then BL.getContents
96              else readFileRawLazy inputFile'
97          if (lineMode args)
98            then mapM_ (publishOneMsg (currentExchange args) (rKey args) Nothing) (BL.lines messageFile)
99            else publishOneMsg (currentExchange args) (rKey args) (Just (inputFile')) messageFile
100          hr "END sending"
101          if (confirm args)
102            then waitForConfirms chan >>= printparam "confirmed"
103            else return ()
104          X.catch (closeConnection conn) exceptionHandler
105          )
106     exceptionHandler
107
108 #if linux_HOST_OS
109 -- | watch a hotfolder
110 watchHotfolder ::
111      Args
112   -> (String -> String -> Maybe RawFilePath -> BL.ByteString -> IO ())
113   -> (RawFilePath, String, String)
114   -> IO (WatchDescriptor, RawFilePath)
115 watchHotfolder args publishOneMsg (folder, exchange, rkey) = do
116   printparam "hotfolder" folder
117   inotify <- initINotify
118   wd <-
119    addWatch
120      inotify
121      [CloseWrite, MoveIn]
122      folder
123      (handleEvent (publishOneMsg exchange rkey) (suffix args) folder)
124   hr "BEGIN watching"
125   if (initialScan args)
126    then RD.listDirectory folder >>=
127         mapM_ (\fn -> handleFile (publishOneMsg exchange rkey) (suffix args) (folder </> fn))
128    else return ()
129   return (wd,folder)
130 #endif
131
132 -- | A handler for clean exit
133 exceptionHandler :: AMQPException -> IO ()
134 exceptionHandler (ChannelClosedException Normal txt) =
135   printparam "exit" txt >> exitWith ExitSuccess
136 exceptionHandler (ConnectionClosedException Normal txt) =
137   printparam "exit" txt >> exitWith ExitSuccess
138 exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
139
140 -- | The handler for publisher confirms
141 confirmCallback :: (Word64, Bool, AckType) -> IO ()
142 confirmCallback (deliveryTag, isAll, ackType) =
143   printparam
144     "confirmed"
145     [ show deliveryTag
146     , if isAll
147         then "all"
148         else "this"
149     , show ackType
150     ]
151
152 #if linux_HOST_OS
153 -- | Hotfolder event handler
154 handleEvent ::
155      (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> RawFilePath -> Event -> IO ()
156 -- just handle closewrite and movedin events
157 handleEvent func suffixes folder (Closed False (Just fileName) True) =
158   handleFile func suffixes (folder </> fileName)
159 handleEvent func suffixes folder (MovedIn False fileName _) =
160   handleFile func suffixes (folder </> fileName)
161 handleEvent _ _ _ _ = return ()
162
163 -- | Hotfolder file handler
164 handleFile ::
165      (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> RawFilePath -> IO ()
166 handleFile func suffixes@(_:_) fileName =
167   if (any (flip BS.isSuffixOf fileName) suffixes) && not ("." `BS.isPrefixOf` fileName)
168     then handleFile func [] fileName
169     else return ()
170 handleFile func [] fileName =
171   X.catch
172     (readFileRawLazy fileName >>= func (Just fileName))
173     (\e -> printparam "exception while processing" fileName >> printparam "exception in handleFile" (e :: X.IOException))
174 #endif
175
176 -- | Publish one message with our settings
177 publishOneMsg' ::
178      Channel
179   -> Args
180   -> String
181   -> String
182   -> Maybe RawFilePath
183   -> BL.ByteString
184   -> IO ()
185 publishOneMsg' chan a exchange rkey fn content = do
186   printparam "sending" fn
187   (mtype, mencoding) <-
188     if (magic a)
189       then do
190         let firstchunk = if BL.null content then BS.empty else head $ BL.toChunks content
191         m <- magicOpen [MagicMimeType]
192         magicLoadDefault m
193         t <- BS.useAsCStringLen firstchunk (magicCString m)
194         magicSetFlags m [MagicMimeEncoding]
195         e <- BS.useAsCStringLen firstchunk (magicCString m)
196         return (Just (T.pack t), Just (T.pack e))
197       else return ((contenttype a), (contentencoding a))
198   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
199   publishMsg
200     chan
201     (T.pack $ exchange)
202     (T.pack $ rkey)
203     newMsg
204       { msgBody = content
205       , msgDeliveryMode = persistent a
206       , msgTimestamp = Just now
207       , msgID = msgid a
208       , msgType = msgtype a
209       , msgUserID = userid a
210       , msgApplicationID = appid a
211       , msgClusterID = clusterid a
212       , msgContentType = mtype
213       , msgContentEncoding = mencoding
214       , msgReplyTo = replyto a
215       , msgPriority = prio a
216       , msgCorrelationID = corrid a
217       , msgExpiration = msgexp a
218       , msgHeaders = substheader (fnheader a) (fmap takeFileName fn) $ msgheader a
219       } >>=
220     printparam "sent"
221   removeSentFileIfRequested (removeSentFile a) (moveSentFileTo a) fn
222   where
223     substheader ::
224          [String] -> Maybe BS.ByteString -> Maybe FieldTable -> Maybe FieldTable
225     substheader (s:r) (Just fname) old =
226       substheader r (Just fname) (addheader' old s fname)
227     substheader _ _ old = old
228     removeSentFileIfRequested False _ _ = return ()
229     removeSentFileIfRequested True _ Nothing = return ()
230     removeSentFileIfRequested True Nothing (Just fname) =
231       printparam "removing" fname >> RD.removeFile fname
232     removeSentFileIfRequested True (Just path) (Just fname) =
233       printparam "moving" [fname,"to",path] >>
234       F.rename fname (replaceDirectory fname ((takeDirectory fname) </> path))
235     addheader' :: Maybe FieldTable -> String -> BS.ByteString -> Maybe FieldTable
236     addheader' Nothing k v =
237       Just $ FieldTable $ M.singleton (T.pack k) (FVString v)
238     addheader' (Just (FieldTable oldheader)) k v =
239       Just $ FieldTable $ M.insert (T.pack k) (FVString v) oldheader