Blog

Your blog category

Blog, Delta, forEachBatch, Spark, Stream

Using Spark Streaming to merge/upsert data into a Delta Lake with working code

This blog will discuss how to read from a Spark Streaming and merge/upsert data into a Delta Lake. We will also optimize/cluster data of the delta table. In the end, we will show how to start a streaming pipeline with the previous target table as the source. Overall, the process works in the following manner, we read data from a streaming source and use this special function foreachBatch. Using this we will call any user-defined function responsible for all the processing. This function encapsulates the Merge and Optimize to the target Delta table. First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc. for simplicity. Letā€™s generate a stream using the below. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 10,000 rows per second across 100 partitions. Generate streaming data at your desired rate generated_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 10 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” )) #display(generated_df) Parameters / Variables (Feel free to change as per your needs) target_table_name = “to_be_merged_into_table”check_point_location = f”/tmp/delta/{target_table_name}/_checkpoints/”join_column_name =”hash” Create an Empty Delta table so data could be merged into it spark.sql(f””” DROP TABLE IF EXISTS {target_table_name};”””)( generated_df.writeStream .format(“delta”) .outputMode(“append”).trigger(once=True) .option(“checkpointLocation”, check_point_location) .toTable(target_table_name)) Check if data is populated display(spark.read.table(target_table_name)) A user-defined function which does the data processing, Merge & Optimize def make_changes_using_the_micro_batch(microBatchOutputDF, batchId: int): print(f”Processing batchId: {batchId}”) microBatchOutputDF.createOrReplaceTempView(“updates”) spark_session_for_this_micro_batch = microBatchOutputDF._jdf.sparkSession() spark_session_for_this_micro_batch.sql(f””” SELECT * FROM ( select * ,rank() over(partition by {join_column_name} order by value desc) as dedupe from updates ) WHERE dedupe =1 “””).drop(“dedupe”).createOrReplaceTempView(“updates_which_need_to_be_merged”) spark_session_for_this_micro_batch.sql(f””” MERGE INTO {target_table_name} target using updates_which_need_to_be_merged u on u.{join_column_name} = target.{join_column_name} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) optimize_every_n_batches = 20 #Define how often should optimize run? for example: at 50, it means that we will run the optimize command every 50 batches of stream data if batchId % optimize_every_n_batches == 0: optimize_and_zorder_table(table_name = target_table_name, zorder_by_col_name = join_column_name) Optimize/ Z-order a Delta table Why do we need to optimize a table? If we keep adding files to our Delta table and never optimize/sort them then over time we need to read a lot of files during merge time. Thus, optimizing the Delta table after every N merges is better. N needs to be decided on your latency requirements. You could start with N as 10 and change it as per your needs. The below code will run an optimize and zorder command on a given table that is being fed by a stream. Optimize commands canā€™t run in a silo because it will require us to pause and then resume the stream. Therefore, we need to call this function a part of the upsert function. This enables us to optimize before the next batch of streaming data comes through. from timeit import default_timer as timer def optimize_and_zorder_table(table_name: str, zorder_by_col_name: str) -> None: “”” Parameters: table_name: str name of the table to be optimized zorder_by_col_name: str comma separated list of columns to zorder by. example “col_a, col_b, col_c” “”” start = timer() print(f”Met condition to optimize table {table_name}”) sql_query_optimize = f”OPTIMIZE {table_name} ZORDER BY ({zorder_by_col_name})” spark.sql(sql_query_optimize) end = timer() time_elapsed_seconds = end – start print( f”Successfully optimized table {table_name} . Total time elapsed: {time_elapsed_seconds} seconds” ) Orchestrate from readStream -> Merge -> Optimize ( generated_df .writeStream.format(‘delta’) .trigger(processingTime=’30 seconds’) .option(“checkpointLocation”, check_point_location) .foreachBatch(make_changes_using_the_micro_batch) .start()) If you have reached so far, you should have an end-to-end pipeline working with streaming data and merging data into a Delta table. As the next step, letā€™s use the previous target table as our new streaming source. Use the target table as a source for the next streaming pipeline Change data feed allows Databricks to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records change events for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated. Reference: https://docs.databricks.com/delta/delta-change-data-feed.html#use-delta-lake-change-data-feed-on-databricks spark.sql(f”’ALTER TABLE {target_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed=true)”’) Reading change data as a stream display( spark.readStream.format(“delta”) .option(“readChangeFeed”, “true”) .table(target_table_name)) Download this notebook Spark Streaming Using For Each Batch & Merge.html Edit description drive.google.com Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Donā€™t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference https://www.youtube.com/watch?v=CLDcdVDupMg

Blog

Using Spark Streaming to merge/upsert data into a Delta Lake with working code

