streamly-0.10.1: Streaming, dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainer[email protected]
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.SVar

Description

Deprecated: SVar is replaced by Channel.

Synopsis

Documentation

send :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> ChildEvent a -> IO Int Source #

This function is used by the producer threads to queue output for the consumer thread to consume. Returns whether the queue has more space.

ringDoorBell :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #

decrementYieldLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO Bool Source #

incrementYieldLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #

minThreadDelay :: NanoSecond64 Source #

This is a magic number and it is overloaded, and used at several places to achieve batching:

  1. If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.
  2. Collected latencies are computed and transferred to measured latency after a minimum of this period.

data Work Source #

Instances

Instances details
Show Work Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Worker

Methods

showsPrec :: Int -> Work -> ShowS #

show :: Work -> String #

showList :: [Work] -> ShowS #

isBeyondMaxRate :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> YieldRateInfo -> IO Bool Source #

workerRateControl :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool Source #

sendYield :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool Source #

sendStop :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> Maybe WorkerInfo -> IO () Source #

handleChildException :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> SomeException -> IO () Source #

sendStopToProducer :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> m () Source #

decrementBufferLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #

incrementBufferLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #

resetBufferLimit :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #

sendToProducer :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> ChildEvent a -> IO Int Source #

handleFoldException :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> SomeException -> IO () Source #

withDiagMVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> String -> IO () -> IO () Source #

printSVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> String -> IO () Source #

collectLatency :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64) Source #

delThread :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> ThreadId -> m () Source #

modifyThread :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> ThreadId -> m () Source #

allThreadsDone :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> m Bool Source #

This is safe even if we are adding more threads concurrently because if a child thread is adding another thread then anyway workerThreads will not be empty.

recordMaxWorkers :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadIO m => SVar t m a -> m () Source #

dumpSVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO String Source #

pushWorker :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => Count -> SVar t m a -> m () Source #

dispatchWorker :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => Count -> SVar t m a -> m Bool Source #

dispatchWorkerPaced :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m Bool Source #

sendWorkerWait :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => (SVar t m a -> IO ()) -> (SVar t m a -> m Bool) -> SVar t m a -> m () Source #

sendWorkerDelay :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #

sendWorkerDelayPaced :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #

pushWorkerPar :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m () Source #

In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of workerThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.

sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a) Source #

readOutputQRaw :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO ([ChildEvent a], Int) Source #

cleanupSVar :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #

readOutputQPaced :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m [ChildEvent a] Source #

readOutputQBounded :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m [ChildEvent a] Source #

postProcessPaced :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m Bool Source #

postProcessBounded :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVar t m a -> m Bool Source #

cleanupSVarFromWorker :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. SVar t m a -> IO () Source #

New SVar

getYieldRateInfo :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. State t m a -> IO (Maybe YieldRateInfo) Source #

Parallel

newParallelVar :: forall m (t :: (Type -> Type) -> Type -> Type) a. MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a) Source #

Ahead

enqueueAhead :: forall t (m :: Type -> Type) a. SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO () Source #

reEnqueueAhead :: forall t (m :: Type -> Type) a. SVar t m a -> IORef ([t m a], Int) -> t m a -> IO () Source #

queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool Source #

dequeueAhead :: MonadIO m => IORef ([t m a], Int) -> m (Maybe (t m a, Int)) Source #

data HeapDequeueResult (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a Source #

Constructors

Clearing 
Waiting Int 
Ready (Entry Int (AheadHeapEntry t m a)) 

dequeueFromHeap :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> IO (HeapDequeueResult t m a) Source #

dequeueFromHeapSeq :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO (HeapDequeueResult t m a) Source #

requeueOnHeapTop :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Entry Int (AheadHeapEntry t m a) -> Int -> IO () Source #

updateHeapSeq :: forall (t :: (Type -> Type) -> Type -> Type) (m :: Type -> Type) a. IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO () Source #

withIORef :: IORef a -> (a -> IO b) -> IO b Source #

newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a) Source #