From 5c8dc67e1ec2ab0dc68d036df2bab42fa2755ffe Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Thu, 5 Mar 2026 21:32:27 -0800 Subject: [PATCH 1/7] Add table.maintenance.compact() for full-table data file compaction This introduces a simplified, whole-table compaction strategy via the MaintenanceTable API (`table.maintenance.compact()`). Key implementation details: - Reads the entire table state into memory via `.to_arrow()`. - Uses `table.overwrite()` to rewrite data, leveraging PyIceberg's target file bin-packing (`write.target-file-size-bytes`) natively. - Ensures atomicity by executing within a table transaction. - Explicitly sets `snapshot-type: replace` and `replace-operation: compaction` to ensure correct metadata history for downstream engines. - Includes a guard to safely ignore compaction requests on empty tables. Includes full Pytest coverage in `tests/table/test_maintenance.py`. Closes #1092 --- pyiceberg/table/maintenance.py | 23 ++++++++ tests/table/test_maintenance.py | 101 ++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 tests/table/test_maintenance.py diff --git a/pyiceberg/table/maintenance.py b/pyiceberg/table/maintenance.py index 0fcda35ae9..37a243ec87 100644 --- a/pyiceberg/table/maintenance.py +++ b/pyiceberg/table/maintenance.py @@ -43,3 +43,26 @@ def expire_snapshots(self) -> ExpireSnapshots: from pyiceberg.table.update.snapshot import ExpireSnapshots return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True)) + + def compact(self) -> None: + """Compact the table's data files by reading and overwriting the entire table. + + Note: This is a full-table compaction that leverages Arrow for binpacking. + It currently reads the entire table into memory via `.to_arrow()`. + + This reads all existing data into memory and writes it back out using the + target file size settings (write.target-file-size-bytes), atomically + dropping the old files and replacing them with fewer, larger files. + """ + # Read the current table state into memory + arrow_table = self.tbl.scan().to_arrow() + + # Guard: if the table is completely empty, there's nothing to compact. + # Doing an overwrite with an empty table would result in deleting everything. + if arrow_table.num_rows == 0: + logger.info("Table contains no rows, skipping compaction.") + return + + # Overwrite the table atomically (REPLACE operation) + with self.tbl.transaction() as txn: + txn.overwrite(arrow_table, snapshot_properties={"snapshot-type": "replace", "replace-operation": "compaction"}) diff --git a/tests/table/test_maintenance.py b/tests/table/test_maintenance.py new file mode 100644 index 0000000000..a28b389aa4 --- /dev/null +++ b/tests/table/test_maintenance.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import random +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.schema import Schema +from pyiceberg.partitioning import PartitionSpec, PartitionField +from pyiceberg.transforms import IdentityTransform +from pyiceberg.exceptions import NoSuchNamespaceError + + +def test_maintenance_compact(catalog: Catalog) -> None: + # Setup Schema and specs + from pyiceberg.types import NestedField, StringType, LongType + schema = Schema( + NestedField(1, "id", LongType()), + NestedField(2, "category", StringType()), + NestedField(3, "value", LongType()), + ) + spec = PartitionSpec( + PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="category") + ) + + # Create the namespace and table + try: + catalog.create_namespace("default") + except NoSuchNamespaceError: + pass + table = catalog.create_table( + "default.test_compaction", + schema=schema, + partition_spec=spec, + ) + + # Append many small data files + categories = ["cat1", "cat2", "cat3"] + for i in range(12): + table.append(pa.table({ + "id": list(range(i * 10, (i + 1) * 10)), + "category": [categories[i % 3]] * 10, + "value": [random.randint(1, 100) for _ in range(10)], + })) + + # Verify state before compaction + before_files = list(table.scan().plan_files()) + assert len(before_files) == 12 + assert table.scan().to_arrow().num_rows == 120 + + # Execute Compaction + table.maintenance.compact() + + # Verify state after compaction + table.refresh() + after_files = list(table.scan().plan_files()) + assert len(after_files) == 3 # Should be 1 optimized data file per partition + assert table.scan().to_arrow().num_rows == 120 + + # Ensure snapshot properties specify the replace-operation + new_snapshot = table.current_snapshot() + assert new_snapshot is not None + assert new_snapshot.summary.get("snapshot-type") == "replace" + assert new_snapshot.summary.get("replace-operation") == "compaction" + + +def test_maintenance_compact_empty_table(catalog: Catalog) -> None: + from pyiceberg.types import NestedField, StringType, LongType + schema = Schema( + NestedField(1, "id", LongType()), + NestedField(2, "category", StringType()), + ) + + try: + catalog.create_namespace("default") + except NoSuchNamespaceError: + pass + + table = catalog.create_table("default.test_compaction_empty", schema=schema) + before_snapshots = len(table.history()) + + # Should safely return doing nothing + table.maintenance.compact() + + table.refresh() + after_snapshots = len(table.history()) + assert before_snapshots == after_snapshots # No new snapshot should be made From 2774bd3db2536f6c7ba555ba16963b7d8c76d4d2 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Thu, 5 Mar 2026 22:24:36 -0800 Subject: [PATCH 2/7] fix: address linting and mypy type errors in maintenance tests --- tests/table/test_maintenance.py | 41 ++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/tests/table/test_maintenance.py b/tests/table/test_maintenance.py index a28b389aa4..d68dbf6907 100644 --- a/tests/table/test_maintenance.py +++ b/tests/table/test_maintenance.py @@ -15,28 +15,27 @@ # specific language governing permissions and limitations # under the License. import random + import pyarrow as pa -import pytest from pyiceberg.catalog import Catalog +from pyiceberg.exceptions import NoSuchNamespaceError +from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import IdentityTransform -from pyiceberg.exceptions import NoSuchNamespaceError def test_maintenance_compact(catalog: Catalog) -> None: # Setup Schema and specs - from pyiceberg.types import NestedField, StringType, LongType + from pyiceberg.types import LongType, NestedField, StringType + schema = Schema( NestedField(1, "id", LongType()), NestedField(2, "category", StringType()), NestedField(3, "value", LongType()), ) - spec = PartitionSpec( - PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="category") - ) - + spec = PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="category")) + # Create the namespace and table try: catalog.create_namespace("default") @@ -51,11 +50,15 @@ def test_maintenance_compact(catalog: Catalog) -> None: # Append many small data files categories = ["cat1", "cat2", "cat3"] for i in range(12): - table.append(pa.table({ - "id": list(range(i * 10, (i + 1) * 10)), - "category": [categories[i % 3]] * 10, - "value": [random.randint(1, 100) for _ in range(10)], - })) + table.append( + pa.table( + { + "id": list(range(i * 10, (i + 1) * 10)), + "category": [categories[i % 3]] * 10, + "value": [random.randint(1, 100) for _ in range(10)], + } + ) + ) # Verify state before compaction before_files = list(table.scan().plan_files()) @@ -74,28 +77,30 @@ def test_maintenance_compact(catalog: Catalog) -> None: # Ensure snapshot properties specify the replace-operation new_snapshot = table.current_snapshot() assert new_snapshot is not None + assert new_snapshot.summary is not None assert new_snapshot.summary.get("snapshot-type") == "replace" assert new_snapshot.summary.get("replace-operation") == "compaction" def test_maintenance_compact_empty_table(catalog: Catalog) -> None: - from pyiceberg.types import NestedField, StringType, LongType + from pyiceberg.types import LongType, NestedField, StringType + schema = Schema( NestedField(1, "id", LongType()), NestedField(2, "category", StringType()), ) - + try: catalog.create_namespace("default") except NoSuchNamespaceError: pass - + table = catalog.create_table("default.test_compaction_empty", schema=schema) before_snapshots = len(table.history()) - + # Should safely return doing nothing table.maintenance.compact() - + table.refresh() after_snapshots = len(table.history()) assert before_snapshots == after_snapshots # No new snapshot should be made From 64200276bfe3be9ef6af60e1c5a99c60bde1ccea Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Fri, 6 Mar 2026 12:22:52 -0800 Subject: [PATCH 3/7] fix: verify that the table itself remains the same before/after compaction in test_maintenance_compact() --- tests/table/test_maintenance.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/table/test_maintenance.py b/tests/table/test_maintenance.py index d68dbf6907..6b969cde04 100644 --- a/tests/table/test_maintenance.py +++ b/tests/table/test_maintenance.py @@ -63,7 +63,8 @@ def test_maintenance_compact(catalog: Catalog) -> None: # Verify state before compaction before_files = list(table.scan().plan_files()) assert len(before_files) == 12 - assert table.scan().to_arrow().num_rows == 120 + arrow_table_before = table.scan().to_arrow() + assert arrow_table_before.num_rows == 120 # Execute Compaction table.maintenance.compact() @@ -72,7 +73,13 @@ def test_maintenance_compact(catalog: Catalog) -> None: table.refresh() after_files = list(table.scan().plan_files()) assert len(after_files) == 3 # Should be 1 optimized data file per partition - assert table.scan().to_arrow().num_rows == 120 + + arrow_table_after = table.scan().to_arrow() + assert arrow_table_after.num_rows == 120 + assert arrow_table_before.column_names == arrow_table_after.column_names + assert sorted(arrow_table_before.to_pylist(), key=lambda x: x["id"]) == sorted( + arrow_table_after.to_pylist(), key=lambda x: x["id"] + ) # Ensure snapshot properties specify the replace-operation new_snapshot = table.current_snapshot() From dfbde71e0d126ad5ba6fa013611d419c8cb7dbba Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Fri, 6 Mar 2026 12:27:59 -0800 Subject: [PATCH 4/7] style: fix trailing whitespace in test_maintenance.py --- tests/table/test_maintenance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/table/test_maintenance.py b/tests/table/test_maintenance.py index 6b969cde04..601f8abd50 100644 --- a/tests/table/test_maintenance.py +++ b/tests/table/test_maintenance.py @@ -73,7 +73,7 @@ def test_maintenance_compact(catalog: Catalog) -> None: table.refresh() after_files = list(table.scan().plan_files()) assert len(after_files) == 3 # Should be 1 optimized data file per partition - + arrow_table_after = table.scan().to_arrow() assert arrow_table_after.num_rows == 120 assert arrow_table_before.column_names == arrow_table_after.column_names From 9fd51a8ec82142cdab3600552b2b68e776105dca Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Fri, 6 Mar 2026 12:30:05 -0800 Subject: [PATCH 5/7] docs: update compact() docstring to include delete files --- pyiceberg/table/maintenance.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/maintenance.py b/pyiceberg/table/maintenance.py index 37a243ec87..5274f598f3 100644 --- a/pyiceberg/table/maintenance.py +++ b/pyiceberg/table/maintenance.py @@ -45,7 +45,8 @@ def expire_snapshots(self) -> ExpireSnapshots: return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True)) def compact(self) -> None: - """Compact the table's data files by reading and overwriting the entire table. + """Compact the table's data and delete files by reading and overwriting + the entire table. Note: This is a full-table compaction that leverages Arrow for binpacking. It currently reads the entire table into memory via `.to_arrow()`. From 93df231528833f869e667b726967aae323e1dd71 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Fri, 6 Mar 2026 12:44:48 -0800 Subject: [PATCH 6/7] chore: fix pydocstyle warnings in maintenance.py Formats the [compact](iceberg-python/pyiceberg/table/maintenance.py) method docstring to ensure the summary line does not wrap and correctly ends with a period, satisfying pydocstyle D205 and D400 rules. --- pyiceberg/table/maintenance.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/table/maintenance.py b/pyiceberg/table/maintenance.py index 5274f598f3..55e45450fe 100644 --- a/pyiceberg/table/maintenance.py +++ b/pyiceberg/table/maintenance.py @@ -45,8 +45,7 @@ def expire_snapshots(self) -> ExpireSnapshots: return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True)) def compact(self) -> None: - """Compact the table's data and delete files by reading and overwriting - the entire table. + """Compact the table's data and delete files by reading and overwriting the entire table. Note: This is a full-table compaction that leverages Arrow for binpacking. It currently reads the entire table into memory via `.to_arrow()`. From edf449e53a184eab91d3fe868b8a93a896ef3a80 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Fri, 6 Mar 2026 16:30:28 -0800 Subject: [PATCH 7/7] Table: Implement replace() API for data compaction Replaces the use of .overwrite() in MaintenanceTable.compact() with a new .replace() API backed by a _RewriteFiles producer. This ensures compaction now generates an Operation.REPLACE snapshot instead of Operation.OVERWRITE, preserving logical table state for downstream consumers. Fixes #1092 --- pyiceberg/table/__init__.py | 75 +++++++++++++++++++++++ pyiceberg/table/maintenance.py | 9 ++- pyiceberg/table/snapshots.py | 2 +- pyiceberg/table/update/snapshot.py | 95 ++++++++++++++++++++++++++++++ tests/table/test_snapshots.py | 4 +- 5 files changed, 180 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 68089beb54..f53e21da6e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -614,6 +614,54 @@ def overwrite( for data_file in data_files: append_files.append_data_file(data_file) + def replace( + self, + df: pa.Table, + files_to_delete: Iterable[DataFile], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand for replacing existing files. + + A replace will produce a REPLACE snapshot that will ignore existing + files and replace them with the new files. + + Args: + df: The Arrow dataframe that will be used to generate the new data files + files_to_delete: The files to delete + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the replace operation + """ + try: + import pyarrow as pa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e + + from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files + + if not isinstance(df, pa.Table): + raise ValueError(f"Expected PyArrow table, got: {df}") + + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False + _check_pyarrow_schema_compatible( + self.table_metadata.schema(), + provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version, + ) + + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).replace() as replace_snapshot: + for file_to_delete in files_to_delete: + replace_snapshot.delete_data_file(file_to_delete) + + if df.shape[0] > 0: + data_files = _dataframe_to_data_files( + table_metadata=self.table_metadata, write_uuid=replace_snapshot.commit_uuid, df=df, io=self._table.io + ) + for data_file in data_files: + replace_snapshot.append_data_file(data_file) + def delete( self, delete_filter: str | BooleanExpression, @@ -1432,6 +1480,33 @@ def overwrite( branch=branch, ) + def replace( + self, + df: pa.Table, + files_to_delete: Iterable[DataFile], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand for replacing existing files. + + A replace will produce a REPLACE snapshot that will ignore existing + files and replace them with the new files. + + Args: + df: The Arrow dataframe that will be used to generate the new data files + files_to_delete: The files to delete + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the replace operation + """ + with self.transaction() as tx: + tx.replace( + df=df, + files_to_delete=files_to_delete, + snapshot_properties=snapshot_properties, + branch=branch, + ) + def delete( self, delete_filter: BooleanExpression | str = ALWAYS_TRUE, diff --git a/pyiceberg/table/maintenance.py b/pyiceberg/table/maintenance.py index 55e45450fe..9e8697c798 100644 --- a/pyiceberg/table/maintenance.py +++ b/pyiceberg/table/maintenance.py @@ -63,6 +63,11 @@ def compact(self) -> None: logger.info("Table contains no rows, skipping compaction.") return - # Overwrite the table atomically (REPLACE operation) + # Replace existing files with new compacted files with self.tbl.transaction() as txn: - txn.overwrite(arrow_table, snapshot_properties={"snapshot-type": "replace", "replace-operation": "compaction"}) + files_to_delete = [task.file for task in self.tbl.scan().plan_files()] + txn.replace( + df=arrow_table, + files_to_delete=files_to_delete, + snapshot_properties={"snapshot-type": "replace", "replace-operation": "compaction"}, + ) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 7e4c6eb1ec..7bd4597399 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -344,7 +344,7 @@ def _partition_summary(self, update_metrics: UpdateMetrics) -> str: def update_snapshot_summaries(summary: Summary, previous_summary: Mapping[str, str] | None = None) -> Summary: - if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}: + if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE, Operation.REPLACE}: raise ValueError(f"Operation not implemented: {summary.operation}") if not previous_summary: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 37d120969a..e2781de3b1 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -667,6 +667,91 @@ def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]: return [] +class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]): + """Rewrites data in the table. This will produce a REPLACE snapshot. + + Data files were logically rearranged, but no new logical records were + added or removed (e.g. compaction). + """ + + def _existing_manifests(self) -> list[ManifestFile]: + """Determine if there are any existing manifest files.""" + existing_files = [] + + manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch): + for manifest_file in snapshot.manifests(io=self._io): + # Manifest does not contain rows that match the files to delete partitions + if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file): + existing_files.append(manifest_file) + continue + + entries_to_write: set[ManifestEntry] = set() + found_deleted_entries: set[ManifestEntry] = set() + + for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): + if entry.data_file in self._deleted_data_files: + found_deleted_entries.add(entry) + else: + entries_to_write.add(entry) + + # Is the intercept the empty set? + if len(found_deleted_entries) == 0: + existing_files.append(manifest_file) + continue + + # Delete all files from manifest + if len(entries_to_write) == 0: + continue + + # We have to rewrite the manifest file without the deleted data files + with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer: + for entry in entries_to_write: + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + ) + existing_files.append(writer.to_manifest_file()) + + return existing_files + + def _deleted_entries(self) -> list[ManifestEntry]: + """To determine if we need to record any deleted entries.""" + if self._parent_snapshot_id is not None: + previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if previous_snapshot is None: + raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + + executor = ExecutorFactory.get_or_create() + manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + + def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]: + if not manifest_evaluators[manifest.partition_spec_id](manifest): + return [] + + return [ + ManifestEntry.from_args( + status=ManifestEntryStatus.DELETED, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True) + if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files + ] + + list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io)) + return list(itertools.chain(*list_of_entries)) + else: + return [] + + class UpdateSnapshot: _transaction: Transaction _io: FileIO @@ -715,6 +800,16 @@ def overwrite(self, commit_uuid: uuid.UUID | None = None) -> _OverwriteFiles: snapshot_properties=self._snapshot_properties, ) + def replace(self, commit_uuid: uuid.UUID | None = None) -> _RewriteFiles: + return _RewriteFiles( + commit_uuid=commit_uuid, + operation=Operation.REPLACE, + transaction=self._transaction, + io=self._io, + branch=self._branch, + snapshot_properties=self._snapshot_properties, + ) + def delete(self) -> _DeleteFiles: return _DeleteFiles( operation=Operation.DELETE, diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index cfdc516227..d078891ca1 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -398,8 +398,8 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: def test_invalid_operation() -> None: with pytest.raises(ValueError) as e: - update_snapshot_summaries(summary=Summary(Operation.REPLACE)) - assert "Operation not implemented: Operation.REPLACE" in str(e.value) + update_snapshot_summaries(summary=Summary(Operation("invalid"))) + assert "'invalid' is not a valid Operation" in str(e.value) def test_invalid_type() -> None: