PubSub: Fix streaming pull incorrectly handling FlowControl max_messages setting#7948
Merged
plamut merged 5 commits intogoogleapis:masterfrom May 24, 2019
Merged
PubSub: Fix streaming pull incorrectly handling FlowControl max_messages setting#7948plamut merged 5 commits intogoogleapis:masterfrom
plamut merged 5 commits intogoogleapis:masterfrom
Conversation
Contributor
Author
|
Rebased because of a merge conflict when #7954 was merged. |
Contributor
Author
|
Putting on hold, because the change is a non-trivial one, and we want to make a PubSub release first. We will unblock the PR once done (reviews still welcome, though). cc: @sduskis |
sduskis
suggested changes
May 16, 2019
Contributor
sduskis
left a comment
There was a problem hiding this comment.
Overall looks good, but I want to look over this more. I have a few documentation comments.
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
Outdated
Show resolved
Hide resolved
1b8a212 to
7e5b9d7
Compare
Contributor
Author
|
@sduskis Made the changes suggested, the diff can be seen in the first force push. (the second force push was just rebasing on top of the latest |
1 task
sduskis
approved these changes
May 21, 2019
tseaver
reviewed
May 23, 2019
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
Show resolved
Hide resolved
pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
Show resolved
Hide resolved
In certain cases automatically leasing Message instances upon creation might not be desired, thus an optional parameter is added to Message initializer that allows skipping that. The default behavior is not changed, new Message instances *are* automatically leased upon creation.
Leasing messages through a request queue in dispatcher causes a race condition with the ConsumeBidirectionalStream thread. A request to pause the background consumer can arrive when the Bidi consumer is just about to fetch the the next batch of messages, and thus the latter gets paused only *after* fetching those messages. This commit synchronously leases received messages in the streaming pull manager callback. If that hits the lease management load limit, the background consumer is paused synchronously, and will correctly pause *before* pulling another batch of messages.
If the PubSub backend sends too many messages in a single response that would cause the leaser overload should all these messeges were added to it, the StreamingPullManager now puts excessive messages into an internal holding buffer. The messages are released from the buffer when the leaser again has enough capacity (as defined by the FlowControl settings), and the message received callback is invoked then as well.
With the StreamingPullManager._on_response() callback adding received messages to the leaser synchronously (in the background consumer thread), a race condition can happen with the dispatcher thread that can asynchronously add (remove) messages to (from) lease management, e.g. on ack() and nack() requests. The same is the case with related operations of maybe pausing/resuming the background consumer. This commit thus adds locks in key places, assuring that these operations are atomic, ant not subject to race conditions.
Contributor
Author
tseaver
approved these changes
May 24, 2019
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #7677.
This PR fixes how subscriber client handles the FlowControl.max_messages setting. It makes sure that subscriber only delivers (i.e. invokes callbacks)
max_messagesat a time, and resumes invoking callbacks only when the user code acknowledges some of the previously delivered messages.How to test
Steps to reproduce:
Actual result (before the fix):
FlowControl.max_messages.Expected result (after the fix):
max_messages):max_messagespending messages (received, but not yet acknowledged) - the"pending ACK"figure in the logs. The streaming pull process waits until some of the messages are ACK-ed before delivering new messages.Footnotes
For
max_messagessettings greater than 10, one might observe that a maximum of 10 (or less) messages are delivered to the client code at once, especially if message callbacks are slow, e.g. by artificially sleeping for prolonged periods of time.This is caused by the default callback thread pool size, which has
max_workersset to 10.This can be mitigated by creating the subscriber client with a custom
schedulerthat uses a thread poll with a higher cap on the worker thread count, or by using batch callbacks for received messages. The latter feature is almost done, and will submit a PR for it soon.