forEachBatch

Blog, Delta, forEachBatch, Spark, Stream

Using Spark Streaming to merge/upsert data into a Delta Lake with working code

This blog will discuss how to read from a Spark Streaming and merge/upsert data into a Delta Lake. We will also optimize/cluster data of the delta table. In the end, we will show how to start a streaming pipeline with the previous target table as the source. Overall, the process works in the following manner, we read data from a streaming source and use this special function foreachBatch. Using this we will call any user-defined function responsible for all the processing. This function encapsulates the Merge and Optimize to the target Delta table. First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc. for simplicity. Let’s generate a stream using the below. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 10,000 rows per second across 100 partitions. Generate streaming data at your desired rate generated_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 10 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” )) #display(generated_df) Parameters / Variables (Feel free to change as per your needs) target_table_name = “to_be_merged_into_table”check_point_location = f”/tmp/delta/{target_table_name}/_checkpoints/”join_column_name =”hash” Create an Empty Delta table so data could be merged into it spark.sql(f””” DROP TABLE IF EXISTS {target_table_name};”””)( generated_df.writeStream .format(“delta”) .outputMode(“append”).trigger(once=True) .option(“checkpointLocation”, check_point_location) .toTable(target_table_name)) Check if data is populated display(spark.read.table(target_table_name)) A user-defined function which does the data processing, Merge & Optimize def make_changes_using_the_micro_batch(microBatchOutputDF, batchId: int): print(f”Processing batchId: {batchId}”) microBatchOutputDF.createOrReplaceTempView(“updates”) spark_session_for_this_micro_batch = microBatchOutputDF._jdf.sparkSession() spark_session_for_this_micro_batch.sql(f””” SELECT * FROM ( select * ,rank() over(partition by {join_column_name} order by value desc) as dedupe from updates ) WHERE dedupe =1 “””).drop(“dedupe”).createOrReplaceTempView(“updates_which_need_to_be_merged”) spark_session_for_this_micro_batch.sql(f””” MERGE INTO {target_table_name} target using updates_which_need_to_be_merged u on u.{join_column_name} = target.{join_column_name} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) optimize_every_n_batches = 20 #Define how often should optimize run? for example: at 50, it means that we will run the optimize command every 50 batches of stream data if batchId % optimize_every_n_batches == 0: optimize_and_zorder_table(table_name = target_table_name, zorder_by_col_name = join_column_name) Optimize/ Z-order a Delta table Why do we need to optimize a table? If we keep adding files to our Delta table and never optimize/sort them then over time we need to read a lot of files during merge time. Thus, optimizing the Delta table after every N merges is better. N needs to be decided on your latency requirements. You could start with N as 10 and change it as per your needs. The below code will run an optimize and zorder command on a given table that is being fed by a stream. Optimize commands can’t run in a silo because it will require us to pause and then resume the stream. Therefore, we need to call this function a part of the upsert function. This enables us to optimize before the next batch of streaming data comes through. from timeit import default_timer as timer def optimize_and_zorder_table(table_name: str, zorder_by_col_name: str) -> None: “”” Parameters: table_name: str name of the table to be optimized zorder_by_col_name: str comma separated list of columns to zorder by. example “col_a, col_b, col_c” “”” start = timer() print(f”Met condition to optimize table {table_name}”) sql_query_optimize = f”OPTIMIZE {table_name} ZORDER BY ({zorder_by_col_name})” spark.sql(sql_query_optimize) end = timer() time_elapsed_seconds = end – start print( f”Successfully optimized table {table_name} . Total time elapsed: {time_elapsed_seconds} seconds” ) Orchestrate from readStream -> Merge -> Optimize ( generated_df .writeStream.format(‘delta’) .trigger(processingTime=’30 seconds’) .option(“checkpointLocation”, check_point_location) .foreachBatch(make_changes_using_the_micro_batch) .start()) If you have reached so far, you should have an end-to-end pipeline working with streaming data and merging data into a Delta table. As the next step, let’s use the previous target table as our new streaming source. Use the target table as a source for the next streaming pipeline Change data feed allows Databricks to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records change events for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated. Reference: https://docs.databricks.com/delta/delta-change-data-feed.html#use-delta-lake-change-data-feed-on-databricks spark.sql(f”’ALTER TABLE {target_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed=true)”’) Reading change data as a stream display( spark.readStream.format(“delta”) .option(“readChangeFeed”, “true”) .table(target_table_name)) Download this notebook Spark Streaming Using For Each Batch & Merge.html Edit description drive.google.com Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference https://www.youtube.com/watch?v=CLDcdVDupMg

