Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | [email protected] |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Streamly.Internal.Data.SVar
Description
Deprecated: SVar is replaced by Channel.
Synopsis
- module Streamly.Internal.Data.SVar.Type
- decrementBufferLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ()
- decrementYieldLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO Bool
- estimateWorkers :: Limit -> Count -> Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> LatencyRange -> Work
- handleChildException :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> SomeException -> IO ()
- handleFoldException :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> SomeException -> IO ()
- incrementBufferLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ()
- incrementYieldLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ()
- isBeyondMaxRate :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> YieldRateInfo -> IO Bool
- minThreadDelay :: NanoSecond64
- resetBufferLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ()
- ringDoorBell :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ()
- send :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> ChildEvent a -> IO Int
- sendStop :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> Maybe WorkerInfo -> IO ()
- sendStopToProducer :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> m ()
- sendToProducer :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> ChildEvent a -> IO Int
- sendYield :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
- updateYieldCount :: WorkerInfo -> IO Count
- workerRateControl :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
- workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
- data Work
- allThreadsDone :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> m Bool
- collectLatency :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
- delThread :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> ThreadId -> m ()
- dispatchWorker :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => Count -> SVar t m a -> m Bool
- dispatchWorkerPaced :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m Bool
- dumpSVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO String
- modifyThread :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> ThreadId -> m ()
- printSVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> String -> IO ()
- pushWorker :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => Count -> SVar t m a -> m ()
- pushWorkerPar :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
- recordMaxWorkers :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> m ()
- sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
- sendWorkerDelay :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ()
- sendWorkerDelayPaced :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ()
- sendWorkerWait :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => (SVar t m a -> IO ()) -> (SVar t m a -> m Bool) -> SVar t m a -> m ()
- withDiagMVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> String -> IO () -> IO ()
- cleanupSVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ()
- cleanupSVarFromWorker :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ()
- postProcessBounded :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m Bool
- postProcessPaced :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m Bool
- readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
- readOutputQBounded :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m [ChildEvent a]
- readOutputQPaced :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m [ChildEvent a]
- readOutputQRaw :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ([ChildEvent a], Int)
- getYieldRateInfo :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. State t m a -> IO (Maybe YieldRateInfo)
- newSVarStats :: IO SVarStats
- newParallelVar :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a)
- enqueueAhead :: forall t (m :: Type -> Type) a. SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
- reEnqueueAhead :: forall t (m :: Type -> Type) a. SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
- queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
- dequeueAhead :: MonadIO m => IORef ([t m a], Int) -> m (Maybe (t m a, Int))
- data HeapDequeueResult (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a
- dequeueFromHeap :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> IO (HeapDequeueResult t m a)
- dequeueFromHeapSeq :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO (HeapDequeueResult t m a)
- requeueOnHeapTop :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
- updateHeapSeq :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO ()
- withIORef :: IORef a -> (a -> IO b) -> IO b
- heapIsSane :: Maybe Int -> Int -> Bool
- newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a)
Documentation
decrementBufferLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #
decrementYieldLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO Bool Source #
estimateWorkers :: Limit -> Count -> Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> LatencyRange -> Work Source #
handleChildException :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> SomeException -> IO () Source #
handleFoldException :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> SomeException -> IO () Source #
incrementBufferLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #
incrementYieldLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #
isBeyondMaxRate :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> YieldRateInfo -> IO Bool Source #
minThreadDelay :: NanoSecond64 Source #
This is a magic number and it is overloaded, and used at several places to achieve batching:
- If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.
- Collected latencies are computed and transferred to measured latency after a minimum of this period.
resetBufferLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #
ringDoorBell :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #
send :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> ChildEvent a -> IO Int Source #
This function is used by the producer threads to queue output for the consumer thread to consume. Returns whether the queue has more space.
sendStop :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> Maybe WorkerInfo -> IO () Source #
sendStopToProducer :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> m () Source #
sendToProducer :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> ChildEvent a -> IO Int Source #
sendYield :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool Source #
updateYieldCount :: WorkerInfo -> IO Count Source #
workerRateControl :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool Source #
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO () Source #
Constructors
BlockWait NanoSecond64 | |
PartialWorker Count | |
ManyWorkers Int Count |
allThreadsDone :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> m Bool Source #
This is safe even if we are adding more threads concurrently because if
a child thread is adding another thread then anyway workerThreads
will
not be empty.
collectLatency :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64) Source #
delThread :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> ThreadId -> m () Source #
dispatchWorker :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => Count -> SVar t m a -> m Bool Source #
dispatchWorkerPaced :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m Bool Source #
dumpSVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO String Source #
modifyThread :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> ThreadId -> m () Source #
printSVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> String -> IO () Source #
pushWorker :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => Count -> SVar t m a -> m () Source #
pushWorkerPar :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m () Source #
In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of workerThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.
recordMaxWorkers :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> m () Source #
sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a) Source #
sendWorkerDelay :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #
sendWorkerDelayPaced :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #
sendWorkerWait :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => (SVar t m a -> IO ()) -> (SVar t m a -> m Bool) -> SVar t m a -> m () Source #
withDiagMVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> String -> IO () -> IO () Source #
cleanupSVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #
cleanupSVarFromWorker :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #
postProcessBounded :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m Bool Source #
postProcessPaced :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m Bool Source #
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int) Source #
readOutputQBounded :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m [ChildEvent a] Source #
readOutputQPaced :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m [ChildEvent a] Source #
readOutputQRaw :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ([ChildEvent a], Int) Source #
New SVar
getYieldRateInfo :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. State t m a -> IO (Maybe YieldRateInfo) Source #
Parallel
newParallelVar :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a) Source #
Ahead
enqueueAhead :: forall t (m :: Type -> Type) a. SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO () Source #
reEnqueueAhead :: forall t (m :: Type -> Type) a. SVar t m a -> IORef ([t m a], Int) -> t m a -> IO () Source #
dequeueFromHeap :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> IO (HeapDequeueResult t m a) Source #
dequeueFromHeapSeq :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO (HeapDequeueResult t m a) Source #
requeueOnHeapTop :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Entry Int (AheadHeapEntry t m a) -> Int -> IO () Source #
updateHeapSeq :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO () Source #
newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a) Source #