Sometimes in life, we need to make breaking changes which require us to create a new checkpoint. Some example scenarios:
- You are doing a code/application change where you are changing logic
- Major Spark Version upgrade from Spark 2.x to Spark 3.x
- The previous deployment was wrong, and you want to reprocess from a certain point
There could be plenty of scenarios where you want to control precisely which data(Kafka offsets) need to be processed.
Not every scenario requires a new checkpoint. Here is a list of things you can change without requiring a new checkpoint.
This blog helps you understand how to handle a scenario where a new checkpoint is unavoidable.
Kafka Basics: Topics, partition & offset
Kafka Cluster has Topics: Topics are a way to organize messages. Each topic has a name that is unique across the entire Kafka cluster. Messages are sent to and read from specific topics. In other words, producers write data on a topic, and consumers read data from the topic.
Topics have Partitions, and data/messages are distributed across partitions. Every message belongs to a single partition.
Partition has messages, each with a unique sequential identifier within the partition called the Offset.
What is the takeaway here?
We must identify what offset has already been processed for each partition, and this information can be found inside the checkpoint.
What information is inside the checkpoint?
- Fetch metadata & write it to WAL(write-ahead log) in the checkpoint. WAL: a roll-forward journal that records transactions that have been committed but not yet applied to the main data
- Fetch the actual data → process data with state info and then write it to the sink
- Write the stateful information & commit to the checkpoint
Under the checkpoint folder, there are four subfolders:
- Sources (contain starting offset of Kafka)
- Offsets (consist of WAL information)
- Commits (after completion of the entire process, it goes to the commit)
- State (only for stateful operations + 1 file of metadata)
How to fetch information about Offset & Partition from the Checkpoint folder?
List the files at the checkpoint location; we are looking for the offsets folder.
checkpoint_location= "/checkpoint_location/checkpoint_for_kafka_to_delta"
dbutils.fs.ls(checkpoint_location)dbutils.fs.ls(f”{checkpoint_location}/”)
Next, we will list the files under the commits folder and identify the most recent commits.
dbutils.fs.ls(checkpoint_location)
dbutils.fs.ls(f”{checkpoint_location}/commits”)
/checkpoint_location/checkpoint_for_kafka_to_delta/commits/0
/checkpoint_location/checkpoint_for_kafka_to_delta/commits/1
/checkpoint_location/checkpoint_for_kafka_to_delta/commits/2
Once we identify the last commits file number; we will open the equivalent offsets file. In this example, we can see the latest commits is “2”.
Now let’s view the contents of the offsets file.
#%fs head {FILL_THE_EXACT_PATH_OF_THE_FILE_WHICH_NEEDS_TO_BE_VIEWED}
%fs head /checkpoint_location/checkpoint_for_kafka_to_delta/offsets/2
{"batchWatermarkMs":0,"batchTimestampMs":1674623173851,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"topic_name_from_kafka":{"0":400000, "1":300000}}
The information of interest is in the end. This has the topic name and offset per partition.
{“topic_name_from_kafka”:{“0”:400000, “1”:300000}}
Now the easy part: Use Spark to start reading Kafka from a particular Offset
Spark Streaming starts read stream by default with the latest offset. However, it provides a parameter “startingOffsets” to select a custom starting point.
startingOffsets = """{"topic_name_from_kafka":{"0":400000, "1":300000}}"""
kafka_stream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext )
.option("subscribe", topic )
.option("startingOffsets", startingOffsets )
.load())
display(kafka_stream)
And we are Done!!. Recommend parameterizing your code so that “startingOffsets” can be passed as a parameter.
Footnote:
Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.