Best Practices, Blog, forEachBatch, Spark, Stream

Streaming Any File Type with Autoloader in Databricks: A Working Guide

Spark Streaming has emerged as a dominant force as a streaming framework, known for its scalable, high-throughput, and fault-tolerant handling of live data streams. While Spark Streaming and Databricks Autoloader inherently support standard file formats like JSON, CSV, PARQUET, AVRO, TEXT, BINARYFILE, and ORC, their versatility extends far beyond these. This blog post delves into the innovative use of Spark Streaming and Databricks Autoloader for processing file types which are not natively supported. The Process Flow: In the below example is for ROS Bag but the same method could be translated for any other file type. Setting Up the Environment Firstly, we need to prepare our Databricks environment: # Databricks notebook source# MAGIC %pip install bagpydbutils.library.restartPython() We install bagpy, a Python library for ROS bag files, and restart the Python environment to ensure the library is properly loaded.Importing Necessary Libraries Next, we import the required Python libraries: from typing import List, Dictimport boto3import rosbagimport tempfilefrom pyspark.sql.functions import udf, explodefrom pyspark.sql.types import ArrayType, StructType, StructField, IntegerType, LongType, FloatTypefrom pyspark.sql import SparkSession These imports include standard data manipulation tools, AWS S3 access (boto3), ROS bag reading capabilities (rosbag), and necessary PySpark components. Detect new files and file path using Autoloader # Spark streaming setup for ROS bag filess3_data_path = “s3a://one-env/jiteshsoni/Vehicle/”table_name = “rosbag_imu”checkpoint_location = f”/tmp/checkpoint/{table_name}/”stream_using_autoloader_df = (spark.readStream .format(“cloudFiles”) .option(“cloudFiles.format”, “binaryfile”) .option(“cloudFiles.includeExistingFiles”, “true”) .load(s3_data_path) )display(stream_using_autoloader_df)Custom UDF to read & parse any file type The core function extract_rosbag_data reads data from a ROS bag file in an S3 bucket and returns a list of dictionaries containing the extracted data: def extract_rosbag_data(s3_rosbag_path: str) -> List[Dict]: “”” Extracts data from a ROS bag file stored in S3, converting it into a list of dictionaries. Args: s3_rosbag_path (str): The S3 path to the ROS bag file. Returns: List[Dict]: A list of dictionaries with data from the ROS bag. “”” interested_topics = [‘/ublox_trunk/ublox/esfalg’] extracted_data = [] # Extracting the S3 bucket and file key from the provided path bucket_name, s3_file_key = s3_rosbag_path.split(‘/’, 3)[2:4] # Using boto3 to download the ROS bag file into memory s3 = boto3.resource(‘s3’) obj = s3.Object(bucket_name, s3_file_key) file_stream = obj.get()[‘Body’].read() # Storing the downloaded file temporarily with tempfile.NamedTemporaryFile() as temp_file: temp_file.write(file_stream) temp_file.flush() # Reading messages from the ROS bag file with rosbag.Bag(temp_file.name, ‘r’) as bag: for topic, msg, timestamp in bag.read_messages(topics=interested_topics): message_data = {field: getattr(msg, field) for field in msg.__slots__} message_data[‘timestamp’] = timestamp.to_sec() extracted_data.append(message_data) return extracted_data This function uses boto3 to access the S3 bucket, reads the ROS bag file, and extracts the relevant data. At this point, we should test the function before we proceed. For your use case, you want to change this function to read your file type. extract_rosbag_data(s3_rosbag_path= “s3a://bucket_name/jiteshsoni/Vehicle/2023-08-04-16-30-24_63.bag”) Things to note here: In this example, I am downloading the file on the cluster which could be avoided depending if your file reader supports it. Defining the Data Schema Before ingesting data into Spark, define the schema that aligns with the data structure in ROS bags. This is important because Spark needs to know what schema to expect. # Define the schema that matches your ROS bag data structurerosbag_schema = ArrayType(StructType([ StructField(“Alpha”, LongType(), True), StructField(“Beta”, IntegerType(), True), StructField(“Gamma”, IntegerType(), True), StructField(“Delta”, IntegerType(), True), StructField(“Epsilon”, IntegerType(), True), StructField(“Zeta”, IntegerType(), True), StructField(“Eta”, IntegerType(), True), StructField(“Theta”, IntegerType(), True), StructField(“Iota”, FloatType(), True)]))# Creating a User Defined Function (UDF) for processing ROS bag filesprocess_rosbag_udf = udf(extract_rosbag_data, returnType=rosbag_schema) Now let’s test with if Autoloader & Parsing if custom UDF is working using the display command rosbag_stream_df = (stream_using_autoloader_df .withColumn(“rosbag_rows”, process_rosbag_udf(“path”)) .withColumn(“extracted_data”, explode(“rosbag_rows”)) .selectExpr(“extracted_data.*”, “_metadata.*”) )# Displaying the DataFramedisplay(rosbag_stream_df) Writing the Stream to a Delta Table Finally, we write the streaming data to a Delta table, enabling further processing or querying: streaming_write_query = ( rosbag_stream_df.writeStream .format(“delta”) .option(“mergeSchema”, “true”) .option(“queryName”, f”IngestFrom_{s3_data_path}_AndWriteTo_{table_name}”) .option(“checkpointLocation”, checkpoint_location) .trigger(availableNow=True) .toTable(table_name)) Best Practices & Considerations Thank You for Reading! I hope you found this article helpful and informative. If you enjoyed this post, please consider giving it a clap 👏 and sharing it with your network. Your support is greatly appreciated! — CanadianDataGuy

Blog, forEachBatch, Spark, Stream

Simplifying Real-time Data Processing with Spark Streaming’s foreachBatch with working code

Comprehensive guide to implementing a fully operational Streaming Pipeline that can be tailored to your specific needs. In this working example, you will learn how to parameterize the ForEachBatch function. Spark Streaming & foreachBatch Spark Streaming is a powerful tool for processing streaming data. It allows you to process data as it arrives, without having to wait for the entire dataset to be available. This can be very useful for applications that need to respond to changes in data in real time. One of the features of Spark Streaming is the foreachBatch() method. This method allows you to apply a custom function to each batch of data as it arrives. This can be useful for a variety of tasks, such as: The foreachBatch() method is a powerful tool that can be used to extend the capabilities of Spark Streaming. In this blog post, we will take a closer look at how to use foreachBatch(). Introducing foreachBatch: foreachBatch is a method provided by Spark Streaming that allows developers to apply arbitrary operations on the output of a streaming query. It acts as a bridge between the streaming world and the structured world of DataFrames and Datasets. This means that we can leverage the rich functionality of Spark’s structured APIs to process real-time data efficiently. The Power of foreachBatch: The foreachBatch operation enables developers to perform batch-like operations on streaming data. Instead of processing each individual record, which can be inefficient, foreachBatch processes the data in micro-batches, offering better performance and resource utilization. This approach also provides the flexibility to leverage the full power of Spark’s DataFrames, including various transformations and aggregations, to perform complex computations on streaming data. Implementing foreachBatch: To use foreachBatch, you need to define a function that takes two arguments: the batch identifier and the DataFrame representing the micro-batch of data. Inside this function, you can apply any transformations or computations required on the streaming data. You can use Spark’s SQL, DataFrame, or Dataset APIs to manipulate the data and write the results to any external systems, such as databases or file systems. Benefits of foreachBatch: Code & Setup Here’s how we can use foreachBatch to achieve this: ∘ Define parameters for the job ∘ Create a Streaming source ∘ Define custom processing logic and parameters ∘ Create an instance of forEachBatchProcessor Class with the parameters ∘ Orchestrate the job ∘ Look at the output table ∘ Clean Up Define parameters for the job target_table_name = “for_each_batch_paramerterize” check_point_location = f”/tmp/delta/{target_table_name}/_checkpoints/” dedupe_colum_name =”hash” Create a Streaming source We will create a synthetic dataset. generated_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 4) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” ) ) Define custom processing logic and parameters class forEachBatchProcessor: def __init__(self, dedupe_column: str, filter_criteria:str, passed_value: int): self.dedupe_column = dedupe_column self.filter_criteria = filter_criteria self.passed_value = passed_value def print_attributes(self): attributes = vars(self) print( “\n”.join([f”{attr}: {value}” for attr, value in attributes.items()]) ) def make_changes_using_the_micro_batch(self, microBatchOutputDF, batchId: int): self.print_attributes() print(f”Processing batchId: {batchId}”) # Your processing logic using the parameter view_name = f”updates_for_batchId_{batchId}” microBatchOutputDF.createOrReplaceTempView(view_name) sql_logic = f””” SELECT * ,{self.passed_value} as passed_value ,{batchId} as batch_id FROM ( SELECT * ,rank() over(partition by {self.dedupe_column} order by value desc) as dedupe FROM {view_name} WHERE {self.filter_criteria} ) WHERE dedupe =1 “”” print(f”Processing sql_logic: {sql_logic}”) to_be_written_df = microBatchOutputDF.sparkSession.sql(sql_logic).drop(“dedupe”) to_be_written_df.write.mode(“append”).saveAsTable(target_table_name) Create an instance of forEachBatchProcessor Class with the parameters instantiateForEachBatchProcessor = forEachBatchProcessor( dedupe_column = dedupe_colum_name, filter_criteria = “1=1”, passed_value = 3 ) Orchestrate the job ( generated_df .writeStream #.trigger(availableNow=True) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location) .option(“queryName”, “ParameterizeForEachBatch”) .foreachBatch(instantiateForEachBatchProcessor.make_changes_using_the_micro_batch) .start() ) Look at the output table display(spark.read.table(target_table_name)) Clean Up spark.sql(f””” DROP TABLE IF EXISTS {target_table_name} “””) dbutils.fs.rm(check_point_location,True) Conclusion: Apache Spark Streaming’s foreachBatch operation is a powerful tool for simplifying real-time data processing. By bridging the gap between the streaming and structured worlds, it enables developers to perform batch-like operations on streaming data efficiently. Leveraging the rich functionality of Spark’s DataFrames, foreachBatch empowers users to process and analyze real-time data with ease. Whether you’re performing aggregations, transformations, or writing data to external systems, foreachBatch offers a flexible and scalable solution for real-time streaming applications. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website **CanadianDataGuy.com** for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Download the code I want to emphasize that my blog posts are designed to be practical resources that you can readily use in your own environments. By providing code examples with careful attention to best practices, I aim to simplify the implementation of real-time data processing solutions. I encourage you to explore the blog, copy the code snippets, and adapt them to your specific needs. With these resources, you’ll be equipped to accelerate your development process and unlock the power of Spark Streaming. Dive in, leverage the code, and start building your real-time data processing pipelines with confidence! Go Build! Canadian Data Guy!

