[SPARK-55848][SQL][4.1] Fix incorrect dedup results with SPJ partial clustering#54751
[SPARK-55848][SQL][4.1] Fix incorrect dedup results with SPJ partial clustering#54751naveenp2708 wants to merge 1 commit intoapache:branch-4.1from
Conversation
|
@peter-toth Fix + tests PR for branch-4.1 as discussed. |
| // Checkpoint the items scan, then join with purchases under partial clustering, | ||
| // and finally dedup. The isPartiallyClustered flag must cause satisfies0()=false | ||
| // for ClusteredDistribution, so the dedup still produces correct distinct ids. | ||
| val itemsDf = spark.read.table(s"testcat.ns.$items").checkpoint() |
There was a problem hiding this comment.
@naveenp2708, actually I wanted to have a test case where we checkpoint a partially clustered KeyGroupedPartitioning and assert that an exchange is inserted when there is a dedup (or anything else that requires ClusteredDistribution) on the top of the checkpoint.
I believe the only way to checkpoint a partially clustered KeyGroupedPartitioning is to checkpoint a join whose output is partially clustered. (items can't be partially clustered here.)
That's what I was asking in https://github.com/apache/spark/pull/54714/changes#r2912671678 as well.
The reason why IMO this test case is important, is because this PR fixes the problem by inserting an exchange if we have partially clustered KeyGroupedPartitioning and we need to satisfy a ClusteredDistribution.
That is a correct fix, but technically there is another way to fix the problem, which is to disable partial clustering in scans, so that the join doesn't produce partially clustered KeyGroupedPartitioning. This way we wouldn't need to add any exhanges on the top of the join before the dedup and the fix could be more efficient. But there is a problem with this alternative fix, which is that we can't disable partial clustering in scans if we don't have access to them due to the checkpoint...
So to sum up, I like this PR because it is a simple fix and deals with all cases, but I think we should test the checkpointed partially clustered KeyGroupedPartitioning case as well.
|
@peter-toth Makes sense — I'll update the checkpointed test to checkpoint the join result (which has partially-clustered KeyGroupedPartitioning) instead of the scan, then apply dedup on top of the checkpoint. This tests the case where partial clustering can't be disabled because scans are behind the checkpoint. Will push shortly. |
852c60a to
a588bf8
Compare
|
@peter-toth Updated the checkpointed test per your feedback — now checkpoints the join result (not the scan) and applies dedup on top. This tests the case where partial clustering can't be disabled because scans are behind the checkpoint. |
| checkAnswer(df, Seq(Row(1, 41.0f), Row(2, 10.0f), Row(3, 15.5f))) | ||
|
|
||
| val allShuffles = collectAllShuffles(df.queryExecution.executedPlan) | ||
| assert(allShuffles.nonEmpty, |
There was a problem hiding this comment.
Besides checking the presence of shuffle, can you please also check if we have partially clustered KeyGroupedPartitioning from the scans?
| checkAnswer(df, Seq(Row(1), Row(2), Row(3))) | ||
|
|
||
| val allShuffles = collectAllShuffles(df.queryExecution.executedPlan) | ||
| assert(allShuffles.nonEmpty, |
There was a problem hiding this comment.
Besides checking the presence of shuffle, can you please also check if we have partially clustered KeyGroupedPartitioning from the checkpoint?
| checkAnswer(df, Seq(Row(1), Row(2), Row(3))) | ||
|
|
||
| val allShuffles = collectAllShuffles(df.queryExecution.executedPlan) | ||
| assert(allShuffles.nonEmpty, |
There was a problem hiding this comment.
Besides checking the presence of shuffle, can you please also check if we have partially clustered KeyGroupedPartitioning from the scans?
peter-toth
left a comment
There was a problem hiding this comment.
Looks good to me, I have just minor requests.
There are more efficient alternatives in some cases, but I like the simplicity of this fix and IMO it deals with all cases: #54751 (comment)
cc @szehon-ho, @cloud-fan
szehon-ho
left a comment
There was a problem hiding this comment.
Fix looks good to me, agree with @peter-toth 's comment to improve the test
a588bf8 to
c09d62b
Compare
|
@peter-toth @szehon-ho Added isPartiallyClustered assertions to all 3 tests:
|
Can you please disable AQE for this test? |
…clustering When SPJ partial clustering splits a partition across multiple tasks, post-join dedup operators (dropDuplicates, Window row_number) produce incorrect results because KeyGroupedPartitioning.satisfies0() incorrectly reports satisfaction of ClusteredDistribution via super.satisfies0() short-circuiting the isPartiallyClustered guard. This fix adds an isPartiallyClustered flag to KeyGroupedPartitioning and restructures satisfies0() to check ClusteredDistribution first, returning false when partially clustered. EnsureRequirements then inserts the necessary Exchange. Plain SPJ joins without dedup are unaffected. Closes apache#54378
c09d62b to
baaf958
Compare
|
@peter-toth Disabled AQE in the checkpointed test. Now the RDDScanExec directly carries the KeyGroupedPartitioning, so the assertion checks the checkpoint's outputPartitioning directly. All 3 tests pass. |
What changes were proposed in this pull request?
Backport fix for SPARK-55848 to branch-4.1. This branch does not have the KeyGroupedPartitioning refactor (#54330) from master.
The fix adds an
isPartiallyClusteredflag toKeyGroupedPartitioningand restructuressatisfies0()to checkClusteredDistributionfirst, returningfalsewhen partially clustered.EnsureRequirementsthen inserts the necessary Exchange.Why are the changes needed?
SPJ with partial clustering produces incorrect results for post-join dedup operations (dropDuplicates, Window row_number). The partially-clustered partitioning is incorrectly treated as satisfying
ClusteredDistribution, so no Exchange is inserted before dedup operators.Does this PR introduce any user-facing change?
Yes. Queries using SPJ with partial clustering followed by dedup operations will now return correct results.
How was this patch tested?
Three regression tests added to KeyGroupedPartitioningSuite with data correctness checks and plan assertions verifying shuffle Exchange presence. All 95 tests pass.
Was this patch authored or co-authored using generative AI tooling?
No.