Stream

Blog, Databricks, Delta, Delta Live Tables, kafka, Spark, Stream

Need for Speed: Benchmarking the Best Tools for Kafka to Delta Ingestion

Introduction Welcome back to the second installment of our series on data ingestion from Kafka to Delta tables on the Databricks platform. Building on our previous discussion about generating and streaming synthetic data to a Kafka topic. This blog post benchmarks three powerful options available on the Databricks platform for ingesting streaming data from Apache Kafka into Delta Lake: Databricks Jobs, Delta Live Tables (DLT), and Delta Live Tables Serverless (DLT Serverless). The primary objective is to evaluate and compare the end-to-end latency of these approaches when ingesting data from Kafka into Delta tables. Latency is a crucial metric, as it directly impacts the freshness and timeliness of data available for downstream analytics and decision-making processes. It’s important to note that all three tools leverage Apache Spark’s Structured Streaming under the hood. “Breaking the myth: Ingest from Kafka to Delta at scale in just 1.5 seconds with Delta Live Tables Serverless — up to 80% faster than traditional methods!” Benchmark Setup Criteria The key metric measured was latency — the duration from when a row is produced in Kafka to when it becomes available in Delta Lake. Latency was meticulously measured over an extended period to ensure precision and account for variability. Input Kafka Feed For our benchmarks, we utilized a Kafka feed churning out data at a rate of 100 rows per second, each approximately 1MB in size which is 100 MB/Second . Annually, this sums up to a staggering 3.15 petabytes, making it a rigorous testbed for evaluating the ingestion capabilities of our selected tools. I used Confluent Cloud to setup Kafka cluster with 6 partitions and it took less than 5 minutes and they gave me 300$ of credits for experimentation. Tools Compared How was latency measured? Latency is measured by calculating the time difference in milliseconds between the timestamps of consecutive streaming updates to a table. This is done by subtracting the timestamp of a previous update from the timestamp of the current update for each sequential commit, allowing an analysis of how long each update takes to process relative to the previous one. The analysis is currently limited to the last 300 commits, but this number can be adjusted as needed. from pyspark.sql import DataFramedef run_analysis_about_latency(table_name: str) -> DataFrame: # SQL command text formatted as a Python multiline string sql_code = f””” — Define a virtual view of the table’s history WITH VW_TABLE_HISTORY AS ( — Describe the historical changes of the table DESCRIBE HISTORY {table_name} ), — Define a view to calculate the timestamp of the previous write operation VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP AS ( SELECT — Calculate the timestamp of the last write operation before the current one lag(timestamp) OVER ( PARTITION BY 1 ORDER BY version ) AS previous_write_timestamp, timestamp, version FROM VW_TABLE_HISTORY WHERE operation = ‘STREAMING UPDATE’ ), — Define a view to analyze the time difference between consecutive commits VW_BOUND_ANALYSIS_TO_N_COMMITS AS ( SELECT — Calculate the time difference in milliseconds between the previous and current write timestamps TIMESTAMPDIFF( MILLISECOND, previous_write_timestamp, timestamp ) AS elapsed_time_ms FROM VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP ORDER BY version DESC LIMIT 300 — Analyze only the last 300 commits ) — Calculate various statistics about the write latency SELECT avg(elapsed_time_ms) AS average_write_latency, percentile_approx(elapsed_time_ms, 0.9) AS p90_write_latency, percentile_approx(elapsed_time_ms, 0.95) AS p95_write_latency, percentile_approx(elapsed_time_ms, 0.99) AS p99_write_latency, max(elapsed_time_ms) AS maximum_write_latency FROM VW_BOUND_ANALYSIS_TO_N_COMMITS “”” # Execute the SQL query using Spark’s SQL module display(spark.sql(sql_code)) Data Ingestion This code sets up a streaming data pipeline using Apache Spark to efficiently ingest data from a Kafka topic. It defines a schema tailored to the expected data types and columns in the Kafka messages, including vehicle details, geographic coordinates, and text fields.The read_kafka_stream function initializes the streaming process, configuring secure and reliable connections to Kafka, subscribing to the specified topic, and handling data across multiple partitions for improved processing speed. The stream decodes JSON-formatted messages according to the defined schema and extracts essential metadata. from pyspark.sql.types import StructType, StringType, FloatTypefrom pyspark.sql.functions import *# Define the schema based on the DataFrame structure you are writing to Kafkaschema = StructType() \ .add(“event_id”, StringType()) \ .add(“vehicle_year_make_model”, StringType()) \ .add(“vehicle_year_make_model_cat”, StringType()) \ .add(“vehicle_make_model”, StringType()) \ .add(“vehicle_make”, StringType()) \ .add(“vehicle_model”, StringType()) \ .add(“vehicle_year”, StringType()) \ .add(“vehicle_category”, StringType()) \ .add(“vehicle_object”, StringType()) \ .add(“latitude”, StringType()) \ .add(“longitude”, StringType()) \ .add(“location_on_land”, StringType()) \ .add(“local_latlng”, StringType()) \ .add(“zipcode”, StringType()) \ .add(“large_text_col_1”, StringType()) \ .add(“large_text_col_2”, StringType()) \ .add(“large_text_col_3”, StringType()) \ .add(“large_text_col_4”, StringType()) \ .add(“large_text_col_5”, StringType()) \ .add(“large_text_col_6”, StringType()) \ .add(“large_text_col_7”, StringType()) \ .add(“large_text_col_8”, StringType()) \ .add(“large_text_col_9”, StringType())def read_kafka_stream(): kafka_stream = (spark.readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_tls ) .option(“subscribe”, topic ) .option(“failOnDataLoss”,”false”) .option(“kafka.security.protocol”, “SASL_SSL”) .option(“kafka.sasl.mechanism”, “PLAIN”) .option(“kafka.sasl.jaas.config”, f’kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=”{kafka_api_key}” password=”{kafka_api_secret}”;’) .option(“minPartitions”,12) .load() .select(from_json(col(“value”).cast(“string”), schema).alias(“data”), “topic”, “partition”, “offset”, “timestamp”, “timestampType” ) .select(“topic”, “partition”, “offset”, “timestamp”, “timestampType”, “data.*”) ) return kafka_stream Explanation: This setup optimizes data ingestion from Kafka into Spark and prepares the data for further processing or integration into storage systems like Delta Lake. Additional code for Databricks Jobs Configuration: This method involves setting up a Databricks job and cluster resources, although it allows for flexible scheduling and monitoring of ingestion processes it but requires understanding of choosing the right compute. ( read_kafka_stream() .writeStream .option(“checkpointLocation”,checkpoint_location_for_delta) .trigger(processingTime=’1 second’) .toTable(target)) Additional code for Delta Live Tables Configuration: Delta Live Tables manage infrastructure automatically, providing a simpler, declarative approach to building data pipelines. This code snippet uses the Delta Live Tables (DLT) API to define a data table that ingests streaming data from Kafka. The @dlt.table decorator specifies the table’s name (to be replaced with your desired table name) and sets the pipeline to poll Kafka every second. This rapid polling supports near-real-time data processing needs. The function dlt_kafka_stream() calls read_kafka_stream(), integrating Kafka streaming directly into DLT for streamlined management and operation within the Databricks environmen @dlt.table(name=”REPLACE_DLT_TABLE_NAME_HERE”, spark_conf={“pipelines.trigger.interval” : “1 seconds”})def dlt_kafka_stream(): read_kafka_stream() Conclusion Our benchmarks show that Delta Live Tables Serverless stands out in latency performance and operational simplicity, making it highly suitable for scenarios with varying data loads. Meanwhile, Databricks Jobs and Delta Live Tables also offer viable solutions. Why Delta Live Tables Serverless Outperforms Standard Delta Live Tables A key factor contributing to the superior performance of Delta Live Tables Serverless over

Blog, Databricks, Spark, Stream

Synthetic Data Made Simple: Generating and Streaming Custom-Sized Data to Kafka

Introduction In the fast-paced world of data engineering, there often arises a need to generate large volumes of synthetic data for testing and benchmarking purposes. Recently, I was tasked with a crucial project: creating records of a specific size (1 MB each) and streaming them to Kafka for performance benchmarking. This blog post, the first in a two-part series, will walk you through how to generate such data using Python and Apache Spark, and then stream it to Kafka efficiently. Tomorrow, we’ll dive into Part 2, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. But first, let me share the story behind this endeavor. The Challenge: Preparing for Technology Decisions Imagine you’re part of a data engineering team at a rapidly growing tech startup. Your CTO has tasked you with benchmarking the expected speed of Kafka to Delta ingestion before making critical technology decisions. You quickly realize two things: The solution? Generating custom-sized fake data and using Confluent Cloud for a quick and hassle-free Kafka setup. Why Confluent Cloud? Setting up a Kafka cluster can be cumbersome, especially when dealing with security configurations and access permissions. AWS MSK is robust, but its setup can be daunting. Confluent Cloud, on the other hand, offers a quick setup process and provides $300 in free credits, making it perfect for quick experiments and testing. I had my Kafka instance up and running in just five minutes with Confluent Cloud. https://www.confluent.io/confluent-cloud/ Step-by-Step Guide Let’s dive into the code that helps you create synthetic data and push it to Kafka. Installing Necessary Packages First, install the required packages. Faker is a library that helps generate fake data, and faker_vehicle adds vehicle-specific data generation capabilities. # Databricks notebook source# MAGIC %pip install Faker faker_vehicle Importing Required Libraries Next, import the necessary libraries for data generation, streaming, and logging. from faker import Fakerfrom faker_vehicle import VehicleProviderfrom pyspark.sql import functions as Ffrom pyspark.sql.types import StringTypeimport uuidimport loggingfrom pyspark.sql.streaming import StreamingQueryfrom datetime import datetime Setting Parameters Define the parameters for Kafka configuration and checkpoint location. timestamp = datetime.now().strftime(“%Y%m%d%H%M%S”)checkpoint_location = f”/tmp/confluent_kafka_checkpoint_{timestamp}”# Kafka configurationtopic = “YOUR_TOPIC”kafka_bootstrap_servers_tls = “YOUR_KAFKA_URL.confluent.cloud:9092″kafka_api_key = “YOUR_KAFKA_API_KEY”kafka_api_secret = “YOUR_KAFKA_API_SECRET” Initialization and UDF Initialize Faker and add the vehicle provider. Configure logging for tracking the process. # Initialize Faker for data generation and add vehicle data providerfake = Faker()fake.add_provider(VehicleProvider)# Configure logginglogging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(name)s – %(levelname)s – %(message)s’)logger = logging.getLogger(__name__) Create User-Defined Functions (UDFs) for generating various fake data attributes. # User-defined functions (UDFs) for generating fake dataevent_id = F.udf(lambda: str(uuid.uuid4()), StringType())vehicle_year_make_model = F.udf(fake.vehicle_year_make_model)vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat)vehicle_make_model = F.udf(fake.vehicle_make_model)vehicle_make = F.udf(fake.vehicle_make)vehicle_model = F.udf(fake.vehicle_model)vehicle_year = F.udf(fake.vehicle_year)vehicle_category = F.udf(fake.vehicle_category)vehicle_object = F.udf(fake.vehicle_object)latitude = F.udf(fake.latitude)longitude = F.udf(fake.longitude)location_on_land = F.udf(fake.location_on_land)local_latlng = F.udf(fake.local_latlng)zipcode = F.udf(fake.zipcode) Function to Generate 1MB Row of Data Define a function to generate a DataFrame that simulates a row of data approximately 1 MB in size. @F.udf(StringType())def large_text_udf(size: int): “””Generate large text data with a specified size.””” return fake.text(max_nb_chars=size)# Configuration for large text datanum_large_columns = 10 # Number of large text columnssize_per_large_column = (1024 * 1024) // num_large_columns # Distribute 1MB across columns def generate_1mb_row_df(rowsPerSecond=10, numPartitions=2): “””Generate a DataFrame simulating streaming data, including vehicle and geographic data.””” logger.info(“Generating vehicle and geo data frame…”) df = spark.readStream.format(“rate”) \ .option(“numPartitions”, numPartitions) \ .option(“rowsPerSecond”, rowsPerSecond) \ .load() \ .withColumn(“event_id”, event_id()) \ .withColumn(“vehicle_year_make_model”, vehicle_year_make_model()) \ .withColumn(“vehicle_year_make_model_cat”, vehicle_year_make_model_cat()) \ .withColumn(“vehicle_make_model”, vehicle_make_model()) \ .withColumn(“vehicle_make”, vehicle_make()) \ .withColumn(“vehicle_model”, vehicle_model()) \ .withColumn(“vehicle_year”, vehicle_year()) \ .withColumn(“vehicle_category”, vehicle_category()) \ .withColumn(“vehicle_object”, vehicle_object()) \ .withColumn(“latitude”, latitude()) \ .withColumn(“longitude”, longitude()) \ .withColumn(“location_on_land”, location_on_land()) \ .withColumn(“local_latlng”, local_latlng()) \ .withColumn(“zipcode”, zipcode()) \ .withColumn(“large_text_col_1”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_2”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_3”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_4”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_5”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_6”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_7”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_8”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_9”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_10”, large_text_udf(F.lit(size_per_large_column))) return df You can test the above code by running the below command display(generate_1mb_row_df()) Streaming Data to Kafka Start streaming the generated data to Kafka. (generate_1mb_row_df(rowsPerSecond=100, numPartitions=12) .selectExpr(“CAST(event_id AS STRING) AS key”, “to_json(struct(*)) AS value”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_tls) .option(“kafka.security.protocol”, “SASL_SSL”) .option(“kafka.sasl.mechanism”, “PLAIN”) .option(“kafka.sasl.jaas.config”, f’kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=”{kafka_api_key}” password=”{kafka_api_secret}”;’) .option(“checkpointLocation”, checkpoint_location) .option(“topic”, topic) .option(“queryName”, f”SendDataToKafka-{topic}”) .option(“kafka.max.request.size”, “1100000”) # Setting new max request size to 1.1 MB .start()) The Confluent UI was able to verify that we are able to generate 100 MB/Second Conclusion This approach allows you to create custom-sized synthetic data and stream it to Kafka efficiently. By using Confluent Cloud, you can significantly reduce the setup time and complexity, enabling a more streamlined and efficient data generation and streaming process. Stay tuned for Part 2 of this series, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. Whether you’re testing, benchmarking, or exploring data streaming, this guide provides a solid foundation to get you started. Happy streaming! Download this notebook References For more details on Spark and Kafka integration, you can refer to the following documentation: These resources provide comprehensive information and examples to help you further understand and implement Spark and Kafka integration. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Blog, Delta, forEachBatch, Spark, Stream

Using Spark Streaming to merge/upsert data into a Delta Lake with working code

This blog will discuss how to read from a Spark Streaming and merge/upsert data into a Delta Lake. We will also optimize/cluster data of the delta table. In the end, we will show how to start a streaming pipeline with the previous target table as the source. Overall, the process works in the following manner, we read data from a streaming source and use this special function foreachBatch. Using this we will call any user-defined function responsible for all the processing. This function encapsulates the Merge and Optimize to the target Delta table. First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc. for simplicity. Let’s generate a stream using the below. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 10,000 rows per second across 100 partitions. Generate streaming data at your desired rate generated_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 10 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” )) #display(generated_df) Parameters / Variables (Feel free to change as per your needs) target_table_name = “to_be_merged_into_table”check_point_location = f”/tmp/delta/{target_table_name}/_checkpoints/”join_column_name =”hash” Create an Empty Delta table so data could be merged into it spark.sql(f””” DROP TABLE IF EXISTS {target_table_name};”””)( generated_df.writeStream .format(“delta”) .outputMode(“append”).trigger(once=True) .option(“checkpointLocation”, check_point_location) .toTable(target_table_name)) Check if data is populated display(spark.read.table(target_table_name)) A user-defined function which does the data processing, Merge & Optimize def make_changes_using_the_micro_batch(microBatchOutputDF, batchId: int): print(f”Processing batchId: {batchId}”) microBatchOutputDF.createOrReplaceTempView(“updates”) spark_session_for_this_micro_batch = microBatchOutputDF._jdf.sparkSession() spark_session_for_this_micro_batch.sql(f””” SELECT * FROM ( select * ,rank() over(partition by {join_column_name} order by value desc) as dedupe from updates ) WHERE dedupe =1 “””).drop(“dedupe”).createOrReplaceTempView(“updates_which_need_to_be_merged”) spark_session_for_this_micro_batch.sql(f””” MERGE INTO {target_table_name} target using updates_which_need_to_be_merged u on u.{join_column_name} = target.{join_column_name} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) optimize_every_n_batches = 20 #Define how often should optimize run? for example: at 50, it means that we will run the optimize command every 50 batches of stream data if batchId % optimize_every_n_batches == 0: optimize_and_zorder_table(table_name = target_table_name, zorder_by_col_name = join_column_name) Optimize/ Z-order a Delta table Why do we need to optimize a table? If we keep adding files to our Delta table and never optimize/sort them then over time we need to read a lot of files during merge time. Thus, optimizing the Delta table after every N merges is better. N needs to be decided on your latency requirements. You could start with N as 10 and change it as per your needs. The below code will run an optimize and zorder command on a given table that is being fed by a stream. Optimize commands can’t run in a silo because it will require us to pause and then resume the stream. Therefore, we need to call this function a part of the upsert function. This enables us to optimize before the next batch of streaming data comes through. from timeit import default_timer as timer def optimize_and_zorder_table(table_name: str, zorder_by_col_name: str) -> None: “”” Parameters: table_name: str name of the table to be optimized zorder_by_col_name: str comma separated list of columns to zorder by. example “col_a, col_b, col_c” “”” start = timer() print(f”Met condition to optimize table {table_name}”) sql_query_optimize = f”OPTIMIZE {table_name} ZORDER BY ({zorder_by_col_name})” spark.sql(sql_query_optimize) end = timer() time_elapsed_seconds = end – start print( f”Successfully optimized table {table_name} . Total time elapsed: {time_elapsed_seconds} seconds” ) Orchestrate from readStream -> Merge -> Optimize ( generated_df .writeStream.format(‘delta’) .trigger(processingTime=’30 seconds’) .option(“checkpointLocation”, check_point_location) .foreachBatch(make_changes_using_the_micro_batch) .start()) If you have reached so far, you should have an end-to-end pipeline working with streaming data and merging data into a Delta table. As the next step, let’s use the previous target table as our new streaming source. Use the target table as a source for the next streaming pipeline Change data feed allows Databricks to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records change events for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated. Reference: https://docs.databricks.com/delta/delta-change-data-feed.html#use-delta-lake-change-data-feed-on-databricks spark.sql(f”’ALTER TABLE {target_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed=true)”’) Reading change data as a stream display( spark.readStream.format(“delta”) .option(“readChangeFeed”, “true”) .table(target_table_name)) Download this notebook Spark Streaming Using For Each Batch & Merge.html Edit description drive.google.com Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference https://www.youtube.com/watch?v=CLDcdVDupMg

Best Practices, Blog, Spark, Stream

Learnings from the Field: How to Give Your Spark Streaming Jobs a 15x Speed Boost Using the Lesser-Known Parameter

