Skip to content

[FLINK-39202][table] Add new SimplifyCoalesceWithEquiJoinConditionRule#27733

Open
gustavodemorais wants to merge 4 commits intoapache:masterfrom
gustavodemorais:FLINK-39202
Open

[FLINK-39202][table] Add new SimplifyCoalesceWithEquiJoinConditionRule#27733
gustavodemorais wants to merge 4 commits intoapache:masterfrom
gustavodemorais:FLINK-39202

Conversation

@gustavodemorais
Copy link
Contributor

What is the purpose of the change

Add a new planner optimization rule that simplifies COALESCE expressions referencing columns from opposite sides of an equi-join condition, replacing them with a direct column reference to the preserved (non-null) side.

Brief change log

  • Added SimplifyCoalesceWithEquiJoinConditionRule that rewrites COALESCE(a.col, b.col) to a direct column reference when the columns form an equi-join pair and the join type guarantees one side is non-null
  • Registered the rule in both FlinkBatchRuleSets and FlinkStreamRuleSets

Verifying this change

  • Added SimplifyCoalesceWithEquiJoinConditionRuleTest covering LEFT, RIGHT, INNER, and FULL OUTER joins, composite keys, non-equi columns, three-arg COALESCE, and nested expressions

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 4, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

simplified = true;

// Handle potential type mismatch by adding a CAST if needed
if (call.getType().equals(preservedRef.getType())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why LogicalTypeCast#supportsImplicitCast is not an option here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good option 👍

I think in practice the types in the join conditions are always or almost always the same so the behavior will be the same but I think with implicit casts it's slightly more broad and accurate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

void testCoalesceOnInnerJoinEquiKey() {
util.verifyRelPlan(
"SELECT COALESCE(b.order_id, a.order_id) AS order_id "
+ "FROM orders a INNER JOIN order_details b ON a.order_id = b.order_id");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about cases with COALESCE in condition?
is it out of scope?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's out of scope and doesn't apply here: when it's in the join condition, we have to evaluate it for all records. So it happens before the join and there's no preserved side or something like that

*/
@Internal
@Value.Enclosing
public class SimplifyCoalesceWithEquiJoinConditionRule
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a separate rule or we can combine it with RemoveUnreachableCoalesceArgumentsRule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having different rules is cleaner since they work differently:

  1. RemoveUnreachableCoalesceArgumentsRule trims arguments based on type nullability metadata (e.g., COALESCE(nonNull, x) -> nonNull), without any join awareness.
  2. This one is specific to joins and merging both would look less elegant IMO.

Comment on lines +131 to +135
void testNestedCoalesceInExpression() {
util.verifyRelPlan(
"SELECT CAST(COALESCE(b.order_id, a.order_id) AS STRING) AS order_id_str "
+ "FROM orders a LEFT JOIN order_details b ON a.order_id = b.order_id");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed it is not really about nested...

I tried to played locally with nested like I tweaked order_details in a way

        util.tableEnv()
                .executeSql(
                        "CREATE TABLE order_details ("
                                + "  r ROW<order_id BIGINT NOT NULL, order_name STRING NOT NULL> NOT NULL, "
                                + "  detail STRING,"
                                + "  PRIMARY KEY (r) NOT ENFORCED"
                                + ") WITH ('connector' = 'values')");

and then the test

        util.verifyRelPlan(
                "SELECT CAST(COALESCE(b.r, ROW(a.order_id, 'e')) AS STRING) AS order_id_str "
                        + "FROM orders a LEFT JOIN order_details b ON a.order_id = b.r.order_id");

and it shows that COALESCE is still present in optimized rel plan
e.g.


Calc(select=[CAST(COALESCE(r, ROW(order_id, 'e')) AS VARCHAR(2147483647)) AS order_id_str])
+- Join(joinType=[LeftOuterJoin], where=[=(order_id, $f2)], select=[order_id, r, $f2], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey])
   :- Exchange(distribution=[hash[order_id]])
   :  +- TableSourceScan(table=[[default_catalog, default_database, orders, project=[order_id], metadata=[]]], fields=[order_id])
   +- Exchange(distribution=[hash[$f2]])
      +- Calc(select=[r, r.order_id AS $f2])
         +- TableSourceScan(table=[[default_catalog, default_database, order_details, project=[r], metadata=[]]], fields=[r])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since r is declared as ROW(...) NOT NULL i would expect removal of COALESCE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or did I miss anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name was misleading 👍 The test is actually about being inside the cast. Renamed it to testCoalesceWrappedInCast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding structured type/row support:

  • We support structured type field access. Added a new test testCoalesceOnNestedRowScalarField
  • We don't support the case you mentioned. 'order_name' and the literal 'e' here are not always equal, so we cannot simplify this
  • Additionally, entire rows as join conditions are out of scope - also not a common/recommended practice afaik

Comment on lines +139 to +146
util.tableEnv()
.executeSql(
"CREATE TABLE order_details_row ("
+ " r ROW<order_id BIGINT NOT NULL, order_name STRING NOT NULL> NOT NULL, "
+ " detail STRING,"
+ " PRIMARY KEY (r) NOT ENFORCED"
+ ") WITH ('connector' = 'values')");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should follow same approach and move to the top of test class

Comment on lines +117 to +119
util.verifyRelPlan(
"SELECT COALESCE(b.order_id, a.order_id, 0) AS order_id "
+ "FROM orders a LEFT JOIN order_details b ON a.order_id = b.order_id");
Copy link
Contributor

@snuyanzin snuyanzin Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like I found an issue with COALESCE

it works if we have NOT NULL at the first position and don't have NULLs
however it doesn't seem to work if we have NULL at first position
like query

SELECT COALESCE(NULL, a.order_id, 0) AS order_id
"FROM orders a LEFT JOIN order_details b ON a.order_id = b.order_id

doesn't matter how many args > 1
and even more interesting thing: doesn't matter whether null is at the first position or not
for instance this query also doesn't remove coalesce

SELECT COALESCE(b.order_id, NULL, a.order_id, 0) AS order_id
"FROM orders a LEFT JOIN order_details b ON a.order_id = b.order_id

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw if I add NULL at the end it works
like

SELECT COALESCE(b.order_id, a.order_id, NULL) AS order_id
"FROM orders a LEFT JOIN order_details b ON a.order_id = b.order_id

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants