]> woffs.de Git - fd/haskell-amqp-utils.git/blob - agitprop.hs
WiP rabbit hole
[fd/haskell-amqp-utils.git] / agitprop.hs
1 -- SPDX-FileCopyrightText: 2022 Frank Doepper
2 --
3 -- SPDX-License-Identifier: GPL-3.0-only
4
5 {-# LANGUAGE CPP               #-}
6 {-# LANGUAGE OverloadedStrings #-}
7
8 -- generic AMQP publisher
9 import           Control.Concurrent
10 import qualified Control.Exception                as X
11 import           Control.Monad                    (forM_)
12 import qualified Data.ByteString.Char8            as BS
13 import qualified Data.ByteString.Lazy.Char8       as BL
14 import qualified Data.Map                         as M
15 import           Data.Maybe
16 import qualified Data.Text                        as T
17 import           Data.Time
18 import           Data.Time.Clock.POSIX
19 import           Data.Version                     (showVersion)
20 import           Data.Word                        (Word64)
21 import           Magic
22 import           Network.AMQP
23 import           Network.AMQP.Types
24 import           Network.AMQP.Utils.Connection
25 import           Network.AMQP.Utils.Helpers
26 import           Network.AMQP.Utils.Options
27 import           Paths_amqp_utils                 (version)
28 import qualified System.Directory.OsPath          as DO
29 import           System.Environment
30 import           System.Exit
31 #if linux_HOST_OS
32 import           System.INotify
33 #endif
34 import           System.Posix.ByteString           (RawFilePath)
35 import qualified System.Posix.Files.ByteString    as FB
36 import qualified System.Posix.Files.PosixString   as FP
37 import qualified System.OsPath                    as OS
38 import qualified System.File.OsPath               as FOS
39
40 main :: IO ()
41 main = do
42   hr "starting"
43   tid <- myThreadId
44   args <- getArgs >>= parseargs 'a'
45   printparam "client version" ["amqp-utils", showVersion version]
46   printparam "routing key" $ rKey args
47   printparam "exchange" $ currentExchange args
48   (conn, chan) <- connect args
49   addChannelExceptionHandler chan (X.throwTo tid)
50   printparam "confirm mode" $ confirm args
51   if (confirm args)
52     then do
53       confirmSelect chan False
54       addConfirmationListener chan confirmCallback
55     else return ()
56   let inputFile' = firstInputFile (inputFiles args)
57   isDir <-
58     if inputFile' == "-"
59       then return False
60       else FB.getFileStatus inputFile' >>= return . FB.isDirectory
61   let publishOneMsg =
62         publishOneMsg' chan args {removeSentFile = removeSentFile args && isDir}
63   if isDir
64     then do
65       printparam "hotfolder mode" True
66       printparam "initial scan" (initialScan args)
67       if isNothing (moveSentFileTo args)
68         then printparam "remove sent file" (removeSentFile args)
69         else printparam "move sent file to" (moveSentFileTo args)
70     else printparam
71            "input file"
72            [ inputFile'
73            , if (lineMode args)
74                then "(line-by-line)"
75                else ""
76            ]
77   X.catch
78     (if isDir
79        then do
80 #if linux_HOST_OS
81          wds <- mapM (watchHotfolder args publishOneMsg) (inputFiles args)
82          sleepingBeauty >>= (\x -> do
83            forM_ wds (\(wd,folder) -> do
84              removeWatch wd
85              printparam "END watching" folder
86              )
87            X.throw x)
88 #else
89          X.throw (X.ErrorCall "ERROR: watching a directory is only supported in Linux")
90 #endif
91        else do
92          hr $ "BEGIN sending"
93          messageFile <-
94            if inputFile' == "-"
95              then BL.getContents
96              else OS.encodeUtf (BS.unpack inputFile') >>= FOS.readFile
97          if (lineMode args)
98            then mapM_ (publishOneMsg (currentExchange args) (rKey args) Nothing) (BL.lines messageFile)
99            else publishOneMsg (currentExchange args) (rKey args) (Just (inputFile')) messageFile
100          hr "END sending"
101          if (confirm args)
102            then waitForConfirms chan >>= printparam "confirmed"
103            else return ()
104          X.catch (closeConnection conn) exceptionHandler
105          )
106     exceptionHandler
107
108 #if linux_HOST_OS
109 -- | watch a hotfolder
110 watchHotfolder ::
111      Args
112   -> (String -> String -> Maybe RawFilePath -> BL.ByteString -> IO ())
113   -> (RawFilePath, String, String)
114   -> IO (WatchDescriptor, RawFilePath)
115 watchHotfolder args publishOneMsg (folder, exchange, rkey) = do
116   printparam "hotfolder" folder
117   inotify <- initINotify
118   wd <-
119    addWatch
120      inotify
121      [CloseWrite, MoveIn]
122      folder
123      (handleEvent (publishOneMsg exchange rkey) (suffix args) folder)
124   hr "BEGIN watching"
125   if (initialScan args)
126     then do
127       folder' <- OS.encodeUtf (BS.unpack folder)
128       DO.listDirectory folder' >>=
129         mapM_ (\fn -> handleFile (publishOneMsg exchange rkey) (suffix args) (folder' OS.</> fn))
130     else return ()
131   return (wd,folder)
132 #endif
133
134 -- | A handler for clean exit
135 exceptionHandler :: AMQPException -> IO ()
136 exceptionHandler (ChannelClosedException Normal txt) =
137   printparam "exit" txt >> exitWith ExitSuccess
138 exceptionHandler (ConnectionClosedException Normal txt) =
139   printparam "exit" txt >> exitWith ExitSuccess
140 exceptionHandler x = printparam "exception" x >> exitWith (ExitFailure 1)
141
142 -- | The handler for publisher confirms
143 confirmCallback :: (Word64, Bool, AckType) -> IO ()
144 confirmCallback (deliveryTag, isAll, ackType) =
145   printparam
146     "confirmed"
147     [ show deliveryTag
148     , if isAll
149         then "all"
150         else "this"
151     , show ackType
152     ]
153
154 #if linux_HOST_OS
155 -- | Hotfolder event handler
156 handleEvent ::
157      (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> OS.OsPath -> Event -> IO ()
158 -- just handle closewrite and movedin events
159 handleEvent func suffixes folder (Closed False (Just fileName) True) =
160   handleFile func suffixes (folder OS.</> fileName)
161 handleEvent func suffixes folder (MovedIn False fileName _) =
162   handleFile func suffixes (folder OS.</> fileName)
163 handleEvent _ _ _ _ = return ()
164
165 -- | Hotfolder file handler
166 handleFile ::
167      (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> OS.OsPath -> IO ()
168 handleFile func suffixes@(_:_) fileName =
169   if (any (flip BS.isSuffixOf fileName) suffixes) && not ("." `BS.isPrefixOf` fileName)
170     then handleFile func [] fileName
171     else return ()
172 handleFile func [] fileName =
173   X.catch
174     (FOS.readFile fileName >>= func (Just fileName))
175     (\e -> printparam "exception while processing" fileName >> printparam "exception in handleFile" (e :: X.IOException))
176 #endif
177
178 -- | Publish one message with our settings
179 publishOneMsg' ::
180      Channel
181   -> Args
182   -> String
183   -> String
184   -> Maybe RawFilePath
185   -> BL.ByteString
186   -> IO ()
187 publishOneMsg' chan a exchange rkey fn content = do
188   printparam "sending" fn
189   (mtype, mencoding) <-
190     if (magic a)
191       then do
192         let firstchunk = if BL.null content then BS.empty else head $ BL.toChunks content
193         m <- magicOpen [MagicMimeType]
194         magicLoadDefault m
195         t <- BS.useAsCStringLen firstchunk (magicCString m)
196         magicSetFlags m [MagicMimeEncoding]
197         e <- BS.useAsCStringLen firstchunk (magicCString m)
198         return (Just (T.pack t), Just (T.pack e))
199       else return ((contenttype a), (contentencoding a))
200   now <- getCurrentTime >>= return . floor . utcTimeToPOSIXSeconds
201   publishMsg
202     chan
203     (T.pack $ exchange)
204     (T.pack $ rkey)
205     newMsg
206       { msgBody = content
207       , msgDeliveryMode = persistent a
208       , msgTimestamp = Just now
209       , msgID = msgid a
210       , msgType = msgtype a
211       , msgUserID = userid a
212       , msgApplicationID = appid a
213       , msgClusterID = clusterid a
214       , msgContentType = mtype
215       , msgContentEncoding = mencoding
216       , msgReplyTo = replyto a
217       , msgPriority = prio a
218       , msgCorrelationID = corrid a
219       , msgExpiration = msgexp a
220       , msgHeaders = substheader (fnheader a) (fmap OS.takeFileName fn) $ msgheader a
221       } >>=
222     printparam "sent"
223   removeSentFileIfRequested (removeSentFile a) (moveSentFileTo a) fn
224   where
225     substheader ::
226          [String] -> Maybe BS.ByteString -> Maybe FieldTable -> Maybe FieldTable
227     substheader (s:r) (Just fname) old =
228       substheader r (Just fname) (addheader' old s fname)
229     substheader _ _ old = old
230     removeSentFileIfRequested False _ _ = return ()
231     removeSentFileIfRequested True _ Nothing = return ()
232     removeSentFileIfRequested True Nothing (Just fname) =
233       printparam "removing" fname >> DO.removeFile fname
234     removeSentFileIfRequested True (Just path) (Just fname) =
235       printparam "moving" [fname,"to",path] >>
236       FP.rename fname (OS.replaceDirectory fname ((OS.takeDirectory fname) OS.</> path))
237     addheader' :: Maybe FieldTable -> String -> BS.ByteString -> Maybe FieldTable
238     addheader' Nothing k v =
239       Just $ FieldTable $ M.singleton (T.pack k) (FVString v)
240     addheader' (Just (FieldTable oldheader)) k v =
241       Just $ FieldTable $ M.insert (T.pack k) (FVString v) oldheader