Skip to content

Conversation

@pan-x-c
Copy link
Collaborator

@pan-x-c pan-x-c commented Jul 22, 2025

Description

As the title says

Checklist

Please check the following items before code is ready to be reviewed.

  • Code has passed all tests
  • Docstrings have been added/updated in Google Style
  • Documentation has been updated
  • Code is ready for review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @pan-x-c, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request overhauls the internal queue management system, particularly focusing on the priority queue implementation and its integration with Ray actors. The changes introduce a more modular and robust queue architecture, improve how queue capacity is handled for priority queues, and refine the experience data model's serialization. These foundational improvements are supported by enhanced testing to ensure correctness and reliability.

Highlights

  • Queue Architecture Refactor: The core queue implementation has been significantly refactored. An abstract QueueBuffer interface was introduced, with concrete implementations AsyncQueue (for standard FIFO queues) and AsyncPriorityQueue (for priority-based queues). The AsyncPriorityQueue class was moved from its dedicated file (trinity/buffer/priority_queue.py, which is now removed) into trinity/buffer/queue.py for better consolidation.
  • Ray Actor Wrapper Redesign: The QueueActor class, previously responsible for wrapping queue logic as a Ray actor, has been replaced by a new QueueWrapper class in trinity/buffer/ray_wrapper.py. This new wrapper now manages the underlying QueueBuffer implementations, handles reference counting, integrates with experience writers (SQL/JSON), and provides robust batch retrieval with an internal experience pool (exp_pool) and a stopped() mechanism for graceful queue exhaustion.
  • Priority Queue Capacity Logic: The AsyncPriorityQueue's effective capacity is now dynamically determined as the minimum of the configured capacity and 2 * read_batch_size. This ensures the priority queue's size is appropriately scaled relative to the consumer's read rate, as reflected in the updated test cases.
  • Improved Queue Exhaustion Handling: The get_batch method in the new QueueWrapper now uses a stopped() method on the underlying queue and an internal exp_pool to manage experience retrieval. This changes the behavior for an empty queue from raising a TimeoutError to raising StopAsyncIteration when the queue is explicitly closed and empty, providing a more Pythonic signal for exhaustion.
  • Experience Data Model Update: Minor adjustments were made to the Experience data model's serialization. Specifically, chosen and rejected fields are now serialized as chosen_text and rejected_text respectively, removing the .tolist() calls, which suggests a change in their underlying data type or representation (likely from tensors to direct string/list of string values).
  • Enhanced Priority Queue Testing: The test_priority_queue_capacity in tests/buffer/queue_test.py has been significantly expanded to cover the new capacity logic and exhaustion behavior, including writing more experiences and asserting the queue length and retrieved experience details more thoroughly.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a refactoring of the queue implementation, replacing the QueueActor with a QueueWrapper and QueueBuffer abstraction. The introduction of AsyncPriorityQueue and AsyncQueue inheriting from a common base class improves modularity, and tests have been enhanced. However, there are critical issues related to potential deadlocks and data loss, as well as minor correctness issues.

@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Jul 22, 2025

/unittest-diff

@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Jul 22, 2025

/unittest-all

@github-actions
Copy link

Summary

Tests 📝 Passed ✅ Failed ❌ Skipped ⏭️ Other ❓ Flaky 🍂 Duration ⏱️
64 64 0 0 0 0 1.4s

Tests

