feat: option to disable record reordering#661
Conversation
d38fe8e to
f22a7e4
Compare
|
Added a warning when events are received out of order |
|
@kinolaev thank you for looking into this. this is very important feature to improve.
this we can ask in debezium zulip to the dev team. AFAIK they are ordered. in term of fix. Could we use other metadata fields to ensure 100% event ordering somehow? for example an event has additional information to sort according to their binlog, even when they happen in same nanosecond. the issue is this additional fields are different database to database. |
...ceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java
Outdated
Show resolved
Hide resolved
i think 2 things could be done to improve it.
Edit: we don't need option 2, since Debezium sends events ordered, confirmed by Debezium dev team. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time. Thank you for your contributions. |
|
Confirmed that [debezium delivers the events in order.](#dev > Consumer Record deduplication (Iceberg Consumer)) simply removing this logic and keeping the last row should do it. |
f22a7e4 to
029f680
Compare
|
Thank you @ismailsimsek for the info! Finally I've got some time to update the PR. I've removed reordering during deduplication and flagged rows that were inserted within the batch, so we won't add their primary keys to the equality delete files. |
|
Fixed delta writer behavior - we have to delete a record when upsert-keep-deletes is true even if the first event for the record in a batch is an insert because a deleted version of the record could exists in a table. |
...ceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java
Show resolved
Hide resolved
37ad251 to
6470820
Compare
ismailsimsek
left a comment
There was a problem hiding this comment.
@kinolaev thank you for the changes, everything looks good. commented few small points.
...iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java
Show resolved
Hide resolved
...iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java
Show resolved
Hide resolved
...iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java
Show resolved
Hide resolved
...ceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java
Outdated
Show resolved
Hide resolved
| @@ -96,19 +96,6 @@ public void testSimpleUpsert() throws Exception { | |||
| Assertions.assertEquals(ds.where("id = 5 AND __op= 'r' ").count(), 1); | |||
| Assertions.assertEquals(ds.where("id = 6 AND __op= 'u' AND first_name= 'Updatedname-6-V1'").count(), 1); | |||
There was a problem hiding this comment.
is there any additional or new edge cases to add ?
There was a problem hiding this comment.
The only thing I've noticed is that the cases with nonempty dedup-column are not covered now. Is that important?
There was a problem hiding this comment.
should be ok. once we format and the ci passes we can merge
spotless:apply should do the formatting.
There was a problem hiding this comment.
Thanks for the hint, done in c1ee98a.
upd: seems like spotless:apply is not enough( I'll figure it out a bit later.
upd: fixed. The reason was <ratchetFrom>refs/remotes/origin/master</ratchetFrom>, because I've merged several commits to my origin/master branch before.
c1ee98a to
541fd08
Compare
|
Thank you @kinolaev merged. |
I'd like to have an option to prioritize the latest event during deduplication, because it seems like debezium events are already ordered. For example, for a sequence of operations
I'd like to get a row
1,'hallo'in an iceberg table. But because deletes have higher priority than inserts, I get1,'hello'.I'm not sure that events are always ordered and that is why I propose to add an option instead of changing the default behavior.