Introduction: In the realm of big data processing, where efficiency and speed are paramount, Apache Spark shines as a potent tool. Yet, the true power of Spark often lies in the nuances of its configuration, particularly in a parameter that might not catch the eye at first glance: spark.sql.files.maxPartitionBytes. This blog unveils how a subtle tweak to this parameter can dramatically amplify the performance of your Spark Streaming jobs, offering up to a 15x speed boost. The Default Behavior — The Large Bucket Dilemma: Imagine you’re at a water park, trying to fill a massive pool using several hoses. Each hose fills a large 128 MB bucket before emptying it into the pool. This is akin to Spark’s default behavior, where each core (or hose) processes data up to 128 MB before moving it further down the pipeline. While this method works, it’s not the most efficient, especially when dealing with numerous smaller files. The large bucket size could lead to slower fill times, underutilizing the hoses and delaying the pool’s completion if you can aquire more hoses(cores). Real-World Implications — The Need for More Buckets: Consider a scenario where a business relies on Spark Streaming for real-time data analysis. They notice the data processing isn’t as swift as expected, despite having ample computational resources. The issue? The oversized 128 MB buckets. With such large buckets, each core is focused on filling its bucket to the brim before contributing to the pool, creating a bottleneck that hampers overall throughput. Adjusting for Performance The Shift to Smaller Buckets: To enhance efficiency, imagine switching to smaller buckets, allowing each hose to fill them more quickly and thus empty more buckets into the pool in the same amount of time. In Spark terms, reducing spark.sql.files.maxPartitionBytes enables the system to create more, smaller data partitions. This adjustment means data can be processed in parallel more effectively, engaging more cores (or hoses) and accelerating the pool-filling process – the data processing task at hand. Understanding the Trade-offs — Finding the Right Bucket Size Opting for smaller buckets increases the number of trips to the pool, akin to Spark managing more partitions, which could introduce overhead from task scheduling and execution. However, too large buckets (or the default setting) might not leverage the full potential of your resources, leading to inefficiencies. The optimal bucket size (partition size) strikes a balance, ensuring each hose (core) contributes effectively without overwhelming the system with overhead. Best Practices — Tuning Your Spark Application: To identify the ideal spark.sql.files.maxPartitionBytes setting, you’ll need to experiment with your specific workload. Monitor the performance impacts of different settings, considering factors like data processing speed, resource utilization, and job completion time. The goal is to maximize parallel processing while minimizing overhead, ensuring that your data processing “water park” operates at peak efficiency. Practical Implications Adjusting spark.sql.files.maxPartitionBytes can have profound effects on the behavior of Spark Streaming jobs: Note: This parameter only applies to file-based sources like an autoloader. Conclusion Adjusting spark.sql.files.maxPartitionBytes is akin to optimizing the bucket size in a massive, collaborative effort to fill a pool. This nuanced configuration can significantly enhance the performance of Spark Streaming jobs, allowing you to fully harness the capabilities of your computational resources. By understanding and fine-tuning this parameter, you can transform your data processing workflow, achieving faster, more efficient results that propel your big data initiatives forward. References and Insights Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Best Practices, Blog, forEachBatch, Spark, Stream

Streaming Any File Type with Autoloader in Databricks: A Working Guide

Spark Streaming has emerged as a dominant force as a streaming framework, known for its scalable, high-throughput, and fault-tolerant handling of live data streams. While Spark Streaming and Databricks Autoloader inherently support standard file formats like JSON, CSV, PARQUET, AVRO, TEXT, BINARYFILE, and ORC, their versatility extends far beyond these. This blog post delves into the innovative use of Spark Streaming and Databricks Autoloader for processing file types which are not natively supported. The Process Flow: In the below example is for ROS Bag but the same method could be translated for any other file type. Setting Up the Environment Firstly, we need to prepare our Databricks environment: # Databricks notebook source# MAGIC %pip install bagpydbutils.library.restartPython() We install bagpy, a Python library for ROS bag files, and restart the Python environment to ensure the library is properly loaded.Importing Necessary Libraries Next, we import the required Python libraries: from typing import List, Dictimport boto3import rosbagimport tempfilefrom pyspark.sql.functions import udf, explodefrom pyspark.sql.types import ArrayType, StructType, StructField, IntegerType, LongType, FloatTypefrom pyspark.sql import SparkSession These imports include standard data manipulation tools, AWS S3 access (boto3), ROS bag reading capabilities (rosbag), and necessary PySpark components. Detect new files and file path using Autoloader # Spark streaming setup for ROS bag filess3_data_path = “s3a://one-env/jiteshsoni/Vehicle/”table_name = “rosbag_imu”checkpoint_location = f”/tmp/checkpoint/{table_name}/”stream_using_autoloader_df = (spark.readStream .format(“cloudFiles”) .option(“cloudFiles.format”, “binaryfile”) .option(“cloudFiles.includeExistingFiles”, “true”) .load(s3_data_path) )display(stream_using_autoloader_df)Custom UDF to read & parse any file type The core function extract_rosbag_data reads data from a ROS bag file in an S3 bucket and returns a list of dictionaries containing the extracted data: def extract_rosbag_data(s3_rosbag_path: str) -> List[Dict]: “”” Extracts data from a ROS bag file stored in S3, converting it into a list of dictionaries. Args: s3_rosbag_path (str): The S3 path to the ROS bag file. Returns: List[Dict]: A list of dictionaries with data from the ROS bag. “”” interested_topics = [‘/ublox_trunk/ublox/esfalg’] extracted_data = [] # Extracting the S3 bucket and file key from the provided path bucket_name, s3_file_key = s3_rosbag_path.split(‘/’, 3)[2:4] # Using boto3 to download the ROS bag file into memory s3 = boto3.resource(‘s3’) obj = s3.Object(bucket_name, s3_file_key) file_stream = obj.get()[‘Body’].read() # Storing the downloaded file temporarily with tempfile.NamedTemporaryFile() as temp_file: temp_file.write(file_stream) temp_file.flush() # Reading messages from the ROS bag file with rosbag.Bag(temp_file.name, ‘r’) as bag: for topic, msg, timestamp in bag.read_messages(topics=interested_topics): message_data = {field: getattr(msg, field) for field in msg.__slots__} message_data[‘timestamp’] = timestamp.to_sec() extracted_data.append(message_data) return extracted_data This function uses boto3 to access the S3 bucket, reads the ROS bag file, and extracts the relevant data. At this point, we should test the function before we proceed. For your use case, you want to change this function to read your file type. extract_rosbag_data(s3_rosbag_path= “s3a://bucket_name/jiteshsoni/Vehicle/2023-08-04-16-30-24_63.bag”) Things to note here: In this example, I am downloading the file on the cluster which could be avoided depending if your file reader supports it. Defining the Data Schema Before ingesting data into Spark, define the schema that aligns with the data structure in ROS bags. This is important because Spark needs to know what schema to expect. # Define the schema that matches your ROS bag data structurerosbag_schema = ArrayType(StructType([ StructField(“Alpha”, LongType(), True), StructField(“Beta”, IntegerType(), True), StructField(“Gamma”, IntegerType(), True), StructField(“Delta”, IntegerType(), True), StructField(“Epsilon”, IntegerType(), True), StructField(“Zeta”, IntegerType(), True), StructField(“Eta”, IntegerType(), True), StructField(“Theta”, IntegerType(), True), StructField(“Iota”, FloatType(), True)]))# Creating a User Defined Function (UDF) for processing ROS bag filesprocess_rosbag_udf = udf(extract_rosbag_data, returnType=rosbag_schema) Now let’s test with if Autoloader & Parsing if custom UDF is working using the display command rosbag_stream_df = (stream_using_autoloader_df .withColumn(“rosbag_rows”, process_rosbag_udf(“path”)) .withColumn(“extracted_data”, explode(“rosbag_rows”)) .selectExpr(“extracted_data.*”, “_metadata.*”) )# Displaying the DataFramedisplay(rosbag_stream_df) Writing the Stream to a Delta Table Finally, we write the streaming data to a Delta table, enabling further processing or querying: streaming_write_query = ( rosbag_stream_df.writeStream .format(“delta”) .option(“mergeSchema”, “true”) .option(“queryName”, f”IngestFrom_{s3_data_path}_AndWriteTo_{table_name}”) .option(“checkpointLocation”, checkpoint_location) .trigger(availableNow=True) .toTable(table_name)) Best Practices & Considerations Thank You for Reading! I hope you found this article helpful and informative. If you enjoyed this post, please consider giving it a clap 👏 and sharing it with your network. Your support is greatly appreciated! — CanadianDataGuy

