Skip to content

[SPARK-55848][SQL][4.1] Fix incorrect dedup results with SPJ partial clustering#54751

Open
naveenp2708 wants to merge 1 commit intoapache:branch-4.1from
naveenp2708:spark-55848-fix-branch-4.1
Open

[SPARK-55848][SQL][4.1] Fix incorrect dedup results with SPJ partial clustering#54751
naveenp2708 wants to merge 1 commit intoapache:branch-4.1from
naveenp2708:spark-55848-fix-branch-4.1

Conversation

@naveenp2708
Copy link

What changes were proposed in this pull request?

Backport fix for SPARK-55848 to branch-4.1. This branch does not have the KeyGroupedPartitioning refactor (#54330) from master.

The fix adds an isPartiallyClustered flag to KeyGroupedPartitioning and restructures satisfies0() to check ClusteredDistribution first, returning false when partially clustered. EnsureRequirements then inserts the necessary Exchange.

Why are the changes needed?

SPJ with partial clustering produces incorrect results for post-join dedup operations (dropDuplicates, Window row_number). The partially-clustered partitioning is incorrectly treated as satisfying ClusteredDistribution, so no Exchange is inserted before dedup operators.

Does this PR introduce any user-facing change?

Yes. Queries using SPJ with partial clustering followed by dedup operations will now return correct results.

How was this patch tested?

Three regression tests added to KeyGroupedPartitioningSuite with data correctness checks and plan assertions verifying shuffle Exchange presence. All 95 tests pass.

Was this patch authored or co-authored using generative AI tooling?

No.

@naveenp2708
Copy link
Author

@peter-toth Fix + tests PR for branch-4.1 as discussed.

// Checkpoint the items scan, then join with purchases under partial clustering,
// and finally dedup. The isPartiallyClustered flag must cause satisfies0()=false
// for ClusteredDistribution, so the dedup still produces correct distinct ids.
val itemsDf = spark.read.table(s"testcat.ns.$items").checkpoint()
Copy link
Contributor

@peter-toth peter-toth Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@naveenp2708, actually I wanted to have a test case where we checkpoint a partially clustered KeyGroupedPartitioning and assert that an exchange is inserted when there is a dedup (or anything else that requires ClusteredDistribution) on the top of the checkpoint.

I believe the only way to checkpoint a partially clustered KeyGroupedPartitioning is to checkpoint a join whose output is partially clustered. (items can't be partially clustered here.)
That's what I was asking in https://github.com/apache/spark/pull/54714/changes#r2912671678 as well.

The reason why IMO this test case is important, is because this PR fixes the problem by inserting an exchange if we have partially clustered KeyGroupedPartitioning and we need to satisfy a ClusteredDistribution.
That is a correct fix, but technically there is another way to fix the problem, which is to disable partial clustering in scans, so that the join doesn't produce partially clustered KeyGroupedPartitioning. This way we wouldn't need to add any exhanges on the top of the join before the dedup and the fix could be more efficient. But there is a problem with this alternative fix, which is that we can't disable partial clustering in scans if we don't have access to them due to the checkpoint...

So to sum up, I like this PR because it is a simple fix and deals with all cases, but I think we should test the checkpointed partially clustered KeyGroupedPartitioning case as well.

@naveenp2708
Copy link
Author

@peter-toth Makes sense — I'll update the checkpointed test to checkpoint the join result (which has partially-clustered KeyGroupedPartitioning) instead of the scan, then apply dedup on top of the checkpoint. This tests the case where partial clustering can't be disabled because scans are behind the checkpoint. Will push shortly.

@naveenp2708 naveenp2708 force-pushed the spark-55848-fix-branch-4.1 branch from 852c60a to a588bf8 Compare March 11, 2026 23:22
@naveenp2708
Copy link
Author

@peter-toth Updated the checkpointed test per your feedback — now checkpoints the join result (not the scan) and applies dedup on top. This tests the case where partial clustering can't be disabled because scans are behind the checkpoint.

checkAnswer(df, Seq(Row(1, 41.0f), Row(2, 10.0f), Row(3, 15.5f)))

val allShuffles = collectAllShuffles(df.queryExecution.executedPlan)
assert(allShuffles.nonEmpty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides checking the presence of shuffle, can you please also check if we have partially clustered KeyGroupedPartitioning from the scans?

checkAnswer(df, Seq(Row(1), Row(2), Row(3)))

val allShuffles = collectAllShuffles(df.queryExecution.executedPlan)
assert(allShuffles.nonEmpty,
Copy link
Contributor

@peter-toth peter-toth Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides checking the presence of shuffle, can you please also check if we have partially clustered KeyGroupedPartitioning from the checkpoint?

checkAnswer(df, Seq(Row(1), Row(2), Row(3)))

val allShuffles = collectAllShuffles(df.queryExecution.executedPlan)
assert(allShuffles.nonEmpty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides checking the presence of shuffle, can you please also check if we have partially clustered KeyGroupedPartitioning from the scans?

Copy link
Contributor

@peter-toth peter-toth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, I have just minor requests.

There are more efficient alternatives in some cases, but I like the simplicity of this fix and IMO it deals with all cases: #54751 (comment)

cc @szehon-ho, @cloud-fan

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix looks good to me, agree with @peter-toth 's comment to improve the test

@naveenp2708 naveenp2708 force-pushed the spark-55848-fix-branch-4.1 branch from a588bf8 to c09d62b Compare March 12, 2026 21:48
@naveenp2708
Copy link
Author

naveenp2708 commented Mar 12, 2026

@peter-toth @szehon-ho Added isPartiallyClustered assertions to all 3 tests:

  • Tests 1 & 2: Verified BatchScanExec has KeyGroupedPartitioning with isPartiallyClustered=true
  • Test 3: The checkpoint node doesn't carry KeyGroupedPartitioning because AQE's AdaptiveSparkPlanExec reports UnknownPartitioning. Instead, verified the scans in the pre-checkpoint join plan have isPartiallyClustered=true. Let me know if you'd like a different approach.

@peter-toth
Copy link
Contributor

  • Test 3: The checkpoint node doesn't carry KeyGroupedPartitioning because AQE's AdaptiveSparkPlanExec reports UnknownPartitioning. Instead, verified the scans in the pre-checkpoint join plan have isPartiallyClustered=true. Let me know if you'd like a different approach.

Can you please disable AQE for this test?

…clustering

When SPJ partial clustering splits a partition across multiple tasks,
post-join dedup operators (dropDuplicates, Window row_number) produce
incorrect results because KeyGroupedPartitioning.satisfies0() incorrectly
reports satisfaction of ClusteredDistribution via super.satisfies0()
short-circuiting the isPartiallyClustered guard.

This fix adds an isPartiallyClustered flag to KeyGroupedPartitioning and
restructures satisfies0() to check ClusteredDistribution first, returning
false when partially clustered. EnsureRequirements then inserts the
necessary Exchange. Plain SPJ joins without dedup are unaffected.

Closes apache#54378
@naveenp2708 naveenp2708 force-pushed the spark-55848-fix-branch-4.1 branch from c09d62b to baaf958 Compare March 13, 2026 14:21
@naveenp2708
Copy link
Author

@peter-toth Disabled AQE in the checkpointed test. Now the RDDScanExec directly carries the KeyGroupedPartitioning, so the assertion checks the checkpoint's outputPartitioning directly. All 3 tests pass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants