Introduction
I'm Aki, an AWS Community Builder (@jitepengin).
Apache Iceberg is one of the emerging open table formats that has gained attention recently. It supports features such as schema evolution, ACID transactions, and time travel.
When managing tables based on snapshots, there are many scenarios in development, testing, and production where you want to track or manipulate snapshot histories.
As a learning project, I created a CLI tool called iceberg-navigator that lets you inspect snapshot history and details of Iceberg tables on AWS. In this article, I’ll introduce this tool.
Problems I Wanted to Solve
- Easily check the snapshot history of Iceberg tables stored in S3 via the command line.
- Trace snapshot parent-child relationships to understand version lineage.
Technologies and Libraries Used
- PyIceberg: Python library to work with Iceberg tables
- PyArrow: Dependency library for Iceberg schema handling (used indirectly)
- Click: To build the CLI interface
- NetworkX / Matplotlib: For visualizing snapshot parent-child relationships as a Directed Acyclic Graph (DAG)
Key Implementation Points
- Use PyIceberg to access Iceberg catalog, tables, and snapshot metadata.
- Connect to AWS Glue Iceberg REST endpoint via PyIceberg.
- Build a simple CLI interface with Click.
- Use NetworkX and Matplotlib to generate a snapshot lineage graph.
Source Code
The project is hosted here:
https://github.com/dataPenginPenguin/iceberg_navigator
How to Use the CLI Tool
AWS CLI Setup
Make sure you have configured AWS CLI with the proper credentials and region.
Install Required Libraries
pip install -r requirements.txt
List Snapshots
$ python -m iceberg_navigator list --table <dbname>.<tablename>
Example output:
| Snapshot ID | Timestamp | Operation | Parent Snapshot ID | Total Size (MB) | Record Count | |---------------------|----------------------|------------------|----------------------|-------------------|----------------| | 1533347322559466931 | 2025-05-22T02:10:24Z | Operation.APPEND | null | 13.48 | 729,732 | | 1485371543345582290 | 2025-05-22T02:10:54Z | Operation.DELETE | 1533347322559466931 | 0.00 | 0 | | 67848960317145716 | 2025-05-22T02:15:45Z | Operation.APPEND | 1485371543345582290 | 13.48 | 729,732 | | 3920289554540444894 | 2025-05-22T02:38:46Z | Operation.DELETE | 67848960317145716 | 0.00 | 0 | | 6369576239134108166 | 2025-05-22T02:41:51Z | Operation.APPEND | 3920289554540444894 | 13.48 | 729,732 | | 6216935665394419954 | 2025-05-22T02:41:54Z | Operation.APPEND | 6369576239134108166 | 26.96 | 1,459,464 | | 9058990433822511495 | 2025-05-22T02:42:28Z | Operation.APPEND | 6216935665394419954 | 40.44 | 2,189,196 | | 5224576979788468429 | 2025-05-22T02:46:53Z | Operation.DELETE | 9058990433822511495 | 0.00 | 0 | | 8997131439115911397 | 2025-05-22T02:47:21Z | Operation.APPEND | 5224576979788468429 | 13.48 | 729,732 | | 4246095293733855575 | 2025-08-02T22:51:16Z | Operation.DELETE | 8997131439115911397 | 0.00 | 0 | | 8106328257365313720 | 2025-08-04T07:50:14Z | Operation.APPEND | 6369576239134108166 | 13.48 | 729,733 | ...
Show Snapshot Details
$ python -m iceberg_navigator show <Snapshot ID> --table <dbname>.<tablename>
Example output:
Table: yellow_tripdata Snapshot ID: 8106328257365313720 Timestamp: 2025-08-04T07:50:14Z Operation: Operation.APPEND Parent Snapshot ID: 6369576239134108166 Manifest List: s3://your-bucket/warehouse/yellow_tripdata/metadata/snap-8106328257365313720-1-a4fb8059-7bf8-4254-b640-bf1fcbf100dd.avro Schema: 1: vendorid: optional int 2: tpep_pickup_datetime: optional timestamp 3: tpep_dropoff_datetime: optional timestamp 4: passenger_count: optional long 5: trip_distance: optional double 6: ratecodeid: optional long 7: store_and_fwd_flag: optional string 8: pulocationid: optional int 9: dolocationid: optional int 10: payment_type: optional long 11: fare_amount: optional double 12: extra: optional double 13: mta_tax: optional double 14: tip_amount: optional double 15: tolls_amount: optional double 16: improvement_surcharge: optional double 17: total_amount: optional double 18: congestion_surcharge: optional double 19: airport_fee: optional double Summary: added-data-files: 1 total-equality-deletes: 0 added-records: 1 total-position-deletes: 0 added-files-size: 3046 total-delete-files: 0 total-files-size: 14138545 total-data-files: 2 total-records: 729733
Compare Snapshot
$ python -m iceberg_navigator compare <snapshot_id> --table <database>.<table>
Example output:
---------------------------------------- Parent Snapshot ---------------------------------------- ID: 6369576239134108166 File Size: 13.48 MB Records: 729,732 ---------------------------------------- Current Snapshot ---------------------------------------- ID: 6216935665394419954 File Size: 26.96 MB Records: 1,459,464 ======================================== Summary ======================================== Added Records: 729,732 Deleted Records: 0
Visualize Snapshot Lineage Graph
$ python -m iceberg_navigator graph --table <dbname>.<tablename> DiGraph with 11 nodes and 10 edges Snapshot graph saved to snapshot_graph.png
Example output:
The graph is drawn using NetworkX + Matplotlib to show parent-child relationships as a DAG.
Project Directory Structure (Excerpt)
iceberg_navigator/ ├── cli.py ├── __main__.py ├── aws/ │ ├── auth.py │ └── glue.py ├── commands/ │ ├── compare.py │ ├── list.py │ ├── show.py │ └── graph.py ├── utils/ │ └── display.py
Implementation Overview
Entry Point (main.py)
Defines Click commands:
import click from iceberg_navigator.commands.list import list_snapshots from iceberg_navigator.commands.show import show_snapshot from iceberg_navigator.commands.graph import graph_snapshots from iceberg_navigator.commands.compare import compare_snapshots @click.group() def cli(): """Iceberg Navigator CLI""" pass cli.add_command(list_snapshots) cli.add_command(show_snapshot) cli.add_command(graph_snapshots) cli.add_command(compare_snapshots) if __name__ == "__main__": cli()
Connecting to AWS Glue Iceberg Catalog(Glue.py)
Uses Glue REST Catalog API:
from urllib.parse import urlparse from pyiceberg.catalog import load_catalog class GlueCatalog: def __init__(self, profile_name=None, region_name=None, catalog_id="AwsDataCatalog"): import boto3 if not region_name: session = boto3.Session(profile_name=profile_name) region_name = session.region_name if not region_name: raise ValueError("region_name Error") self.region_name = region_name self.catalog_id = catalog_id session = boto3.Session(profile_name=profile_name, region_name=region_name) self.glue_client = session.client("glue", region_name=region_name) def _get_catalog(self): conf = { "type": "rest", "uri": f"https://glue.{self.region_name}.amazonaws.com/iceberg", "s3.region": self.region_name, "rest.sigv4-enabled": "true", "rest.signing-name": "glue", "rest.signing-region": self.region_name, } return load_catalog(**conf) def get_table_location(self, table_identifier: str) -> str: database, table = table_identifier.split(".", 1) resp = self.glue_client.get_table(DatabaseName=database, Name=table) return resp["Table"]["Parameters"]["metadata_location"] def list_snapshots(self, table_identifier: str): catalog = self._get_catalog() namespace, table_name = table_identifier.split(".", 1) table = catalog.load_table(f"{namespace}.{table_name}") snapshots = [] for snap in table.snapshots(): total_bytes = int(snap.summary.get("total-files-size", 0)) if snap.summary else 0 total_records = int(snap.summary.get("total-records", 0)) if snap.summary else 0 snapshots.append({ "snapshot_id": str(snap.snapshot_id), "timestamp": snap.timestamp_ms, "operation": snap.summary.get("operation") if snap.summary else None, "parent_id": str(snap.parent_snapshot_id) if snap.parent_snapshot_id else None, "total_size_mb": round((total_bytes) / (1024 * 1024), 2), "record_count": total_records }) return snapshots def show_snapshot(self, table_identifier: str, snapshot_id: str): catalog = self._get_catalog() namespace, table_name = table_identifier.split(".", 1) table = catalog.load_table(f"{namespace}.{table_name}") snap = table.snapshot_by_id(int(snapshot_id)) if not snap: return {"error": f"snapshot_id {snapshot_id} not found"} schema_columns = [] for idx, col in enumerate(table.schema().columns, start=1): requiredness = "optional" if col.optional else "required" schema_columns.append(f"{idx}: {col.name}: {requiredness} {col.field_type}") summary_dict = {} if snap.summary: summary_dict["operation"] = snap.summary.operation if hasattr(snap.summary, "additional_properties"): summary_dict.update(snap.summary.additional_properties) return { "table": table_name, "snapshot_id": str(snap.snapshot_id), "timestamp": snap.timestamp_ms, "operation": summary_dict.get("operation"), "parent_id": str(snap.parent_snapshot_id) if snap.parent_snapshot_id else None, "manifest_list": snap.manifest_list, "schema": schema_columns, "summary": summary_dict, } def compare_snapshots(self, table_identifier: str, snapshot_id: str): catalog = self._get_catalog() namespace, table_name = table_identifier.split(".", 1) table = catalog.load_table(f"{namespace}.{table_name}") current_snap = table.snapshot_by_id(int(snapshot_id)) if not current_snap: return {"error": f"snapshot_id {snapshot_id} not found"} parent_snap = table.snapshot_by_id(int(current_snap.parent_snapshot_id)) if not parent_snap: return {"error": f"parent_snapshot not found"} current_summary_dict = {} if current_snap.summary: current_summary_dict["operation"] = current_snap.summary.operation if hasattr(current_snap.summary, "additional_properties"): current_summary_dict.update(current_snap.summary.additional_properties) parent_summary_dict = {} if parent_snap.summary: parent_summary_dict["operation"] = parent_snap.summary.operation if hasattr(parent_snap.summary, "additional_properties"): parent_summary_dict.update(parent_snap.summary.additional_properties) current_size = int(current_snap.summary.get("total-files-size", 0)) current_records = int(current_snap.summary.get("total-records", 0)) parent_size = int(parent_snap.summary.get("total-files-size", 0)) parent_records = int(parent_snap.summary.get("total-records", 0)) added = current_records - parent_records if current_records > parent_records else 0 deleted = parent_records - current_records if parent_records > current_records else 0 return { "current_snapshot_id": str(current_snap.snapshot_id), "current_size": current_size, "current_records": current_records, "parent_snapshot_id": str(parent_snap.snapshot_id), "parent_size": parent_size, "parent_records": parent_records, "added": added, "deleted": deleted, }
Snapshot List Command (list.py)
import click from iceberg_navigator.aws.glue import GlueCatalog from iceberg_navigator.utils.display import format_snapshots_table @click.command("list") @click.option("--table", required=True, help="Table identifier, e.g. db.table") def list_snapshots(table): glue = GlueCatalog() snapshots = glue.list_snapshots(table) if not snapshots: click.echo("No snapshots found.") return table_str = format_snapshots_table(snapshots) click.echo(table_str)
Snapshot Show Command (show.py)
import click from iceberg_navigator.aws.glue import GlueCatalog from iceberg_navigator.utils.display import show_snapshot_details @click.command(name="show") @click.argument("snapshot_id") @click.option('--table', required=True, help="Table identifier, e.g. db.table") def show_snapshot(table, snapshot_id): glue_catalog = GlueCatalog() snapshot = glue_catalog.show_snapshot(table, snapshot_id) if snapshot is None or "error" in snapshot: click.echo(f"Snapshot {snapshot_id} not found in table {table}.") return show_snapshot_details(snapshot)
Compare Snapshot Command (compare.py)
import click from iceberg_navigator.aws.glue import GlueCatalog from iceberg_navigator.utils.display import compare_snapshot @click.command(name="compare") @click.argument("snapshot_id") @click.option('--table', required=True, help="Table identifier, e.g. db.table") def compare_snapshots(table, snapshot_id): glue_catalog = GlueCatalog() comparison_result = glue_catalog.compare_snapshots(table, snapshot_id) if comparison_result is None or "error" in comparison_result: click.echo(f"Snapshot {snapshot_id} not found in table {table}.") return compare_snapshot(comparison_result)
Snapshot Graph Command (graph.py)
import click from iceberg_navigator.aws.glue import GlueCatalog from iceberg_navigator.utils.display import build_snapshot_graph, draw_graph @click.command("graph") @click.option("--table", required=True, help="Table name (e.g., db.table)") @click.option("--output", default="snapshot_graph.png", help="Output image filename") def graph_snapshots(table: str, output: str): glue_catalog = GlueCatalog() snapshots = glue_catalog.list_snapshots(table) if not snapshots: click.echo(f"No snapshots found for table {table}") return G = build_snapshot_graph(snapshots) draw_graph(G, output) click.echo(f"Snapshot graph saved to {output}") if __name__ == "__main__": graph_snapshots()
Catalog Access Patterns
PyIceberg supports multiple catalog implementations. In AWS environments, two main approaches are used:
RestCatalog: Access Iceberg metadata via Glue's Iceberg REST API
GlueCatalog: Use boto3 Glue client to fetch table info
According to official AWS docs and recent trends, using Glue’s REST endpoint via RestCatalog is the mainstream approach. This tool uses PyIceberg's RestCatalog access via Glue's Iceberg REST API, enabling standard and lightweight access.
For more details, check out my article comparing catalog access patterns:
https://zenn.dev/penginpenguin/articles/e44880aaa2d5e3
PyIceberg Limitations
While PyIceberg is a powerful Python tool for working with Iceberg metadata, it currently has some limitations:
Limited metadata operations like rollback
Cannot restore snapshots or perform rollback directly.Partial functionality via REST Catalog
Glue's REST API is still evolving, so some Iceberg features may not be accessible (especially rollback-related).Diff and snapshot operations require custom logic
Users must implement logic for diffing or complex history operations themselves.
Iceberg Table Rollback on AWS
As noted above, rollback is not supported with PyIceberg. Athena, often considered for Iceberg querying, does not currently provide snapshot rollback capabilities either.
To perform rollbacks, you need to use Glue or EMR-based tooling.
This CLI tool focuses on snapshot viewing via Glue REST Catalog but has potential to be extended in the future into a full metadata management tool including rollback.
Conclusion
I introduced the iceberg-navigator CLI tool that allows you to inspect snapshot history and details of Apache Iceberg tables on AWS.
Snapshot history is crucial for understanding data change history and keeping rollback-ready states.
With this tool, you can easily retrieve and inspect snapshot information to assist development and debugging.
This is a personal learning project, and the tool is still evolving, but I hope it serves as a useful example of AWS Iceberg usage and PyIceberg application.
If you're interested, please try it out and feel free to share your feedback!
Top comments (0)