Blog, Stream, Streaming Aggregation

SPARK STREAM AGGREGATION: A BEGINNER’S JOURNEY WITH PRACTICAL CODE EXAMPLE

Welcome to the fascinating world of Spark Stream Aggregation! This blog is tailored for beginners, featuring hands-on code examples from a real-world scenario of processing vehicle data. I suggest reading the blog first without the code and then reading the code with the blog. Setting up Spark Configuration, Imports & Parameters First, let’s understand our setup. We configure the Spark environment to use RocksDB for state management, enhancing the efficiency of our stream processing: spark.conf.set(“spark.sql.streaming.stateStore.providerClass”, “com.databricks.sql.streaming.state.RocksDBStateStoreProvider”) !pip install faker_vehicle !pip install faker from faker import Faker from faker_vehicle import VehicleProvider from pyspark.sql import functions as F from pyspark.sql.types import StringType import uuid from pyspark.sql.streaming import StreamingQuery import logging logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(name)s – %(levelname)s – %(message)s’) logger = logging.getLogger(__name__) # define schema name and where should the table be stored schema_name = “test_streaming_joins” schema_storage_location = “/tmp/CHOOSE_A_PERMANENT_LOCATION/” target_table = f”{schema_name}.streaming_aggregation” checkpoint_location= f”{schema_storage_location}{target_table}/_checkpoint/”, silver_target_table= f”{schema_name}.silver_streaming” silver_checkpoint_locattion = f”{schema_storage_location}{silver_target_table}/_checkpoint/”, column_to_watermark_on = “timestamp” how_late_can_the_data_be = “3 minutes” create_schema_sql = f””” CREATE SCHEMA IF NOT EXISTS {schema_name} COMMENT ‘This is {schema_name} schema’ LOCATION ‘{schema_storage_location}’; “”” logger.info(f”Executing SQL: {create_schema_sql}”) spark.sql(create_schema_sql) Simulating Data Streams Using the Faker library, we simulate a vehicle data stream. This approach helps us create a realistic data processing environment that is crucial for our learning: # Using Faker to define functions for fake data generation fake = Faker() fake.add_provider(VehicleProvider) event_id = F.udf(lambda: str(uuid.uuid4()), StringType()) vehicle_year_make_model = F.udf(fake.vehicle_year_make_model) vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat) vehicle_make_model = F.udf(fake.vehicle_make_model) vehicle_make = F.udf(fake.vehicle_make) vehicle_model = F.udf(fake.vehicle_model) vehicle_year = F.udf(fake.vehicle_year) vehicle_category = F.udf(fake.vehicle_category) vehicle_object = F.udf(fake.vehicle_object) latitude = F.udf(fake.latitude) longitude = F.udf(fake.longitude) location_on_land = F.udf(fake.location_on_land) local_latlng = F.udf(fake.local_latlng) zipcode = F.udf(fake.zipcode) # COMMAND ———- # MAGIC %md # MAGIC ### Generate Streaming source data at your desired rate # COMMAND ———- # Generate streaming data def generated_vehicle_and_geo_df(rowsPerSecond: int =100, numPartitions: int = 10): logger.info(“Generating vehicle and geo data frame…”) return ( spark.readStream.format(“rate”) .option(“numPartitions”, numPartitions) .option(“rowsPerSecond”, rowsPerSecond) .load() .withColumn(“event_id”, event_id()) .withColumn(“vehicle_year_make_model”, vehicle_year_make_model()) .withColumn(“vehicle_year_make_model_cat”, vehicle_year_make_model_cat()) .withColumn(“vehicle_make_model”, vehicle_make_model()) .withColumn(“vehicle_make”, vehicle_make()) .withColumn(“vehicle_year”, vehicle_year()) .withColumn(“vehicle_category”, vehicle_category()) .withColumn(“vehicle_object”, vehicle_object()) .withColumn(“latitude”, latitude()) .withColumn(“longitude”, longitude()) .withColumn(“location_on_land”, location_on_land()) .withColumn(“local_latlng”, local_latlng()) .withColumn(“zipcode”, zipcode()) ) # You can uncomment the below display command to check if the code in this cell works display(generated_vehicle_and_geo_df()) Aggregation Modes in Spark Streaming Stream Aggregation in Spark lets us process continuous data streams. Our code demonstrates how to generate and aggregate streaming data. Spark Streaming provides three primary modes for data output during stream processing: Complete Mode Update Mode Each mode caters to different requirements, offering flexibility in streaming applications. Applying Aggregation in Practice Our code showcases these modes in action, applying them to our simulated vehicle data stream: def streaming_aggregation(rows_per_second: int = 100, num_partitions: int = 10, how_late_can_the_data_be :str = “30 minutes”, window_duration: str = “1 minutes”, checkpoint_location: str = checkpoint_location, output_table_name: str = target_table) -> StreamingQuery: “”” Aggregate streaming data and write to a Delta table. Parameters: – rows_per_second (int): Number of rows per second for generated data. – num_partitions (int): Number of partitions for the generated data. – window_duration (str): Window duration for aggregation. – checkpoint_location (str): Path for checkpointing. – output_table_name (str): Name of the output Delta table. Returns: – StreamingQuery: Spark StreamingQuery object. “”” logger.info(“Starting streaming aggregation…”) raw_stream = generated_vehicle_and_geo_df(rows_per_second, num_partitions) aggregated_data = ( raw_stream .withWatermark(column_to_watermark_on, how_late_can_the_data_be) .groupBy( F.window(column_to_watermark_on, window_duration), “zipcode” ) .agg( F.min(“vehicle_year”).alias(“oldest_vehicle_year”) ) ) query = ( aggregated_data .writeStream .queryName(f”write_stream_to_delta_table: {output_table_name}”) .format(“delta”) .option(“checkpointLocation”, checkpoint_location) .outputMode(“append”) .toTable(output_table_name) # This actually starts the streaming job ) logger.info(f”Streaming query started with ID: {query.id}”) logger.info(f”Current status of the query: {query.status}”) logger.info(f”Recent progress updates: {query.recentProgress}”) # If you want to programmatically stop the query after some condition or time, you can do so. # For the sake of this example, I am NOT stopping it. But if you need to, you can invoke: # query.stop() # logger.info(“Streaming query stopped.”) return query streaming_aggregation( checkpoint_location = checkpoint_location, output_table_name = target_table, how_late_can_the_data_be = “5 minutes” ) What is most critical to understand is the Watermarking column. Best Practices and Considerations Download the code https://github.com/jiteshsoni/material_for_public_consumption/blob/main/notebooks/spark_stream_aggregations.py Conclusion This blog provides a beginner-friendly introduction to Spark Stream Aggregation, supported by practical code examples. Dive in and explore the world of real-time data processing with Spark! Thank You for Reading! I hope you found this article helpful and informative. If you enjoyed this post, please consider giving it a clap 👏 and sharing it with your network. Your support is greatly appreciated! — **CanadianDataGuy**