Test Name Status Flaky Duration
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_dpo_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_mix_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_opmd_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_ppo_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_sft_policy_loss 1ms
tests/buffer/file_test.py::TestFileBuffer::test_file_buffer 3ms
tests/buffer/file_test.py::TestFileBuffer::test_file_reader 1ms
tests/buffer/file_test.py::TestFileBuffer::test_file_writer 2ms
tests/buffer/queue_test.py::TestQueueBuffer::test_priority_queue_buffer_reuse 7ms
tests/buffer/queue_test.py::TestQueueBuffer::test_priority_queue_capacity 2ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_0_queue 1ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_1_priority_queue 1ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_capacity 4ms
tests/buffer/sql_test.py::TestSQLBuffer::test_create_sql_buffer 5ms
tests/common/config_test.py::TestConfig::test_all_examples_are_valid 1ms
tests/common/config_test.py::TestConfig::test_continue_from_checkpoint_is_valid 1ms
tests/common/config_test.py::TestConfig::test_load_default_config 4ms
tests/common/experience_test.py::TestEID::test_eid_properties 1ms
tests/common/experience_test.py::TestExperience::test_action_mask_and_logprobs_type 1ms
tests/common/experience_test.py::TestExperience::test_assertions 1ms
tests/common/experience_test.py::TestExperience::test_dpo_experience 1ms
tests/common/experience_test.py::TestExperience::test_gather 1ms
tests/common/experience_test.py::TestExperience::test_multi_turn_experience 1ms
tests/common/experience_test.py::TestExperience::test_serialize_deserialize 1ms
tests/common/experience_test.py::TestExperience::test_single_turn_experience 1ms
tests/common/experience_test.py::TestExperience::test_to_dict 1ms
tests/common/experience_test.py::TestExperienceConversion::test_batch_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_dpo_experience_batch_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_experience_model_experience_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_multiturn_experience_batch_converstion 1ms
tests/common/vllm_test.py::ModelWrapperTest_0::test_generate 44ms
tests/common/vllm_test.py::ModelWrapperTest_1::test_generate 54ms
tests/common/vllm_test.py::ModelWrapperTest_2::test_generate 56ms
tests/common/vllm_test.py::ModelWrapperTest_3::test_generate 42ms
tests/common/vllm_test.py::ModelWrapperTest_4::test_generate 54ms
tests/common/vllm_test.py::TestAPIServer::test_api 32ms
tests/common/vllm_test.py::TestTokenizer::test_assistant_token_mask 1ms
tests/explorer/explorer_test.py::BaseExplorerCase::test_explorer 1ms
tests/explorer/explorer_test.py::TestExplorerCountdownEval::test_explorer 99ms
tests/explorer/explorer_test.py::TestExplorerCountdownNoEval::test_explorer 95ms
tests/explorer/explorer_test.py::TestExplorerWithAddStrategy::test_explorer 57ms
tests/explorer/scheduler_test.py::SchedulerTest::test_concurrent_operations 5ms
tests/explorer/scheduler_test.py::SchedulerTest::test_get_results 20ms
tests/explorer/scheduler_test.py::SchedulerTest::test_scheduler_all_methods 15ms
tests/explorer/scheduler_test.py::SchedulerTest::test_scheduler_restart_after_stop 9ms
tests/explorer/scheduler_test.py::SchedulerTest::test_split_tasks 8ms
tests/explorer/scheduler_test.py::SchedulerTest::test_wait_all 8ms
tests/explorer/scheduler_test.py::SchedulerTest::test_wait_all_timeout_with_multi_batch 13ms
tests/explorer/workflow_test.py::WorkflowTest::test_gsm8k_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_boxed_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_complex_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_fraction_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_rm_gallery_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_workflow_resettable 1ms
tests/trainer/trainer_test.py::BaseTrainerCase::test_trainer 1ms
tests/trainer/trainer_test.py::TestTrainerCountdown::test_trainer 252ms
tests/trainer/trainer_test.py::TestStepAheadAsyncRL::test_trainer 96ms
tests/trainer/trainer_test.py::TestTrainerGSM8K::test_trainer 72ms
tests/trainer/trainer_test.py::TestTrainerSFTWarmupGSM8K::test_trainer 90ms
tests/trainer/trainer_test.py::TestTrainerDPO::test_trainer 44ms
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_0_queue 101ms
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_1_priority_queue 98ms
tests/utils/plugin_test.py::TestPluginLoader::test_load_plugins 5ms

Github Test Reporter by CTRF 💚

@hiyuchang hiyuchang merged commit cfa7f85 into modelscope:main Jul 24, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants