Write-Audit-Publish with Branches in Apache Iceberg¶
This notebook runs using the Docker Compose at https://github.com/tabular-io/docker-spark-iceberg. It's based on the Iceberg - Integrated Audits Demo.ipynb notebook.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Jupyter").getOrCreate()
spark
25/06/21 06:54:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
SparkSession - in-memory
To be able to rerun the notebook several times, let's drop the permits
table if it exists to start fresh.
%%sql
CREATE DATABASE IF NOT EXISTS nyc
%%sql
DROP TABLE IF EXISTS nyc.permits
Load NYC Film Permits Data¶
For this demo, we will use the New York City Film Permits dataset available as part of the NYC Open Data initiative. We're using a locally saved copy of a 1000 record sample, but feel free to download the entire dataset to use in this notebook!
We'll save the sample dataset into an iceberg table called permits
.
df = spark.read.option("inferSchema","true").option("multiline","true").json("/home/iceberg/data/nyc_film_permits.json")
df.write.saveAsTable("nyc.permits")
Taking a quick peek at the data, you can see that there are a number of permits for different boroughs in New York.
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits
GROUP BY borough
borough | permit_cnt |
---|---|
Queens | 96 |
Brooklyn | 378 |
Staten Island | 1 |
Manhattan | 518 |
Bronx | 7 |
The Setup¶
Tables by default are not configured to allow integrated audits, therefore the first step is enabling this by setting the write.wap.enabled
table metadata property to true
%%sql
ALTER TABLE nyc.permits
SET TBLPROPERTIES (
'write.wap.enabled'='true'
)
We create a branch for the work we want to do. This is a copy-on-write branch, so "free" until we start making changes (and "cheap" thereafter) since only data that's changed needs to be written.
%%sql
ALTER TABLE nyc.permits
CREATE BRANCH etl_job_42
Write¶
Before writing to the table we set spark.wap.branch
so that writes (and reads) are against the specified branch of the table.
spark.conf.set('spark.wap.branch', 'etl_job_42')
Now make the change to the table
%%sql
DELETE FROM nyc.permits
WHERE borough='Manhattan'
Inspecting the staged/unpublished data¶
Staged/unpublished data¶
The changes are reflected in the table:
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits
GROUP BY borough
borough | permit_cnt |
---|---|
Queens | 96 |
Brooklyn | 378 |
Staten Island | 1 |
Bronx | 7 |
Note that because spark.wap.branch
is set the above query is effectively the same as this one with VERSION AS OF
for the branch
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits VERSION AS OF 'etl_job_42'
GROUP BY borough
borough | permit_cnt |
---|---|
Queens | 96 |
Brooklyn | 378 |
Staten Island | 1 |
Bronx | 7 |
Another syntax (albiet less clear IMHO) for VERSION AS OF
is a branch_<branch_name>
suffix to the table:
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits.branch_etl_job_42
GROUP BY borough
borough | permit_cnt |
---|---|
Queens | 96 |
Brooklyn | 378 |
Staten Island | 1 |
Bronx | 7 |
Published data¶
We can also inspect the unmodified main
version of the table with VERSION AS OF
:
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits VERSION AS OF 'main'
GROUP BY borough
borough | permit_cnt |
---|---|
Queens | 96 |
Brooklyn | 378 |
Staten Island | 1 |
Manhattan | 518 |
Bronx | 7 |
The same branch_
suffix words here too:
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits.branch_main
GROUP BY borough
borough | permit_cnt |
---|---|
Queens | 96 |
Brooklyn | 378 |
Staten Island | 1 |
Manhattan | 518 |
Bronx | 7 |
Any other user of the table will see the full set of data. We can reassure ourselves of this by unsetting spark.wap.branch
for the session and querying the table without any VERSION AS OF
modifier
spark.conf.unset('spark.wap.branch')
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits
GROUP BY borough
borough | permit_cnt |
---|---|
Queens | 96 |
Brooklyn | 378 |
Staten Island | 1 |
Manhattan | 518 |
Bronx | 7 |
Audit¶
How you audit the data is up to you. The nice thing about the data being staged is that you can do it within the same ETL job, or have another tool do it.
Here's a very simple example of doing in Python. We're going to programatically check that only the four expected boroughs remain in the data.
First, we define those that are expected:
expected_boroughs = {"Queens", "Brooklyn", "Bronx", "Staten Island"}
Then we get a set of the actual boroughs in the staged data
distinct_boroughs = spark.read \
.option("branch", "etl_job_42") \
.format("iceberg") \
.load("nyc.permits") \
.select("borough") \
.distinct() \
.toLocalIterator()
boroughs = {row[0] for row in distinct_boroughs}
Now we do two checks:
- Compare the length of the expected vs actual set
- Check that the two sets when unioned are still the same length. This is necessary, since the first test isn't sufficient alone
if ( (len(boroughs) != len(expected_boroughs)) \
or (len(boroughs) != len(set.union(boroughs, expected_boroughs))) \
or (len(expected_boroughs) != len(set.union(boroughs, expected_boroughs)))):
raise ValueError(f"Audit failed, borough set does not match expected boroughs: {boroughs} != {expected_boroughs}")
else:
print(f"Audit has passed 🙌🏻")
Audit has passed 🙌🏻
Publish¶
Iceberg supports fast-forward merging of branches back to main
, using the manageSnapshots().fastForwardBranch
API.
This isn't yet exposed in Spark, so the existing cherrypick
can be used as a slightly less elegant option.
ℹ️ Note that cherrypick
only works for one commit.
First, we need the snapshot ID of our branch, which we can get from the .refs
table:
%%sql
SELECT * FROM nyc.permits.refs
name | type | snapshot_id | max_reference_age_in_ms | min_snapshots_to_keep | max_snapshot_age_in_ms |
---|---|---|---|---|---|
etl_job_42 | BRANCH | 8720005413734770031 | None | None | None |
main | BRANCH | 8397442454150540251 | None | None | None |
query = f"""
SELECT snapshot_id
FROM nyc.permits.refs
WHERE name = 'etl_job_42'
"""
wap_snapshot_id = spark.sql(query).head().snapshot_id
Now we do the publish, using cherrypick_snapshot
and the snapshot id:
publish_query = f"CALL system.cherrypick_snapshot('nyc.permits', {wap_snapshot_id})"
%sql $publish_query
source_snapshot_id | current_snapshot_id |
---|---|
8720005413734770031 | 8720005413734770031 |
Finally, we look at the table and revel in the glory that is our published changes 🎉
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits.branch_etl_job_42
GROUP BY borough
borough | permit_cnt |
---|---|
Queens | 96 |
Brooklyn | 378 |
Staten Island | 1 |
Bronx | 7 |
We can also inspect the unmodified main
version of the table with VERSION AS OF
:
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits VERSION AS OF 'main'
GROUP BY borough
borough | permit_cnt |
---|---|
Queens | 96 |
Brooklyn | 378 |
Staten Island | 1 |
Bronx | 7 |
What if You Don't Want to Publish Changes?¶
If you don't want to merge the branch you can simply DROP
it.
Create a new branch¶
%%sql
ALTER TABLE nyc.permits
CREATE BRANCH new_etl_job
Set spark.wap.branch
¶
spark.conf.set('spark.wap.branch', 'new_etl_job')
Write¶
%%sql
DELETE FROM nyc.permits WHERE borough LIKE '%'
Audit¶
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits
GROUP BY borough
borough | permit_cnt |
---|
Whoops 🤭¶
We deleted all the data
Reassure ourselves that the data is still there in main
¶
%%sql
SELECT borough, count(*) permit_cnt
FROM nyc.permits VERSION AS OF 'main'
GROUP BY borough
borough | permit_cnt |
---|---|
Queens | 96 |
Brooklyn | 378 |
Staten Island | 1 |
Bronx | 7 |
Abandon changes¶
%%sql
ALTER TABLE nyc.permits
DROP BRANCH new_etl_job
25/06/21 07:09:11 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-7969d33e-6c09-4684-9f57-dbbfd3932826. Falling back to Java IO way java.io.IOException: Failed to delete: /tmp/blockmgr-7969d33e-6c09-4684-9f57-dbbfd3932826 at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:174) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:109) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:90) at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121) at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1126) at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368) at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.storage.DiskBlockManager.doStop(DiskBlockManager.scala:364) at org.apache.spark.storage.DiskBlockManager.$anonfun$addShutdownHook$2(DiskBlockManager.scala:346) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) 25/06/21 07:09:11 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-7969d33e-6c09-4684-9f57-dbbfd3932826/09. Falling back to Java IO way java.io.IOException: Failed to delete: /tmp/blockmgr-7969d33e-6c09-4684-9f57-dbbfd3932826/09 at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:174) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:109) at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:130) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:117) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:90) at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121) at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1126) at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368) at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.storage.DiskBlockManager.doStop(DiskBlockManager.scala:364) at org.apache.spark.storage.DiskBlockManager.$anonfun$addShutdownHook$2(DiskBlockManager.scala:346) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) 25/06/21 07:09:11 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-7969d33e-6c09-4684-9f57-dbbfd3932826. Falling back to Java IO way java.io.IOException: Failed to delete: /tmp/blockmgr-7969d33e-6c09-4684-9f57-dbbfd3932826 at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:174) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:109) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:90) at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121) at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1126) at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368) at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.storage.DiskBlockManager.doStop(DiskBlockManager.scala:364) at org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:359) at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:2122) at org.apache.spark.SparkEnv.stop(SparkEnv.scala:95) at org.apache.spark.SparkContext.$anonfun$stop$25(SparkContext.scala:2305) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375) at org.apache.spark.SparkContext.stop(SparkContext.scala:2305) at org.apache.spark.SparkContext.stop(SparkContext.scala:2211) at org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:550) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:569) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:840) 25/06/21 07:09:11 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-7969d33e-6c09-4684-9f57-dbbfd3932826/3f. Falling back to Java IO way java.io.IOException: Failed to delete: /tmp/blockmgr-7969d33e-6c09-4684-9f57-dbbfd3932826/3f at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:174) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:109) at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:130) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:117) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:90) at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121) at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1126) at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368) at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.storage.DiskBlockManager.doStop(DiskBlockManager.scala:364) at org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:359) at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:2122) at org.apache.spark.SparkEnv.stop(SparkEnv.scala:95) at org.apache.spark.SparkContext.$anonfun$stop$25(SparkContext.scala:2305) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375) at org.apache.spark.SparkContext.stop(SparkContext.scala:2305) at org.apache.spark.SparkContext.stop(SparkContext.scala:2211) at org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:550) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:569) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:840) 25/06/21 07:09:11 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-7969d33e-6c09-4684-9f57-dbbfd3932826/1a. Falling back to Java IO way java.io.IOException: Failed to delete: /tmp/blockmgr-7969d33e-6c09-4684-9f57-dbbfd3932826/1a at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:174) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:109) at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:130) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:117) at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:90) at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121) at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1126) at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368) at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.storage.DiskBlockManager.doStop(DiskBlockManager.scala:364) at org.apache.spark.storage.DiskBlockManager.$anonfun$addShutdownHook$2(DiskBlockManager.scala:346) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840)
Where Next?¶
For more information about write-audit-publish see this talk from Michelle Winters and this talk from Sam Redai.