This blog will discuss how to read from a Spark Streaming and merge/upsert data into a Delta Lake. We will also optimize/cluster data of the delta table. In the end, we will show how to start a streaming pipeline with the previous target table as the source. Overall, the process works in the following manner, we read data from a streaming source and use this special functionĀ foreachBatch.Ā Using this we will call any user-defined function responsible for all the processing. This function encapsulates theĀ MergeĀ andĀ OptimizeĀ to the target Delta table. First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc. for simplicity. Letā€™s generate a stream using the below. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 10,000 rows per second across 100 partitions. Generate streaming data at your desired rate generated_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 10 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5” ,”value” ,”value%1000000 as hash” ) ) #display(generated_df) Parameters / Variables (Feel free to change as per your needs) target_table_name = “to_be_merged_into_table” check_point_location = f”/tmp/delta/{target_table_name}/_checkpoints/” join_column_name =”hash” Create an Empty Delta table so data could be merged into it spark.sql(f””” DROP TABLE IF EXISTS {target_table_name};”””) ( generated_df.writeStream .format(“delta”) .outputMode(“append”).trigger(once=True) .option(“checkpointLocation”, check_point_location) .toTable(target_table_name) ) Check if data is populated display(spark.read.table(target_table_name)) A user-defined function which does the data processing, Merge & Optimize def make_changes_using_the_micro_batch(microBatchOutputDF, batchId: int): print(f”Processing batchId: {batchId}”) microBatchOutputDF.createOrReplaceTempView(“updates”) spark_session_for_this_micro_batch = microBatchOutputDF._jdf.sparkSession() spark_session_for_this_micro_batch.sql(f””” SELECT * FROM ( select * ,rank() over(partition by {join_column_name} order by value desc) as dedupe from updates ) WHERE dedupe =1 “””).drop(“dedupe”).createOrReplaceTempView(“updates_which_need_to_be_merged”) spark_session_for_this_micro_batch.sql(f””” MERGE INTO {target_table_name} target using updates_which_need_to_be_merged u on u.{join_column_name} = target.{join_column_name} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) optimize_every_n_batches = 20 #Define how often should optimize run? for example: at 50, it means that we will run the optimize command every 50 batches of stream data if batchId % optimize_every_n_batches == 0: optimize_and_zorder_table(table_name = target_table_name, zorder_by_col_name = join_column_name) Optimize/ Z-order a Delta table Why do we need to optimize a table? If we keep adding files to our Delta table and never optimize/sort them then over time we need to read a lot of files during merge time. Thus, optimizing the Delta table after every N merges is better. N needs to be decided on your latency requirements. You could start with N as 10 and change it as per your needs. The below code will run an optimize and zorder command on a given table that is being fed by a stream. Optimize commands canā€™t run in a silo because it will require us to pause and then resume the stream. Therefore, we need to call this function a part of the upsert function. This enables us to optimize before the next batch of streaming data comes through. from timeit import default_timer as timer def optimize_and_zorder_table(table_name: str, zorder_by_col_name: str) -> None: “”” Parameters: table_name: str name of the table to be optimized zorder_by_col_name: str comma separated list of columns to zorder by. example “col_a, col_b, col_c” “”” start = timer() print(f”Met condition to optimize table {table_name}”) sql_query_optimize = f”OPTIMIZE {table_name} ZORDER BY ({zorder_by_col_name})” spark.sql(sql_query_optimize) end = timer() time_elapsed_seconds = end – start print( f”Successfully optimized table {table_name} . Total time elapsed: {time_elapsed_seconds} seconds” ) Orchestrate from readStream -> Merge -> Optimize ( generated_df .writeStream.format(‘delta’) .trigger(processingTime=’30 seconds’) .option(“checkpointLocation”, check_point_location) .foreachBatch(make_changes_using_the_micro_batch) .start() ) If you have reached so far, you should have an end-to-end pipeline working with streaming data and merging data into a Delta table. As the next step, letā€™s use the previous target table as our new streaming source. Use the target table as a source for the next streaming pipeline Change data feed allows Databricks to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records change events for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated. Reference:Ā https://docs.databricks.com/delta/delta-change-data-feed.html#use-delta-lake-change-data-feed-on-databricks spark.sql(f”’ ALTER TABLE {target_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed=true) ”’) Reading change data as a stream display( spark.readStream.format(“delta”) .option(“readChangeFeed”, “true”) .table(target_table_name) ) Download this notebook Spark Streaming Using For Each Batch & Merge.html Edit description drive.google.com Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Donā€™t forget to follow me for more insightful content, and visit my websiteĀ CanadianDataGuy.comĀ for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference https://www.youtube.com/watch?v=CLDcdVDupMg

Best Practices, Blog, Spark, Stream

Learnings from the Field: How to Give Your Spark Streaming Jobs a 15x Speed Boost Using the Lesser-Known Parameter

Introduction: In the realm of big data processing, where efficiency and speed are paramount, Apache Spark shines as a potent tool. Yet, the true power of Spark often lies in the nuances of its configuration, particularly in a parameter that might not catch the eye at first glance: spark.sql.files.maxPartitionBytes. This blog unveils how a subtle tweak to this parameter can dramatically amplify the performance of your Spark Streaming jobs, offering up to a 15x speed boost. The Default Behavior ā€” The Large Bucket Dilemma: Imagine youā€™re at a water park, trying to fill a massive pool using several hoses. Each hose fills a large 128 MB bucket before emptying it into the pool. This is akin to Sparkā€™s default behavior, where each core (or hose) processes data up to 128 MB before moving it further down the pipeline. While this method works, itā€™s not the most efficient, especially when dealing with numerous smaller files. The large bucket size could lead to slower fill times, underutilizing the hoses and delaying the poolā€™s completion if you can aquire more hoses(cores). Real-World Implications ā€” The Need for More Buckets: Consider a scenario where a business relies on Spark Streaming for real-time data analysis. They notice the data processing isnā€™t as swift as expected, despite having ample computational resources. The issue? The oversized 128 MB buckets. With such large buckets, each core is focused on filling its bucket to the brim before contributing to the pool, creating a bottleneck that hampers overall throughput. Adjusting for Performance The Shift to Smaller Buckets: To enhance efficiency, imagine switching to smaller buckets, allowing each hose to fill them more quickly and thus empty more buckets into the pool in the same amount of time. In Spark terms, reducing spark.sql.files.maxPartitionBytes enables the system to create more, smaller data partitions. This adjustment means data can be processed in parallel more effectively, engaging more cores (or hoses) and accelerating the pool-filling process – the data processing task at hand. Understanding the Trade-offs ā€” Finding the Right Bucket Size Opting for smaller buckets increases the number of trips to the pool, akin to Spark managing more partitions, which could introduce overhead from task scheduling and execution. However, too large buckets (or the default setting) might not leverage the full potential of your resources, leading to inefficiencies. The optimal bucket size (partition size) strikes a balance, ensuring each hose (core) contributes effectively without overwhelming the system with overhead. Best Practices ā€” Tuning Your Spark Application: To identify the ideal spark.sql.files.maxPartitionBytes setting, you’ll need to experiment with your specific workload. Monitor the performance impacts of different settings, considering factors like data processing speed, resource utilization, and job completion time. The goal is to maximize parallel processing while minimizing overhead, ensuring that your data processing “water park” operates at peak efficiency. Practical Implications Adjusting spark.sql.files.maxPartitionBytes can have profound effects on the behavior of Spark Streaming jobs: Note: This parameter only applies to file-based sources like an autoloader. Conclusion Adjusting spark.sql.files.maxPartitionBytes is akin to optimizing the bucket size in a massive, collaborative effort to fill a pool. This nuanced configuration can significantly enhance the performance of Spark Streaming jobs, allowing you to fully harness the capabilities of your computational resources. By understanding and fine-tuning this parameter, you can transform your data processing workflow, achieving faster, more efficient results that propel your big data initiatives forward. References and Insights Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Donā€™t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Blog, Delta

Understanding Delta Lake: A Technical Deep Dive

Delta Lake is a powerful open-source storage layer that brings ACID transactions, scalable metadata handling, and unified batch and streaming data processing to big data workloads. Itā€™s designed to improve data reliability and enable complex data processing workflows. This technical blog will blend the key features of Delta Lake with resources for a deeper understanding of how these features are achieved. The resources in this guide, from essential whitepapers to insightful video tutorials, were key to my mastery of Delta Lake, offering a deep dive into its architecture and practical applications, and equipping me with the knowledge to effectively utilize its features in real-world data scenarios. Key Features of Delta Lake ACID Transactions Delta Lake provides serializable isolation levels, ensuring that readers always see consistent data, even in the presence of concurrent writes. This is achieved through a transaction log that records details about every change made to the data Scalable Metadata Handling With the help of Sparkā€™s distributed processing power, Delta Lake can handle metadata for petabyte-scale tables, which may include billions of files and partitions. This scalability is crucial for managing large datasets efficiently Unified Batch and Streaming Data Processing Delta Lake tables serve as both batch tables and streaming sources/sinks, offering exactly-once semantics for data ingestion, backfill, and interactive queries. This unification simplifies the data pipeline and reduces the complexity of data processing Schema Evolution and Enforcement Delta Lake prevents the insertion of bad records during ingestion by enforcing schemas automatically. It also supports schema evolution, allowing for the addition of new columns to data tables without disrupting existing operations Time Travel (Data Versioning) Data versioning in Delta Lake enables rollbacks, full historical audit trails, and reproducible machine learning experiments. Users can access and revert to earlier versions of data for various purposes DML Operations Delta Lake supports merge, update, and delete operations, which are essential for use cases like change-data-capture (CDC) and slowly-changing-dimension (SCD) operations Deep Dive Resources To understand how Delta Lake achieves these features, the following resources provide in-depth technical knowledge: Lakehouse Storage Systems Whitepaper For a comprehensive technical understanding of Delta Lakeā€™s internals, the Lakehouse Storage Systems Whitepaper is invaluable. It explains the architecture and mechanisms that enable Delta Lakeā€™s features, such as ACID transactions and scalable metadata handling. Read the whitepaper here. Educational Videos Quick Overviews Real-World Use Cases To see Delta Lake in action, refer to The Delta Lake Series Complete Collection. This guide helps you understand various use cases and how Delta Lake addresses complex data challenges. Access it here. Conclusion Delta Lake is a sophisticated tool that addresses many of the challenges associated with big data processing and storage. By leveraging the resources provided, you can gain a deeper technical understanding of how Delta Lake ensures data reliability, consistency, and scalability. Whether youā€™re a data engineer, architect, or analyst, these insights will help you to effectively implement and utilize Delta Lake in your data solutions. Thank You for Reading! I hope you found this article helpful and informative. If you enjoyed this post, please consider giving it a clap šŸ‘ and sharing it with your network. Your support is greatly appreciated! ā€” CanadianDataGuy

Best Practices, Blog, forEachBatch, Spark, Stream

Streaming Any File Type with Autoloader in Databricks: A Working Guide

Spark Streaming has emerged as a dominant force as a streaming framework, known for its scalable, high-throughput, and fault-tolerant handling of live data streams. While Spark Streaming and Databricks Autoloader inherently support standard file formats like JSON, CSV, PARQUET, AVRO, TEXT, BINARYFILE, and ORC, their versatility extends far beyond these. This blog post delves into the innovative use of Spark Streaming and Databricks Autoloader for processing file types which are not natively supported. The Process Flow: In the below example is for ROS Bag but the same method could be translated for any other file type. Setting Up the Environment Firstly, we need to prepare our Databricks environment: # Databricks notebook source# MAGIC %pip install bagpydbutils.library.restartPython() We install bagpy, a Python library for ROS bag files, and restart the Python environment to ensure the library is properly loaded.Importing Necessary Libraries Next, we import the required Python libraries: from typing import List, Dictimport boto3import rosbagimport tempfilefrom pyspark.sql.functions import udf, explodefrom pyspark.sql.types import ArrayType, StructType, StructField, IntegerType, LongType, FloatTypefrom pyspark.sql import SparkSession These imports include standard data manipulation tools, AWS S3 access (boto3), ROS bag reading capabilities (rosbag), and necessary PySpark components. Detect new files and file path using Autoloader # Spark streaming setup for ROS bag filess3_data_path = “s3a://one-env/jiteshsoni/Vehicle/”table_name = “rosbag_imu”checkpoint_location = f”/tmp/checkpoint/{table_name}/”stream_using_autoloader_df = (spark.readStream .format(“cloudFiles”) .option(“cloudFiles.format”, “binaryfile”) .option(“cloudFiles.includeExistingFiles”, “true”) .load(s3_data_path) )display(stream_using_autoloader_df)Custom UDF to read & parse any file type The core function extract_rosbag_data reads data from a ROS bag file in an S3 bucket and returns a list of dictionaries containing the extracted data: def extract_rosbag_data(s3_rosbag_path: str) -> List[Dict]: “”” Extracts data from a ROS bag file stored in S3, converting it into a list of dictionaries. Args: s3_rosbag_path (str): The S3 path to the ROS bag file. Returns: List[Dict]: A list of dictionaries with data from the ROS bag. “”” interested_topics = [‘/ublox_trunk/ublox/esfalg’] extracted_data = [] # Extracting the S3 bucket and file key from the provided path bucket_name, s3_file_key = s3_rosbag_path.split(‘/’, 3)[2:4] # Using boto3 to download the ROS bag file into memory s3 = boto3.resource(‘s3’) obj = s3.Object(bucket_name, s3_file_key) file_stream = obj.get()[‘Body’].read() # Storing the downloaded file temporarily with tempfile.NamedTemporaryFile() as temp_file: temp_file.write(file_stream) temp_file.flush() # Reading messages from the ROS bag file with rosbag.Bag(temp_file.name, ‘r’) as bag: for topic, msg, timestamp in bag.read_messages(topics=interested_topics): message_data = {field: getattr(msg, field) for field in msg.__slots__} message_data[‘timestamp’] = timestamp.to_sec() extracted_data.append(message_data) return extracted_data This function uses boto3 to access the S3 bucket, reads the ROS bag file, and extracts the relevant data. At this point, we should test the function before we proceed. For your use case, you want to change this function to read your file type. extract_rosbag_data(s3_rosbag_path= “s3a://bucket_name/jiteshsoni/Vehicle/2023-08-04-16-30-24_63.bag”) Things to note here: In this example, I am downloading the file on the cluster which could be avoided depending if your file reader supports it. Defining the Data Schema Before ingesting data into Spark, define the schema that aligns with the data structure in ROS bags. This is important because Spark needs to know what schema to expect. # Define the schema that matches your ROS bag data structurerosbag_schema = ArrayType(StructType([ StructField(“Alpha”, LongType(), True), StructField(“Beta”, IntegerType(), True), StructField(“Gamma”, IntegerType(), True), StructField(“Delta”, IntegerType(), True), StructField(“Epsilon”, IntegerType(), True), StructField(“Zeta”, IntegerType(), True), StructField(“Eta”, IntegerType(), True), StructField(“Theta”, IntegerType(), True), StructField(“Iota”, FloatType(), True)]))# Creating a User Defined Function (UDF) for processing ROS bag filesprocess_rosbag_udf = udf(extract_rosbag_data, returnType=rosbag_schema) Now letā€™s test with if Autoloader & Parsing if custom UDF is working using the display command rosbag_stream_df = (stream_using_autoloader_df .withColumn(“rosbag_rows”, process_rosbag_udf(“path”)) .withColumn(“extracted_data”, explode(“rosbag_rows”)) .selectExpr(“extracted_data.*”, “_metadata.*”) )# Displaying the DataFramedisplay(rosbag_stream_df) Writing the Stream to a Delta Table Finally, we write the streaming data to a Delta table, enabling further processing or querying: streaming_write_query = ( rosbag_stream_df.writeStream .format(“delta”) .option(“mergeSchema”, “true”) .option(“queryName”, f”IngestFrom_{s3_data_path}_AndWriteTo_{table_name}”) .option(“checkpointLocation”, checkpoint_location) .trigger(availableNow=True) .toTable(table_name)) Best Practices & Considerations Thank You for Reading! I hope you found this article helpful and informative. If you enjoyed this post, please consider giving it a clap šŸ‘ and sharing it with your network. Your support is greatly appreciated! ā€” CanadianDataGuy

Blog, Stream, Streaming Aggregation

SPARK STREAM AGGREGATION: A BEGINNERā€™S JOURNEY WITH PRACTICAL CODE EXAMPLE

Welcome to the fascinating world of Spark Stream Aggregation! This blog is tailored for beginners, featuring hands-on code examples from a real-world scenario of processing vehicle data. I suggest reading the blog first without the code and then reading the code with the blog. Setting up Spark Configuration, Imports & Parameters First, letā€™s understand our setup. We configure the Spark environment to use RocksDB for state management, enhancing the efficiency of our stream processing: spark.conf.set(“spark.sql.streaming.stateStore.providerClass”, “com.databricks.sql.streaming.state.RocksDBStateStoreProvider”) !pip install faker_vehicle !pip install faker from faker import Faker from faker_vehicle import VehicleProvider from pyspark.sql import functions as F from pyspark.sql.types import StringType import uuid from pyspark.sql.streaming import StreamingQuery import logging logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(name)s – %(levelname)s – %(message)s’) logger = logging.getLogger(__name__) # define schema name and where should the table be stored schema_name = “test_streaming_joins” schema_storage_location = “/tmp/CHOOSE_A_PERMANENT_LOCATION/” target_table = f”{schema_name}.streaming_aggregation” checkpoint_location= f”{schema_storage_location}{target_table}/_checkpoint/”, silver_target_table= f”{schema_name}.silver_streaming” silver_checkpoint_locattion = f”{schema_storage_location}{silver_target_table}/_checkpoint/”, column_to_watermark_on = “timestamp” how_late_can_the_data_be = “3 minutes” create_schema_sql = f””” CREATE SCHEMA IF NOT EXISTS {schema_name} COMMENT ‘This is {schema_name} schema’ LOCATION ‘{schema_storage_location}’; “”” logger.info(f”Executing SQL: {create_schema_sql}”) spark.sql(create_schema_sql) Simulating Data Streams Using the Faker library, we simulate a vehicle data stream. This approach helps us create a realistic data processing environment that is crucial for our learning: # Using Faker to define functions for fake data generation fake = Faker() fake.add_provider(VehicleProvider) event_id = F.udf(lambda: str(uuid.uuid4()), StringType()) vehicle_year_make_model = F.udf(fake.vehicle_year_make_model) vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat) vehicle_make_model = F.udf(fake.vehicle_make_model) vehicle_make = F.udf(fake.vehicle_make) vehicle_model = F.udf(fake.vehicle_model) vehicle_year = F.udf(fake.vehicle_year) vehicle_category = F.udf(fake.vehicle_category) vehicle_object = F.udf(fake.vehicle_object) latitude = F.udf(fake.latitude) longitude = F.udf(fake.longitude) location_on_land = F.udf(fake.location_on_land) local_latlng = F.udf(fake.local_latlng) zipcode = F.udf(fake.zipcode) # COMMAND ———- # MAGIC %md # MAGIC ### Generate Streaming source data at your desired rate # COMMAND ———- # Generate streaming data def generated_vehicle_and_geo_df(rowsPerSecond: int =100, numPartitions: int = 10): logger.info(“Generating vehicle and geo data frame…”) return ( spark.readStream.format(“rate”) .option(“numPartitions”, numPartitions) .option(“rowsPerSecond”, rowsPerSecond) .load() .withColumn(“event_id”, event_id()) .withColumn(“vehicle_year_make_model”, vehicle_year_make_model()) .withColumn(“vehicle_year_make_model_cat”, vehicle_year_make_model_cat()) .withColumn(“vehicle_make_model”, vehicle_make_model()) .withColumn(“vehicle_make”, vehicle_make()) .withColumn(“vehicle_year”, vehicle_year()) .withColumn(“vehicle_category”, vehicle_category()) .withColumn(“vehicle_object”, vehicle_object()) .withColumn(“latitude”, latitude()) .withColumn(“longitude”, longitude()) .withColumn(“location_on_land”, location_on_land()) .withColumn(“local_latlng”, local_latlng()) .withColumn(“zipcode”, zipcode()) ) # You can uncomment the below display command to check if the code in this cell works display(generated_vehicle_and_geo_df()) Aggregation Modes in Spark Streaming Stream Aggregation in Spark lets us process continuous data streams. Our code demonstrates how to generate and aggregate streaming data. Spark Streaming provides three primary modes for data output during stream processing: Complete Mode Update Mode Each mode caters to different requirements, offering flexibility in streaming applications. Applying Aggregation in Practice Our code showcases these modes in action, applying them to our simulated vehicle data stream: def streaming_aggregation(rows_per_second: int = 100, num_partitions: int = 10, how_late_can_the_data_be :str = “30 minutes”, window_duration: str = “1 minutes”, checkpoint_location: str = checkpoint_location, output_table_name: str = target_table) -> StreamingQuery: “”” Aggregate streaming data and write to a Delta table. Parameters: – rows_per_second (int): Number of rows per second for generated data. – num_partitions (int): Number of partitions for the generated data. – window_duration (str): Window duration for aggregation. – checkpoint_location (str): Path for checkpointing. – output_table_name (str): Name of the output Delta table. Returns: – StreamingQuery: Spark StreamingQuery object. “”” logger.info(“Starting streaming aggregation…”) raw_stream = generated_vehicle_and_geo_df(rows_per_second, num_partitions) aggregated_data = ( raw_stream .withWatermark(column_to_watermark_on, how_late_can_the_data_be) .groupBy( F.window(column_to_watermark_on, window_duration), “zipcode” ) .agg( F.min(“vehicle_year”).alias(“oldest_vehicle_year”) ) ) query = ( aggregated_data .writeStream .queryName(f”write_stream_to_delta_table: {output_table_name}”) .format(“delta”) .option(“checkpointLocation”, checkpoint_location) .outputMode(“append”) .toTable(output_table_name) # This actually starts the streaming job ) logger.info(f”Streaming query started with ID: {query.id}”) logger.info(f”Current status of the query: {query.status}”) logger.info(f”Recent progress updates: {query.recentProgress}”) # If you want to programmatically stop the query after some condition or time, you can do so. # For the sake of this example, I am NOT stopping it. But if you need to, you can invoke: # query.stop() # logger.info(“Streaming query stopped.”) return query streaming_aggregation( checkpoint_location = checkpoint_location, output_table_name = target_table, how_late_can_the_data_be = “5 minutes” ) What is most critical to understand is the Watermarking column. Best Practices and Considerations Download the code https://github.com/jiteshsoni/material_for_public_consumption/blob/main/notebooks/spark_stream_aggregations.py Conclusion This blog provides a beginner-friendly introduction to Spark Stream Aggregation, supported by practical code examples. Dive in and explore the world of real-time data processing with Spark! Thank You for Reading! I hope you found this article helpful and informative. If you enjoyed this post, please consider giving it a clap šŸ‘ and sharing it with your network. Your support is greatly appreciated! ā€”Ā **CanadianDataGuy**

