Delta

Blog, Databricks, Delta, Delta Live Tables

Merging Multiple Data Streams with Delta Live Tables: Kafka, Kinesis, and Delta

Introduction In today’s data landscape, organizations often deal with multiple input datasets from various sources like Kafka, Kinesis, and Delta tables. These datasets may have different schemas but sometime to be combined into a single, unified table. Delta Live Tables (DLT) in Databricks simplifies this process by handling schema evolution and managing Slowly Changing Dimensions (SCD) efficiently. Why Delta Live Tables? Delta Live Tables offers several benefits: Use Case We aim to merge multiple data streams (human_name, human_email, and human_date_of_birth) into a single Delta table (DIM_SSN_SCD_TYP_2) using the common join key ssn. The input data can come from Kafka, Kinesis, or Delta, or a combination of these sources. We have 3 datasets with different schemas and combine attributes of all of them into a single table. The Solution: CHANGE FLOWs API Delta Live Tables is currently previewing a CHANGE FLOWs API, which allows multiple APPLY CHANGES streams to write to a streaming table within a single pipeline. This functionality enables: The apply_changes method uses the existing APPLY CHANGE API with an additional once property to specify whether the operation should run only once. apply_changes( once=True, # optional, only run this code once; ignore new files added to this location target=”<existing-target-table>”, source=”<data-source>”, keys=[“key1”, “key2”, “keyN”], # must match <existing-target-table> sequence_by=”<sequence-column>”, # must match <existing-target-table> ignore_null_updates=False, # must match <existing-target-table> apply_as_deletes=None, apply_as_truncates=None, column_list=None, except_column_list=None, stored_as_scd_type=<type>, # must match <existing-target-table> track_history_column_list=None, # must match <existing-target-table> track_history_except_column_list=None) Implementation Imports and Parameters Define the parameters for our catalog, database names, and the target table name. import dltfrom pyspark.sql.functions import *catalog_name = “soni”database_name = “2024_06_25″tables = [“human_name”, “human_email”, “human_date_of_birth”]target_name = “DIM_SSN_SCD_TYP_2″ Creating DLT Views Create DLT views for each source table dynamically. for table in tables: @dlt.view(name=f”dlt_view_{table}”) def create_view(table=table): “”” Creates a DLT view for the given table from the Delta table. Args: table (str): Name of the table to create the view for. Returns: DataFrame: Streaming DataFrame containing the data from the specified table. “”” table_path = f”{catalog_name}.`{database_name}`.{table}” return spark.readStream.format(“delta”).table(table_path) Creating the Streaming Target Table Create a streaming target table to merge the data streams. dlt.create_streaming_table( name=target_name, table_properties={ “pipelines.autoOptimize.zOrderCols”: “ssn”, },) Note: The pipelines.autoOptimize.zOrderCols property is set to ssn because all merges are happening on the ssn column, and clustering the table on this column optimizes the performance of these merge operations. Merging Data Streams into the Delta Table Merge each stream into the target table using apply_changes. dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_name_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_name”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”,)dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_email_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_email”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”,)dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_date_of_birth_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_date_of_birth”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”,) Output table with SCD Type 2 Explanation Usage Notes How to stream out of this target table With DBR 15.3, one can read the change feed outside of DLT and then stream changes for futher processing. display( spark.readStream.format(“delta”) .option(“readChangeFeed”, “true”).table(“soni.dlt_2024.dim_ssn_scd_typ_2”)) Conclusion Delta Live Tables provides a powerful and flexible way to merge multiple data streams into a single Delta table. By leveraging DLT’s capabilities, data engineers can ensure real-time data consistency and handle complex data integration tasks with ease. Start using Delta Live Tables today to streamline your data processing pipelines and ensure your data is always up-to-date. Go Build!!! Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Note: All opinions are my own.

Blog, Databricks, Delta, Delta Live Tables, kafka, Spark, Stream

Need for Speed: Benchmarking the Best Tools for Kafka to Delta Ingestion

Introduction Welcome back to the second installment of our series on data ingestion from Kafka to Delta tables on the Databricks platform. Building on our previous discussion about generating and streaming synthetic data to a Kafka topic. This blog post benchmarks three powerful options available on the Databricks platform for ingesting streaming data from Apache Kafka into Delta Lake: Databricks Jobs, Delta Live Tables (DLT), and Delta Live Tables Serverless (DLT Serverless). The primary objective is to evaluate and compare the end-to-end latency of these approaches when ingesting data from Kafka into Delta tables. Latency is a crucial metric, as it directly impacts the freshness and timeliness of data available for downstream analytics and decision-making processes. It’s important to note that all three tools leverage Apache Spark’s Structured Streaming under the hood. “Breaking the myth: Ingest from Kafka to Delta at scale in just 1.5 seconds with Delta Live Tables Serverless — up to 80% faster than traditional methods!” Benchmark Setup Criteria The key metric measured was latency — the duration from when a row is produced in Kafka to when it becomes available in Delta Lake. Latency was meticulously measured over an extended period to ensure precision and account for variability. Input Kafka Feed For our benchmarks, we utilized a Kafka feed churning out data at a rate of 100 rows per second, each approximately 1MB in size which is 100 MB/Second . Annually, this sums up to a staggering 3.15 petabytes, making it a rigorous testbed for evaluating the ingestion capabilities of our selected tools. I used Confluent Cloud to setup Kafka cluster with 6 partitions and it took less than 5 minutes and they gave me 300$ of credits for experimentation. Tools Compared How was latency measured? Latency is measured by calculating the time difference in milliseconds between the timestamps of consecutive streaming updates to a table. This is done by subtracting the timestamp of a previous update from the timestamp of the current update for each sequential commit, allowing an analysis of how long each update takes to process relative to the previous one. The analysis is currently limited to the last 300 commits, but this number can be adjusted as needed. from pyspark.sql import DataFramedef run_analysis_about_latency(table_name: str) -> DataFrame: # SQL command text formatted as a Python multiline string sql_code = f””” — Define a virtual view of the table’s history WITH VW_TABLE_HISTORY AS ( — Describe the historical changes of the table DESCRIBE HISTORY {table_name} ), — Define a view to calculate the timestamp of the previous write operation VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP AS ( SELECT — Calculate the timestamp of the last write operation before the current one lag(timestamp) OVER ( PARTITION BY 1 ORDER BY version ) AS previous_write_timestamp, timestamp, version FROM VW_TABLE_HISTORY WHERE operation = ‘STREAMING UPDATE’ ), — Define a view to analyze the time difference between consecutive commits VW_BOUND_ANALYSIS_TO_N_COMMITS AS ( SELECT — Calculate the time difference in milliseconds between the previous and current write timestamps TIMESTAMPDIFF( MILLISECOND, previous_write_timestamp, timestamp ) AS elapsed_time_ms FROM VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP ORDER BY version DESC LIMIT 300 — Analyze only the last 300 commits ) — Calculate various statistics about the write latency SELECT avg(elapsed_time_ms) AS average_write_latency, percentile_approx(elapsed_time_ms, 0.9) AS p90_write_latency, percentile_approx(elapsed_time_ms, 0.95) AS p95_write_latency, percentile_approx(elapsed_time_ms, 0.99) AS p99_write_latency, max(elapsed_time_ms) AS maximum_write_latency FROM VW_BOUND_ANALYSIS_TO_N_COMMITS “”” # Execute the SQL query using Spark’s SQL module display(spark.sql(sql_code)) Data Ingestion This code sets up a streaming data pipeline using Apache Spark to efficiently ingest data from a Kafka topic. It defines a schema tailored to the expected data types and columns in the Kafka messages, including vehicle details, geographic coordinates, and text fields.The read_kafka_stream function initializes the streaming process, configuring secure and reliable connections to Kafka, subscribing to the specified topic, and handling data across multiple partitions for improved processing speed. The stream decodes JSON-formatted messages according to the defined schema and extracts essential metadata. from pyspark.sql.types import StructType, StringType, FloatTypefrom pyspark.sql.functions import *# Define the schema based on the DataFrame structure you are writing to Kafkaschema = StructType() \ .add(“event_id”, StringType()) \ .add(“vehicle_year_make_model”, StringType()) \ .add(“vehicle_year_make_model_cat”, StringType()) \ .add(“vehicle_make_model”, StringType()) \ .add(“vehicle_make”, StringType()) \ .add(“vehicle_model”, StringType()) \ .add(“vehicle_year”, StringType()) \ .add(“vehicle_category”, StringType()) \ .add(“vehicle_object”, StringType()) \ .add(“latitude”, StringType()) \ .add(“longitude”, StringType()) \ .add(“location_on_land”, StringType()) \ .add(“local_latlng”, StringType()) \ .add(“zipcode”, StringType()) \ .add(“large_text_col_1”, StringType()) \ .add(“large_text_col_2”, StringType()) \ .add(“large_text_col_3”, StringType()) \ .add(“large_text_col_4”, StringType()) \ .add(“large_text_col_5”, StringType()) \ .add(“large_text_col_6”, StringType()) \ .add(“large_text_col_7”, StringType()) \ .add(“large_text_col_8”, StringType()) \ .add(“large_text_col_9”, StringType())def read_kafka_stream(): kafka_stream = (spark.readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_tls ) .option(“subscribe”, topic ) .option(“failOnDataLoss”,”false”) .option(“kafka.security.protocol”, “SASL_SSL”) .option(“kafka.sasl.mechanism”, “PLAIN”) .option(“kafka.sasl.jaas.config”, f’kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=”{kafka_api_key}” password=”{kafka_api_secret}”;’) .option(“minPartitions”,12) .load() .select(from_json(col(“value”).cast(“string”), schema).alias(“data”), “topic”, “partition”, “offset”, “timestamp”, “timestampType” ) .select(“topic”, “partition”, “offset”, “timestamp”, “timestampType”, “data.*”) ) return kafka_stream Explanation: This setup optimizes data ingestion from Kafka into Spark and prepares the data for further processing or integration into storage systems like Delta Lake. Additional code for Databricks Jobs Configuration: This method involves setting up a Databricks job and cluster resources, although it allows for flexible scheduling and monitoring of ingestion processes it but requires understanding of choosing the right compute. ( read_kafka_stream() .writeStream .option(“checkpointLocation”,checkpoint_location_for_delta) .trigger(processingTime=’1 second’) .toTable(target)) Additional code for Delta Live Tables Configuration: Delta Live Tables manage infrastructure automatically, providing a simpler, declarative approach to building data pipelines. This code snippet uses the Delta Live Tables (DLT) API to define a data table that ingests streaming data from Kafka. The @dlt.table decorator specifies the table’s name (to be replaced with your desired table name) and sets the pipeline to poll Kafka every second. This rapid polling supports near-real-time data processing needs. The function dlt_kafka_stream() calls read_kafka_stream(), integrating Kafka streaming directly into DLT for streamlined management and operation within the Databricks environmen @dlt.table(name=”REPLACE_DLT_TABLE_NAME_HERE”, spark_conf={“pipelines.trigger.interval” : “1 seconds”})def dlt_kafka_stream(): read_kafka_stream() Conclusion Our benchmarks show that Delta Live Tables Serverless stands out in latency performance and operational simplicity, making it highly suitable for scenarios with varying data loads. Meanwhile, Databricks Jobs and Delta Live Tables also offer viable solutions. Why Delta Live Tables Serverless Outperforms Standard Delta Live Tables A key factor contributing to the superior performance of Delta Live Tables Serverless over

Blog, Delta, forEachBatch, Spark, Stream

Using Spark Streaming to merge/upsert data into a Delta Lake with working code

This blog will discuss how to read from a Spark Streaming and merge/upsert data into a Delta Lake. We will also optimize/cluster data of the delta table. In the end, we will show how to start a streaming pipeline with the previous target table as the source. Overall, the process works in the following manner, we read data from a streaming source and use this special function foreachBatch. Using this we will call any user-defined function responsible for all the processing. This function encapsulates the Merge and Optimize to the target Delta table. First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc. for simplicity. Let’s generate a stream using the below. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 10,000 rows per second across 100 partitions. Generate streaming data at your desired rate generated_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 10 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” )) #display(generated_df) Parameters / Variables (Feel free to change as per your needs) target_table_name = “to_be_merged_into_table”check_point_location = f”/tmp/delta/{target_table_name}/_checkpoints/”join_column_name =”hash” Create an Empty Delta table so data could be merged into it spark.sql(f””” DROP TABLE IF EXISTS {target_table_name};”””)( generated_df.writeStream .format(“delta”) .outputMode(“append”).trigger(once=True) .option(“checkpointLocation”, check_point_location) .toTable(target_table_name)) Check if data is populated display(spark.read.table(target_table_name)) A user-defined function which does the data processing, Merge & Optimize def make_changes_using_the_micro_batch(microBatchOutputDF, batchId: int): print(f”Processing batchId: {batchId}”) microBatchOutputDF.createOrReplaceTempView(“updates”) spark_session_for_this_micro_batch = microBatchOutputDF._jdf.sparkSession() spark_session_for_this_micro_batch.sql(f””” SELECT * FROM ( select * ,rank() over(partition by {join_column_name} order by value desc) as dedupe from updates ) WHERE dedupe =1 “””).drop(“dedupe”).createOrReplaceTempView(“updates_which_need_to_be_merged”) spark_session_for_this_micro_batch.sql(f””” MERGE INTO {target_table_name} target using updates_which_need_to_be_merged u on u.{join_column_name} = target.{join_column_name} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) optimize_every_n_batches = 20 #Define how often should optimize run? for example: at 50, it means that we will run the optimize command every 50 batches of stream data if batchId % optimize_every_n_batches == 0: optimize_and_zorder_table(table_name = target_table_name, zorder_by_col_name = join_column_name) Optimize/ Z-order a Delta table Why do we need to optimize a table? If we keep adding files to our Delta table and never optimize/sort them then over time we need to read a lot of files during merge time. Thus, optimizing the Delta table after every N merges is better. N needs to be decided on your latency requirements. You could start with N as 10 and change it as per your needs. The below code will run an optimize and zorder command on a given table that is being fed by a stream. Optimize commands can’t run in a silo because it will require us to pause and then resume the stream. Therefore, we need to call this function a part of the upsert function. This enables us to optimize before the next batch of streaming data comes through. from timeit import default_timer as timer def optimize_and_zorder_table(table_name: str, zorder_by_col_name: str) -> None: “”” Parameters: table_name: str name of the table to be optimized zorder_by_col_name: str comma separated list of columns to zorder by. example “col_a, col_b, col_c” “”” start = timer() print(f”Met condition to optimize table {table_name}”) sql_query_optimize = f”OPTIMIZE {table_name} ZORDER BY ({zorder_by_col_name})” spark.sql(sql_query_optimize) end = timer() time_elapsed_seconds = end – start print( f”Successfully optimized table {table_name} . Total time elapsed: {time_elapsed_seconds} seconds” ) Orchestrate from readStream -> Merge -> Optimize ( generated_df .writeStream.format(‘delta’) .trigger(processingTime=’30 seconds’) .option(“checkpointLocation”, check_point_location) .foreachBatch(make_changes_using_the_micro_batch) .start()) If you have reached so far, you should have an end-to-end pipeline working with streaming data and merging data into a Delta table. As the next step, let’s use the previous target table as our new streaming source. Use the target table as a source for the next streaming pipeline Change data feed allows Databricks to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records change events for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated. Reference: https://docs.databricks.com/delta/delta-change-data-feed.html#use-delta-lake-change-data-feed-on-databricks spark.sql(f”’ALTER TABLE {target_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed=true)”’) Reading change data as a stream display( spark.readStream.format(“delta”) .option(“readChangeFeed”, “true”) .table(target_table_name)) Download this notebook Spark Streaming Using For Each Batch & Merge.html Edit description drive.google.com Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference https://www.youtube.com/watch?v=CLDcdVDupMg

Blog, Delta

Understanding Delta Lake: A Technical Deep Dive

Delta Lake is a powerful open-source storage layer that brings ACID transactions, scalable metadata handling, and unified batch and streaming data processing to big data workloads. It’s designed to improve data reliability and enable complex data processing workflows. This technical blog will blend the key features of Delta Lake with resources for a deeper understanding of how these features are achieved. The resources in this guide, from essential whitepapers to insightful video tutorials, were key to my mastery of Delta Lake, offering a deep dive into its architecture and practical applications, and equipping me with the knowledge to effectively utilize its features in real-world data scenarios. Key Features of Delta Lake ACID Transactions Delta Lake provides serializable isolation levels, ensuring that readers always see consistent data, even in the presence of concurrent writes. This is achieved through a transaction log that records details about every change made to the data Scalable Metadata Handling With the help of Spark’s distributed processing power, Delta Lake can handle metadata for petabyte-scale tables, which may include billions of files and partitions. This scalability is crucial for managing large datasets efficiently Unified Batch and Streaming Data Processing Delta Lake tables serve as both batch tables and streaming sources/sinks, offering exactly-once semantics for data ingestion, backfill, and interactive queries. This unification simplifies the data pipeline and reduces the complexity of data processing Schema Evolution and Enforcement Delta Lake prevents the insertion of bad records during ingestion by enforcing schemas automatically. It also supports schema evolution, allowing for the addition of new columns to data tables without disrupting existing operations Time Travel (Data Versioning) Data versioning in Delta Lake enables rollbacks, full historical audit trails, and reproducible machine learning experiments. Users can access and revert to earlier versions of data for various purposes DML Operations Delta Lake supports merge, update, and delete operations, which are essential for use cases like change-data-capture (CDC) and slowly-changing-dimension (SCD) operations Deep Dive Resources To understand how Delta Lake achieves these features, the following resources provide in-depth technical knowledge: Lakehouse Storage Systems Whitepaper For a comprehensive technical understanding of Delta Lake’s internals, the Lakehouse Storage Systems Whitepaper is invaluable. It explains the architecture and mechanisms that enable Delta Lake’s features, such as ACID transactions and scalable metadata handling. Read the whitepaper here. Educational Videos Quick Overviews Real-World Use Cases To see Delta Lake in action, refer to The Delta Lake Series Complete Collection. This guide helps you understand various use cases and how Delta Lake addresses complex data challenges. Access it here. Conclusion Delta Lake is a sophisticated tool that addresses many of the challenges associated with big data processing and storage. By leveraging the resources provided, you can gain a deeper technical understanding of how Delta Lake ensures data reliability, consistency, and scalability. Whether you’re a data engineer, architect, or analyst, these insights will help you to effectively implement and utilize Delta Lake in your data solutions. Thank You for Reading! I hope you found this article helpful and informative. If you enjoyed this post, please consider giving it a clap 👏 and sharing it with your network. Your support is greatly appreciated! — CanadianDataGuy

Blog, Delta

SOLVING DELTA TABLE CONCURRENCY ISSUES

Delta Lake is a powerful technology for bringing ACID transactions to your data lakes. It allows multiple operations to be performed on a dataset concurrently. However, dealing with concurrent operations can sometimes be tricky and may lead to issues such as ConcurrentAppendException, ConcurrentDeleteReadException, and ConcurrentDeleteDeleteException. In this blog post, we will explore why these issues occur and how to handle them effectively using a Python function, and how to avoid them with table design and using isolation levels and write conflicts. Why Do These Issues Happen? Understanding Isolation Levels: Serializable vs. WriteSerializable Isolation levels in a database control how much transactions are protected from each other’s changes. Delta Lake on Databricks offers two such levels: Serializable and WriteSerializable. 1. Serializable: — This is the highest level of isolation. — It ensures that all write and read operations are done in a specific order, just like how they appear in the table’s history. — This means operations are carried out one by one, maintaining the order and ensuring the final result is as expected. 2. WriteSerializable (Default): — This level is a bit more relaxed compared to Serializable. — It guarantees order only for write operations, not for reads. — Even though it’s more relaxed, it’s still more strict than the Snapshot isolation level. — This level is used by default as it offers a good mix of data consistency and availability for most operations. Solution 1: Setting the Isolation Level: Solution 2: Avoiding Conflicts Using Partitioning and Disjoint Command Conditions When working with tables, sometimes two operations can clash or conflict, especially if they are working on the same set of files. This can cause problems and errors. But, there’s a way to avoid this! You can organize or partition your table based on certain columns that are often used in operations. This way, different operations work on different parts of the table, preventing them from clashing. For example, imagine two commands — one is updating the table for dates after January 1, 2010, and another is deleting from the table for dates before January 1, 2010. These two can clash if the table is not organized by date, as both might try to change the same files. But if you partition the table by date, these operations won’t conflict, making things smooth and error-free. However, be careful while choosing the column for partitioning. If you choose a column that has a lot of unique values, it can create a large number of subdirectories. This can lead to other issues, affecting the performance of operations on the table. By using these strategies and understanding the insights from Databricks regarding isolation levels, row-level concurrency, and write conflicts, you can make your Delta operations more robust, reliable, and efficient. Solution 3: Code block with exponential retry The Python code below offers a robust solution to address this challenge. It is designed to manage concurrent write operations to a Delta table or path by intelligently retrying the operation in the event of specific concurrent exceptions. Streaming_write_with_concurrent_retry takes parameters such as the data stream, maximum attempts, and others to provide flexibility and control. It employs a while loop to attempt the write operation continuously and waits for its completion. In case of concurrent exceptions, it increments the attempt counter and calculates the sleep time using an exponential backoff strategy before retrying the operation. This approach ensures that the write operation is eventually successful, providing reliability and efficiency in handling concurrent operations on Delta tables. Explore the code below to understand its workings and integrate it into your projects to enhance concurrent operations handling. from datetime import datetime from time import sleep from delta.exceptions import ( ConcurrentAppendException, ConcurrentDeleteReadException, ConcurrentDeleteDeleteException, ) import math def streaming_write_with_concurrent_retry( stream, max_attempts=3, indefinite=False, table=None, path=None ): “”” Handles concurrent write operations to a Delta table or path by retrying the operation in case of specific concurrent exceptions. :param stream: The data stream to be written. :param max_attempts: The maximum number of retry attempts. Default is 3. :param indefinite: If True, will keep retrying indefinitely. Default is False. :param table: The Delta table to write to. :param path: The path to write to. :return: The result of writer.awaitTermination(). “”” attempt = 0 # Initialize attempt counter while True: try: # Choose the writer based on whether table or path is provided if table: writer = stream.table(table) elif path: writer = stream.start(path) else: writer = stream.start() # Attempt to write and wait for termination return writer.awaitTermination() # Handle concurrent exceptions except ( ConcurrentAppendException, ConcurrentDeleteReadException, ConcurrentDeleteDeleteException, ) as e: # Increment attempt counter attempt += 1 # If indefinite is False and attempts have reached max_attempts, raise the exception if not indefinite and attempt >= max_attempts: raise e from None # Calculate sleep time using exponential backoff strategy sleep_time = min(120, math.pow(2, attempt)) # Sleep for the calculated time before retrying sleep(sleep_time) Solution 4: Row-Level Concurrency (Advanced Feature)? Available only on Delta tables with deletion vectors enabled and on Photon-enabled compute running Databricks Runtime 14.0 and above. Reference Isolation levels and write conflicts on Databricks *Learn about the isolation levels and potential conflicts when performing concurrent transactions on tables on…*docs.databricks.com Thank You for Reading! I hope you found this article helpful and informative. If you enjoyed this post, please consider giving it a clap 👏 and sharing it with your network. Your support is greatly appreciated! — **CanadianDataGuy**

Blog, Delta

Delta vs. Parquet: A Deep Dive into Big Data Storage Solutions

Unlocking the intricacies of big data storage solutions is pivotal in today’s data-driven landscape. As organizations grapple with vast amounts of data, choosing between storage formats like Delta and Parquet becomes crucial. Diving deep into their technical nuances, this article highlights why Delta is emerging as the preferred choice for many. From ACID transactions to schema evolution, discover the game-changing features that set Delta apart in the competitive world of data storage. 1. Introduction to Delta and Parquet Parquet: An open-source columnar storage format developed under the Apache Software Foundation. It is designed to be compatible with a wide variety of data processing tools in the Hadoop ecosystem. Delta: Delta Lake is more than just a file format; it’s a storage layer that brings ACID transactions to big data workloads on top of Spark. 2. Technical Differences a. ACID Transactions: b. Schema Evolution: c. Time Travel: d. Storage Efficiency: e. Merge, Update, and Delete: 3. Performance Insights 4. Compatibility and Ecosystem 5. The Verdict While Parquet is a robust columnar storage format that has served the big data community well, Delta brings in features that cater to the evolving needs of data engineering and data science teams. Its emphasis on transactional integrity, combined with advanced optimizations, positions Delta as a formidable player in the big data storage arena.

Databricks, Delta, Spark, Stream, Stream-Stream

How to write your first Spark application with Stream-Stream Joins with working code.

Have you been waiting to try Streaming but cannot take the plunge? In a single blog, we will teach you whatever needs to be understood about Streaming Joins. We will give you a working code which you can use for your next Streaming Pipeline. The steps involved: Create a fake dataset at scaleSet a baseline using traditional SQLDefine Temporary Streaming ViewsInner Joins with optional WatermarkingLeft Joins with WatermarkingThe cold start edge case: withEventTimeOrderCleanup What is Stream-Stream Join? Stream-stream join is a widely used operation in stream processing where two or more data streams are joined based on some common attributes or keys. It is essential in several use cases, such as real-time analytics, fraud detection, and IoT data processing. Concept of Stream-Stream Join Stream-stream join combines two or more streams based on a common attribute or key. The join operation is performed on an ongoing basis, with each new data item from the stream triggering a join operation. In stream-stream join, each data item in the stream is treated as an event, and it is matched with the corresponding event from the other stream based on matching criteria. This matching criterion could be a common attribute or key in both streams. When it comes to joining data streams, there are a few key challenges that must be addressed to ensure successful results. One of the biggest hurdles is the fact that, at any given moment, neither stream has a complete view of the dataset. This can make it difficult to find matches between inputs and generate accurate join results. To overcome this challenge, it’s important to buffer past input as a streaming state for both input streams. This allows for every future input to be matched with past input, which can help to generate more accurate join results. Additionally, this buffering process can help to automatically handle late or out-of-order data, which can be common in streaming environments. To further optimize the join process, it’s also important to use watermarks to limit the state. This can help to ensure that only the most relevant data is being used to generate join results, which can help to improve accuracy and reduce processing times. Types of Stream-Stream Join Depending on the nature of the join and the matching criteria, there are several types of stream-stream join operations. Some of the popular types of stream-stream join are: Inner Join In inner join, only those events are returned where there is a match in both the input streams. This type of join is useful when combining the data from two streams with a common key or attribute. Outer Join In outer join, all events from both the input streams are included in the joined stream, whether or not there is a match between them. This type of join is useful when we need to combine data from two streams, and there may be missing or incomplete data in either stream. Left Join In left join, all events from the left input stream are included in the joined stream, and only the matching events from the right input stream are included. This type of join is useful when we need to combine data from two streams and keep all the data from the left stream, even if there is no matching data in the right stream. 1. The Setup: Create a fake dataset at scale Most people do not have 2 streams just hanging around for one to experiment with Stream Steam Joins. Thus I used Faker to mock 2 different streams which we will use for this example. The name of the library being used is Faker and faker_vehicle to create Datasets. !pip install faker_vehicle !pip install faker Imports from faker import Faker from faker_vehicle import VehicleProvider from pyspark.sql import functions as F import uuid from utils import logger Parameters # define schema name and where should the table be stored schema_name = “test_streaming_joins” schema_storage_location = “/tmp/CHOOSE_A_PERMANENT_LOCATION/” Create the Target Schema/Database Create a Schema and set location. This way, all tables would inherit the base location. create_schema_sql = f””” CREATE SCHEMA IF NOT EXISTS {schema_name} COMMENT ‘This is {schema_name} schema’ LOCATION ‘{schema_storage_location}’ WITH DBPROPERTIES ( Owner=’Jitesh’); “”” print(f”create_schema_sql: {create_schema_sql}”) spark.sql(create_schema_sql) Use Faker to define functions to help generate fake column values fake = Faker() fake.add_provider(VehicleProvider) event_id = F.udf(lambda: str(uuid.uuid4())) vehicle_year_make_model = F.udf(fake.vehicle_year_make_model) vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat) vehicle_make_model = F.udf(fake.vehicle_make_model) vehicle_make = F.udf(fake.vehicle_make) vehicle_model = F.udf(fake.vehicle_model) vehicle_year = F.udf(fake.vehicle_year) vehicle_category = F.udf(fake.vehicle_category) vehicle_object = F.udf(fake.vehicle_object) latitude = F.udf(fake.latitude) longitude = F.udf(fake.longitude) location_on_land = F.udf(fake.location_on_land) local_latlng = F.udf(fake.local_latlng) zipcode = F.udf(fake.zipcode) Generate Streaming source data at your desired rate def generated_vehicle_and_geo_df (rowsPerSecond:int , numPartitions :int ): return ( spark.readStream.format(“rate”) .option(“numPartitions”, numPartitions) .option(“rowsPerSecond”, rowsPerSecond) .load() .withColumn(“event_id”, event_id()) .withColumn(“vehicle_year_make_model”, vehicle_year_make_model()) .withColumn(“vehicle_year_make_model_cat”, vehicle_year_make_model_cat()) .withColumn(“vehicle_make_model”, vehicle_make_model()) .withColumn(“vehicle_make”, vehicle_make()) .withColumn(“vehicle_year”, vehicle_year()) .withColumn(“vehicle_category”, vehicle_category()) .withColumn(“vehicle_object”, vehicle_object()) .withColumn(“latitude”, latitude()) .withColumn(“longitude”, longitude()) .withColumn(“location_on_land”, location_on_land()) .withColumn(“local_latlng”, local_latlng()) .withColumn(“zipcode”, zipcode()) ) # You can uncomment the below display command to check if the code in this cell works #display(generated_vehicle_and_geo_df) # You can uncomment the below display command to check if the code in this cell works #display(generated_vehicle_and_geo_df) Now let’s generate the base source table and let’s call it Vehicle_Geo table_name_vehicle_geo= “vehicle_geo” def stream_write_to_vehicle_geo_table(rowsPerSecond: int = 1000, numPartitions: int = 10): ( generated_vehicle_and_geo_df(rowsPerSecond, numPartitions) .writeStream .queryName(f”write_to_delta_table: {table_name_vehicle_geo}”) .option(“checkpointLocation”, f”{schema_storage_location}/{table_name_vehicle_geo}/_checkpoint”) .format(“delta”) .toTable(f”{schema_name}.{table_name_vehicle_geo}”) ) stream_write_to_vehicle_geo_table(rowsPerSecond = 1000, numPartitions = 10) Let the above code run for a few iterations, and you can play with rowsPerSecond and numPartitions to control how much data you would like to generate. Once you have generated enough data, kill the above stream and get a base line for row count. spark.read.table(f”{schema_name}.{table_name_vehicle_geo}”).count() display( spark.sql(f””” SELECT * FROM {schema_name}.{table_name_vehicle_geo} “””) ) Let’s also get a min & max of the timestamp column as we would be leveraging it for watermarking. display( spark.sql(f””” SELECT min(timestamp) ,max(timestamp) ,current_timestamp() FROM {schema_name}.{table_name_vehicle_geo} “””) ) Next, we will break this Delta table into 2 different tables Because for Stream-Stream Joins we need 2 different streams. We will use Delta To Delta Streaming here to create these tables. table_name_vehicle = “vehicle” vehicle_df = ( spark.readStream.format(“delta”) .option(“maxFilesPerTrigger”,

Blog, Delta, Spark

What is inside a Spark Streaming Checkpoint

Spark is a distributed computing framework that allows for processing large datasets in parallel across a cluster of computers. When running a Spark job, it is not uncommon to encounter failures due to various issues such as network or hardware failures, software bugs, or even insufficient memory. One way to address these issues is to re-run the entire job from the beginning, which can be time-consuming and inefficient. To mitigate this problem, Spark provides a mechanism called check-pointing. Why do we even need a checkpoint? Someone needs to remember what was done before or what was processed before, or what we know so far. All this information needs to be stored somewhere. The place where this is stored is called a Checkpoint. How does checkpoint work? Think of it as a 3 step process: Checkpoints store the current offsets and state values (e.g. aggregate values) for your stream. Checkpoints are stream specific, so each should be set to its own location. This is an advanced blog and should be read with the expectation of familiarizing and not understanding. Read this and bookmark it; once you come across a situation where you need to dig into the checkpoint, this blog will come in handy. What is inside a checkpoint folder? It will have 3 folders inside it and a metadata file: What is inside the Offsets file? The easiest way to think about it is that once we start processing a micro-batch of data. We need to store an upper bound mark and a lower bound mark of the data. This mark could be called an offset. Think if you a measuring something with a scale and you need to log the reading. This reading, aka the offset, we will store in the offsets file. Different sources like Kafka, Kinesis, Delta, etc., all have different ways of defining offsets, but conceptually they are the same. For this blog, let’s concentrate on Delta as a streaming source. This stores the stream-id, which is generated when the stream starts and remains the same throughout the life of the checkpoint. Commits These files are generated only when the micro-batch succeeds. Offsets are generated at the start of the micro-batch. If the offset did not have a corresponding commit, a failure happened when processing that offset. In an ideal scenario, the number of commit files equals the number of offset files. However, when they are not equal, the next Spark Streaming knows where to start because it’s stored in the offset file, which did not have a corresponding commit. Furthermore, watermarking information would be found here. State Store This folder only has data in the case of Stateful Streaming, where the State is stored on disk for resiliency purposes. Thus when failures happen, the state can be recovered from here. References Please spare some time to look at the below to help absorb the above content further. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Databricks, Delta, Delta Live Tables

Delta Live Tables Advanced Q & A

This is primarily written for those trying to handle edge cases. Q1.) How can a single/unified table be built with historical backfill and ongoing streaming Kafka data? The streaming table built using DLT allows writes to the table outside of the DLT. Thus, you can build and run your DLT pipeline with Kafka as a source, generating the physical table with a name. Then, you can do a streaming write to this table outside DLT. What is the gotcha here? The data has lost its natural ordering which is fine in most cases, meaning it did not go into the Delta table in the same order it was generated. This is in contrast to an ideal world in which Kafka had infinite retention, and a single DLT pipeline would have ingested the data. If and only if you are using the table as a Streaming source with Watermarking downstream then while reading this data, we will have to instruct Spark Streaming to sort the data while reading it. We can do this by using the following parameter ‘withEventTimeOrder’. spark.readStream.format(“delta”) .option(“maxFilesPerTrigger”, f”{maxFilesPerTrigger}”) .option(“withEventTimeOrder”, “true”) .table(f”{schema_name}.{table_name}”) You can further read about this solution here https://canadiandataguy.medium.com/how-to-write-your-first-spark-application-with-stream-stream-joins-with-working-code-dd9b0b39f814#d828 To reiterate, the gotcha only applies if you use this table as a Streaming Source along with Watermarking. Q2.) How do I handle deletes in a Streaming Table? Let’s take GDPR as an example of where we need to enforce retention on the Delta table. One can run a regular DELETE command on the table and then in the DLT pipeline make changes to downstream consumers. “By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits flag can be set on the target streaming table to ignore those changes. For more information about this flag, see Ignore updates and deletes.” @tabledef b(): return spark.readStream.option(“skipChangeCommits”, “true”).table(“LIVE.A”) Q3.) How to enable mergeSchema on DLT table? This is already handled in DLT. If you want to control otherwise explicitly, you can pass the following spark conf property at the DLT pipeline or table level. spark.databricks.delta.schema.autoMerge.enabled True If you are using Autoloader, consider playing with different schema evolution modes while reading data. .option(“cloudFiles.schemaEvolutionMode”, “addNewColumns”) Q4.) How to change the location where the table is stored? @dlt.table( name=”<name>”, comment=”<comment>”, spark_conf={“<key>” : “<value”, “<key” : “<value>”}, table_properties={“<key>” : “<value>”, “<key>” : “<value>”}, path=”<storage-location-path>”, partition_cols=[“<partition-column>”, “<partition-column>”], schema=”schema-definition”, temporary=False) 3. In your DLT pipeline configuration, set this property pipelines.tableManagedByMultiplePipelinesCheck.enabledto false 4. Now, we need to make sure that we do not read any duplicate data because we cannot reuse our old checkpoint. We will solve this by using filters or providing a starting configuration for the streaming source. E.g., if your streaming source is: 4. a) Kafka: Then we will provide offset information. More information can be found here. 4. b) Delta: For example, suppose you have a table user_events. If you want to read changes since version 5, use: spark.readStream.format(“delta”) .option(“startingVersion”, “5”) .load(“/tmp/delta/user_events”) If you want to read changes since 2023–03–03, use: spark.readStream.format(“delta”) .option(“startingTimestamp”, “2018-10-18”) .load(“/tmp/delta/user_events”) More details can be found here. 5. To do step 4, you should parameterize your DLT pipeline, which can be done by following these instructions. Q5.) Does DLT support Identity Columns? Yes, more details here. However, Identity columns are not supported with APPLY CHANGES tables. Q6.) How to stream out of a table which was loaded using apply_changes? This is generally not recommended. The target of the APPLY CHANGES INTO query or apply_changes the function cannot be used as a source for a streaming live table. A table that reads from the target of a APPLY CHANGES INTO query or apply_changes function must be a live table. You can rely on enabling SCD and then use audit columns (__START_AT &__END_AT)to identify the changes. However, the downstream would still have to do a batch read and filter on these audit columns to limit the information being read. If you are adventurous and still want to do a read stream of this source. You need to enableChangeDataFeed on the delta table ‘fact_sales’. @dlt.table(name=”fact_sales”, comment=”This is a fact tables for sales”, partition_cols = [“order_date”], table_properties={ “pipelines.autoOptimize.zOrderCols”: “StoreId,ItemId”, “delta.enableChangeDataFeed”: “true”, }) Then you can decide to stream changes out of the __apply_changes_{table_name} . Make sure to handle tombstones/deletes as part of your downstream pipeline. Q7.) How to delete Data using DLT? Use the Change Data Capture functionality of DLT. The particular expression which will help you achieve this is called apply_as_deletes. You can change the parameter to match your custom criteria. For example, if you had bad records originating in a specific time interval or file name, you can change the expression to meet your custom criteria. import dltfrom pyspark.sql.functions import col, expr@dlt.viewdef users(): return spark.readStream.format(“delta”).table(“cdc_data.users”)dlt.create_streaming_live_table(“target”)dlt.apply_changes( target = “target”, source = “users”, keys = [“userId”], sequence_by = col(“sequenceNum”), apply_as_deletes = expr(“operation = ‘DELETE’ or {any other custom logic} “), except_column_list = [“operation”, “sequenceNum”], stored_as_scd_type = “2”) Q8.) How to avoid accidental overwrites in DLT? Set this property so that tables cannot be overwritten. pipelines.reset.allowed false Q9.) DLT Pipeline was deleted, but the Delta table exists. What to do now? What if the owner has left the org and I need a new DLT pipeline to take care of the table Step 1.) Verify via CLI if the pipeline has been deleted databricks –profile <your_env> pipelines listdatabricks –profile <your_env> pipelines get –pipeline-id <deleted_pipeline_id> Step 2.) Change the owner of the table ALTER TABLE <db>.<table> SET TBLPROPERTIES(pipelines.pipelineId = ‘<NEW_PIPELINE_ID>’); Note: In case you do not have a pipeline ID yet, you can use the below parameter once; run your pipeline to get the pipeline ID and then remove the below parameter. pipelines.tableManagedByMultiplePipelinesCheck.enabledto false Q10.) How does sequence_by work in apply_changes() ? There are two types of data management strategies with apply_changes: Type 1 involves keeping only the latest state of a record. This means that if an older record arrives out-of-order and we already have a newer record in the target, the older record will not update the target because it is not the latest state. Type 2 involves keeping a history of all records. This means

Blog, Delta, Spark, Stream

How to upgrade your Spark Stream application with a new checkpoint With working code

Sometimes in life, we need to make breaking changes which require us to create a new checkpoint. Some example scenarios: There could be plenty of scenarios where you want to control precisely which data(Kafka offsets) need to be processed. Not every scenario requires a new checkpoint. Here is a list of things you can change without requiring a new checkpoint. This blog helps you understand how to handle a scenario where a new checkpoint is unavoidable. Kafka Basics: Topics, partition & offset Kafka Cluster has Topics: Topics are a way to organize messages. Each topic has a name that is unique across the entire Kafka cluster. Messages are sent to and read from specific topics. In other words, producers write data on a topic, and consumers read data from the topic. Topics have Partitions, and data/messages are distributed across partitions. Every message belongs to a single partition. Partition has messages, each with a unique sequential identifier within the partition called the Offset. What is the takeaway here? We must identify what offset has already been processed for each partition, and this information can be found inside the checkpoint. What information is inside the checkpoint? Under the checkpoint folder, there are four subfolders: How to fetch information about Offset & Partition from the Checkpoint folder? List the files at the checkpoint location; we are looking for the offsets folder. checkpoint_location= “/checkpoint_location/checkpoint_for_kafka_to_delta”dbutils.fs.ls(checkpoint_location)dbutils.fs.ls(f”{checkpoint_location}/”) Next, we will list the files under the commits folder and identify the most recent commits. dbutils.fs.ls(checkpoint_location)dbutils.fs.ls(f”{checkpoint_location}/commits”) /checkpoint_location/checkpoint_for_kafka_to_delta/commits/0/checkpoint_location/checkpoint_for_kafka_to_delta/commits/1/checkpoint_location/checkpoint_for_kafka_to_delta/commits/2 Once we identify the last commits file number; we will open the equivalent offsets file. In this example, we can see the latest commits is “2”. Now let’s view the contents of the offsets file. #%fs head {FILL_THE_EXACT_PATH_OF_THE_FILE_WHICH_NEEDS_TO_BE_VIEWED}%fs head /checkpoint_location/checkpoint_for_kafka_to_delta/offsets/2{“batchWatermarkMs”:0,”batchTimestampMs”:1674623173851,”conf”:{“spark.sql.streaming.stateStore.providerClass”:”org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider”,”spark.sql.streaming.join.stateFormatVersion”:”2″,”spark.sql.streaming.stateStore.compression.codec”:”lz4″,”spark.sql.streaming.stateStore.rocksdb.formatVersion”:”5″,”spark.sql.streaming.statefulOperator.useStrictDistribution”:”true”,”spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion”:”2″,”spark.sql.streaming.multipleWatermarkPolicy”:”min”,”spark.sql.streaming.aggregation.stateFormatVersion”:”2″,”spark.sql.shuffle.partitions”:”200″}}{“topic_name_from_kafka”:{“0”:400000, “1”:300000}} The information of interest is in the end. This has the topic name and offset per partition. {“topic_name_from_kafka”:{“0”:400000, “1”:300000}} Now the easy part: Use Spark to start reading Kafka from a particular Offset Spark Streaming starts read stream by default with the latest offset. However, it provides a parameter “startingOffsets” to select a custom starting point. startingOffsets = “””{“topic_name_from_kafka”:{“0”:400000, “1”:300000}}”””kafka_stream = (spark.readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_plaintext ) .option(“subscribe”, topic ) .option(“startingOffsets”, startingOffsets ) .load())display(kafka_stream) And we are Done!!. Recommend parameterizing your code so that “startingOffsets” can be passed as a parameter. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Scroll to Top