3 iceberg table maintenance spark procedures
In [1]:
Copied!
spark
spark
Intitializing Scala interpreter ...
Spark Web UI available at http://1ac96ad2acf7:4040 SparkContext available as 'sc' (version = 3.5.5, master = local[*], app id = local-1750487975219) SparkSession available as 'spark'
Out[1]:
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@74b3152b
In [2]:
Copied!
spark.sql("DROP TABLE IF EXISTS demo.nyc.taxis_sample")
spark.sql("DROP TABLE IF EXISTS demo.nyc.taxis_sample")
Out[2]:
res1: org.apache.spark.sql.DataFrame = []
In [3]:
Copied!
spark.sql("""
CREATE TABLE demo.nyc.taxis_sample (
`VendorID` BIGINT,
`tpep_pickup_datetime` TIMESTAMP,
`tpep_dropoff_datetime` TIMESTAMP,
`passenger_count` DOUBLE,
`trip_distance` DOUBLE,
`RatecodeID` DOUBLE,
`store_and_fwd_flag` STRING,
`PULocationID` BIGINT,
`DOLocationID` BIGINT,
`payment_type` BIGINT,
`fare_amount` DOUBLE,
`extra` DOUBLE,
`mta_tax` DOUBLE,
`tip_amount` DOUBLE,
`tolls_amount` DOUBLE,
`improvement_surcharge` DOUBLE,
`total_amount` DOUBLE,
`congestion_surcharge` DOUBLE,
`airport_fee` DOUBLE)
USING iceberg
TBLPROPERTIES(
'write.target-file-size-bytes'='5242880'
)
""")
spark.sql("""
CREATE TABLE demo.nyc.taxis_sample (
`VendorID` BIGINT,
`tpep_pickup_datetime` TIMESTAMP,
`tpep_dropoff_datetime` TIMESTAMP,
`passenger_count` DOUBLE,
`trip_distance` DOUBLE,
`RatecodeID` DOUBLE,
`store_and_fwd_flag` STRING,
`PULocationID` BIGINT,
`DOLocationID` BIGINT,
`payment_type` BIGINT,
`fare_amount` DOUBLE,
`extra` DOUBLE,
`mta_tax` DOUBLE,
`tip_amount` DOUBLE,
`tolls_amount` DOUBLE,
`improvement_surcharge` DOUBLE,
`total_amount` DOUBLE,
`congestion_surcharge` DOUBLE,
`airport_fee` DOUBLE)
USING iceberg
TBLPROPERTIES(
'write.target-file-size-bytes'='5242880'
)
""")
Out[3]:
res2: org.apache.spark.sql.DataFrame = []
In [4]:
Copied!
val df_202201 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-01.parquet")
val df_202202 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-02.parquet")
val df_202203 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-03.parquet")
val df_q1 = df_202201.union(df_202202).union(df_202203)
df_q1.write.insertInto("nyc.taxis_sample")
val df_202201 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-01.parquet")
val df_202202 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-02.parquet")
val df_202203 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-03.parquet")
val df_q1 = df_202201.union(df_202202).union(df_202203)
df_q1.write.insertInto("nyc.taxis_sample")
Out[4]:
df_202201: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp_ntz ... 17 more fields] df_202202: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp_ntz ... 17 more fields] df_202203: org.apache.spark.sql.DataFrame = [VendorID: bigint, tpep_pickup_datetime: timestamp_ntz ... 17 more fields] df_q1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [VendorID: bigint, tpep_pickup_datetime: timestamp_ntz ... 17 more fields]
Rewriting Data Files¶
In [5]:
Copied!
spark.sql("SELECT file_path, file_size_in_bytes FROM nyc.taxis_sample.files").show(100)
spark.sql("SELECT file_path, file_size_in_bytes FROM nyc.taxis_sample.files").show(100)
+--------------------+------------------+ | file_path|file_size_in_bytes| +--------------------+------------------+ |s3://warehouse/ny...| 4098378| |s3://warehouse/ny...| 3951238| |s3://warehouse/ny...| 3990037| |s3://warehouse/ny...| 3894699| |s3://warehouse/ny...| 3915456| |s3://warehouse/ny...| 3895987| |s3://warehouse/ny...| 3806277| |s3://warehouse/ny...| 3899172| |s3://warehouse/ny...| 3822840| |s3://warehouse/ny...| 3963021| |s3://warehouse/ny...| 1242601| |s3://warehouse/ny...| 3887960| |s3://warehouse/ny...| 3718812| |s3://warehouse/ny...| 3893136| |s3://warehouse/ny...| 3705416| |s3://warehouse/ny...| 3719417| |s3://warehouse/ny...| 3823555| |s3://warehouse/ny...| 3711923| |s3://warehouse/ny...| 3749498| |s3://warehouse/ny...| 3859935| |s3://warehouse/ny...| 3743970| |s3://warehouse/ny...| 3753909| |s3://warehouse/ny...| 3770138| |s3://warehouse/ny...| 2129775| |s3://warehouse/ny...| 3752993| |s3://warehouse/ny...| 3612792| |s3://warehouse/ny...| 3834524| |s3://warehouse/ny...| 3740475| |s3://warehouse/ny...| 3730257| |s3://warehouse/ny...| 3730578| |s3://warehouse/ny...| 3846061| |s3://warehouse/ny...| 3785702| |s3://warehouse/ny...| 3735734| |s3://warehouse/ny...| 3891194| |s3://warehouse/ny...| 3715606| |s3://warehouse/ny...| 3744550| |s3://warehouse/ny...| 3754543| |s3://warehouse/ny...| 3690781| |s3://warehouse/ny...| 4814844| +--------------------+------------------+
In [6]:
Copied!
spark.sql("ALTER TABLE nyc.taxis_sample UNSET TBLPROPERTIES ('write.target-file-size-bytes')")
spark.sql("ALTER TABLE nyc.taxis_sample UNSET TBLPROPERTIES ('write.target-file-size-bytes')")
Out[6]:
res5: org.apache.spark.sql.DataFrame = []
In [7]:
Copied!
spark.sql("CALL demo.system.rewrite_data_files(table => 'nyc.taxis_sample', options => map('target-file-size-bytes','52428800'))").show()
spark.sql("CALL demo.system.rewrite_data_files(table => 'nyc.taxis_sample', options => map('target-file-size-bytes','52428800'))").show()
+--------------------------+----------------------+---------------------+-----------------------+ |rewritten_data_files_count|added_data_files_count|rewritten_bytes_count|failed_data_files_count| +--------------------------+----------------------+---------------------+-----------------------+ | 39| 3| 145327784| 0| +--------------------------+----------------------+---------------------+-----------------------+
In [8]:
Copied!
spark.sql("SELECT file_path, file_size_in_bytes FROM nyc.taxis_sample.files").show(100)
spark.sql("SELECT file_path, file_size_in_bytes FROM nyc.taxis_sample.files").show(100)
+--------------------+------------------+ | file_path|file_size_in_bytes| +--------------------+------------------+ |s3://warehouse/ny...| 49243858| |s3://warehouse/ny...| 48534830| |s3://warehouse/ny...| 40600811| +--------------------+------------------+
Expiring Snapshots¶
In [9]:
Copied!
spark.sql("SELECT committed_at, snapshot_id, operation FROM nyc.taxis_sample.snapshots").show(truncate=false)
spark.sql("SELECT committed_at, snapshot_id, operation FROM nyc.taxis_sample.snapshots").show(truncate=false)
+-----------------------+-------------------+---------+ |committed_at |snapshot_id |operation| +-----------------------+-------------------+---------+ |2025-06-21 06:49:14.548|616558229842152742 |append | |2025-06-21 06:49:40.198|1836423127434635778|replace | +-----------------------+-------------------+---------+
In [10]:
Copied!
val now = java.util.Calendar.getInstance().getTime()
val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
val now_str = format.format(now)
spark.sql(s"CALL demo.system.expire_snapshots(table => 'nyc.taxis_sample', older_than => TIMESTAMP '$now_str', retain_last => 1)").show()
val now = java.util.Calendar.getInstance().getTime()
val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
val now_str = format.format(now)
spark.sql(s"CALL demo.system.expire_snapshots(table => 'nyc.taxis_sample', older_than => TIMESTAMP '$now_str', retain_last => 1)").show()
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+ |deleted_data_files_count|deleted_position_delete_files_count|deleted_equality_delete_files_count|deleted_manifest_files_count|deleted_manifest_lists_count|deleted_statistics_files_count| +------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+ | 39| 0| 0| 1| 1| 0| +------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+
Out[10]:
now: java.util.Date = Sat Jun 21 06:49:57 UTC 2025 format: java.text.SimpleDateFormat = java.text.SimpleDateFormat@f17b4ca5 now_str: String = 2025-06-21 06:49:57.220
In [11]:
Copied!
spark.sql("SELECT committed_at, snapshot_id, operation FROM nyc.taxis_sample.snapshots").show(truncate=false)
spark.sql("SELECT committed_at, snapshot_id, operation FROM nyc.taxis_sample.snapshots").show(truncate=false)
+-----------------------+-------------------+---------+ |committed_at |snapshot_id |operation| +-----------------------+-------------------+---------+ |2025-06-21 06:49:40.198|1836423127434635778|replace | +-----------------------+-------------------+---------+
Rewriting Manifest Files¶
In [12]:
Copied!
spark.sql("CALL demo.system.rewrite_manifests('nyc.taxis_sample')").show()
spark.sql("CALL demo.system.rewrite_manifests('nyc.taxis_sample')").show()
+-------------------------+---------------------+ |rewritten_manifests_count|added_manifests_count| +-------------------------+---------------------+ | 2| 1| +-------------------------+---------------------+
In [ ]:
Copied!