Blog, Delta

SOLVING DELTA TABLE CONCURRENCY ISSUES

Delta Lake is a powerful technology for bringing ACID transactions to your data lakes. It allows multiple operations to be performed on a dataset concurrently. However, dealing with concurrent operations can sometimes be tricky and may lead to issues such asĀ ConcurrentAppendException,Ā ConcurrentDeleteReadException,Ā andĀ ConcurrentDeleteDeleteException.Ā In this blog post, we will explore why these issues occur and how to handle them effectively using a Python function, and how to avoid them with table design and using isolation levels and write conflicts. Why Do These Issues Happen? Understanding Isolation Levels: Serializable vs. WriteSerializable Isolation levels in a database control how much transactions are protected from each otherā€™s changes. Delta Lake on Databricks offers two such levels: Serializable and WriteSerializable. 1. Serializable: ā€” This is the highest level of isolation. ā€” It ensures that all write and read operations are done in a specific order, just like how they appear in the tableā€™s history. ā€” This means operations are carried out one by one, maintaining the order and ensuring the final result is as expected. 2. WriteSerializable (Default): ā€” This level is a bit more relaxed compared to Serializable. ā€” It guarantees order only for write operations, not for reads. ā€” Even though itā€™s more relaxed, itā€™s still more strict than the Snapshot isolation level. ā€” This level is used by default as it offers a good mix of data consistency and availability for most operations. Solution 1: Setting the Isolation Level: Solution 2: Avoiding Conflicts Using Partitioning and Disjoint Command Conditions When working with tables, sometimes two operations can clash or conflict, especially if they are working on the same set of files. This can cause problems and errors. But, thereā€™s a way to avoid this! You can organize or partition your table based on certain columns that are often used in operations. This way, different operations work on different parts of the table, preventing them from clashing. For example, imagine two commands ā€” one is updating the table for dates after January 1, 2010, and another is deleting from the table for dates before January 1, 2010. These two can clash if the table is not organized by date, as both might try to change the same files. But if you partition the table by date, these operations wonā€™t conflict, making things smooth and error-free. However, be careful while choosing the column for partitioning. If you choose a column that has a lot of unique values, it can create a large number of subdirectories. This can lead to other issues, affecting the performance of operations on the table. By using these strategies and understanding the insights from Databricks regarding isolation levels, row-level concurrency, and write conflicts, you can make your Delta operations more robust, reliable, and efficient. Solution 3: Code block with exponential retry The Python code below offers a robust solution to address this challenge. It is designed to manage concurrent write operations to a Delta table or path by intelligently retrying the operation in the event of specific concurrent exceptions. Streaming_write_with_concurrent_retry takes parameters such as the data stream, maximum attempts, and others to provide flexibility and control. It employs a while loop to attempt the write operation continuously and waits for its completion. In case of concurrent exceptions, it increments the attempt counter and calculates the sleep time using an exponential backoff strategy before retrying the operation. This approach ensures that the write operation is eventually successful, providing reliability and efficiency in handling concurrent operations on Delta tables. Explore the code below to understand its workings and integrate it into your projects to enhance concurrent operations handling. from datetime import datetime from time import sleep from delta.exceptions import ( ConcurrentAppendException, ConcurrentDeleteReadException, ConcurrentDeleteDeleteException, ) import math def streaming_write_with_concurrent_retry( stream, max_attempts=3, indefinite=False, table=None, path=None ): “”” Handles concurrent write operations to a Delta table or path by retrying the operation in case of specific concurrent exceptions. :param stream: The data stream to be written. :param max_attempts: The maximum number of retry attempts. Default is 3. :param indefinite: If True, will keep retrying indefinitely. Default is False. :param table: The Delta table to write to. :param path: The path to write to. :return: The result of writer.awaitTermination(). “”” attempt = 0 # Initialize attempt counter while True: try: # Choose the writer based on whether table or path is provided if table: writer = stream.table(table) elif path: writer = stream.start(path) else: writer = stream.start() # Attempt to write and wait for termination return writer.awaitTermination() # Handle concurrent exceptions except ( ConcurrentAppendException, ConcurrentDeleteReadException, ConcurrentDeleteDeleteException, ) as e: # Increment attempt counter attempt += 1 # If indefinite is False and attempts have reached max_attempts, raise the exception if not indefinite and attempt >= max_attempts: raise e from None # Calculate sleep time using exponential backoff strategy sleep_time = min(120, math.pow(2, attempt)) # Sleep for the calculated time before retrying sleep(sleep_time) Solution 4: Row-Level Concurrency (Advanced Feature)? Available only on Delta tables with deletion vectors enabled and on Photon-enabled compute running Databricks Runtime 14.0 and above. Reference Isolation levels and write conflicts on DatabricksĀ *Learn about the isolation levels and potential conflicts when performing concurrent transactions on tables onā€¦*docs.databricks.com Thank You for Reading! I hope you found this article helpful and informative. If you enjoyed this post, please consider giving it a clap šŸ‘ and sharing it with your network. Your support is greatly appreciated! ā€”Ā **CanadianDataGuy**

