-
Notifications
You must be signed in to change notification settings - Fork 56
consumer: flexible transaction boundaries #303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@psFried FYI Some more unit testing and a lot of integration testing I want to do, plus reworking commit structure, but I believe the implementation is all here. |
psFried
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving some thoughts, but overall I feel like this all makes sense and seems right. This is serving as a marker of where I left off in review, since I think you're still adding some commits.
Before, Sequencer expected that runs of acknowledged messages would be processed as a single atomic unit, and only concerned itself with tracking state changes from *uncommitted* messages. This is now inverted: when a run of acknowledged messages is ready for processing, Sequencer is Step()'d through each message in turn, and may further be asked to produce a Checkpoint() at any time. Checkpoints within acknowledged runs are accomplished by iteratively tightening the ProducerState bound to reflect the Clock of the *last* stepped-past message and the offset of the stepped-to *next* one. QueueUncommitted provides a finer-grain QueueOutcome result type which gives detail into the exact queue decision made, and the QueueOutcome is now the mechanism by which the caller discovers the need to start a replay read (rather than receiving a ErrMustStartReplay, which is removed). Finally, Sequencer now takes responsiblity for maintaining read-through offsets of journals. This is a more natural fit because read-through offsets *must not* reflect a message ACK until its acknoweldged run has been fully processed, and Sequencer best understands this detail. Issue gazette#300
There's no valid interpretation where nil makes sense for these, so remove the potential for nil-pointer mistakes.
Formerly, processing acknowledged messages was done in a tight loop within txnConsume. This was pretty problematic for large transactions, because the application was required to handle with potentially far more messages in a single transaction that it might prefer (or even have resources to handle). It can also cause tranaction timers to be wildly ignored, depending on how much work the application does per-message. Now, processing of a run of acknowledged messages is hoisted to txnStep, and includes timer checks. An unfinshed run of messages is able to be deferred to a following transaction, and a new error is introduced to also allow applications to make a determination of when a transaction should close. The updated message.Sequencer is used for tracking and production of fine-grain Checkpoints, including ones which are within an acknowledged message run. Issue gazette#300
SHARD_STOPPED consolidates cancellation vs failure cases into a single code which is returned by Resolve. It applies both while awaiting ready-ness of the shard's store, and also while awaiting read-through offsets of a Stat request. Also update test support functions to use Stat to read-through a set of known, published acknowledgements rather than presuming that a single shard commit means that all published messages have been processed. That's no longer always true with the relaxation of consumer transaction boundaries (try adding a sleep to testApplication.ConsumeMessage). Issue gazette#300 Issue estuary/flow#123
Verify we report readThrough progress on duplicate ACKs.
TestTxnMaxDurWithinDequeueSequence simulates a transaction maximum duration timeout in the middle of processing a transactional sequence. Expectation is that the current transaction closes, and the next transaction picks up where it left off. TestAppDeferWithinDequeueSequence simulates an application which defers processing of a message to a future transaction. Similar expectation: the current transaction closes and the next picks up where it left off. Issue gazette#300
To excercise sequence replay after a shard hand-off.
c5b5c34 to
ae45b8a
Compare
|
I've addressed feedback and re-worked commit messages. I left all but the first two commits (which were merged up WIP) alone. |
Default to 8192 (existing value) if zero, but otherwise take the value from the ShardSpec. Issue gazette#304
psFried
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is looking good. I feel like it would be good to get another set of eyes on this before merging, though, since it's a pretty tricky change, and I'm not 100% clear on to what extent these changes affect portions of the API that people are using today.
saterus
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't say I'm an expert at any of this yet, but this exercise definitely helped connect a lot of dots between abstract concepts and the code that makes it real. I have a few lingering questions that you might be able to help clear up.
| // We could just ask Dequeued for its UUID, but the application | ||
| // may have changed it out from under us. That's especially easy | ||
| // to do if the application published it (which assigns a new UUID). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this a bit more? I don't think I understand what it means for the "application to have changed out from under us".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Dequeued message is passed to the application's ConsumeMessage callback, and that can do whatever it likes with the message. When it returns, we can't assume that its UUID continues to be the same UUID that we read. And in fact it won't be, if the application immediately published the message to another journal (as that sets a new UUID value on the decoded instance).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we're passing the Envelope to ConsumeMessage by value, rather than by reference, right? The Application shouldn't be able to mutate the message that's sitting in the Sequencer.
Even if we say that it could potentially mutate it and we need to defend against that here, phrasing it as "change the Application" seems a bit misleading to me. If you're republishing the same message to a different journal, it's just a different message now. This seems reflected in the reality of what's happening: the UUID of the message is changing. Is this the right way to think about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Envelope is passed by-value, including its contained Message field which is an interface (always a reference). We're essentially passing a reference by-value, but it's still a reference.
| } | ||
| // We otherwise don't block if we're no longer reading messages. | ||
| if txn.readCh == nil { | ||
| // We don't block if the read channel has drained and no | ||
| // transaction has been started (and now cannot ever start). | ||
| if txn.readCh == nil && txn.consumedCount == 0 { | ||
| return false | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing I'm missing something, so I'll ask:
Why doesn't txn.readCh == nil crash when it returns out to txnStep and we immediately try to read from that channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! This is a neat detail of the Go spec and a fun technique.
A var foo chan Bar is always a reference to an allocated channel structure, or nil. You can swap out the backing channel it points to, or set it to nil to nil out the reference without touching the allocated channel, which may have other references.
Selecting into or out of a nil channel blocks forever. It doesn't crash. That means that nil-ing out foo is a good way to turn it off. So, here we're not messing with the underlying message channel, we're just causing it to not select in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see now. I didn't internalize that channels were always passed by reference. It's inconsistent since the syntax is hiding the pointer type with var foo chan Bar rather than var foo *(chan Bar) (or some syntax like that).
I knew about non-blocking reads from channels with select, but I don't think I put together that inside a select the <-nil would block rather than crash. Outside of select the <-nil crashes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. map is also by-reference FYI.
| for { | ||
| var msgCh = make(chan EnvelopeOrError, messageBufferSize) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a consumer signals they would like to close the transaction via ErrDeferToNextTransaction, we nil out the txn.readCh.
- Do we need to explicitly close this channel?
- Is this the only mechanism needed to resume? Or is there more to the contract around using
ErrDeferToNextTransaction?
On the Flow side, will this cause any Task/Application to restart itself when the channel is closed and we subsequently call StartReadingMessages again? I'm looking at Captures since it's a MessageProducer, but this could matter for non-MessageProducers too.
Specifically, Capture.serverDriverTransactions, which will attempt to send data to this channel that we're setting to nil. I'm not sure how (or how far) this propagates back to the Capture driver, but it seems like we either need to maintain some continuity or deliberately restart things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
No, we don't want to close the channel. Only senders should ever close a channel in Go, and we're a receiver. In fact,
readChisn't a typechan Messagebut rather a type<-chan Message, meaning it can only receive and cannot be closed. This particular channel gets closed by the implementation ofStartReadingMessagesthat pumps into it, when that function decides it's done for whatever reason and isn't going to send anymore. -
The effect of this is to not read any more messages in this transaction. The
readChvariable on the *transaction is a separate reference from thereadChpassed torunTransactions. Eachtransactionstarts with the latter, and sets it to nil when it's not going to read anymore.txnInitinitializes the nexttransactionwith a non-nilreadChand so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The effect of this is to not read any more messages in this transaction. The readCh variable on the *transaction is a separate reference from the readCh passed to runTransactions. Each transaction starts with the latter, and sets it to nil when it's not going to read anymore. txnInit initializes the next transaction with a non-nil readCh and so on.
Gotcha! This is what I was missing here. The channel continues to fill with new messages, even as the transaction goes through the motions of committing itself. 💯
Only senders should ever close a channel in Go
How does a receiver signal that it's done? Would you open yet another channel to signal that we're hanging up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| How does a receiver signal that it's done?
This is really where Context comes in. You cancel a context that the sender is also selecting over.
Naively, just having the receiver go away is a good way to get leaked goroutnines trying to send into a channel that will never again unblock. So you always want another avenue by which the sender discovers that it's work isn't needed anymore. Check out the StartReadingMessages pump in go/shuffle/reader.go in the Flow codebase; it's send is a select over sending a message OR receiving <-ctx.Done().
Also check out servePrimary in shard.go. When we want to do a clean restart of the message loop, that's done by having the sender close the channel, and when we've read through its contents and the close, then we start up a new message loop.
|
We've been running this in production a few days and it's looking solid. Going ahead and merging. |
Update
message.Sequencerto enable fine-grain tracking within a run of acknowledged messages. The Sequencer is now able to take checkpoints that reflect partial processing of a single run, by incrementally tightening the minimum Clock and offset of the ProducerState as messages are processed.Update
consumertransaction processing to hoist the former tight loop of processing an acknowledged message run, to the top-leveltxnStep. This makes processing of runs incremental, and subject to interruption due to transaction timeouts or a new error return-able by clients, electing to defer processing of a message to a following transaction.Tested using Gazette's soak tests, and also within Flow.
Issue #300
This change is