Safe Haskell | None |
---|---|
Language | Haskell2010 |
Core.Program.Threads
Description
Utility functions for running Program
actions concurrently.
Haskell uses green threads: small lines of work that are scheduled down onto actual execution contexts (set by default by this library to be one per core). Haskell threads are incredibly lightweight, and you are encouraged to use them freely. Haskell provides a rich ecosystem of tools to do work concurrently and to communicate safely between threads.
This module provides wrappers around some of these primatives so you can use
them easily from the Program
monad.
Note that when you fire off a new thread the top-level application state is
shared; it's the same τ
inherited from the parent Program
.
Synopsis
- forkThread :: Program τ α -> Program τ (Thread α)
- waitThread :: Thread α -> Program τ α
- waitThread_ :: Thread α -> Program τ ()
- waitThread' :: Thread α -> Program τ (Either SomeException α)
- waitThreads' :: [Thread α] -> Program τ [Either SomeException α]
- linkThread :: Thread α -> Program τ ()
- cancelThread :: Thread α -> Program τ ()
- concurrentThreads :: Program τ α -> Program τ β -> Program τ (α, β)
- concurrentThreads_ :: Program τ α -> Program τ β -> Program τ ()
- raceThreads :: Program τ α -> Program τ β -> Program τ (Either α β)
- raceThreads_ :: Program τ α -> Program τ β -> Program τ ()
- data Thread α
Concurrency
forkThread :: Program τ α -> Program τ (Thread α) Source #
Fork a thread. The child thread will run in the same Context
as the calling
Program
, including sharing the user-defined application state value.
Threads that are launched off as children are on their own! If the code in the child thread throws an exception that is not caught within that thread, the exception will kill the thread. Threads dying without telling anyone is a bit of an anti-pattern, so this library logs a warning-level log message if this happens.
If you additionally want the exception to propagate back to the parent thread
(say, for example, you want your whole program to die if any of its worker
threads fail), then call linkThread
after forking. If you want the other
direction, that is, if you want the forked thread to be cancelled when its
parent is cancelled, then you need to be waiting on it using waitThread
.
(this wraps async's async
which in turn wraps
base's forkIO
)
Since: 0.2.7
waitThread :: Thread α -> Program τ α Source #
Wait for the completion of a thread, returning the result. This is a blocking operation.
If the thread you are waiting on throws an exception it will be rethrown by
waitThread
.
If the current thread making this call is cancelled (as a result of being on
the losing side of concurrentThreads
or raceThreads
for example, or due to
an explicit call to cancelThread
), then the thread you are waiting on will
be cancelled. This is necessary to ensure that child threads are not leaked if
you nest forkThread
s.
(this wraps async's wait
, taking care to
ensure the behaviour described above)
Since: 0.2.7
waitThread_ :: Thread α -> Program τ () Source #
Wait for the completion of a thread, discarding its result. This is particularly useful at the end of a do-block if you're waiting on a worker thread to finish but don't need its return value, if any; otherwise you have to explicily deal with the unused return value:
_ <-waitThread
t1return
()
which is a bit tedious. Instead, you can just use this convenience function:
waitThread_
t1
The trailing underscore in the name of this function follows the same
convetion as found in Control.Monad, which has mapM_
which
does the same as mapM
but which likewise discards the return
value.
Since: 0.2.7
waitThread' :: Thread α -> Program τ (Either SomeException α) Source #
Wait for a thread to complete, returning the result if the computation was successful or the exception if one was thrown by the child thread.
This basically is convenience for calling waitThread
and putting catch
around it, but as with all the other wait*
functions this ensures that if
the thread waiting is cancelled the cancellation is propagated to the thread
being watched as well.
(this wraps async's waitCatch
)
Since: 0.4.5
waitThreads' :: [Thread α] -> Program τ [Either SomeException α] Source #
Wait for many threads to complete. This function is intended for the scenario
where you fire off a number of worker threads with forkThread
but rather
than leaving them to run independantly, you need to wait for them all to
complete.
The results of the threads that complete successfully will be returned as
Right
values. Should any of the threads being waited upon throw an
exception, those exceptions will be returned as Left
values.
If you don't need to analyse the failures individually, then you can just
collect the successes using Data.Either's rights
:
responses <-waitThreads'
info
"Aggregating results..." combineResults (rights
responses)
Likewise, if you do want to do something with all the failures, you might
find lefts
useful:
mapM_
(warn
.intoRope
.displayException
) (lefts
responses)
If the thread calling waitThreads'
is cancelled, then all the threads being
waited upon will also be cancelled. This often occurs within a timeout or
similar control measure implemented using raceThreads_
. Should the thread
that spawned all the workers and is waiting for their results be told to
cancel because it lost the "race", the child threads need to be told in turn
to cancel so as to avoid those threads being leaked and continuing to run as
zombies. This function takes care of that.
(this extends async's waitCatch
to work
across a list of Threads, taking care to ensure the cancellation behaviour
described throughout this module)
Since: 0.4.5
linkThread :: Thread α -> Program τ () Source #
Ordinarily if an exception is thrown in a forked thread that exception is silently swollowed. If you instead need the exception to propegate back to the parent thread, you can "link" the two together using this function.
(this wraps async's link
)
Since: 0.4.2
cancelThread :: Thread α -> Program τ () Source #
Cancel a thread.
(this wraps async's cancel
. The underlying
mechanism used is to throw the AsyncCancelled
to the other thread. That
exception is asynchronous, so will not be trapped by a catch
block and will
indeed cause the thread receiving the exception to come to an end)
Since: 0.4.5
Helper functions
concurrentThreads :: Program τ α -> Program τ β -> Program τ (α, β) Source #
Fork two threads and wait for both to finish. The return value is the pair of each action's return types.
This is the same as calling forkThread
and waitThread
twice, except that
if either sub-program fails with an exception the other program which is still
running will be cancelled and the original exception is then re-thrown.
(a,b) <- concurrentThreads
one two
-- continue, doing something with both results.
For a variant that ingores the return values and just waits for both see
concurrentThreads_
below.
(this wraps async's concurrently
)
Since: 0.4.0
concurrentThreads_ :: Program τ α -> Program τ β -> Program τ () Source #
Fork two threads and wait for both to finish.
This is the same as calling forkThread
and waitThread_
twice, except that
if either sub-program fails with an exception the other program which is still
running will be cancelled and the original exception is then re-thrown.
(this wraps async's concurrently_
)
Since: 0.4.0
raceThreads :: Program τ α -> Program τ β -> Program τ (Either α β) Source #
Fork two threads and race them against each other. This blocks until one or
the other of the threads finishes. The return value will be Left
α
if the
first program (one
) completes first, and Right
β
if it is the second
program (two
) which finishes first. The sub program which is still running
will be cancelled with an exception.
result <- raceThreads
one two
case result of
Left a -> do
-- one finished first
Right b -> do
-- two finished first
For a variant that ingores the return value and just races the threads see
raceThreads_
below.
(this wraps async's race
)
Since: 0.4.0
raceThreads_ :: Program τ α -> Program τ β -> Program τ () Source #
Fork two threads and race them against each other. When one action completes the other will be cancelled with an exception. This is useful for enforcing timeouts:
raceThreads_
(sleepThread
300) (do -- We expect this to complete within 5 minutes. performAction )
(this wraps async's race_
)
Since: 0.4.0