Best Practices, Blog

Databricks SQL Dashboards Guide: Tips and Tricks to Master Them

Welcome to the world of Databricks SQL Dashboards! Youā€™re in the right place if you want to learn how to go beyond just building visualizations and add some tricks to your arsenal. This guide will walk you through creating, managing, and optimizing your Databricks SQL dashboards. 1. Getting Started with Viewing and Organizing Dashboards: 2. Tags are your friend 3. Cloning: Replicating Success: 4. Harnessing the Power of Query Parameters: 5. Editing and Customizing Your Dashboard: 6. Keeping Your Data Fresh with Refreshes: 7. Stay Updated with Dashboard Subscriptions: 8. Managing and Optimizing Dashboards: In conclusion, Databricks SQL dashboards offer a versatile data visualization and analysis platform. With this step-by-step guide, youā€™re well on your way to mastering the art of creating, managing, and optimizing your dashboards. Dive in and explore the world of data with Databricks!

Best Practices, Blog

Optimizing Databricks SQL: Achieving Blazing-Fast Query Speeds at Scale

In this data age, delivering a seamless user experience is paramount. While there are numerous ways to measure this experience, one metric stands tall when evaluating the responsiveness of applications and databases: the P99 latency. Especially vital for SQL queries, this seemingly esoteric number is, in reality, a powerful gauge of the experience we provide to our customers. Why is it so crucial? And how can we optimize it to ensure our databases arenā€™t just fast, but consistently reliable for 99% of our users? Join us as we demystify P99 latency and delve into strategies to fine-tune it in Databricks SQL. What is P99 Latency? The P99 latency (also known as the 99th percentile latency) for SQL queries is a metric used to measure the response time of SQL queries in a database system. It represents the latency at which 99% of the queries have a response time less than or equal to the P99 latency value, and 1% have a response time greater than the P99 latency value. In other words, P99 latency helps you understand the worst-case response time for most of your SQL queries. It is often used to evaluate the performance of a database system and ensure that the vast majority of queries are responding quickly, even under heavy load. For example, if the P99 latency for a particular SQL query is 100 milliseconds, it means that 99% of the time, that query will execute in 100 milliseconds or less. However, in 1% of cases, it may take longer than 100 milliseconds. To achieve a P99 latency of 5 seconds in Databricks SQL, you can follow these steps: If you need to power an application with minimum latency, itā€™s possible to pre-cache data using specific commands. However, itā€™s important to take caution while using these commands as misconfiguration can cause more harm than good. Itā€™s recommended to reach out to me or your Databricks representative for the command and have the scenario validated with Databricks before implementing it. I have not included the command in the blog to avoid any mishaps. Reference: 3. https://www.youtube.com/watch?v=rJDkfRPUebw&t=629s ļ»æ

Blog, forEachBatch, Spark, Stream

Simplifying Real-time Data Processing with Spark Streamingā€™s foreachBatch with working code