Blog, forEachBatch, Spark, Stream

Simplifying Real-time Data Processing with Spark Streaming’s foreachBatch with working code

Comprehensive guide to implementing a fully operational Streaming Pipeline that can be tailored to your specific needs. In this working example, you will learn how to parameterize the ForEachBatch function. Spark Streaming & foreachBatch Spark Streaming is a powerful tool for processing streaming data. It allows you to process data as it arrives, without having to wait for the entire dataset to be available. This can be very useful for applications that need to respond to changes in data in real time. One of the features of Spark Streaming is the foreachBatch() method. This method allows you to apply a custom function to each batch of data as it arrives. This can be useful for a variety of tasks, such as: The foreachBatch() method is a powerful tool that can be used to extend the capabilities of Spark Streaming. In this blog post, we will take a closer look at how to use foreachBatch(). Introducing foreachBatch: foreachBatch is a method provided by Spark Streaming that allows developers to apply arbitrary operations on the output of a streaming query. It acts as a bridge between the streaming world and the structured world of DataFrames and Datasets. This means that we can leverage the rich functionality of Spark’s structured APIs to process real-time data efficiently. The Power of foreachBatch: The foreachBatch operation enables developers to perform batch-like operations on streaming data. Instead of processing each individual record, which can be inefficient, foreachBatch processes the data in micro-batches, offering better performance and resource utilization. This approach also provides the flexibility to leverage the full power of Spark’s DataFrames, including various transformations and aggregations, to perform complex computations on streaming data. Implementing foreachBatch: To use foreachBatch, you need to define a function that takes two arguments: the batch identifier and the DataFrame representing the micro-batch of data. Inside this function, you can apply any transformations or computations required on the streaming data. You can use Spark’s SQL, DataFrame, or Dataset APIs to manipulate the data and write the results to any external systems, such as databases or file systems. Benefits of foreachBatch: Code & Setup Here’s how we can use foreachBatch to achieve this: ∘ Define parameters for the job ∘ Create a Streaming source ∘ Define custom processing logic and parameters ∘ Create an instance of forEachBatchProcessor Class with the parameters ∘ Orchestrate the job ∘ Look at the output table ∘ Clean Up Define parameters for the job target_table_name = “for_each_batch_paramerterize” check_point_location = f”/tmp/delta/{target_table_name}/_checkpoints/” dedupe_colum_name =”hash” Create a Streaming source We will create a synthetic dataset. generated_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 4) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” ) ) Define custom processing logic and parameters class forEachBatchProcessor: def __init__(self, dedupe_column: str, filter_criteria:str, passed_value: int): self.dedupe_column = dedupe_column self.filter_criteria = filter_criteria self.passed_value = passed_value def print_attributes(self): attributes = vars(self) print( “\n”.join([f”{attr}: {value}” for attr, value in attributes.items()]) ) def make_changes_using_the_micro_batch(self, microBatchOutputDF, batchId: int): self.print_attributes() print(f”Processing batchId: {batchId}”) # Your processing logic using the parameter view_name = f”updates_for_batchId_{batchId}” microBatchOutputDF.createOrReplaceTempView(view_name) sql_logic = f””” SELECT * ,{self.passed_value} as passed_value ,{batchId} as batch_id FROM ( SELECT * ,rank() over(partition by {self.dedupe_column} order by value desc) as dedupe FROM {view_name} WHERE {self.filter_criteria} ) WHERE dedupe =1 “”” print(f”Processing sql_logic: {sql_logic}”) to_be_written_df = microBatchOutputDF.sparkSession.sql(sql_logic).drop(“dedupe”) to_be_written_df.write.mode(“append”).saveAsTable(target_table_name) Create an instance of forEachBatchProcessor Class with the parameters instantiateForEachBatchProcessor = forEachBatchProcessor( dedupe_column = dedupe_colum_name, filter_criteria = “1=1”, passed_value = 3 ) Orchestrate the job ( generated_df .writeStream #.trigger(availableNow=True) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location) .option(“queryName”, “ParameterizeForEachBatch”) .foreachBatch(instantiateForEachBatchProcessor.make_changes_using_the_micro_batch) .start() ) Look at the output table display(spark.read.table(target_table_name)) Clean Up spark.sql(f””” DROP TABLE IF EXISTS {target_table_name} “””) dbutils.fs.rm(check_point_location,True) Conclusion: Apache Spark Streaming’s foreachBatch operation is a powerful tool for simplifying real-time data processing. By bridging the gap between the streaming and structured worlds, it enables developers to perform batch-like operations on streaming data efficiently. Leveraging the rich functionality of Spark’s DataFrames, foreachBatch empowers users to process and analyze real-time data with ease. Whether you’re performing aggregations, transformations, or writing data to external systems, foreachBatch offers a flexible and scalable solution for real-time streaming applications. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website **CanadianDataGuy.com** for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Download the code I want to emphasize that my blog posts are designed to be practical resources that you can readily use in your own environments. By providing code examples with careful attention to best practices, I aim to simplify the implementation of real-time data processing solutions. I encourage you to explore the blog, copy the code snippets, and adapt them to your specific needs. With these resources, you’ll be equipped to accelerate your development process and unlock the power of Spark Streaming. Dive in, leverage the code, and start building your real-time data processing pipelines with confidence! Go Build! Canadian Data Guy!

Databricks, Delta, Spark, Stream, Stream-Stream

How to write your first Spark application with Stream-Stream Joins with working code.

