Conversation
…encies The requeue_interrupted_tasks monitor was false-positiving on pending request tasks that haven't been dispatched to Celery yet because their upstream dependencies aren't complete. These tasks legitimately have no cache key (cache_task_tracking_key is only called at dispatch time), so the monitor was incorrectly treating them as stuck and canceling the entire privacy request. Fix: before canceling, check if the task is pending and its upstream tasks are incomplete — if so, skip it. Only cancel if upstream is done but the task was never dispatched (truly stuck). Fixes ENG-2756. Made-with: Cursor
|
The latest updates on your projects. Learn more about Vercel for GitHub. 2 Skipped Deployments
|
Made-with: Cursor
Greptile SummaryThis PR fixes a false-positive in the
Confidence Score: 4/5
Important Files Changed
Last reviewed commit: 7f09aff |
|
@greptile please review |
There was a problem hiding this comment.
can we name the file 7525-descriptive-slug ?
There was a problem hiding this comment.
Done — renamed to 7525-fix-watchdog-false-positive-pending-tasks.yaml.
| should_requeue = False | ||
| break | ||
|
|
||
| # A pending task that hasn't been dispatched to Celery yet will |
There was a problem hiding this comment.
nit: this method is already huge, should we maybe wrap the new logic in a method that we call here?
There was a problem hiding this comment.
Addressed both this and the query optimization comment below — reworked _get_request_task_ids_in_progress to load full RequestTask objects and pre-compute an awaiting_upstream flag. This moves the pending-task logic out of the main method and eliminates the per-iteration re-query of RequestTask by ID. The upstream_tasks_complete() call still happens per pending task within the helper, but the extra object lookup is gone.
There was a problem hiding this comment.
Follow-up in 51c48a6: went further and replaced the per-task upstream_tasks_complete() DB calls with a single column-projection query + in-memory lookup dict. Also switched to a generator to avoid building the full result list, and queries only the 5 columns needed (avoids loading large JSON blobs on RequestTask).
| request_task_obj = ( | ||
| db.query(RequestTask) | ||
| .filter(RequestTask.id == request_task_id) | ||
| .first() |
There was a problem hiding this comment.
to avoid the extra query (or more than one if upstream_tasks_complete also runs its own query) in each iteration, don't we want to rework _get_request_task_ids_in_progress -- or write a separate method -- that returns something like task_id, status, has_incomplete_upstream_tasks instead?
There was a problem hiding this comment.
Done — see reply above. _get_request_task_ids_in_progress now returns (task_id, status, awaiting_upstream) tuples with upstream completion pre-computed, eliminating the per-iteration RequestTask lookup.
There was a problem hiding this comment.
Follow-up in 51c48a6: fully addressed — _get_request_task_ids_in_progress now does a single db.query() with column projection (id, status, collection_address, action_type, upstream_tasks), builds a (collection_address, action_type) → status lookup, and computes upstream completion in Python. Zero per-task DB queries regardless of how many pending tasks.
- Rename changelog to 7525-fix-watchdog-false-positive-pending-tasks.yaml - Move pending-task upstream check into _get_request_task_ids_in_progress - Return (task_id, status, awaiting_upstream) tuples to eliminate per-iteration DB query - Update test mocks to match new 3-tuple signature Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace per-task upstream_tasks_complete() DB calls with a single column-projection query and in-memory lookup. Uses a generator to avoid building the full result list and queries only the 5 columns needed (avoiding large JSON blobs on RequestTask). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…f github.com:ethyca/fides into ENG-2756-fix-watchdog-false-positive-pending-tasks-2
- Move pending+awaiting check before async task DB query to skip unnecessary hit for tasks already known to be legitimately waiting - Parametrize stuck-task cancellation test to cover both scenarios: complete upstream (existing) and root task with no upstream (new) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Documents that the in-memory status_by_address lookup mirrors RequestTask.upstream_tasks_complete() — missing upstream records are treated as incomplete (same safe default). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@greptile please review |
Additional Comments (2)
The loop variable Context Used: Rule from Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Both new tests manually delete database records in The standard approach in this test file is to create records in pytest fixtures (see the existing Context Used: Rule from |
- Rename single-char `t` to `task` in status_by_address dict comprehension - Remove manual try/finally cleanup in tests (db is cleared between runs) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
erosselli
left a comment
There was a problem hiding this comment.
approved with one more suggestion
| awaiting_upstream = not all( | ||
| status_by_address.get((addr, task.action_type)) | ||
| in COMPLETED_EXECUTION_LOG_STATUSES | ||
| for addr in upstream_addrs | ||
| ) |
There was a problem hiding this comment.
can't we use any so we short-circuit when we find one , rather than iterating over all and then negating with not ?
| awaiting_upstream = not all( | |
| status_by_address.get((addr, task.action_type)) | |
| in COMPLETED_EXECUTION_LOG_STATUSES | |
| for addr in upstream_addrs | |
| ) | |
| awaiting_upstream = any( | |
| status_by_address.get((addr, task.action_type)) | |
| not in COMPLETED_EXECUTION_LOG_STATUSES | |
| for addr in upstream_addrs | |
| ) |
Clearer intent: "any upstream is incomplete" reads more directly than "not all upstreams are complete". Same short-circuit behavior. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Ticket ENG-2756
Description Of Changes
Fixes a false-positive in the
requeue_interrupted_taskswatchdog that was incorrectly canceling privacy requests during normal DSR execution.Root cause: The watchdog queries for request tasks in
pendingorin_processingstatus and checks for a cached Celery task ID in Redis. Tasks only get a cache key when they're dispatched viaqueue_request_task(), which only happens once all upstream tasks are complete. Tasks that are legitimatelypendingand waiting for upstream dependencies have never been dispatched and therefore have no cache key — the watchdog was treating these as "stuck" and canceling the entire privacy request.This was most visible on the erasure phase of multi-connector DSRs: the access terminator completes and re-queues the privacy request for erasure, but the new erasure-phase tasks start as
pendingwith no cache keys, and the watchdog fires during that transition window.Fix: Before canceling, check if the task is
pendingwith incomplete upstream dependencies. If so, skip it — it's legitimately waiting. Only cancel if upstream tasks are complete but the task was never dispatched (truly stuck).Code Changes
src/fides/api/service/privacy_request/request_service.py- Added upstream dependency check inrequeue_interrupted_tasksbefore the cancel path for tasks with no cached subtask IDtests/task/test_requeue_interrupted_tasks.py- Added two new tests covering the pending-awaiting-upstream (skip) and pending-upstream-complete-no-cache-key (cancel) casesSteps to Confirm
Run the existing test suite for the watchdog — no regressions in
tests/task/test_requeue_interrupted_tasks.pyis the primary verification for this fix.The two new tests added in this PR directly cover the fixed behavior:
test_pending_task_awaiting_upstream_is_not_canceled— verifies the watchdog skips legitimately waiting taskstest_pending_task_with_complete_upstream_and_no_cache_key_is_canceled— verifies truly stuck tasks are still canceledPre-Merge Checklist
CHANGELOG.mdupdatedMade with Cursor