The Task Execution Engine is the core orchestration system responsible for managing the complete lifecycle of task runs from trigger to completion. It coordinates distributed execution across worker nodes, manages state transitions, enforces concurrency limits, handles retries, and provides specialized features like checkpoints (for resuming long-running tasks) and waitpoints (for blocking until external conditions are met).
This document covers the architecture and core systems of the engine. For information about:
The Task Execution Engine is implemented by the RunEngine class, which composes 11 specialized subsystems to handle different aspects of task execution.
Sources: internal-packages/run-engine/src/engine/index.ts76-384
The RunEngine constructor accepts a comprehensive configuration object defining all subsystem parameters:
| Configuration Category | Key Options | Purpose |
|---|---|---|
prisma | PrismaClient, optional readOnlyPrisma | Database persistence and read replicas |
worker | redis, workers, tasksPerWorker, pollIntervalMs | Background job processing configuration |
queue | redis, defaultEnvConcurrency, shardCount | Fair queue selection and hierarchical queue management |
runLock | redis, duration, automaticExtensionThreshold, retryConfig | Distributed locking via Redlock algorithm |
machines | defaultMachine, machines, baseCostInCents | Machine preset definitions for different execution environments |
heartbeatTimeoutsMs | PENDING_EXECUTING, EXECUTING, SUSPENDED, etc. | Per-state timeouts for stall detection |
tracer, meter | OpenTelemetry instances | Distributed tracing and metrics instrumentation |
Sources: internal-packages/run-engine/src/engine/types.ts23-107 internal-packages/run-engine/src/engine/index.ts106-384
The following diagram illustrates the flow from triggering a task through to completion:
Sources: internal-packages/run-engine/src/engine/index.ts389-727 internal-packages/run-engine/src/engine/systems/enqueueSystem.ts25-102 internal-packages/run-engine/src/engine/systems/dequeueSystem.ts105-770 internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts294-628
Task runs progress through execution states tracked by TaskRunExecutionSnapshot records. The executionStatus field tracks fine-grained internal state:
Sources: internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts226-387 internal-packages/run-engine/src/engine/statuses.ts
The ExecutionSnapshotSystem maintains an immutable audit trail of all run state transitions by creating TaskRunExecutionSnapshot records.
Key Responsibilities:
previousSnapshotId to form an immutable chainSnapshot Data Model:
The getLatestExecutionSnapshot() helper retrieves the most recent valid snapshot for a run.
Sources: internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts226-387 internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts95-113
EnqueueSystem
The EnqueueSystem handles placing runs into the queue hierarchy:
QUEUED execution snapshotrunQueue.enqueueMessage(orgId, runId, message) to add to queuesrunQueued event via EventBusSources: internal-packages/run-engine/src/engine/systems/enqueueSystem.ts25-102
DequeueSystem
The DequeueSystem retrieves runs for execution using fair selection:
runQueue.dequeueMessageFromWorkerQueue() to get a fairly-selected runrunLocker.lock()QUEUED, RUN_CREATED)BackgroundWorkerTask and version:
lockedById to task IDlockedToVersionId to worker version IDtaskVersion, sdkVersion, cliVersion from worker metadataTaskRun to DEQUEUED statusPENDING_EXECUTING snapshotDequeuedMessage with execution contextThe dequeue operation includes comprehensive validation to ensure the run can execute, checking for:
Sources: internal-packages/run-engine/src/engine/systems/dequeueSystem.ts88-770 internal-packages/run-engine/src/engine/systems/dequeueSystem.ts105-578
The RunAttemptSystem orchestrates the lifecycle of execution attempts and retry logic.
startRunAttempt()
Creates a new attempt and transitions to executing state:
runLocker.lock()attemptNumber on TaskRun (starts at 1)attemptNumber against MAX_TASK_RUN_ATTEMPTS constantEXECUTINGEXECUTING snapshotBackgroundWorkerTask)TaskQueue)TaskRunExecution object with all contextThe system maintains in-memory caches using @internal/cache with Redis backing to avoid database queries on each attempt start.
Sources: internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts294-628 internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts106-171
completeRunAttempt()
Handles attempt completion with success or failure:
attemptSucceeded() or attemptFailed() based on completion.okFor successful attempts (attemptSucceeded()):
COMPLETED_SUCCESSFULLYoutput and outputType fieldsFINISHED snapshottriggerAndWait()resumeParentOnCompletion is setrunCompleted eventFor failed attempts (attemptFailed()):
retryOutcomeFromCompletion() to determine if retryablemaxAttempts:
QUEUED snapshotCOMPLETED_WITH_ERRORSFINISHED snapshot with errorSources: internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts630-667 internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts669-728 internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts730-1019
The CheckpointSystem enables suspending long-running tasks to free resources while preserving execution state.
createCheckpoint()
Creates a checkpoint and suspends execution:
runLocker.lock("createCheckpoint")QUEUED_EXECUTING stateEXECUTINGQUEUED_EXECUTINGTaskRunCheckpoint record with:
type: "DOCKER" or other checkpoint mechanismlocation: External storage location (e.g., S3 URL)imageRef: Container image reference for restorationreason: Optional explanationWAITING_TO_RESUMESUSPENDED snapshot linking to checkpointrunQueue.releaseAllConcurrency()Special case for QUEUED_EXECUTING: If run is executing but also queued (rare race condition), the checkpoint causes the run to be re-enqueued rather than suspended.
Sources: internal-packages/run-engine/src/engine/systems/checkpointSystem.ts36-249
continueRunExecution()
Resumes execution from a checkpoint:
SUSPENDED or QUEUED stateenqueueSystem.enqueueRun()The dequeue operation will detect the checkpoint reference and pass it to the worker for restoration.
Sources: internal-packages/run-engine/src/engine/systems/checkpointSystem.ts254-319
The WaitpointSystem enables blocking task execution until external conditions are satisfied.
Waitpoint Types:
| Type | Completion Trigger | Use Case |
|---|---|---|
DATETIME | Specified timestamp reached | wait.for(duration) delays |
MANUAL | Explicit API call to complete | Human-in-the-loop workflows, external callbacks |
BATCH | All runs in batch finish | Parent waiting for all batch children |
blockRunWithWaitpoint()
Blocks a run on one or more waitpoints:
runLocker.lock("blockRunWithWaitpoint")TaskRunWaitpoint junction records_WaitpointRunConnections many-to-many recordsEXECUTING_WITH_WAITPOINTSSUSPENDEDcontinueRunIfUnblocked jobSources: internal-packages/run-engine/src/engine/systems/waitpointSystem.ts368-497
completeWaitpoint()
Marks a waitpoint as completed and unblocks affected runs:
Waitpoint status to COMPLETED via updateMany (idempotent)TaskRunWaitpoint records for this waitpointcontinueRunIfUnblocked job with 50ms delaycachedRunCompleted event if spanIdToComplete presentSources: internal-packages/run-engine/src/engine/systems/waitpointSystem.ts70-172
continueRunIfUnblocked()
Checks if a run is unblocked and continues execution:
runLocker.lock("continueRunIfUnblocked")TaskRunWaitpoint records for the runPENDING{ status: "blocked" } and exitsEXECUTING_WITH_WAITPOINTS: Creates QUEUED_EXECUTING snapshot and notifies workerSUSPENDED: Re-enqueues run via enqueueSystem.enqueueRun()Sources: internal-packages/run-engine/src/engine/systems/waitpointSystem.ts499-709
The DelayedRunSystem handles runs scheduled for future execution via the delay option.
scheduleDelayedRunEnqueuing()
When a run is triggered with delayUntil:
TaskRun in DELAYED statusDELAYED snapshotenqueueDelayedRun:${runId}delayUntil timestampSources: internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts56-133
enqueueDelayedRun()
Background job handler that fires at scheduled time:
DELAYED stateenqueueSystem.enqueueRun() to queue the runQUEUED snapshotSources: internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts135-191
rescheduleDelayedRun()
Allows changing the delay time before a run executes:
DELAYED statedelayUntil timestamp on TaskRunworker.reschedule()DELAYED snapshot with updated descriptionrunDelayRescheduled eventSources: internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts26-95
The RunLocker class provides distributed mutual exclusion using the Redlock algorithm over Redis.
Architecture:
Key Features:
AsyncLocalStorage to detect if lock already held in current async contextSources: internal-packages/run-engine/src/engine/locking.ts70-599
lock() Method Signature:
Retry Configuration:
| Parameter | Default | Purpose |
|---|---|---|
maxAttempts | 10 | Maximum lock acquisition attempts |
baseDelay | 100ms | Initial retry delay |
maxDelay | 3000ms | Maximum retry delay cap |
backoffMultiplier | 1.8 | Exponential backoff factor |
jitterFactor | 0.15 | Randomization (±15%) to prevent synchronized retries |
maxTotalWaitTime | 15000ms | Total timeout for all retry attempts |
duration | 5000ms | Lock TTL before automatic expiration |
automaticExtensionThreshold | 1000ms | Start extending when <1s remains |
Sources: internal-packages/run-engine/src/engine/locking.ts55-68 internal-packages/run-engine/src/engine/index.ts124-140
All state-changing operations follow this pattern:
This pattern ensures:
Sources: internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts294-628 internal-packages/run-engine/src/engine/systems/checkpointSystem.ts53-90 internal-packages/run-engine/src/engine/systems/waitpointSystem.ts397-496
The RunQueue implements a three-tier hierarchy for fair task distribution:
Queue Tiers:
Each tier is a Redis sorted set with scores representing queue timestamps for fairness.
Sources: internal-packages/run-engine/src/run-queue/index.ts
The FairQueueSelectionStrategy selects which environment queue to dequeue from using weighted scoring:
Score Formula:
score = (concurrencyLimitBias × concurrencyScore) +
(availableCapacityBias × capacityScore) +
(queueAgeRandomization × random())
Score Components:
| Component | Default Weight | Calculation | Purpose |
|---|---|---|---|
| Concurrency Limit Bias | 0.75 | available / limit | Prioritize environments with more capacity headroom |
| Available Capacity Bias | 0.3 | 1 - (queueSize / limit) | Factor in current queue depth |
| Queue Age Randomization | 0.25 | random() | Prevent starvation via randomness |
The strategy maintains a snapshot of environment states and reuses it for a configurable number of dequeues (default: 0, refresh every time) to reduce Redis queries.
Sources: internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts apps/webapp/app/v3/runEngine.server.ts58-67
The EventBus is a typed EventEmitter that broadcasts run lifecycle events for monitoring and UI updates.
Key Events:
| Event Name | When Emitted | Payload Fields |
|---|---|---|
runCreated | New TaskRun record created | runId, time |
runQueued | Run added to queue | runId, orgId, envId, queueName, time |
runLocked | Run locked to worker during dequeue | run, organization, project, environment, time |
runAttemptStarted | Attempt begins executing | run, organization, project, environment, time |
runStatusChanged | Run status field changes | run, organization, project, environment, time |
runCompleted | Run reaches final state | run, result, time |
incomingCheckpointDiscarded | Checkpoint rejected (invalid state) | run, checkpoint, snapshot, time |
The EventBus enables decoupled real-time features without blocking execution paths. Listeners can subscribe to events for:
Sources: internal-packages/run-engine/src/engine/eventBus.ts internal-packages/run-engine/src/engine/index.ts712-715
All critical operations are instrumented with OpenTelemetry tracing using the startSpan() helper:
Traced Operations:
trigger: Creating new runsdequeueFromWorkerQueue: Fair queue selection and dequeuestartRunAttempt: Starting execution attemptscompleteRunAttempt: Completing attemptscreateCheckpoint: Creating checkpointsblockRunWithWaitpoint: Blocking on waitpointsSemantic Attributes: Spans include semantic attributes following OpenTelemetry conventions:
run_id: Task run identifiersnapshot_id: Execution snapshot identifierorganization_id: Organization identifierenvironment_id: Environment identifierrun_engine.lock.type: Lock operation typerun_engine.lock.resources: Locked resource keysSources: internal-packages/run-engine/src/engine/index.ts1-5 internal-packages/run-engine/src/engine/locking.ts18-24 apps/webapp/app/v3/tracer.server.ts113-142
The engine monitors for stalled runs using per-state heartbeat timeouts configured via heartbeatTimeoutsMs:
| Execution State | Default Timeout | Detection Purpose |
|---|---|---|
PENDING_EXECUTING | 60s | Worker crashed after dequeue but before attempt start |
PENDING_CANCEL | 60s | Cancellation notification not processed |
EXECUTING | 5 minutes | Worker crashed during normal execution |
EXECUTING_WITH_WAITPOINTS | 5 minutes | Worker crashed while blocked on waitpoints |
SUSPENDED | 10 minutes | Checkpoint restoration never requested |
Background Job Handlers:
The Worker processes background jobs for maintenance operations:
| Job Type | Purpose | Handler |
|---|---|---|
heartbeatSnapshot | Check if snapshot is stalled | #handleStalledSnapshot() |
repairSnapshot | Attempt to recover stalled run | #handleRepairSnapshot() |
expireRun | Handle TTL expiration | ttlSystem.expireRun() |
cancelRun | Process cancellation request | runAttemptSystem.cancelRun() |
continueRunIfUnblocked | Check if waitpoints unblocked | waitpointSystem.continueRunIfUnblocked() |
enqueueDelayedRun | Queue delayed run at scheduled time | delayedRunSystem.enqueueDelayedRun() |
Sources: internal-packages/run-engine/src/engine/index.ts199-243 internal-packages/run-engine/src/engine/types.ts109-115
The retryOutcomeFromCompletion() function determines if a failed attempt should be retried:
Retry Decision Logic:
attemptNumber < maxAttemptsdelay = baseDelay × (factor ^ (attemptNumber - 1))
delay = min(delay, maxDelay)
delay = delay × (1 + jitter × random())
Default Retry Configuration:
maxAttempts: 3 (configurable per task or run)factor: 2 (exponential growth)minTimeoutInMs: 1000 (1 second minimum)maxTimeoutInMs: 3600000 (1 hour maximum)randomize: true (adds jitter)If retry is determined, the run is re-enqueued with a delayUntil timestamp calculated from the backoff delay.
Sources: internal-packages/run-engine/src/engine/retrying.ts internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts730-1019
The RunEngine is instantiated as a singleton in the webapp via the singleton() helper:
Configuration Loading:
Environment variables prefixed with RUN_ENGINE_* control engine behavior:
| Variable Group | Purpose | Examples |
|---|---|---|
RUN_ENGINE_WORKER_* | Worker pool configuration | WORKER_COUNT, WORKER_CONCURRENCY_LIMIT |
RUN_ENGINE_RUN_QUEUE_* | Queue and selection strategy | PARENT_QUEUE_LIMIT, CONCURRENCY_LIMIT_BIAS |
RUN_ENGINE_RUN_LOCK_* | Lock duration and retry | LOCK_DURATION, LOCK_MAX_RETRIES |
RUN_ENGINE_TIMEOUT_* | Per-state stall detection | TIMEOUT_EXECUTING, TIMEOUT_SUSPENDED |
RUN_ENGINE_*_REDIS_* | Redis connection per subsystem | WORKER_REDIS_HOST, RUN_QUEUE_REDIS_PORT |
The engine supports separate Redis instances for different subsystems (worker, queue, lock) to enable horizontal scaling and isolation.
Sources: apps/webapp/app/v3/runEngine.server.ts11-192 apps/webapp/app/env.server.ts560-602
The engine includes comprehensive integration tests using the containerTest() helper from @internal/testcontainers:
Test Categories:
| Test File | Coverage |
|---|---|
attemptFailures.test.ts | Retry logic, error handling, max attempts |
checkpoints.test.ts | Checkpoint creation, resumption, race conditions |
waitpoints.test.ts | Waitpoint blocking, completion, timeouts, batch coordination |
locking.test.ts | Lock acquisition, retry, extension, nested locks |
Tests spin up isolated PostgreSQL and Redis containers for each test case, ensuring complete isolation and reproducibility.
Sources: internal-packages/run-engine/src/engine/tests/waitpoints.test.ts1-13 internal-packages/run-engine/src/engine/tests/checkpoints.test.ts1-13 internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts1-8 internal-packages/run-engine/src/engine/tests/locking.test.ts1-8
This page provides an overview of the Task Execution Engine architecture. For detailed information on specific subsystems:
Sources: [Table of Contents JSON]
Refresh this wiki