In fashionable information architectures, Apache Iceberg has emerged as a well-liked desk format for information lakes, providing key options together with ACID transactions and concurrent write assist. Though these capabilities are highly effective, implementing them successfully in manufacturing environments presents distinctive challenges that require cautious consideration.
Take into account a typical state of affairs: A streaming pipeline repeatedly writes information to an Iceberg desk whereas scheduled upkeep jobs carry out compaction operations. Though Iceberg gives built-in mechanisms to deal with concurrent writes, sure battle eventualities—resembling between streaming updates and compaction operations—can result in transaction failures that require particular dealing with patterns.
This submit demonstrates implement dependable concurrent write dealing with mechanisms in Iceberg tables. We’ll discover Iceberg’s concurrency mannequin, look at widespread battle eventualities, and supply sensible implementation patterns of each automated retry mechanisms and conditions requiring customized battle decision logic for constructing resilient information pipelines. We can even cowl the sample with automated compaction by means of AWS Glue Information Catalog desk optimization.
Frequent battle eventualities
Essentially the most frequent information conflicts happen in a number of particular operational eventualities that many organizations encounter of their information pipelines, which we focus on on this part.
Concurrent UPDATE/DELETE on overlapping partitions
When a number of processes try to switch the identical partition concurrently, information conflicts can come up. For instance, think about a knowledge high quality course of updating buyer data with corrected addresses whereas one other course of is deleting outdated buyer data. Each operations goal the identical partition based mostly on customer_id, resulting in potential conflicts as a result of they’re modifying an overlapping dataset. These conflicts are notably widespread in large-scale information cleanup operations.
Compaction vs. streaming writes
A traditional battle state of affairs happens throughout desk upkeep operations. Take into account a streaming pipeline ingesting real-time occasion information whereas a scheduled compaction job runs to optimize file sizes. The streaming course of is likely to be writing new data to a partition whereas the compaction job is making an attempt to mix current recordsdata in the identical partition. This state of affairs is particularly widespread with Information Catalog desk optimization, the place automated compaction can run concurrently with steady information ingestion.
Concurrent MERGE operations
MERGE operations are notably prone to conflicts as a result of they contain each studying and writing information. As an illustration, an hourly job is likely to be merging buyer profile updates from a supply system whereas a separate job is merging choice updates from one other system. If each jobs try to switch the identical buyer data, they will battle as a result of every operation bases its adjustments on a unique view of the present information state.
Basic concurrent desk updates
When a number of transactions happen concurrently, some transactions would possibly fail to decide to the catalog on account of interference from different transactions. Iceberg has mechanisms to deal with this state of affairs, so it will possibly adapt to concurrent transactions in lots of instances. Nonetheless, commits can nonetheless fail if the newest metadata is up to date after the bottom metadata model is established. This state of affairs applies to any sort of updates on an Iceberg desk.
Iceberg’s concurrency mannequin and battle sort
Earlier than diving into particular implementation patterns, it’s important to grasp how Iceberg manages concurrent writes by means of its desk structure and transaction mannequin. Iceberg makes use of a layered structure to handle desk state and information:
Catalog layer – Maintains a pointer to the present desk metadata file, serving as the one supply of fact for desk state. The Information Catalog gives the performance because the Iceberg catalog.
Metadata layer – Comprises metadata recordsdata that observe desk historical past, schema evolution, and snapshot info. These recordsdata are saved on Amazon Easy Storage Service (Amazon S3).
Information layer – Shops the precise information recordsdata and delete recordsdata (for Merge-on-Learn operations). These recordsdata are additionally saved on Amazon S3.
The next diagram illustrates this structure.
This structure is prime to Iceberg’s optimistic concurrency management, the place a number of writers can proceed with their operations concurrently, and conflicts are detected at commit time.
Write transaction move
A typical write transaction in Iceberg follows these key steps:
Learn present state. In lots of operations (like OVERWRITE, MERGE, and DELETE), the question engine must know which recordsdata or rows are related, so it reads the present desk snapshot. That is optionally available for operations like INSERT.
Decide the adjustments in transaction, and write new information recordsdata.
Load the desk’s newest metadata, and decide which metadata model is used as the bottom for the replace.
Verify if the change ready in Step 2 is appropriate with the newest desk information in Step 3. If the test failed, the transaction should cease.
Generate new metadata recordsdata.
Commit the metadata recordsdata to the catalog. If the commit failed, retry from Step 3. The variety of retries is dependent upon the configuration.
The next diagram illustrates this workflow.
Conflicts can happen at two crucial factors:
Information replace conflicts – Throughout validation when checking for information conflicts (Step 4)
Catalog commit conflicts – Throughout the commit when making an attempt to replace the catalog pointer (Step 6)
When working with Iceberg tables, understanding the varieties of conflicts that may happen and the way they’re dealt with is essential for constructing dependable information pipelines. Let’s look at the 2 main varieties of conflicts and their traits.
Catalog commit conflicts
Catalog commit conflicts happen when a number of writers try and replace the desk metadata concurrently. When a commit battle happens, Iceberg will robotically retry the operation based mostly on the desk’s write properties. The retry course of solely repeats the metadata commit, not your entire transaction, making it each secure and environment friendly. When the retries fail, the transaction fails with CommitFailedException.
Within the following diagram, two transactions run concurrently. Transaction 1 efficiently updates the desk’s newest snapshot within the Iceberg catalog from 0 to 1. In the meantime, transaction 2 makes an attempt to replace from Snapshot 0 to 1, however when it tries to commit the adjustments to the catalog, it finds that the newest snapshot has already been modified to 1 by transaction 1. Consequently, transaction 2 must retry from Step 3.
These conflicts are sometimes transient and might be robotically resolved by means of retries. You may optionally configure write properties controlling commit retry conduct. For extra detailed configuration, seek advice from Write properties within the Iceberg documentation.
The metadata used when studying the present state (Step 1) and the snapshot used as base metadata for updates (Step 3) might be completely different. Even when one other transaction updates the newest snapshot between Steps 1 and three, the present transaction can nonetheless commit adjustments to the catalog so long as it passes the info battle test (Step 4). Which means even when computing adjustments and writing information recordsdata (Step 1 to 2) take a very long time, and different transactions make adjustments throughout this era, the transaction can nonetheless try and decide to the catalog. This demonstrates Iceberg’s clever concurrency management mechanism.
The next diagram illustrates this workflow.
Information replace conflicts
Information replace conflicts are extra complicated and happen when concurrent transactions try to switch overlapping information. Throughout a write transaction, the question engine checks consistency between the snapshot being written and the newest snapshot in line with transaction isolation guidelines. When incompatibility is detected, the transaction fails with a ValidationException.
Within the following diagram, two transactions run concurrently on an worker desk containing id, identify, and wage columns. Transaction 1 makes an attempt to replace a file based mostly on Snapshot 0 and efficiently commits this modification, making the newest snapshot model 1. In the meantime, transaction 2 additionally makes an attempt to replace the identical file based mostly on Snapshot 0. When transaction 2 initially scanned the info, the newest snapshot was 0, however it has since been up to date to 1 by transaction 1. Throughout the information battle test, transaction 2 discovers that its adjustments battle with Snapshot 1, ensuing within the transaction failing.
These conflicts can’t be robotically retried by Iceberg’s library as a result of when information conflicts happen, the desk’s state has modified, making it unsure whether or not retrying the transaction would keep total information consistency. It is advisable deal with this kind of battle based mostly in your particular use case and necessities.
The next desk summarizes how completely different write patterns have various probability of conflicts.
Write Sample
Catalog Commit Battle (Mechanically retryable)
Information Battle (Non-retryable)
INSERT (AppendFiles)
Sure
By no means
UPDATE/DELETE with Copy-on-Write or Merge-on-Learn (OverwriteFiles)
Sure
Sure
Compaction (RewriteFiles)
Sure
Sure
Iceberg desk’s isolation ranges
Iceberg tables assist two isolation ranges: Serializable and Snapshot isolation. Each present a learn constant view of the desk and guarantee readers see solely dedicated information. Serializable isolation ensures that concurrent operations run as in the event that they have been carried out in some sequential order. Snapshot isolation gives weaker ensures however presents higher efficiency in environments with many concurrent writers. Below snapshot isolation, information battle checks can cross even when concurrent transactions add new recordsdata with data that doubtlessly match its situations.
By default, Iceberg tables use serializable isolation. You may configure isolation ranges for particular operations utilizing desk properties:
tbl_properties = {
‘write.delete.isolation-level’ = ‘serializable’,
‘write.replace.isolation-level’ = ‘serializable’,
‘write.merge.isolation-level’ = ‘serializable’
}
You could select the suitable isolation stage based mostly in your use case. Be aware that for conflicts between streaming ingestion and compaction operations, which is without doubt one of the most typical eventualities, snapshot isolation doesn’t present any further advantages to the default serializable isolation. For extra detailed configuration, see IsolationLevel.
Implementation patterns
Implementing strong concurrent write dealing with in Iceberg requires completely different methods relying on the battle sort and use case. On this part, we share confirmed patterns for dealing with widespread eventualities.
Handle catalog commit conflicts
Catalog commit conflicts are comparatively simple to deal with by means of desk properties. The next configurations function preliminary baseline settings that you may regulate based mostly in your particular workload patterns and necessities.
For frequent concurrent writes (for instance, streaming ingestion):
tbl_properties = {
‘commit.retry.num-retries’: ’10’,
‘commit.retry.min-wait-ms’: ‘100’,
‘commit.retry.max-wait-ms’: ‘10000’,
‘commit.retry.total-timeout-ms’: ‘1800000’
}
For upkeep operations (for instance, compaction):
tbl_properties = {
‘commit.retry.num-retries’: ‘4’,
‘commit.retry.min-wait-ms’: ‘1000’,
‘commit.retry.max-wait-ms’: ‘60000’,
‘commit.retry.total-timeout-ms’: ‘1800000’
}
Handle information replace conflicts
For information replace conflicts, which may’t be robotically retried, you want to implement a customized retry mechanism with correct error dealing with. A typical state of affairs is when stream UPSERT ingestion conflicts with concurrent compaction operations. In such instances, the stream ingestion job ought to sometimes implement retries to deal with incoming information. With out correct error dealing with, the job will fail with a ValidationException.
We present two instance scripts demonstrating a sensible implementation of error dealing with for information conflicts in Iceberg streaming jobs. The code particularly catches ValidationException by means of Py4JJavaError dealing with, which is important for correct Java-Python interplay. It contains exponential backoff and jitter technique by including a random delay of 0–25% to every retry interval. For instance, if the bottom exponential backoff time is 4 seconds, the precise retry delay will likely be between 4–5 seconds, serving to stop instant retry storms whereas sustaining cheap latency.
On this instance, we create a state of affairs with frequent MERGE operations on the identical data by utilizing ‘worth’ as a novel identifier and artificially limiting its vary. By making use of a modulo operation (worth % 20), we constrain all values to fall inside 0–19, which suggests a number of updates will goal the identical data. As an illustration, if the unique stream incorporates values 0, 20, 40, and 60, they may all be mapped to 0, leading to a number of MERGE operations focusing on the identical file. We then use groupBy and max aggregation to simulate a typical UPSERT sample the place we maintain the newest file for every worth. The reworked information is saved in a short lived view that serves because the supply desk within the MERGE assertion, permitting us to carry out UPDATE operations utilizing ‘worth’ because the matching situation. This setup helps display how our retry mechanism handles ValidationExceptions that happen when concurrent transactions try to switch the identical data.
The primary instance makes use of Spark Structured Streaming utilizing a price supply with a 20-second set off interval to display the retry mechanism’s conduct when concurrent operations trigger information conflicts. Change along with your database identify, along with your desk identify, amzn-s3-demo-bucket along with your S3 bucket identify.
import time
import random
from pyspark.sql import SparkSession
from py4j.protocol import Py4JJavaError
from pyspark.sql.features import max as max_
CATALOG = “glue_catalog”
DATABASE = “”
TABLE = “”
BUCKET = “amzn-s3-demo-bucket”
spark = SparkSession.builder
.appName(“IcebergUpsertExample”)
.config(f”spark.sql.catalog.{CATALOG}”, “org.apache.iceberg.spark.SparkCatalog”)
.config(“spark.sql.extensions”,”org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions”)
.config(f”spark.sql.catalog.{CATALOG}.io-impl”,”org.apache.iceberg.aws.s3.S3FileIO”)
.config(“spark.sql.defaultCatalog”, CATALOG)
.config(f”spark.sql.catalog.{CATALOG}.sort”, “glue”)
.getOrCreate()
spark.sql(f”””
CREATE TABLE IF NOT EXISTS {DATABASE}.{TABLE} (
timestamp TIMESTAMP,
worth LONG
)
USING iceberg
LOCATION ‘s3://{BUCKET}/warehouse’
“””)
def backoff(try):
“””Exponential backoff with jitter”””
exp_backoff = min(2 ** try, 60)
jitter = random.uniform(0, 0.25 * exp_backoff)
return exp_backoff + jitter
def is_validation_exception(java_exception):
“””Verify if exception is ValidationException”””
trigger = java_exception
whereas trigger will not be None:
if “org.apache.iceberg.exceptions.ValidationException” in str(trigger.getClass().getName()):
return True
trigger = trigger.getCause()
return False
def upsert_with_retry(microBatchDF, batchId):
max_retries = 5
try = 0
# Use a narrower key vary to deliberately enhance updates for a similar worth in MERGE
transformedDF = microBatchDF
.selectExpr(“timestamp”, “worth % 20 AS worth”)
.groupBy(“worth”)
.agg(max_(“timestamp”).alias(“timestamp”))
view_name = f”incoming_data_{batchId}”
transformedDF.createOrReplaceGlobalTempView(view_name)
whereas try < max_retries:
strive:
spark.sql(f”””
MERGE INTO {DATABASE}.{TABLE} AS t
USING global_temp.{view_name} AS i
ON t.worth = i.worth
WHEN MATCHED THEN
UPDATE SET
t.timestamp = i.timestamp,
t.worth = i.worth
WHEN NOT MATCHED THEN
INSERT (timestamp, worth)
VALUES (i.timestamp, i.worth)
“””)
print(f”(SUCCESS) Batch {batchId} processed efficiently”)
return
besides Py4JJavaError as e:
if is_validation_exception(e.java_exception):
try += 1
if try < max_retries:
delay = backoff(try)
print(f”(RETRY) Batch {batchId} failed with ValidationException. ”
f”Retrying in {delay} seconds. Try {try}/{max_retries}”)
time.sleep(delay)
else:
print(f”(FAILED) Batch {batchId} failed after {max_retries} makes an attempt”)
elevate
# Pattern streaming question setup
df = spark.readStream
.format(“price”)
.choice(“rowsPerSecond”, 10)
.load()
# Begin streaming question
question = df.writeStream
.set off(processingTime=”20 seconds”)
.choice(“checkpointLocation”, f”s3://{BUCKET}/checkpointLocation”)
.foreachBatch(upsert_with_retry)
.begin()
question.awaitTermination()
The second instance makes use of GlueContext.forEachBatch obtainable on AWS Glue Streaming jobs. The implementation sample for the retry mechanism stays the identical, however the primary variations are the preliminary setup utilizing GlueContext and create a streaming DataFrame. Though our instance makes use of spark.readStream with a price supply for demonstration, in precise AWS Glue Streaming jobs, you’d sometimes create your streaming DataFrame utilizing glueContext.create_data_frame.from_catalog to learn from sources like Amazon Kinesis or Kafka. For extra particulars, see AWS Glue Streaming connections. Change along with your database identify, along with your desk identify, amzn-s3-demo-bucket along with your S3 bucket identify.
import time
import random
from py4j.protocol import Py4JJavaError
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from pyspark.sql.features import max as max_
CATALOG = “glue_catalog”
DATABASE = “”
TABLE = “”
BUCKET = “amzn-s3-demo-bucket”
spark = SparkSession.builder
.appName(“IcebergUpsertExample”)
.config(f”spark.sql.catalog.{CATALOG}”, “org.apache.iceberg.spark.SparkCatalog”)
.config(“spark.sql.extensions”,”org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions”)
.config(f”spark.sql.catalog.{CATALOG}.io-impl”,”org.apache.iceberg.aws.s3.S3FileIO”)
.config(“spark.sql.defaultCatalog”, CATALOG)
.config(f”spark.sql.catalog.{CATALOG}.sort”, “glue”)
.getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
spark.sql(f”””
CREATE TABLE IF NOT EXISTS {DATABASE}.{TABLE} (
timestamp TIMESTAMP,
worth LONG
)
USING iceberg
LOCATION ‘s3://{BUCKET}/warehouse’
“””)
def backoff(try):
exp_backoff = min(2 ** try, 60)
jitter = random.uniform(0, 0.25 * exp_backoff)
return exp_backoff + jitter
def is_validation_exception(java_exception):
trigger = java_exception
whereas trigger will not be None:
if “org.apache.iceberg.exceptions.ValidationException” in str(trigger.getClass().getName()):
return True
trigger = trigger.getCause()
return False
def upsert_with_retry(batch_df, batchId):
max_retries = 5
try = 0
transformedDF = batch_df.selectExpr(“timestamp”, “worth % 20 AS worth”)
.groupBy(“worth”)
.agg(max_(“timestamp”).alias(“timestamp”))
view_name = f”incoming_data_{batchId}”
transformedDF.createOrReplaceGlobalTempView(view_name)
whereas try < max_retries:
strive:
spark.sql(f”””
MERGE INTO {DATABASE}.{TABLE} AS t
USING global_temp.{view_name} AS i
ON t.worth = i.worth
WHEN MATCHED THEN
UPDATE SET
t.timestamp = i.timestamp,
t.worth = i.worth
WHEN NOT MATCHED THEN
INSERT (timestamp, worth)
VALUES (i.timestamp, i.worth)
“””)
print(f”(SUCCESS) Batch {batchId} processed efficiently”)
return
besides Py4JJavaError as e:
if is_validation_exception(e.java_exception):
try += 1
if try < max_retries:
delay = backoff(try)
print(f”(RETRY) Batch {batchId} failed with ValidationException. ”
f”Retrying in {delay} seconds. Try {try}/{max_retries}”)
time.sleep(delay)
else:
print(f”(FAILED) Batch {batchId} failed after {max_retries} makes an attempt”)
elevate
# Pattern streaming question setup
streaming_df = spark.readStream
.format(“price”)
.choice(“rowsPerSecond”, 10)
.load()
# In precise Glue Streaming jobs, you’d sometimes create a streaming DataFrame like this:
“””
streaming_df = glueContext.create_data_frame.from_catalog(
database = “database”,
table_name = “table_name”,
transformation_ctx = “streaming_df”,
additional_options = {
“startingPosition”: “TRIM_HORIZON”,
“inferSchema”: “false”
}
)
“””
glueContext.forEachBatch(
body=streaming_df,
batch_function=upsert_with_retry,
choices={
“windowSize”: “20 seconds”,
“checkpointLocation”: f”s3://{BUCKET}/checkpointLocation”
}
)
Reduce battle risk by scoping your operations
When performing upkeep operations like compaction or updates, it’s beneficial to slim down the scope to attenuate overlap with different operations. For instance, think about a desk partitioned by date the place a streaming job repeatedly upserts information for the newest date. The next is the instance script to run the rewrite_data_files process to compact your entire desk:
# Instance of broad scope compaction
spark.sql(“””
CALL catalog_name.system.rewrite_data_files(
desk => ‘db.table_name’
)
“””)
By narrowing the compaction scope with a date partition filter within the the place clause, you possibly can keep away from conflicts between streaming ingestion and compaction operations. The streaming job can proceed to work with the newest partition whereas compaction processes historic information.
# Slender down the scope by partition
spark.sql(“””
CALL catalog_name.system.rewrite_data_files(
desk => ‘db.table_name’,
the place => ‘date_partition < current_date’
)
“””)
Conclusion
Efficiently managing concurrent writes in Iceberg requires understanding each the desk structure and varied battle eventualities. On this submit, we explored implement dependable battle dealing with mechanisms in manufacturing environments.
Essentially the most crucial idea to recollect is the excellence between catalog commit conflicts and information conflicts. Though catalog commit conflicts might be dealt with by means of automated retries and desk properties configuration, information conflicts require cautious implementation of customized dealing with logic. This turns into notably essential when implementing upkeep operations like compaction, the place utilizing the the place clause in rewrite_data_files can considerably decrease battle potential by decreasing the scope of operations.
For streaming pipelines, the important thing to success lies in implementing correct error dealing with that may differentiate between battle sorts and reply appropriately. This contains configuring appropriate retry settings by means of desk properties and implementing backoff methods that align along with your workload traits. When mixed with well-timed upkeep operations, these patterns assist construct resilient information pipelines that may deal with concurrent writes reliably.
By making use of these patterns and understanding the underlying mechanisms of Iceberg’s concurrency mannequin, you possibly can construct strong information pipelines that successfully deal with concurrent write eventualities whereas sustaining information consistency and reliability.
Concerning the Authors
Sotaro Hikita is an Analytics Options Architect. He helps clients throughout a variety of industries in constructing and working analytics platforms extra successfully. He’s notably captivated with massive information applied sciences and open supply software program.
Noritaka Sekiyama is a Principal Massive Information Architect on the AWS Glue workforce. He works based mostly in Tokyo, Japan. He’s chargeable for constructing software program artifacts to assist clients. In his spare time, he enjoys biking along with his highway bike.