Author name: Devteam

Blog

Merging Multiple Data Streams with Delta Live Tables: Kafka, Kinesis, and Delta

Introduction In today’s data landscape, organizations often deal with multiple input datasets from various sources like Kafka, Kinesis, and Delta tables. These datasets may have different schemas but sometime to be combined into a single, unified table. Delta Live Tables (DLT) in Databricks simplifies this process by handling schema evolution and managing Slowly Changing Dimensions (SCD) efficiently. Why Delta Live Tables? Delta Live Tables offers several benefits: Real-time Data Processing: Ensures data is processed and available in real-time. Schema Evolution: Automatically propagates new columns from source to target. SCD Type 1 and 2: Easily build SCD Type 1 and Type 2. Inserts, Updates, Deletes: Handles all data operations efficiently. Use Case We aim to merge multiple data streams (human_name, human_email, and human_date_of_birth) into a single Delta table (DIM_SSN_SCD_TYP_2) using the common join key ssn. The input data can come from Kafka, Kinesis, or Delta, or a combination of these sources. We have 3 datasets with different schemas and combine attributes of all of them into a single table. The Solution: CHANGE FLOWs API Delta Live Tables is currently previewing a CHANGE FLOWs API, which allows multiple APPLY CHANGES streams to write to a streaming table within a single pipeline. This functionality enables: Performing data backfills to existing APPLY CHANGES streaming table targets. Ingesting data from multiple independent APPLY CHANGES processes into the same target table. Performing initial hydration (using once=true API) of a streaming table to support Change Data Capture (CDC) use cases. The apply_changes method uses the existing APPLY CHANGE API with an additional once property to specify whether the operation should run only once. apply_changes( once=True, # optional, only run this code once; ignore new files added to this location target=”<existing-target-table>”, source=”<data-source>”, keys=[“key1”, “key2”, “keyN”], # must match <existing-target-table> sequence_by=”<sequence-column>”, # must match <existing-target-table> ignore_null_updates=False, # must match <existing-target-table> apply_as_deletes=None, apply_as_truncates=None, column_list=None, except_column_list=None, stored_as_scd_type=<type>, # must match <existing-target-table> track_history_column_list=None, # must match <existing-target-table> track_history_except_column_list=None ) Implementation Imports and Parameters Define the parameters for our catalog, database names, and the target table name. import dlt from pyspark.sql.functions import * catalog_name = “soni” database_name = “2024_06_25” tables = [“human_name”, “human_email”, “human_date_of_birth”] target_name = “DIM_SSN_SCD_TYP_2” Creating DLT Views Create DLT views for each source table dynamically. for table in tables: @dlt.view(name=f”dlt_view_{table}”) def create_view(table=table): “”” Creates a DLT view for the given table from the Delta table. Args: table (str): Name of the table to create the view for. Returns: DataFrame: Streaming DataFrame containing the data from the specified table. “”” table_path = f”{catalog_name}.`{database_name}`.{table}” return spark.readStream.format(“delta”).table(table_path) Creating the Streaming Target Table Create a streaming target table to merge the data streams. dlt.create_streaming_table( name=target_name, table_properties={ “pipelines.autoOptimize.zOrderCols”: “ssn”, }, ) Note: The pipelines.autoOptimize.zOrderCols property is set to ssn because all merges are happening on the ssn column, and clustering the table on this column optimizes the performance of these merge operations. Merging Data Streams into the Delta Table Merge each stream into the target table using apply_changes. dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_name_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_name”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”, ) dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_email_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_email”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”, ) dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_date_of_birth_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_date_of_birth”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”, ) Output table with SCD Type 2 Explanation Creating Views: The create_view function generates a streaming DataFrame for each table, reading data from the respective Delta table. Streaming Table: The create_streaming_table function initializes the target Delta table with optimization properties. Merging Data: The apply_changes function merges data from each view into the target table, ensuring data consistency and handling Slowly Changing Dimensions (SCD) efficiently. Usage Notes Concurrency: Only one APPLY CHANGES flow for each target table can run concurrently. Consistency: All APPLY CHANGES flows targeting a table must have the same keys, sequencing, and TRACK HISTORY columns. Preview Channel: This feature is currently available in the preview channel of Delta Live Tables. How to stream out of this target table With DBR 15.3, one can read the change feed outside of DLT and then stream changes for futher processing. display( spark.readStream.format(“delta”) .option(“readChangeFeed”, “true”).table(“soni.dlt_2024.dim_ssn_scd_typ_2”) ) Conclusion Delta Live Tables provides a powerful and flexible way to merge multiple data streams into a single Delta table. By leveraging DLT’s capabilities, data engineers can ensure real-time data consistency and handle complex data integration tasks with ease. Start using Delta Live Tables today to streamline your data processing pipelines and ensure your data is always up-to-date. Go Build!!! Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Note: All opinions are my own.

Blog

Need for Speed: Benchmarking the Best Tools for Kafka to Delta Ingestion

Introduction Welcome back to the second installment of our series on data ingestion from Kafka to Delta tables on the Databricks platform. Building on our previous discussion about generating and streaming synthetic data to a Kafka topic. This blog post benchmarks three powerful options available on the Databricks platform for ingesting streaming data from Apache Kafka into Delta Lake: Databricks Jobs, Delta Live Tables (DLT), and Delta Live Tables Serverless (DLT Serverless). The primary objective is to evaluate and compare the end-to-end latency of these approaches when ingesting data from Kafka into Delta tables. Latency is a crucial metric, as it directly impacts the freshness and timeliness of data available for downstream analytics and decision-making processes. It’s important to note that all three tools leverage Apache Spark’s Structured Streaming under the hood. “Breaking the myth: Ingest from Kafka to Delta at scale in just 1.5 seconds with Delta Live Tables Serverless — up to 80% faster than traditional methods!” Benchmark Setup Criteria The key metric measured was latency — the duration from when a row is produced in Kafka to when it becomes available in Delta Lake. Latency was meticulously measured over an extended period to ensure precision and account for variability. Input Kafka Feed For our benchmarks, we utilized a Kafka feed churning out data at a rate of 100 rows per second, each approximately 1MB in size which is 100 MB/Second . Annually, this sums up to a staggering 3.15 petabytes, making it a rigorous testbed for evaluating the ingestion capabilities of our selected tools. I used Confluent Cloud to setup Kafka cluster with 6 partitions and it took less than 5 minutes and they gave me 300$ of credits for experimentation. Tools Compared Databricks Jobs: Utilizes Apache Spark Structured Streaming for reading from Kafka and writing to Delta Lake tables. It provides flexibility in job configuration and scheduling but requires manual management of cluster resources. Delta Live Tables (DLT): Employs a declarative approach for data ingestion from Kafka into Delta Lake, automatically managing infrastructure and simplifying pipeline development. Delta Live Tables Serverless (DLT Serverless): Leverages a serverless model to further simplify infrastructure management while performing the same ingestion tasks as DLT. It offers automatic scaling and resource optimization. How was latency measured? Latency is measured by calculating the time difference in milliseconds between the timestamps of consecutive streaming updates to a table. This is done by subtracting the timestamp of a previous update from the timestamp of the current update for each sequential commit, allowing an analysis of how long each update takes to process relative to the previous one. The analysis is currently limited to the last 300 commits, but this number can be adjusted as needed. from pyspark.sql import DataFrame def run_analysis_about_latency(table_name: str) -> DataFrame: # SQL command text formatted as a Python multiline string sql_code = f””” — Define a virtual view of the table’s history WITH VW_TABLE_HISTORY AS ( — Describe the historical changes of the table DESCRIBE HISTORY {table_name} ), — Define a view to calculate the timestamp of the previous write operation VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP AS ( SELECT — Calculate the timestamp of the last write operation before the current one lag(timestamp) OVER ( PARTITION BY 1 ORDER BY version ) AS previous_write_timestamp, timestamp, version FROM VW_TABLE_HISTORY WHERE operation = ‘STREAMING UPDATE’ ), — Define a view to analyze the time difference between consecutive commits VW_BOUND_ANALYSIS_TO_N_COMMITS AS ( SELECT — Calculate the time difference in milliseconds between the previous and current write timestamps TIMESTAMPDIFF( MILLISECOND, previous_write_timestamp, timestamp ) AS elapsed_time_ms FROM VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP ORDER BY version DESC LIMIT 300 — Analyze only the last 300 commits ) — Calculate various statistics about the write latency SELECT avg(elapsed_time_ms) AS average_write_latency, percentile_approx(elapsed_time_ms, 0.9) AS p90_write_latency, percentile_approx(elapsed_time_ms, 0.95) AS p95_write_latency, percentile_approx(elapsed_time_ms, 0.99) AS p99_write_latency, max(elapsed_time_ms) AS maximum_write_latency FROM VW_BOUND_ANALYSIS_TO_N_COMMITS “”” # Execute the SQL query using Spark’s SQL module display(spark.sql(sql_code)) Data Ingestion This code sets up a streaming data pipeline using Apache Spark to efficiently ingest data from a Kafka topic. It defines a schema tailored to the expected data types and columns in the Kafka messages, including vehicle details, geographic coordinates, and text fields.The read_kafka_stream function initializes the streaming process, configuring secure and reliable connections to Kafka, subscribing to the specified topic, and handling data across multiple partitions for improved processing speed. The stream decodes JSON-formatted messages according to the defined schema and extracts essential metadata. from pyspark.sql.types import StructType, StringType, FloatType from pyspark.sql.functions import * # Define the schema based on the DataFrame structure you are writing to Kafka schema = StructType() .add(“event_id”, StringType()) .add(“vehicle_year_make_model”, StringType()) .add(“vehicle_year_make_model_cat”, StringType()) .add(“vehicle_make_model”, StringType()) .add(“vehicle_make”, StringType()) .add(“vehicle_model”, StringType()) .add(“vehicle_year”, StringType()) .add(“vehicle_category”, StringType()) .add(“vehicle_object”, StringType()) .add(“latitude”, StringType()) .add(“longitude”, StringType()) .add(“location_on_land”, StringType()) .add(“local_latlng”, StringType()) .add(“zipcode”, StringType()) .add(“large_text_col_1”, StringType()) .add(“large_text_col_2”, StringType()) .add(“large_text_col_3”, StringType()) .add(“large_text_col_4”, StringType()) .add(“large_text_col_5”, StringType()) .add(“large_text_col_6”, StringType()) .add(“large_text_col_7”, StringType()) .add(“large_text_col_8”, StringType()) .add(“large_text_col_9”, StringType()) def read_kafka_stream(): kafka_stream = (spark.readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_tls ) .option(“subscribe”, topic ) .option(“failOnDataLoss”,”false”) .option(“kafka.security.protocol”, “SASL_SSL”) .option(“kafka.sasl.mechanism”, “PLAIN”) .option(“kafka.sasl.jaas.config”, f’kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=”{kafka_api_key}” password=”{kafka_api_secret}”;’) .option(“minPartitions”,12) .load() .select(from_json(col(“value”).cast(“string”), schema).alias(“data”), “topic”, “partition”, “offset”, “timestamp”, “timestampType” ) .select(“topic”, “partition”, “offset”, “timestamp”, “timestampType”, “data.*”) ) return kafka_stream Explanation: Connection Setup: Connects to Kafka using specific bootstrap servers and includes security settings like SASL_SSL for encrypted and authenticated data transfer. Topic Subscription: Subscribes to a specified Kafka topic to continuously receive new data. Stream Configuration: Robustly configured to handle potential data loss and manage data across multiple partitions to boost processing speed. Data Transformation: Uses from_json to decode JSON messages according to the established schema, transforming them into a structured format within Spark. Metadata Extraction: Extracts essential metadata like the Kafka topic, partition, and message timestamps. This setup optimizes data ingestion from Kafka into Spark and prepares the data for further processing or integration into storage systems like Delta Lake. Additional code for Databricks Jobs Configuration: This method involves setting up a Databricks job and cluster resources, although it allows for flexible scheduling and monitoring of ingestion processes it but requires understanding of choosing the right compute. ( read_kafka_stream() .writeStream .option(“checkpointLocation”,checkpoint_location_for_delta) .trigger(processingTime=’1 second’) .toTable(target) ) Additional code for Delta Live Tables

Blog

Synthetic Data Made Simple: Generating and Streaming Custom-Sized Data to Kafka

Introduction In the fast-paced world of data engineering, there often arises a need to generate large volumes of synthetic data for testing and benchmarking purposes. Recently, I was tasked with a crucial project: creating records of a specific size (1 MB each) and streaming them to Kafka for performance benchmarking. This blog post, the first in a two-part series, will walk you through how to generate such data using Python and Apache Spark, and then stream it to Kafka efficiently. Tomorrow, we’ll dive into Part 2, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. But first, let me share the story behind this endeavor. The Challenge: Preparing for Technology Decisions Imagine you’re part of a data engineering team at a rapidly growing tech startup. Your CTO has tasked you with benchmarking the expected speed of Kafka to Delta ingestion before making critical technology decisions. You quickly realize two things: No suitable public Kafka feed: You need a Kafka feed that matches your specific requirements, especially in terms of record size. Complex setup with AWS MSK: Setting up AWS Managed Streaming for Apache Kafka (MSK) for external access is time-consuming and complex. The solution? Generating custom-sized fake data and using Confluent Cloud for a quick and hassle-free Kafka setup. Why Confluent Cloud? Setting up a Kafka cluster can be cumbersome, especially when dealing with security configurations and access permissions. AWS MSK is robust, but its setup can be daunting. Confluent Cloud, on the other hand, offers a quick setup process and provides $300 in free credits, making it perfect for quick experiments and testing. I had my Kafka instance up and running in just five minutes with Confluent Cloud. https://www.confluent.io/confluent-cloud/ Step-by-Step Guide Let’s dive into the code that helps you create synthetic data and push it to Kafka. Installing Necessary Packages First, install the required packages. Faker is a library that helps generate fake data, and faker_vehicle adds vehicle-specific data generation capabilities. # Databricks notebook source # MAGIC %pip install Faker faker_vehicle Importing Required Libraries Next, import the necessary libraries for data generation, streaming, and logging. from faker import Faker from faker_vehicle import VehicleProvider from pyspark.sql import functions as F from pyspark.sql.types import StringType import uuid import logging from pyspark.sql.streaming import StreamingQuery from datetime import datetime Setting Parameters Define the parameters for Kafka configuration and checkpoint location. timestamp = datetime.now().strftime(“%Y%m%d%H%M%S”) checkpoint_location = f”/tmp/confluent_kafka_checkpoint_{timestamp}” # Kafka configuration topic = “YOUR_TOPIC” kafka_bootstrap_servers_tls = “YOUR_KAFKA_URL.confluent.cloud:9092” kafka_api_key = “YOUR_KAFKA_API_KEY” kafka_api_secret = “YOUR_KAFKA_API_SECRET” Initialization and UDF Initialize Faker and add the vehicle provider. Configure logging for tracking the process. # Initialize Faker for data generation and add vehicle data provider fake = Faker() fake.add_provider(VehicleProvider) # Configure logging logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(name)s – %(levelname)s – %(message)s’) logger = logging.getLogger(__name__) Create User-Defined Functions (UDFs) for generating various fake data attributes. # User-defined functions (UDFs) for generating fake data event_id = F.udf(lambda: str(uuid.uuid4()), StringType()) vehicle_year_make_model = F.udf(fake.vehicle_year_make_model) vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat) vehicle_make_model = F.udf(fake.vehicle_make_model) vehicle_make = F.udf(fake.vehicle_make) vehicle_model = F.udf(fake.vehicle_model) vehicle_year = F.udf(fake.vehicle_year) vehicle_category = F.udf(fake.vehicle_category) vehicle_object = F.udf(fake.vehicle_object) latitude = F.udf(fake.latitude) longitude = F.udf(fake.longitude) location_on_land = F.udf(fake.location_on_land) local_latlng = F.udf(fake.local_latlng) zipcode = F.udf(fake.zipcode) Function to Generate 1MB Row of Data Define a function to generate a DataFrame that simulates a row of data approximately 1 MB in size. @F.udf(StringType()) def large_text_udf(size: int): “””Generate large text data with a specified size.””” return fake.text(max_nb_chars=size) # Configuration for large text data num_large_columns = 10 # Number of large text columns size_per_large_column = (1024 * 1024) // num_large_columns # Distribute 1MB across columnsdef generate_1mb_row_df(rowsPerSecond=10, numPartitions=2): “””Generate a DataFrame simulating streaming data, including vehicle and geographic data.””” logger.info(“Generating vehicle and geo data frame…”) df = spark.readStream.format(“rate”) .option(“numPartitions”, numPartitions) .option(“rowsPerSecond”, rowsPerSecond) .load() .withColumn(“event_id”, event_id()) .withColumn(“vehicle_year_make_model”, vehicle_year_make_model()) .withColumn(“vehicle_year_make_model_cat”, vehicle_year_make_model_cat()) .withColumn(“vehicle_make_model”, vehicle_make_model()) .withColumn(“vehicle_make”, vehicle_make()) .withColumn(“vehicle_model”, vehicle_model()) .withColumn(“vehicle_year”, vehicle_year()) .withColumn(“vehicle_category”, vehicle_category()) .withColumn(“vehicle_object”, vehicle_object()) .withColumn(“latitude”, latitude()) .withColumn(“longitude”, longitude()) .withColumn(“location_on_land”, location_on_land()) .withColumn(“local_latlng”, local_latlng()) .withColumn(“zipcode”, zipcode()) .withColumn(“large_text_col_1”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_2”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_3”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_4”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_5”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_6”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_7”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_8”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_9”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_10”, large_text_udf(F.lit(size_per_large_column))) return df You can test the above code by running the below command display(generate_1mb_row_df()) Streaming Data to Kafka Start streaming the generated data to Kafka. (generate_1mb_row_df(rowsPerSecond=100, numPartitions=12) .selectExpr(“CAST(event_id AS STRING) AS key”, “to_json(struct(*)) AS value”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_tls) .option(“kafka.security.protocol”, “SASL_SSL”) .option(“kafka.sasl.mechanism”, “PLAIN”) .option(“kafka.sasl.jaas.config”, f’kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=”{kafka_api_key}” password=”{kafka_api_secret}”;’) .option(“checkpointLocation”, checkpoint_location) .option(“topic”, topic) .option(“queryName”, f”SendDataToKafka-{topic}”) .option(“kafka.max.request.size”, “1100000”) # Setting new max request size to 1.1 MB .start() ) The Confluent UI was able to verify that we are able to generate 100 MB/Second Conclusion This approach allows you to create custom-sized synthetic data and stream it to Kafka efficiently. By using Confluent Cloud, you can significantly reduce the setup time and complexity, enabling a more streamlined and efficient data generation and streaming process. Stay tuned for Part 2 of this series, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. Whether you’re testing, benchmarking, or exploring data streaming, this guide provides a solid foundation to get you started. Happy streaming! Download this notebook References For more details on Spark and Kafka integration, you can refer to the following documentation: Apache Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) Faker Documentation Apache Kafka Documentation These resources provide comprehensive information and examples to help you further understand and implement Spark and Kafka integration. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Blog

Using Spark Streaming to merge/upsert data into a Delta Lake with working code

This blog will discuss how to read from a Spark Streaming and merge/upsert data into a Delta Lake. We will also optimize/cluster data of the delta table. In the end, we will show how to start a streaming pipeline with the previous target table as the source. Overall, the process works in the following manner, we read data from a streaming source and use this special function foreachBatch. Using this we will call any user-defined function responsible for all the processing. This function encapsulates the Merge and Optimize to the target Delta table. First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc. for simplicity. Let’s generate a stream using the below. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 10,000 rows per second across 100 partitions. Generate streaming data at your desired rate generated_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 10 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5” ,”value” ,”value%1000000 as hash” ) ) #display(generated_df) Parameters / Variables (Feel free to change as per your needs) target_table_name = “to_be_merged_into_table” check_point_location = f”/tmp/delta/{target_table_name}/_checkpoints/” join_column_name =”hash” Create an Empty Delta table so data could be merged into it spark.sql(f””” DROP TABLE IF EXISTS {target_table_name};”””) ( generated_df.writeStream .format(“delta”) .outputMode(“append”).trigger(once=True) .option(“checkpointLocation”, check_point_location) .toTable(target_table_name) ) Check if data is populated display(spark.read.table(target_table_name)) A user-defined function which does the data processing, Merge & Optimize def make_changes_using_the_micro_batch(microBatchOutputDF, batchId: int): print(f”Processing batchId: {batchId}”) microBatchOutputDF.createOrReplaceTempView(“updates”) spark_session_for_this_micro_batch = microBatchOutputDF._jdf.sparkSession() spark_session_for_this_micro_batch.sql(f””” SELECT * FROM ( select * ,rank() over(partition by {join_column_name} order by value desc) as dedupe from updates ) WHERE dedupe =1 “””).drop(“dedupe”).createOrReplaceTempView(“updates_which_need_to_be_merged”) spark_session_for_this_micro_batch.sql(f””” MERGE INTO {target_table_name} target using updates_which_need_to_be_merged u on u.{join_column_name} = target.{join_column_name} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) optimize_every_n_batches = 20 #Define how often should optimize run? for example: at 50, it means that we will run the optimize command every 50 batches of stream data if batchId % optimize_every_n_batches == 0: optimize_and_zorder_table(table_name = target_table_name, zorder_by_col_name = join_column_name) Optimize/ Z-order a Delta table Why do we need to optimize a table? If we keep adding files to our Delta table and never optimize/sort them then over time we need to read a lot of files during merge time. Thus, optimizing the Delta table after every N merges is better. N needs to be decided on your latency requirements. You could start with N as 10 and change it as per your needs. The below code will run an optimize and zorder command on a given table that is being fed by a stream. Optimize commands can’t run in a silo because it will require us to pause and then resume the stream. Therefore, we need to call this function a part of the upsert function. This enables us to optimize before the next batch of streaming data comes through. from timeit import default_timer as timer def optimize_and_zorder_table(table_name: str, zorder_by_col_name: str) -> None: “”” Parameters: table_name: str name of the table to be optimized zorder_by_col_name: str comma separated list of columns to zorder by. example “col_a, col_b, col_c” “”” start = timer() print(f”Met condition to optimize table {table_name}”) sql_query_optimize = f”OPTIMIZE {table_name} ZORDER BY ({zorder_by_col_name})” spark.sql(sql_query_optimize) end = timer() time_elapsed_seconds = end – start print( f”Successfully optimized table {table_name} . Total time elapsed: {time_elapsed_seconds} seconds” ) Orchestrate from readStream -> Merge -> Optimize ( generated_df .writeStream.format(‘delta’) .trigger(processingTime=’30 seconds’) .option(“checkpointLocation”, check_point_location) .foreachBatch(make_changes_using_the_micro_batch) .start() ) If you have reached so far, you should have an end-to-end pipeline working with streaming data and merging data into a Delta table. As the next step, let’s use the previous target table as our new streaming source. Use the target table as a source for the next streaming pipeline Change data feed allows Databricks to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records change events for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated. Reference: https://docs.databricks.com/delta/delta-change-data-feed.html#use-delta-lake-change-data-feed-on-databricks spark.sql(f”’ ALTER TABLE {target_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed=true) ”’) Reading change data as a stream display( spark.readStream.format(“delta”) .option(“readChangeFeed”, “true”) .table(target_table_name) ) Download this notebook Spark Streaming Using For Each Batch & Merge.html Edit description drive.google.com Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference https://www.youtube.com/watch?v=CLDcdVDupMg

Scroll to Top