Have you been waiting to try Streaming but cannot take the plunge? In a single blog, we will teach you whatever needs to be understood about Streaming Joins. We will give you a working code which you can use for your next Streaming Pipeline. The steps involved: Create a fake dataset at scaleSet a baseline using traditional SQLDefine Temporary Streaming ViewsInner Joins with optional WatermarkingLeft Joins with WatermarkingThe cold start edge case: withEventTimeOrderCleanup What is Stream-Stream Join? Stream-stream join is a widely used operation in stream processing where two or more data streams are joined based on some common attributes or keys. It is essential in several use cases, such as real-time analytics, fraud detection, and IoT data processing. Concept of Stream-Stream Join Stream-stream join combines two or more streams based on a common attribute or key. The join operation is performed on an ongoing basis, with each new data item from the stream triggering a join operation. In stream-stream join, each data item in the stream is treated as an event, and it is matched with the corresponding event from the other stream based on matching criteria. This matching criterion could be a common attribute or key in both streams. When it comes to joining data streams, there are a few key challenges that must be addressed to ensure successful results. One of the biggest hurdles is the fact that, at any given moment, neither stream has a complete view of the dataset. This can make it difficult to find matches between inputs and generate accurate join results. To overcome this challenge, it’s important to buffer past input as a streaming state for both input streams. This allows for every future input to be matched with past input, which can help to generate more accurate join results. Additionally, this buffering process can help to automatically handle late or out-of-order data, which can be common in streaming environments. To further optimize the join process, it’s also important to use watermarks to limit the state. This can help to ensure that only the most relevant data is being used to generate join results, which can help to improve accuracy and reduce processing times. Types of Stream-Stream Join Depending on the nature of the join and the matching criteria, there are several types of stream-stream join operations. Some of the popular types of stream-stream join are: Inner Join In inner join, only those events are returned where there is a match in both the input streams. This type of join is useful when combining the data from two streams with a common key or attribute. Outer Join In outer join, all events from both the input streams are included in the joined stream, whether or not there is a match between them. This type of join is useful when we need to combine data from two streams, and there may be missing or incomplete data in either stream. Left Join In left join, all events from the left input stream are included in the joined stream, and only the matching events from the right input stream are included. This type of join is useful when we need to combine data from two streams and keep all the data from the left stream, even if there is no matching data in the right stream. 1. The Setup: Create a fake dataset at scale Most people do not have 2 streams just hanging around for one to experiment with Stream Steam Joins. Thus I used Faker to mock 2 different streams which we will use for this example. The name of the library being used is Faker and faker_vehicle to create Datasets. !pip install faker_vehicle !pip install faker Imports from faker import Faker from faker_vehicle import VehicleProvider from pyspark.sql import functions as F import uuid from utils import logger Parameters # define schema name and where should the table be stored schema_name = “test_streaming_joins” schema_storage_location = “/tmp/CHOOSE_A_PERMANENT_LOCATION/” Create the Target Schema/Database Create a Schema and set location. This way, all tables would inherit the base location. create_schema_sql = f””” CREATE SCHEMA IF NOT EXISTS {schema_name} COMMENT ‘This is {schema_name} schema’ LOCATION ‘{schema_storage_location}’ WITH DBPROPERTIES ( Owner=’Jitesh’); “”” print(f”create_schema_sql: {create_schema_sql}”) spark.sql(create_schema_sql) Use Faker to define functions to help generate fake column values fake = Faker() fake.add_provider(VehicleProvider) event_id = F.udf(lambda: str(uuid.uuid4())) vehicle_year_make_model = F.udf(fake.vehicle_year_make_model) vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat) vehicle_make_model = F.udf(fake.vehicle_make_model) vehicle_make = F.udf(fake.vehicle_make) vehicle_model = F.udf(fake.vehicle_model) vehicle_year = F.udf(fake.vehicle_year) vehicle_category = F.udf(fake.vehicle_category) vehicle_object = F.udf(fake.vehicle_object) latitude = F.udf(fake.latitude) longitude = F.udf(fake.longitude) location_on_land = F.udf(fake.location_on_land) local_latlng = F.udf(fake.local_latlng) zipcode = F.udf(fake.zipcode) Generate Streaming source data at your desired rate def generated_vehicle_and_geo_df (rowsPerSecond:int , numPartitions :int ): return ( spark.readStream.format(“rate”) .option(“numPartitions”, numPartitions) .option(“rowsPerSecond”, rowsPerSecond) .load() .withColumn(“event_id”, event_id()) .withColumn(“vehicle_year_make_model”, vehicle_year_make_model()) .withColumn(“vehicle_year_make_model_cat”, vehicle_year_make_model_cat()) .withColumn(“vehicle_make_model”, vehicle_make_model()) .withColumn(“vehicle_make”, vehicle_make()) .withColumn(“vehicle_year”, vehicle_year()) .withColumn(“vehicle_category”, vehicle_category()) .withColumn(“vehicle_object”, vehicle_object()) .withColumn(“latitude”, latitude()) .withColumn(“longitude”, longitude()) .withColumn(“location_on_land”, location_on_land()) .withColumn(“local_latlng”, local_latlng()) .withColumn(“zipcode”, zipcode()) ) # You can uncomment the below display command to check if the code in this cell works #display(generated_vehicle_and_geo_df) # You can uncomment the below display command to check if the code in this cell works #display(generated_vehicle_and_geo_df) Now let’s generate the base source table and let’s call it Vehicle_Geo table_name_vehicle_geo= “vehicle_geo” def stream_write_to_vehicle_geo_table(rowsPerSecond: int = 1000, numPartitions: int = 10): ( generated_vehicle_and_geo_df(rowsPerSecond, numPartitions) .writeStream .queryName(f”write_to_delta_table: {table_name_vehicle_geo}”) .option(“checkpointLocation”, f”{schema_storage_location}/{table_name_vehicle_geo}/_checkpoint”) .format(“delta”) .toTable(f”{schema_name}.{table_name_vehicle_geo}”) ) stream_write_to_vehicle_geo_table(rowsPerSecond = 1000, numPartitions = 10) Let the above code run for a few iterations, and you can play with rowsPerSecond and numPartitions to control how much data you would like to generate. Once you have generated enough data, kill the above stream and get a base line for row count. spark.read.table(f”{schema_name}.{table_name_vehicle_geo}”).count() display( spark.sql(f””” SELECT * FROM {schema_name}.{table_name_vehicle_geo} “””) ) Let’s also get a min & max of the timestamp column as we would be leveraging it for watermarking. display( spark.sql(f””” SELECT min(timestamp) ,max(timestamp) ,current_timestamp() FROM {schema_name}.{table_name_vehicle_geo} “””) ) Next, we will break this Delta table into 2 different tables Because for Stream-Stream Joins we need 2 different streams. We will use Delta To Delta Streaming here to create these tables. table_name_vehicle = “vehicle” vehicle_df = ( spark.readStream.format(“delta”) .option(“maxFilesPerTrigger”,

Blog, Databricks, Spark, Stream

How to write your first Spark Stream Batch Join with working code