Comprehensive guide to implementing a fully operational Streaming Pipeline that can be tailored to your specific needs. In this working example, you will learn how to parameterize the ForEachBatch function. Spark Streaming & foreachBatch Spark Streaming is a powerful tool for processing streaming data. It allows you to process data as it arrives, without having to wait for the entire dataset to be available. This can be very useful for applications that need to respond to changes in data in real time. One of the features of Spark Streaming is the foreachBatch() method. This method allows you to apply a custom function to each batch of data as it arrives. This can be useful for a variety of tasks, such as: The foreachBatch() method is a powerful tool that can be used to extend the capabilities of Spark Streaming. In this blog post, we will take a closer look at how to use foreachBatch(). Introducing foreachBatch: foreachBatch is a method provided by Spark Streaming that allows developers to apply arbitrary operations on the output of a streaming query. It acts as a bridge between the streaming world and the structured world of DataFrames and Datasets. This means that we can leverage the rich functionality of Sparkā€™s structured APIs to process real-time data efficiently. The Power of foreachBatch: The foreachBatch operation enables developers to perform batch-like operations on streaming data. Instead of processing each individual record, which can be inefficient, foreachBatch processes the data in micro-batches, offering better performance and resource utilization. This approach also provides the flexibility to leverage the full power of Sparkā€™s DataFrames, including various transformations and aggregations, to perform complex computations on streaming data. Implementing foreachBatch: To use foreachBatch, you need to define a function that takes two arguments: the batch identifier and the DataFrame representing the micro-batch of data. Inside this function, you can apply any transformations or computations required on the streaming data. You can use Sparkā€™s SQL, DataFrame, or Dataset APIs to manipulate the data and write the results to any external systems, such as databases or file systems. Benefits of foreachBatch: Code & Setup Hereā€™s how we can use foreachBatch to achieve this: āˆ˜ Define parameters for the job āˆ˜ Create a Streaming source āˆ˜ Define custom processing logic and parameters āˆ˜ Create an instance of forEachBatchProcessor Class with the parameters āˆ˜ Orchestrate the job āˆ˜ Look at the output table āˆ˜ Clean Up Define parameters for the job target_table_name = “for_each_batch_paramerterize” check_point_location = f”/tmp/delta/{target_table_name}/_checkpoints/” dedupe_colum_name =”hash” Create a Streaming source We will create a synthetic dataset. generated_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 4) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” ) ) Define custom processing logic and parameters class forEachBatchProcessor: def __init__(self, dedupe_column: str, filter_criteria:str, passed_value: int): self.dedupe_column = dedupe_column self.filter_criteria = filter_criteria self.passed_value = passed_value def print_attributes(self): attributes = vars(self) print( “\n”.join([f”{attr}: {value}” for attr, value in attributes.items()]) ) def make_changes_using_the_micro_batch(self, microBatchOutputDF, batchId: int): self.print_attributes() print(f”Processing batchId: {batchId}”) # Your processing logic using the parameter view_name = f”updates_for_batchId_{batchId}” microBatchOutputDF.createOrReplaceTempView(view_name) sql_logic = f””” SELECT * ,{self.passed_value} as passed_value ,{batchId} as batch_id FROM ( SELECT * ,rank() over(partition by {self.dedupe_column} order by value desc) as dedupe FROM {view_name} WHERE {self.filter_criteria} ) WHERE dedupe =1 “”” print(f”Processing sql_logic: {sql_logic}”) to_be_written_df = microBatchOutputDF.sparkSession.sql(sql_logic).drop(“dedupe”) to_be_written_df.write.mode(“append”).saveAsTable(target_table_name) Create an instance of forEachBatchProcessor Class with the parameters instantiateForEachBatchProcessor = forEachBatchProcessor( dedupe_column = dedupe_colum_name, filter_criteria = “1=1”, passed_value = 3 ) Orchestrate the job ( generated_df .writeStream #.trigger(availableNow=True) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location) .option(“queryName”, “ParameterizeForEachBatch”) .foreachBatch(instantiateForEachBatchProcessor.make_changes_using_the_micro_batch) .start() ) Look at the output table display(spark.read.table(target_table_name)) Clean Up spark.sql(f””” DROP TABLE IF EXISTS {target_table_name} “””) dbutils.fs.rm(check_point_location,True) Conclusion: Apache Spark Streamingā€™s foreachBatch operation is a powerful tool for simplifying real-time data processing. By bridging the gap between the streaming and structured worlds, it enables developers to perform batch-like operations on streaming data efficiently. Leveraging the rich functionality of Sparkā€™s DataFrames, foreachBatch empowers users to process and analyze real-time data with ease. Whether youā€™re performing aggregations, transformations, or writing data to external systems, foreachBatch offers a flexible and scalable solution for real-time streaming applications. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Donā€™t forget to follow me for more insightful content, and visit my website **CanadianDataGuy.com** for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Download the code I want to emphasize that my blog posts are designed to be practical resources that you can readily use in your own environments. By providing code examples with careful attention to best practices, I aim to simplify the implementation of real-time data processing solutions. I encourage you to explore the blog, copy the code snippets, and adapt them to your specific needs. With these resources, youā€™ll be equipped to accelerate your development process and unlock the power of Spark Streaming. Dive in, leverage the code, and start building your real-time data processing pipelines with confidence! Go Build! Canadian Data Guy!

Scroll to Top