[DSIP-95][API] Complete the functionality of using dependencies in the complement data#18003
[DSIP-95][API] Complete the functionality of using dependencies in the complement data#18003det101 wants to merge 5 commits intoapache:devfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Implements downstream workflow triggering for complement/backfill runs in the API layer, adding support for “trigger dependent workflows” behavior and accompanying unit tests.
Changes:
- Implemented
doBackfillDependentWorkflowto fetch downstream workflow definitions and trigger backfill runs for them. - Added visited-code tracking intended to prevent self/cyclic triggering and duplicate downstream triggers.
- Added
BackfillWorkflowExecutorDelegateTestwith basic scenarios for downstream triggering and filtering.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java | Adds dependent workflow backfill triggering logic and wiring for lineage + workflow definition lookup. |
| dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java | Adds unit tests for the new dependent backfill triggering logic (single-hop scenarios). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
Show resolved
Hide resolved
.../org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegateTest.java
Show resolved
Hide resolved
...java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
Show resolved
Hide resolved
SbloodyS
left a comment
There was a problem hiding this comment.
You didn't fill in the content according to the PR template, please fix it.
...java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java
Outdated
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| BackfillWorkflowDTO.BackfillParamsDTO params = BackfillWorkflowDTO.BackfillParamsDTO.builder() | ||
| .runMode(RunMode.RUN_MODE_SERIAL) | ||
| .backfillDateList(Collections.<ZonedDateTime>emptyList()) | ||
| .backfillDependentMode(ComplementDependentMode.ALL_DEPENDENT) | ||
| .allLevelDependent(true) | ||
| .executionOrder(ExecutionOrder.ASC_ORDER) | ||
| .build(); | ||
|
|
There was a problem hiding this comment.
The new dependent-triggering behavior isn’t covered for RunMode.RUN_MODE_PARALLEL. All tests build params with RUN_MODE_SERIAL, so regressions around splitting dates / triggering dependencies per chunk (and interaction with visitedCodes) won’t be caught. Adding a test that uses RUN_MODE_PARALLEL with multiple backfill dates and asserts downstream workflows are triggered for each split would improve coverage.
| for (List<ZonedDateTime> stringDate : splitDateTime(listDate, expectedParallelismNumber)) { | ||
| final Integer workflowInstanceId = doBackfillWorkflow( | ||
| backfillWorkflowDTO, | ||
| stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList())); | ||
| stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()), | ||
| visitedCodes); | ||
| workflowInstanceIdList.add(workflowInstanceId); |
There was a problem hiding this comment.
In parallel backfill mode, the same mutable visitedCodes instance is passed into every doBackfillWorkflow(...) call. Since visitedCodes is also used to prevent duplicate/cyclic dependent triggers, the first chunk will add downstream codes and later chunks will skip triggering downstream workflows, causing dependent backfill to only run for the first subset of dates. Consider scoping visitedCodes to a single dependency traversal per backfill chunk (e.g., pass null or a fresh copy like new HashSet<>(visitedCodes) for each split) so downstream workflows are triggered for each chunk’s backfillTimeList.
| List<DependentWorkflowDefinition> downstreamDefinitions = | ||
| workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode); | ||
|
|
||
| if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) { | ||
| log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode); | ||
| return; | ||
| } | ||
|
|
||
| // 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream | ||
| // backfill | ||
| final List<ZonedDateTime> upstreamBackfillDates = backfillTimeList.stream() | ||
| .map(DateUtils::stringToZoneDateTime) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| // 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO | ||
| for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) { | ||
| long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode(); | ||
|
|
||
| // Prevent self-dependency and circular dependency chains | ||
| if (visitedCodes.contains(downstreamCode)) { | ||
| log.warn("Skip circular dependent workflow {}", downstreamCode); | ||
| continue; | ||
| } | ||
|
|
||
| WorkflowDefinition downstreamWorkflow = | ||
| workflowDefinitionDao.queryByCode(downstreamCode).orElse(null); | ||
| if (downstreamWorkflow == null) { | ||
| log.warn("Skip dependent workflow {}, workflow definition not found", downstreamCode); | ||
| continue; |
There was a problem hiding this comment.
doBackfillDependentWorkflow queries each downstream workflow definition one-by-one via workflowDefinitionDao.queryByCode(downstreamCode), which will result in an N+1 query pattern for workflows with many downstream dependencies. Since WorkflowDefinitionDao#queryByCodes(Collection<Long>) exists, consider collecting downstream codes first and fetching definitions in a single batch, then iterating in-memory (still applying the ONLINE filter and visited/cycle checks).
…endent workflow - Problem 1: Replace single-level self-dep check with ThreadLocal visited set to detect and skip indirect circular dependencies (A→B→A), preventing StackOverflowError when allLevelDependent=true - Problem 2: Set startNodes=null for downstream workflows; upstream task node codes are not valid in a different workflow definition - Add tests for OFFLINE skip, not-found skip, and startNodes null assertion Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…tion ThreadLocal is unnecessary here since the call chain is synchronous and private. Passing visitedCodes as a parameter is simpler, clearer, and avoids ThreadLocal lifecycle management. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Align Java source and test formatting with project spotless rules to keep style checks consistent. Made-with: Cursor


Was this PR generated or assisted by AI?
close #17748
Purpose of the pull request
Brief change log
Verify this pull request
This pull request is code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(or)
Pull Request Notice
Pull Request Notice
If your pull request contains incompatible change, you should also add it to
docs/docs/en/guide/upgrade/incompatible.md