from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Jupyter").getOrCreate()
spark
25/06/21 04:34:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
SparkSession - in-memory
To be able to rerun the notebook several times, let's drop the permits
table if it exists to start fresh.
%%sql
CREATE DATABASE IF NOT EXISTS nyc
%%sql
DROP TABLE IF EXISTS nyc.permits
Load NYC Film Permits Data¶
For this demo, we will use the New York City Film Permits dataset available as part of the NYC Open Data initiative. We're using a locally saved copy of a 1000 record sample, but feel free to download the entire dataset to use in this notebook!
We'll save the sample dataset into an iceberg table called permits
.
df = spark.read.option("inferSchema","true").option("multiline","true").json("/home/iceberg/data/nyc_film_permits.json")
df.write.saveAsTable("nyc.permits")
Taking a quick peek at the data, you can see that there are a number of permits for different boroughs in New York.
spark.read \
.format("iceberg") \
.load("nyc.permits") \
.groupBy("borough") \
.count() \
.show()
+-------------+-----+ | borough|count| +-------------+-----+ | Queens| 96| | Brooklyn| 378| |Staten Island| 1| | Manhattan| 518| | Bronx| 7| +-------------+-----+
Generate an ID for an Integrated Audit Session¶
An integrated audit session is a single cadence of:
- Staging changes to a table
- Auditing the staged changes
- Committing the changes (optional)
Each of these sessions must be represented with an ID. You can use any convention that makes sense in your environment but in this demo we'll simply use a UUID.
import uuid
ia_session_id = uuid.uuid4().hex
ia_session_id
'92d9d9eb680740468822d1ea99068c8a'
The Setup¶
Tables by default are not configured to allow integrated audits, therefore the first step is enabling this by setting the write.wap.enabled
table metadata property to true
%%sql
ALTER TABLE nyc.permits
SET TBLPROPERTIES (
'write.wap.enabled'='true'
)
Next, the spark.wap.id
property of your Spark session configuration must be set to the integrated audit session ID.
spark.conf.set('spark.wap.id', ia_session_id)
With a spark.wap.id
value set, you can now safely write directly to the permits table--don't worry, these changes will only be staged, not committed!
Staging The Changes¶
To stage the changes, you simply write directly to the permits
table. This is awesome in situations where you're working with a large and complex data ingestion pipeline.
Instead of including hard-coded logic in your pipeline to switch between a sort of "audit-mode" as opposed to "production-mode", with integrated audits you simple run your
production code!
For this demo, let's use a simple query that deletes all records for film permits in the manhattan borough.
%%sql
DELETE FROM nyc.permits
WHERE borough='Manhattan'
As described, even though the query was executed against the production table, these changes are only staged and not committed since we are within an integrated audit session. Let's confirm this by verifying that a count by borough still includes the Manhattan records.
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits
GROUP BY borough
borough | permit_cnt |
---|---|
Queens | 96 |
Brooklyn | 378 |
Staten Island | 1 |
Manhattan | 518 |
Bronx | 7 |
The Audit¶
Once the changes for this session are staged, you can perform all of your audits to validate the data. The first step is to retrieve the snapshot ID generated by the changes and tagged with this integrated audit session ID.
%%sql
select
*
from nyc.permits.history
made_current_at | snapshot_id | parent_id | is_current_ancestor |
---|---|---|---|
2025-06-21 04:36:21.362000 | 2044767572491260316 | None | True |
query = f"""
SELECT
snapshot_id
FROM nyc.permits.snapshots
WHERE summary['wap.id'] = '{ia_session_id}'
"""
ia_session_snapshot = spark.sql(query).head().snapshot_id
ia_session_snapshot
2689398741546157644
This snapshot includes the staged (but not commited) changes to your production table. Once you have this snapshot ID, you can use Iceberg's Time Travel feature to query it!
spark.read \
.option("snapshot-id", ia_session_snapshot) \
.format("iceberg") \
.load("nyc.permits") \
.groupBy("borough") \
.count() \
.show()
+-------------+-----+ | borough|count| +-------------+-----+ | Queens| 96| | Brooklyn| 378| |Staten Island| 1| | Bronx| 7| +-------------+-----+
At this point, you can use any auditing tool or technique to validate your changes. For this demo, we'll do a simple audit that confirms that the only remaining boroughs are Queens, Brooklyn, Bronx, and Staten Island. If either borough is missing or any additional boroughs are found, we'll raise an exception.
expected_boroughs = {"Queens", "Brooklyn", "Bronx", "Staten Island"}
distinct_boroughs = spark.read \
.option("snapshot-id", ia_session_snapshot) \
.format("iceberg") \
.load("nyc.permits") \
.select("borough") \
.distinct() \
.toLocalIterator()
boroughs = {row[0] for row in distinct_boroughs}
# Since `boroughs` and `required_boroughs` are both sets (array of distinct items),
# we can confirm that they match by checking that the lengths of the sets are equal
# to eachother as well as to the union of both sets.
if len(boroughs) != len(expected_boroughs) != len(set.union(boroughs, expected_boroughs)):
raise ValueError(f"Audit failed, borough set does not match expected boroughs: {boroughs} != {expected_boroughs}")
If the above check does not fail, we can go ahead and commit our staged data to publish our changes!
The Publish¶
After the audits are completed, publishing the data is as simple as running a cherrypick_snapshot
stored procedure.
publish_query = f"CALL system.cherrypick_snapshot('nyc.permits', {ia_session_snapshot})"
%sql $publish_query
source_snapshot_id | current_snapshot_id |
---|---|
2689398741546157644 | 2689398741546157644 |
That's it! Publishing the changes from this integrated audit session is a simple metadata-only operation that instantly makes the changes live for all downstream consumers querying the permits
table! Query results will now include the commit that removed all Manhattan records.
spark.read \
.format("iceberg") \
.load("nyc.permits") \
.groupBy("borough") \
.count() \
.show()
+-------------+-----+ | borough|count| +-------------+-----+ | Queens| 96| | Brooklyn| 378| |Staten Island| 1| | Bronx| 7| +-------------+-----+
%%sql
select
*
from nyc.permits.history
made_current_at | snapshot_id | parent_id | is_current_ancestor |
---|---|---|---|
2025-06-21 04:36:21.362000 | 2044767572491260316 | None | True |
2025-06-21 05:08:54.301000 | 2689398741546157644 | 2044767572491260316 | True |
What Happens When The Audits Fail?¶
What about when your audits fail? What happens to the snapshots generated? How about the data and metadata files?
One of the best parts of Iceberg's integrated audits is that the cleanup of "staged-yet-not-committed-data" is part of the normal snapshot cleanup process of a typical Iceberg warehouse. To be more specific, let's say a daily snapshot expiration is performed on the data warehouse (using the expire_snapshots procedure) and all snapshots older than 7 days are expired. That means once your staged snapshot reaches 7 days in age, it will be expired.
Additionally, since the changes were never committed, the underlying data files for the snapshot will be removed since they're not referenced by any other snapshots in the linear history of the table.
Let's see this in action. First, start a new integrated audit session and stage a commit by inserting a single record.
ia_session_id = uuid.uuid4().hex
ia_session_id
'1782105f3a254b4d9858087ec76f5258'
spark.conf.set('spark.wap.id', ia_session_id)
%%sql
INSERT INTO nyc.permits
VALUES (
'Hoboken',
'Television',
'1',
'United States of America',
'2021-11-24T23:00:00.000',
'2021-11-23T09:38:17.000',
'Mayor\'s Office of Film, Theatre & Broadcasting',
'613322',
'Shooting Permit',
'WASHINGTON STREET',
'100',
'2021-11-24T07:00:00.000',
'Episodic series',
'07030'
)
Next, let's identify the snapshot that was tagged with the integrated audit session ID.
%%sql
SELECT snapshot_id
FROM nyc.permits.snapshots
snapshot_id |
---|
2044767572491260316 |
2689398741546157644 |
7408523516967726811 |
query = f"""
SELECT snapshot_id
FROM nyc.permits.snapshots
WHERE summary['wap.id'] = '{ia_session_id}'
"""
ia_session_snapshot = spark.sql(query).head().snapshot_id
ia_session_snapshot
7408523516967726811
A quick check of the history table shows that this snapshot is not included as part of the current history of the table since it has not been published yet.
%%sql
SELECT *
FROM nyc.permits.history
made_current_at | snapshot_id | parent_id | is_current_ancestor |
---|---|---|---|
2025-06-21 04:36:21.362000 | 2044767572491260316 | None | True |
2025-06-21 05:08:54.301000 | 2689398741546157644 | 2044767572491260316 | True |
In a scenario where the audits fail and this change is not published, the expire_snapshots
procedure will clean up the snapshot and the data files. Let's demonstrate this by calling the expire_snapshots
procedure for all snapshots older than the current timestamp.
import time
%sql CALL system.expire_snapshots('nyc.permits', {round(time.time() * 1000)}, 100)
deleted_data_files_count | deleted_position_delete_files_count | deleted_equality_delete_files_count | deleted_manifest_files_count | deleted_manifest_lists_count | deleted_statistics_files_count |
---|---|---|---|---|---|
1 | 0 | 0 | 1 | 1 | 0 |
The output from the expire_snapshots
procedure shows that a data file, a manifest file, and a manifest list file were deleted. Furthermore, the snapshot no longer appears in the permit table's snapshots table.
%%sql
SELECT *
FROM nyc.permits.snapshots
committed_at | snapshot_id | parent_id | operation | manifest_list | summary |
---|---|---|---|---|---|
2025-06-21 04:36:21.362000 | 2044767572491260316 | None | append | s3://warehouse/nyc/permits/metadata/snap-2044767572491260316-1-4cc87a9f-3242-46b1-a03a-7c45dc885f7f.avro | {'engine-version': '3.5.5', 'added-data-files': '1', 'total-equality-deletes': '0', 'app-id': 'local-1750480469176', 'added-records': '1000', 'total-records': '1000', 'spark.app.id': 'local-1750480469176', 'changed-partition-count': '1', 'engine-name': 'spark', 'total-position-deletes': '0', 'added-files-size': '49719', 'total-delete-files': '0', 'iceberg-version': 'Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc996c3bad2afdcfd33d)', 'total-files-size': '49719', 'total-data-files': '1'} |
2025-06-21 04:46:00.881000 | 2689398741546157644 | 2044767572491260316 | overwrite | s3://warehouse/nyc/permits/metadata/snap-2689398741546157644-1-01b64de9-8295-4443-af60-b7dcb5ff1908.avro | {'engine-version': '3.5.5', 'added-data-files': '1', 'total-equality-deletes': '0', 'app-id': 'local-1750480469176', 'added-records': '482', 'deleted-data-files': '1', 'deleted-records': '1000', 'total-records': '482', 'spark.app.id': 'local-1750480469176', 'removed-files-size': '49719', 'changed-partition-count': '1', 'engine-name': 'spark', 'wap.id': '92d9d9eb680740468822d1ea99068c8a', 'total-position-deletes': '0', 'added-files-size': '26860', 'total-delete-files': '0', 'iceberg-version': 'Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc996c3bad2afdcfd33d)', 'total-files-size': '26860', 'total-data-files': '1'} |