-
Notifications
You must be signed in to change notification settings - Fork 14
feat: Reset Consumer upon out-of-band seek #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java
Outdated
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java
Show resolved
Hide resolved
src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java
Outdated
Show resolved
Hide resolved
I think the majority of your code is correct for the PR. The main thing I'm getting stuck on is what the seek reset behavior should be in the presence of client-initiated seeks. I think its reasonable to say that this is unsupported with a comment in the section here https://github.com/googleapis/java-pubsublite-kafka#about-pubsub-lite-kafka-shim and wherever the equivalent section is in the docs. |
I'll note in the Devsite docs for admin seek that mixing use of client-initiated and admin seeks is not recommended, as they would clobber each other. Also, I amended this PR to disable handling admin seeks when the Consumer auto-commit setting is false, since this could also lead to races:
|
Resets the Kafka Consumer state to handle admin seeks pushed from the server.
SubscriberState
was refactored out intoSinglePartitionSubscriber
. The main motivation was to give each subscriber state its own mutex and update its internal state (for auto-commits) atomically when pulling messages.SinglePartitionSubscriber::onSubscriberReset()
to reset subscriber state and wait for commits when theRESET
signal is received from the server.