When I started learning about Spark Streaming, I could not find enough code/material which could kick-start my journey and build my confidence. I wrote this blog to fill this gap which could help beginners understand how simple streaming is and build their first application. In this blog, I will explain most things by first principles to increase your understanding and confidence and you walk away with code for your first Streaming application. Scenario: Let’s assume we have a streaming source with data arriving all the time. We want to add more attributes from another table( Think lookup table/ dimension table). Thus we will stream the data and join with the lookup table via Stream-Batch join. The result would be written as a Delta table, which could be used downstream for analytics or streaming. Imports & Parameters from pyspark.sql import functions as Ffrom faker import Fakerimport uuid# define schema name and where should the table be storedschema_name = “test_streaming_joins”schema_storage_location = “/tmp/CHOOSE_A_PERMANENT_LOCATION/”# Please download this file from https://simplemaps.com/data/us-zips then download and place it at a location of your choice and then change the value for the variable belowstatic_table_csv_file = “/FileStore/jitesh.soni/data/us_zip_code_and_its_attributes.csv”# Static table specificationstatic_table_name = “static_zip_codes”# Target Stareaming Table specificationtarget_table_name = “joined_datasets”# Recommend you to keep the checkpoint next to the Delta table so that you do have to notion about where the checkpoint ischeckpoint_location = f”{schema_storage_location}/{target_table_name}/_checkpoints/”Create Target Database create_schema_sql = f””” CREATE SCHEMA IF NOT EXISTS {schema_name} COMMENT ‘This is {schema_name} schema’ LOCATION ‘{schema_storage_location}’ WITH DBPROPERTIES ( Owner=’Jitesh’); “””print(f”create_schema_sql: {create_schema_sql}”) Generate Static Or a lookup Dataset We will use a public dataset source with attributes about a zip code. This could be any other static source or a Delta table being updated in parallel. Note: If you pick a static source and start streaming, Spark Streaming will only read it once. If you have a few updates to the static source, you will have to restart the Spark Stream so it rereads the static source. Meanwhile, if you have the Delta table as a source, then Spark Streaming will identify the update automatically, and nothing extra needs to be done. csv_df = ( spark.read.option(“header”, True) .option(“inferSchema”, True) .csv(static_table_csv_file))display(csv_df)csv_df.write.saveAsTable(f”{schema_name}.{static_table_name}”) Next, we will Z-order the table on the key, which would be used in joins. This will help Spark Streaming do efficient joins because the Delta table is sorted by join key with statistics about which file contains which key value. spark.sql( f””” OPTIMIZE {schema_name}.{static_table_name} ZORDER BY (zip); “””) Generate Streaming Dataset We will generate a Streaming dataset using the Faker library. In the below code, we will define a few user-defined functions. fake = Faker()fake_id = F.udf(lambda: str(uuid.uuid4()))fake_firstname = F.udf(fake.first_name)fake_lastname = F.udf(fake.last_name)fake_email = F.udf(fake.ascii_company_email)# fake_date = F.udf(lambda:fake.date_time_this_month().strftime(“%Y-%m-%d %H:%M:%S”))fake_address = F.udf(fake.address)fake_zipcode = F.udf(fake.zipcode) Now, we will use spark.readStream.format(“rate”) to generate data at your desired rate. streaming_df = ( spark.readStream.format(“rate”) .option(“numPartitions”, 10) .option(“rowsPerSecond”, 1 * 1000) .load() .withColumn(“fake_id”, fake_id()) .withColumn(“fake_firstname”, fake_firstname()) .withColumn(“fake_lastname”, fake_lastname()) .withColumn(“fake_email”, fake_email()) .withColumn(“fake_address”, fake_address()) .withColumn(“fake_zipcode”, fake_zipcode()))# You can uncomment the below display command to check if the code in this cell works# display(streaming_df) Stream- Static Join or Stream -Delta Join Structured Streaming supports joins (inner join and left join) between a streaming and a static DataFrame or a Delta Table. However, a few types of stream-static outer Joins are not supported yet. lookup_delta_df = spark.read.table(static_table_name)joined_streaming_df = streaming_df.join( lookup_delta_df, streaming_df[“fake_zipcode”] == lookup_delta_df[“zip”], “left_outer”,).drop(“fake_zipcode”)# display(joined_streaming_df) Orchestrate the pipeline and write Spark Stream to Delta Table Some Tips: ( joined_streaming_df.writeStream # .trigger(availableNow=True) .queryName(“do_a_stream_join_with_the_delta_table”) .option(“checkpointLocation”, checkpoint_location) .format(“delta”) .toTable(f”{schema_name}.{target_table_name}”)) Download the code Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Blog, Delta, Spark, Stream

How to upgrade your Spark Stream application with a new checkpoint With working code

Sometimes in life, we need to make breaking changes which require us to create a new checkpoint. Some example scenarios: There could be plenty of scenarios where you want to control precisely which data(Kafka offsets) need to be processed. Not every scenario requires a new checkpoint. Here is a list of things you can change without requiring a new checkpoint. This blog helps you understand how to handle a scenario where a new checkpoint is unavoidable. Kafka Basics: Topics, partition & offset Kafka Cluster has Topics: Topics are a way to organize messages. Each topic has a name that is unique across the entire Kafka cluster. Messages are sent to and read from specific topics. In other words, producers write data on a topic, and consumers read data from the topic. Topics have Partitions, and data/messages are distributed across partitions. Every message belongs to a single partition. Partition has messages, each with a unique sequential identifier within the partition called the Offset. What is the takeaway here? We must identify what offset has already been processed for each partition, and this information can be found inside the checkpoint. What information is inside the checkpoint? Under the checkpoint folder, there are four subfolders: How to fetch information about Offset & Partition from the Checkpoint folder? List the files at the checkpoint location; we are looking for the offsets folder. checkpoint_location= “/checkpoint_location/checkpoint_for_kafka_to_delta”dbutils.fs.ls(checkpoint_location)dbutils.fs.ls(f”{checkpoint_location}/”) Next, we will list the files under the commits folder and identify the most recent commits. dbutils.fs.ls(checkpoint_location)dbutils.fs.ls(f”{checkpoint_location}/commits”) /checkpoint_location/checkpoint_for_kafka_to_delta/commits/0/checkpoint_location/checkpoint_for_kafka_to_delta/commits/1/checkpoint_location/checkpoint_for_kafka_to_delta/commits/2 Once we identify the last commits file number; we will open the equivalent offsets file. In this example, we can see the latest commits is “2”. Now let’s view the contents of the offsets file. #%fs head {FILL_THE_EXACT_PATH_OF_THE_FILE_WHICH_NEEDS_TO_BE_VIEWED}%fs head /checkpoint_location/checkpoint_for_kafka_to_delta/offsets/2{“batchWatermarkMs”:0,”batchTimestampMs”:1674623173851,”conf”:{“spark.sql.streaming.stateStore.providerClass”:”org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider”,”spark.sql.streaming.join.stateFormatVersion”:”2″,”spark.sql.streaming.stateStore.compression.codec”:”lz4″,”spark.sql.streaming.stateStore.rocksdb.formatVersion”:”5″,”spark.sql.streaming.statefulOperator.useStrictDistribution”:”true”,”spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion”:”2″,”spark.sql.streaming.multipleWatermarkPolicy”:”min”,”spark.sql.streaming.aggregation.stateFormatVersion”:”2″,”spark.sql.shuffle.partitions”:”200″}}{“topic_name_from_kafka”:{“0”:400000, “1”:300000}} The information of interest is in the end. This has the topic name and offset per partition. {“topic_name_from_kafka”:{“0”:400000, “1”:300000}} Now the easy part: Use Spark to start reading Kafka from a particular Offset Spark Streaming starts read stream by default with the latest offset. However, it provides a parameter “startingOffsets” to select a custom starting point. startingOffsets = “””{“topic_name_from_kafka”:{“0”:400000, “1”:300000}}”””kafka_stream = (spark.readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_plaintext ) .option(“subscribe”, topic ) .option(“startingOffsets”, startingOffsets ) .load())display(kafka_stream) And we are Done!!. Recommend parameterizing your code so that “startingOffsets” can be passed as a parameter. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Scroll to Top