Blog, Delta, forEachBatch, Spark, Stream

Merge Multiple Spark Streams into a Delta Table with working code

This blog will discuss how to read from multiple Spark Streams and merge/upsert data into a single Delta Table. We will also optimize/cluster data of the delta table. Overall, the process works in the following manner: Architecture First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc. However, for simplicity we will use .format(’rate’) to generate a stream. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 1,000 rows per second across 100 partitions. For the purpose of this blog, we will build 2 Spark streams one for each country Canada & USA. USA’s stream generated_streaming_usa_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” ,”‘USA’ AS country” ,”current_timestamp() as ingestion_timestamp” )) #display(generated_streaming_usa_df) Canada’s Stream generated_streaming_canada_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” ,”‘Canada’ AS country” ,”current_timestamp() as ingestion_timestamp” )) #display(generated_streaming_canada_df) Parameters / Variables (Feel free to change as per your needs) target_table_name = “to_be_merged_into_table_partitioned_by_country”check_point_location_for_usa_stream = f”/tmp/delta/{target_table_name}/_checkpoints/_usa/”check_point_location_for_canada_stream = f”/tmp/delta/{target_table_name}/_checkpoints/_canada/”join_column_name =”hash”partition_column = “country” Create an Empty Delta table so data could be merged into it #spark.sql(f””” DROP TABLE IF EXISTS {target_table_name};”””)( generated_steaming_usa_df.writeStream .partitionBy(partition_column) .format(“delta”) .outputMode(“append”).trigger(once=True) .option(“checkpointLocation”, check_point_location_for_usa_stream) .toTable(target_table_name)) Check if data is populated. If you do not see any data, just run the above code snippet once more. Sometimes it takes time for the data to show up. display(spark.read.table(target_table_name)) Now we will build the code for the user-defined function which does all the data processing, merge & Optimize def make_changes_using_the_micro_batch(microBatchOutputDF, batchId: int): print(f”Processing batchId: {batchId}”) microBatchOutputDF.createOrReplaceTempView(“updates”) spark_session_for_this_micro_batch = microBatchOutputDF._jdf.sparkSession() spark_session_for_this_micro_batch.sql(f””” SELECT * FROM ( select * ,rank() over(partition by {join_column_name} order by value desc) as dedupe from updates ) WHERE dedupe =1 “””).drop(“dedupe”).createOrReplaceTempView(“updates_which_need_to_be_merged”) spark_session_for_this_micro_batch.sql(f””” MERGE INTO {target_table_name} target using updates_which_need_to_be_merged u on u.{partition_column} = target.{partition_column} AND u.{join_column_name} = target.{join_column_name} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) optimize_every_n_batches = 20 #Define how often should optimize run? for example: at 50, it means that we will run the optimize command every 50 batches of stream data if batchId % optimize_every_n_batches == 0: optimize_and_zorder_table(table_name = target_table_name, zorder_by_col_name = join_column_name) Optimize/ Z-order a Delta table Why do we need to optimize a table? If we keep adding files to our Delta table and never optimize/sort them then over time we need to read a lot of files during merge time. Thus, optimizing the Delta table after every N merges is better. N needs to be decided on your latency requirements. You could start with N as 10 and change it as per your needs. The below code will run an optimize and zorder command on a given table that is being fed by a stream. Optimize commands can’t run in a silo because it will require us to pause and then resume the stream. Therefore, we need to call this function a part of the upsert function. This enables us to optimize before the next batch of streaming data comes through. from timeit import default_timer as timer def optimize_and_zorder_table(table_name: str, zorder_by_col_name: str) -> None: “”” Parameters: table_name: str name of the table to be optimized zorder_by_col_name: str comma separated list of columns to zorder by. example “col_a, col_b, col_c” “”” start = timer() print(f”Met condition to optimize table {table_name}”) sql_query_optimize = f”OPTIMIZE {table_name} ZORDER BY ({zorder_by_col_name})” spark.sql(sql_query_optimize) end = timer() time_elapsed_seconds = end – start print( f”Successfully optimized table {table_name} . Total time elapsed: {time_elapsed_seconds} seconds” ) Orchestrate the streaming pipeline end to end Read the Canada stream and merge into Delta table ( generated_steaming_canada_df .writeStream.format(‘delta’) #.trigger(availableNow=True) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location_for_canada_stream) .foreachBatch(make_changes_using_the_micro_batch) .start()) Read the USA stream and merge into Delta table ( generated_steaming_usa_df .writeStream.format(‘delta’) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location_for_usa_stream) .foreachBatch(make_changes_using_the_micro_batch) .start()) Now, let’s validate that data is being populated display( spark.sql(f””” SELECT {partition_column} as partition_column ,count(1) as row_count FROM {target_table_name} GROUP BY {partition_column} “””)) If you have reached so far, you should have an end-to-end pipeline working with streaming data and merging data into a Delta table. Download this notebook material_for_public_consumption/Merge Multiple Spark Streams Into A Delta Table.py at main ·… Contribute to jiteshsoni/material_for_public_consumption development by creating an account on GitHub. github.com Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference https://cdn.embedly.com/widgets/media.html?src=https%3A%2F%2Fwww.youtube.com%2Fembed%2F2Iy5S0Hf4XM%3Ffeature%3Doembed&display_name=YouTube&url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3D2Iy5S0Hf4XM&image=https%3A%2F%2Fi.ytimg.com%2Fvi%2F2Iy5S0Hf4XM%2Fhqdefault.jpg&key=a19fcc184b9711e1b4764040d3dc5c07&type=text%2Fhtml&schema=youtube

Scroll to Top