diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 68089beb54..f53e21da6e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -614,6 +614,54 @@ def overwrite( for data_file in data_files: append_files.append_data_file(data_file) + def replace( + self, + df: pa.Table, + files_to_delete: Iterable[DataFile], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand for replacing existing files. + + A replace will produce a REPLACE snapshot that will ignore existing + files and replace them with the new files. + + Args: + df: The Arrow dataframe that will be used to generate the new data files + files_to_delete: The files to delete + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the replace operation + """ + try: + import pyarrow as pa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e + + from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files + + if not isinstance(df, pa.Table): + raise ValueError(f"Expected PyArrow table, got: {df}") + + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False + _check_pyarrow_schema_compatible( + self.table_metadata.schema(), + provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version, + ) + + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).replace() as replace_snapshot: + for file_to_delete in files_to_delete: + replace_snapshot.delete_data_file(file_to_delete) + + if df.shape[0] > 0: + data_files = _dataframe_to_data_files( + table_metadata=self.table_metadata, write_uuid=replace_snapshot.commit_uuid, df=df, io=self._table.io + ) + for data_file in data_files: + replace_snapshot.append_data_file(data_file) + def delete( self, delete_filter: str | BooleanExpression, @@ -1432,6 +1480,33 @@ def overwrite( branch=branch, ) + def replace( + self, + df: pa.Table, + files_to_delete: Iterable[DataFile], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand for replacing existing files. + + A replace will produce a REPLACE snapshot that will ignore existing + files and replace them with the new files. + + Args: + df: The Arrow dataframe that will be used to generate the new data files + files_to_delete: The files to delete + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the replace operation + """ + with self.transaction() as tx: + tx.replace( + df=df, + files_to_delete=files_to_delete, + snapshot_properties=snapshot_properties, + branch=branch, + ) + def delete( self, delete_filter: BooleanExpression | str = ALWAYS_TRUE, diff --git a/pyiceberg/table/maintenance.py b/pyiceberg/table/maintenance.py index 0fcda35ae9..9e8697c798 100644 --- a/pyiceberg/table/maintenance.py +++ b/pyiceberg/table/maintenance.py @@ -43,3 +43,31 @@ def expire_snapshots(self) -> ExpireSnapshots: from pyiceberg.table.update.snapshot import ExpireSnapshots return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True)) + + def compact(self) -> None: + """Compact the table's data and delete files by reading and overwriting the entire table. + + Note: This is a full-table compaction that leverages Arrow for binpacking. + It currently reads the entire table into memory via `.to_arrow()`. + + This reads all existing data into memory and writes it back out using the + target file size settings (write.target-file-size-bytes), atomically + dropping the old files and replacing them with fewer, larger files. + """ + # Read the current table state into memory + arrow_table = self.tbl.scan().to_arrow() + + # Guard: if the table is completely empty, there's nothing to compact. + # Doing an overwrite with an empty table would result in deleting everything. + if arrow_table.num_rows == 0: + logger.info("Table contains no rows, skipping compaction.") + return + + # Replace existing files with new compacted files + with self.tbl.transaction() as txn: + files_to_delete = [task.file for task in self.tbl.scan().plan_files()] + txn.replace( + df=arrow_table, + files_to_delete=files_to_delete, + snapshot_properties={"snapshot-type": "replace", "replace-operation": "compaction"}, + ) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 7e4c6eb1ec..7bd4597399 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -344,7 +344,7 @@ def _partition_summary(self, update_metrics: UpdateMetrics) -> str: def update_snapshot_summaries(summary: Summary, previous_summary: Mapping[str, str] | None = None) -> Summary: - if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}: + if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE, Operation.REPLACE}: raise ValueError(f"Operation not implemented: {summary.operation}") if not previous_summary: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 37d120969a..e2781de3b1 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -667,6 +667,91 @@ def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]: return [] +class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]): + """Rewrites data in the table. This will produce a REPLACE snapshot. + + Data files were logically rearranged, but no new logical records were + added or removed (e.g. compaction). + """ + + def _existing_manifests(self) -> list[ManifestFile]: + """Determine if there are any existing manifest files.""" + existing_files = [] + + manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch): + for manifest_file in snapshot.manifests(io=self._io): + # Manifest does not contain rows that match the files to delete partitions + if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file): + existing_files.append(manifest_file) + continue + + entries_to_write: set[ManifestEntry] = set() + found_deleted_entries: set[ManifestEntry] = set() + + for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): + if entry.data_file in self._deleted_data_files: + found_deleted_entries.add(entry) + else: + entries_to_write.add(entry) + + # Is the intercept the empty set? + if len(found_deleted_entries) == 0: + existing_files.append(manifest_file) + continue + + # Delete all files from manifest + if len(entries_to_write) == 0: + continue + + # We have to rewrite the manifest file without the deleted data files + with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer: + for entry in entries_to_write: + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + ) + existing_files.append(writer.to_manifest_file()) + + return existing_files + + def _deleted_entries(self) -> list[ManifestEntry]: + """To determine if we need to record any deleted entries.""" + if self._parent_snapshot_id is not None: + previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if previous_snapshot is None: + raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + + executor = ExecutorFactory.get_or_create() + manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + + def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]: + if not manifest_evaluators[manifest.partition_spec_id](manifest): + return [] + + return [ + ManifestEntry.from_args( + status=ManifestEntryStatus.DELETED, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True) + if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files + ] + + list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io)) + return list(itertools.chain(*list_of_entries)) + else: + return [] + + class UpdateSnapshot: _transaction: Transaction _io: FileIO @@ -715,6 +800,16 @@ def overwrite(self, commit_uuid: uuid.UUID | None = None) -> _OverwriteFiles: snapshot_properties=self._snapshot_properties, ) + def replace(self, commit_uuid: uuid.UUID | None = None) -> _RewriteFiles: + return _RewriteFiles( + commit_uuid=commit_uuid, + operation=Operation.REPLACE, + transaction=self._transaction, + io=self._io, + branch=self._branch, + snapshot_properties=self._snapshot_properties, + ) + def delete(self) -> _DeleteFiles: return _DeleteFiles( operation=Operation.DELETE, diff --git a/tests/table/test_maintenance.py b/tests/table/test_maintenance.py new file mode 100644 index 0000000000..601f8abd50 --- /dev/null +++ b/tests/table/test_maintenance.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import random + +import pyarrow as pa + +from pyiceberg.catalog import Catalog +from pyiceberg.exceptions import NoSuchNamespaceError +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.transforms import IdentityTransform + + +def test_maintenance_compact(catalog: Catalog) -> None: + # Setup Schema and specs + from pyiceberg.types import LongType, NestedField, StringType + + schema = Schema( + NestedField(1, "id", LongType()), + NestedField(2, "category", StringType()), + NestedField(3, "value", LongType()), + ) + spec = PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="category")) + + # Create the namespace and table + try: + catalog.create_namespace("default") + except NoSuchNamespaceError: + pass + table = catalog.create_table( + "default.test_compaction", + schema=schema, + partition_spec=spec, + ) + + # Append many small data files + categories = ["cat1", "cat2", "cat3"] + for i in range(12): + table.append( + pa.table( + { + "id": list(range(i * 10, (i + 1) * 10)), + "category": [categories[i % 3]] * 10, + "value": [random.randint(1, 100) for _ in range(10)], + } + ) + ) + + # Verify state before compaction + before_files = list(table.scan().plan_files()) + assert len(before_files) == 12 + arrow_table_before = table.scan().to_arrow() + assert arrow_table_before.num_rows == 120 + + # Execute Compaction + table.maintenance.compact() + + # Verify state after compaction + table.refresh() + after_files = list(table.scan().plan_files()) + assert len(after_files) == 3 # Should be 1 optimized data file per partition + + arrow_table_after = table.scan().to_arrow() + assert arrow_table_after.num_rows == 120 + assert arrow_table_before.column_names == arrow_table_after.column_names + assert sorted(arrow_table_before.to_pylist(), key=lambda x: x["id"]) == sorted( + arrow_table_after.to_pylist(), key=lambda x: x["id"] + ) + + # Ensure snapshot properties specify the replace-operation + new_snapshot = table.current_snapshot() + assert new_snapshot is not None + assert new_snapshot.summary is not None + assert new_snapshot.summary.get("snapshot-type") == "replace" + assert new_snapshot.summary.get("replace-operation") == "compaction" + + +def test_maintenance_compact_empty_table(catalog: Catalog) -> None: + from pyiceberg.types import LongType, NestedField, StringType + + schema = Schema( + NestedField(1, "id", LongType()), + NestedField(2, "category", StringType()), + ) + + try: + catalog.create_namespace("default") + except NoSuchNamespaceError: + pass + + table = catalog.create_table("default.test_compaction_empty", schema=schema) + before_snapshots = len(table.history()) + + # Should safely return doing nothing + table.maintenance.compact() + + table.refresh() + after_snapshots = len(table.history()) + assert before_snapshots == after_snapshots # No new snapshot should be made diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index cfdc516227..d078891ca1 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -398,8 +398,8 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: def test_invalid_operation() -> None: with pytest.raises(ValueError) as e: - update_snapshot_summaries(summary=Summary(Operation.REPLACE)) - assert "Operation not implemented: Operation.REPLACE" in str(e.value) + update_snapshot_summaries(summary=Summary(Operation("invalid"))) + assert "'invalid' is not a valid Operation" in str(e.value) def test_invalid_type() -> None: