feat: Add table.maintenance.compact() for full-table data file compaction#3124
feat: Add table.maintenance.compact() for full-table data file compaction#3124qzyu999 wants to merge 7 commits intoapache:mainfrom
Conversation
This introduces a simplified, whole-table compaction strategy via the MaintenanceTable API (`table.maintenance.compact()`). Key implementation details: - Reads the entire table state into memory via `.to_arrow()`. - Uses `table.overwrite()` to rewrite data, leveraging PyIceberg's target file bin-packing (`write.target-file-size-bytes`) natively. - Ensures atomicity by executing within a table transaction. - Explicitly sets `snapshot-type: replace` and `replace-operation: compaction` to ensure correct metadata history for downstream engines. - Includes a guard to safely ignore compaction requests on empty tables. Includes full Pytest coverage in `tests/table/test_maintenance.py`. Closes apache#1092
pyiceberg/table/maintenance.py
Outdated
|
|
||
| # Overwrite the table atomically (REPLACE operation) | ||
| with self.tbl.transaction() as txn: | ||
| txn.overwrite(arrow_table, snapshot_properties={"snapshot-type": "replace", "replace-operation": "compaction"}) |
There was a problem hiding this comment.
i think we should have a replace operation instead
https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/DataOperations.html#REPLACE
we might want to create the .replace() first
There was a problem hiding this comment.
Hi @kevinjqliu, thanks for the insight, I agree with what you're saying in terms of building a replace rather than just reusing the overwrite. I've refactored the compaction run to properly use a .replace() API, following the design of the Java Iceberg implementation.
The approach is to create a new _RewriteFiles in pyiceberg/table/update/snapshot.py, which utilizes the new Operation.REPLACE from pyiceberg/table/update/snapshots.py. The _RewriteFiles utilizes the replace(), which effectively mimics the _OverwriteFiles operation, with the exception that it uses Operation.REPLACE instead of Operation.OVERWRITE. This allows MaintenanceTable.compact() to do a proper txn.replace() rather than reuse txn.overwrite().
I also think it's worth noting that by adding Operation.REPLACE, we make room for the needed rewrite manifests (#270) and delete orphan files functionality (#1200).
| after_files = list(table.scan().plan_files()) | ||
| assert len(after_files) == 3 # Should be 1 optimized data file per partition | ||
| assert table.scan().to_arrow().num_rows == 120 | ||
|
|
There was a problem hiding this comment.
since its a small result set, we should verify the data is the same too
There was a problem hiding this comment.
Hi @kevinjqliu, made a change in 6420027 to check that the columns and the primary keys remain the same before/after compaction.
pyiceberg/table/maintenance.py
Outdated
| return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True)) | ||
|
|
||
| def compact(self) -> None: | ||
| """Compact the table's data files by reading and overwriting the entire table. |
There was a problem hiding this comment.
this should be data and delete files. but generally it compacts the entire table
There was a problem hiding this comment.
Hi @kevinjqliu, made the update to the docstring here: 9fd51a8.
…ction in test_maintenance_compact()
Formats the [compact](iceberg-python/pyiceberg/table/maintenance.py) method docstring to ensure the summary line does not wrap and correctly ends with a period, satisfying pydocstyle D205 and D400 rules.
Replaces the use of .overwrite() in MaintenanceTable.compact() with a new .replace() API backed by a _RewriteFiles producer. This ensures compaction now generates an Operation.REPLACE snapshot instead of Operation.OVERWRITE, preserving logical table state for downstream consumers. Fixes apache#1092
| for data_file in data_files: | ||
| append_files.append_data_file(data_file) | ||
|
|
||
| def replace( |
There was a problem hiding this comment.
lets add replace on its own since its a pretty significant change and follow up with table compaction.
i think there are a few more things we need to add to the replace operation. Would be a good idea to look into the java side. For example, how can we ensure that the table's data remains the same? REPLACE means no data change. If we cannot guarantee that the data remains the same, maybe we should not expose a replace function that takes a df as a parameter
There was a problem hiding this comment.
Hi @kevinjqliu, I created an issue (#3130) and a corresponding PR (#3131) to address the need to create a separate PR for replace. When approved, we can use that to build and complete this current PR for compaction. We can move this discussion to there and come back when finished.
Closes #1092
Rationale for this change
This introduces a simplified, whole-table compaction strategy via the MaintenanceTable API (
table.maintenance.compact()).Key implementation details:
.to_arrow().table.overwrite()to rewrite data, leveraging PyIceberg's target file bin-packing (write.target-file-size-bytes) natively.snapshot-type: replaceandreplace-operation: compactionto ensure correct metadata history for downstream engines.Are these changes tested?
Includes full Pytest coverage in
tests/table/test_maintenance.py.Are there any user-facing changes?
Yes. This PR adds a new compact() method to the TableMaintenance API, allowing users to perform file compaction on existing Iceberg tables.
Example usage:
Edit: It looks like I'm not able to add the changelog label, hopefully someone with permissions can do so.