Databricks

Blog, Databricks, Delta, Delta Live Tables

Merging Multiple Data Streams with Delta Live Tables: Kafka, Kinesis, and Delta

Introduction In today’s data landscape, organizations often deal with multiple input datasets from various sources like Kafka, Kinesis, and Delta tables. These datasets may have different schemas but sometime to be combined into a single, unified table. Delta Live Tables (DLT) in Databricks simplifies this process by handling schema evolution and managing Slowly Changing Dimensions (SCD) efficiently. Why Delta Live Tables? Delta Live Tables offers several benefits: Use Case We aim to merge multiple data streams (human_name, human_email, and human_date_of_birth) into a single Delta table (DIM_SSN_SCD_TYP_2) using the common join key ssn. The input data can come from Kafka, Kinesis, or Delta, or a combination of these sources. We have 3 datasets with different schemas and combine attributes of all of them into a single table. The Solution: CHANGE FLOWs API Delta Live Tables is currently previewing a CHANGE FLOWs API, which allows multiple APPLY CHANGES streams to write to a streaming table within a single pipeline. This functionality enables: The apply_changes method uses the existing APPLY CHANGE API with an additional once property to specify whether the operation should run only once. apply_changes( once=True, # optional, only run this code once; ignore new files added to this location target=”<existing-target-table>”, source=”<data-source>”, keys=[“key1”, “key2”, “keyN”], # must match <existing-target-table> sequence_by=”<sequence-column>”, # must match <existing-target-table> ignore_null_updates=False, # must match <existing-target-table> apply_as_deletes=None, apply_as_truncates=None, column_list=None, except_column_list=None, stored_as_scd_type=<type>, # must match <existing-target-table> track_history_column_list=None, # must match <existing-target-table> track_history_except_column_list=None) Implementation Imports and Parameters Define the parameters for our catalog, database names, and the target table name. import dltfrom pyspark.sql.functions import *catalog_name = “soni”database_name = “2024_06_25″tables = [“human_name”, “human_email”, “human_date_of_birth”]target_name = “DIM_SSN_SCD_TYP_2″ Creating DLT Views Create DLT views for each source table dynamically. for table in tables: @dlt.view(name=f”dlt_view_{table}”) def create_view(table=table): “”” Creates a DLT view for the given table from the Delta table. Args: table (str): Name of the table to create the view for. Returns: DataFrame: Streaming DataFrame containing the data from the specified table. “”” table_path = f”{catalog_name}.`{database_name}`.{table}” return spark.readStream.format(“delta”).table(table_path) Creating the Streaming Target Table Create a streaming target table to merge the data streams. dlt.create_streaming_table( name=target_name, table_properties={ “pipelines.autoOptimize.zOrderCols”: “ssn”, },) Note: The pipelines.autoOptimize.zOrderCols property is set to ssn because all merges are happening on the ssn column, and clustering the table on this column optimizes the performance of these merge operations. Merging Data Streams into the Delta Table Merge each stream into the target table using apply_changes. dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_name_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_name”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”,)dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_email_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_email”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”,)dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_date_of_birth_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_date_of_birth”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”,) Output table with SCD Type 2 Explanation Usage Notes How to stream out of this target table With DBR 15.3, one can read the change feed outside of DLT and then stream changes for futher processing. display( spark.readStream.format(“delta”) .option(“readChangeFeed”, “true”).table(“soni.dlt_2024.dim_ssn_scd_typ_2”)) Conclusion Delta Live Tables provides a powerful and flexible way to merge multiple data streams into a single Delta table. By leveraging DLT’s capabilities, data engineers can ensure real-time data consistency and handle complex data integration tasks with ease. Start using Delta Live Tables today to streamline your data processing pipelines and ensure your data is always up-to-date. Go Build!!! Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Note: All opinions are my own.

Blog, Databricks, Delta, Delta Live Tables, kafka, Spark, Stream

Need for Speed: Benchmarking the Best Tools for Kafka to Delta Ingestion

Introduction Welcome back to the second installment of our series on data ingestion from Kafka to Delta tables on the Databricks platform. Building on our previous discussion about generating and streaming synthetic data to a Kafka topic. This blog post benchmarks three powerful options available on the Databricks platform for ingesting streaming data from Apache Kafka into Delta Lake: Databricks Jobs, Delta Live Tables (DLT), and Delta Live Tables Serverless (DLT Serverless). The primary objective is to evaluate and compare the end-to-end latency of these approaches when ingesting data from Kafka into Delta tables. Latency is a crucial metric, as it directly impacts the freshness and timeliness of data available for downstream analytics and decision-making processes. It’s important to note that all three tools leverage Apache Spark’s Structured Streaming under the hood. “Breaking the myth: Ingest from Kafka to Delta at scale in just 1.5 seconds with Delta Live Tables Serverless — up to 80% faster than traditional methods!” Benchmark Setup Criteria The key metric measured was latency — the duration from when a row is produced in Kafka to when it becomes available in Delta Lake. Latency was meticulously measured over an extended period to ensure precision and account for variability. Input Kafka Feed For our benchmarks, we utilized a Kafka feed churning out data at a rate of 100 rows per second, each approximately 1MB in size which is 100 MB/Second . Annually, this sums up to a staggering 3.15 petabytes, making it a rigorous testbed for evaluating the ingestion capabilities of our selected tools. I used Confluent Cloud to setup Kafka cluster with 6 partitions and it took less than 5 minutes and they gave me 300$ of credits for experimentation. Tools Compared How was latency measured? Latency is measured by calculating the time difference in milliseconds between the timestamps of consecutive streaming updates to a table. This is done by subtracting the timestamp of a previous update from the timestamp of the current update for each sequential commit, allowing an analysis of how long each update takes to process relative to the previous one. The analysis is currently limited to the last 300 commits, but this number can be adjusted as needed. from pyspark.sql import DataFramedef run_analysis_about_latency(table_name: str) -> DataFrame: # SQL command text formatted as a Python multiline string sql_code = f””” — Define a virtual view of the table’s history WITH VW_TABLE_HISTORY AS ( — Describe the historical changes of the table DESCRIBE HISTORY {table_name} ), — Define a view to calculate the timestamp of the previous write operation VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP AS ( SELECT — Calculate the timestamp of the last write operation before the current one lag(timestamp) OVER ( PARTITION BY 1 ORDER BY version ) AS previous_write_timestamp, timestamp, version FROM VW_TABLE_HISTORY WHERE operation = ‘STREAMING UPDATE’ ), — Define a view to analyze the time difference between consecutive commits VW_BOUND_ANALYSIS_TO_N_COMMITS AS ( SELECT — Calculate the time difference in milliseconds between the previous and current write timestamps TIMESTAMPDIFF( MILLISECOND, previous_write_timestamp, timestamp ) AS elapsed_time_ms FROM VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP ORDER BY version DESC LIMIT 300 — Analyze only the last 300 commits ) — Calculate various statistics about the write latency SELECT avg(elapsed_time_ms) AS average_write_latency, percentile_approx(elapsed_time_ms, 0.9) AS p90_write_latency, percentile_approx(elapsed_time_ms, 0.95) AS p95_write_latency, percentile_approx(elapsed_time_ms, 0.99) AS p99_write_latency, max(elapsed_time_ms) AS maximum_write_latency FROM VW_BOUND_ANALYSIS_TO_N_COMMITS “”” # Execute the SQL query using Spark’s SQL module display(spark.sql(sql_code)) Data Ingestion This code sets up a streaming data pipeline using Apache Spark to efficiently ingest data from a Kafka topic. It defines a schema tailored to the expected data types and columns in the Kafka messages, including vehicle details, geographic coordinates, and text fields.The read_kafka_stream function initializes the streaming process, configuring secure and reliable connections to Kafka, subscribing to the specified topic, and handling data across multiple partitions for improved processing speed. The stream decodes JSON-formatted messages according to the defined schema and extracts essential metadata. from pyspark.sql.types import StructType, StringType, FloatTypefrom pyspark.sql.functions import *# Define the schema based on the DataFrame structure you are writing to Kafkaschema = StructType() \ .add(“event_id”, StringType()) \ .add(“vehicle_year_make_model”, StringType()) \ .add(“vehicle_year_make_model_cat”, StringType()) \ .add(“vehicle_make_model”, StringType()) \ .add(“vehicle_make”, StringType()) \ .add(“vehicle_model”, StringType()) \ .add(“vehicle_year”, StringType()) \ .add(“vehicle_category”, StringType()) \ .add(“vehicle_object”, StringType()) \ .add(“latitude”, StringType()) \ .add(“longitude”, StringType()) \ .add(“location_on_land”, StringType()) \ .add(“local_latlng”, StringType()) \ .add(“zipcode”, StringType()) \ .add(“large_text_col_1”, StringType()) \ .add(“large_text_col_2”, StringType()) \ .add(“large_text_col_3”, StringType()) \ .add(“large_text_col_4”, StringType()) \ .add(“large_text_col_5”, StringType()) \ .add(“large_text_col_6”, StringType()) \ .add(“large_text_col_7”, StringType()) \ .add(“large_text_col_8”, StringType()) \ .add(“large_text_col_9”, StringType())def read_kafka_stream(): kafka_stream = (spark.readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_tls ) .option(“subscribe”, topic ) .option(“failOnDataLoss”,”false”) .option(“kafka.security.protocol”, “SASL_SSL”) .option(“kafka.sasl.mechanism”, “PLAIN”) .option(“kafka.sasl.jaas.config”, f’kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=”{kafka_api_key}” password=”{kafka_api_secret}”;’) .option(“minPartitions”,12) .load() .select(from_json(col(“value”).cast(“string”), schema).alias(“data”), “topic”, “partition”, “offset”, “timestamp”, “timestampType” ) .select(“topic”, “partition”, “offset”, “timestamp”, “timestampType”, “data.*”) ) return kafka_stream Explanation: This setup optimizes data ingestion from Kafka into Spark and prepares the data for further processing or integration into storage systems like Delta Lake. Additional code for Databricks Jobs Configuration: This method involves setting up a Databricks job and cluster resources, although it allows for flexible scheduling and monitoring of ingestion processes it but requires understanding of choosing the right compute. ( read_kafka_stream() .writeStream .option(“checkpointLocation”,checkpoint_location_for_delta) .trigger(processingTime=’1 second’) .toTable(target)) Additional code for Delta Live Tables Configuration: Delta Live Tables manage infrastructure automatically, providing a simpler, declarative approach to building data pipelines. This code snippet uses the Delta Live Tables (DLT) API to define a data table that ingests streaming data from Kafka. The @dlt.table decorator specifies the table’s name (to be replaced with your desired table name) and sets the pipeline to poll Kafka every second. This rapid polling supports near-real-time data processing needs. The function dlt_kafka_stream() calls read_kafka_stream(), integrating Kafka streaming directly into DLT for streamlined management and operation within the Databricks environmen @dlt.table(name=”REPLACE_DLT_TABLE_NAME_HERE”, spark_conf={“pipelines.trigger.interval” : “1 seconds”})def dlt_kafka_stream(): read_kafka_stream() Conclusion Our benchmarks show that Delta Live Tables Serverless stands out in latency performance and operational simplicity, making it highly suitable for scenarios with varying data loads. Meanwhile, Databricks Jobs and Delta Live Tables also offer viable solutions. Why Delta Live Tables Serverless Outperforms Standard Delta Live Tables A key factor contributing to the superior performance of Delta Live Tables Serverless over

Blog, Databricks, Spark, Stream

Synthetic Data Made Simple: Generating and Streaming Custom-Sized Data to Kafka

Introduction In the fast-paced world of data engineering, there often arises a need to generate large volumes of synthetic data for testing and benchmarking purposes. Recently, I was tasked with a crucial project: creating records of a specific size (1 MB each) and streaming them to Kafka for performance benchmarking. This blog post, the first in a two-part series, will walk you through how to generate such data using Python and Apache Spark, and then stream it to Kafka efficiently. Tomorrow, we’ll dive into Part 2, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. But first, let me share the story behind this endeavor. The Challenge: Preparing for Technology Decisions Imagine you’re part of a data engineering team at a rapidly growing tech startup. Your CTO has tasked you with benchmarking the expected speed of Kafka to Delta ingestion before making critical technology decisions. You quickly realize two things: The solution? Generating custom-sized fake data and using Confluent Cloud for a quick and hassle-free Kafka setup. Why Confluent Cloud? Setting up a Kafka cluster can be cumbersome, especially when dealing with security configurations and access permissions. AWS MSK is robust, but its setup can be daunting. Confluent Cloud, on the other hand, offers a quick setup process and provides $300 in free credits, making it perfect for quick experiments and testing. I had my Kafka instance up and running in just five minutes with Confluent Cloud. https://www.confluent.io/confluent-cloud/ Step-by-Step Guide Let’s dive into the code that helps you create synthetic data and push it to Kafka. Installing Necessary Packages First, install the required packages. Faker is a library that helps generate fake data, and faker_vehicle adds vehicle-specific data generation capabilities. # Databricks notebook source# MAGIC %pip install Faker faker_vehicle Importing Required Libraries Next, import the necessary libraries for data generation, streaming, and logging. from faker import Fakerfrom faker_vehicle import VehicleProviderfrom pyspark.sql import functions as Ffrom pyspark.sql.types import StringTypeimport uuidimport loggingfrom pyspark.sql.streaming import StreamingQueryfrom datetime import datetime Setting Parameters Define the parameters for Kafka configuration and checkpoint location. timestamp = datetime.now().strftime(“%Y%m%d%H%M%S”)checkpoint_location = f”/tmp/confluent_kafka_checkpoint_{timestamp}”# Kafka configurationtopic = “YOUR_TOPIC”kafka_bootstrap_servers_tls = “YOUR_KAFKA_URL.confluent.cloud:9092″kafka_api_key = “YOUR_KAFKA_API_KEY”kafka_api_secret = “YOUR_KAFKA_API_SECRET” Initialization and UDF Initialize Faker and add the vehicle provider. Configure logging for tracking the process. # Initialize Faker for data generation and add vehicle data providerfake = Faker()fake.add_provider(VehicleProvider)# Configure logginglogging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(name)s – %(levelname)s – %(message)s’)logger = logging.getLogger(__name__) Create User-Defined Functions (UDFs) for generating various fake data attributes. # User-defined functions (UDFs) for generating fake dataevent_id = F.udf(lambda: str(uuid.uuid4()), StringType())vehicle_year_make_model = F.udf(fake.vehicle_year_make_model)vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat)vehicle_make_model = F.udf(fake.vehicle_make_model)vehicle_make = F.udf(fake.vehicle_make)vehicle_model = F.udf(fake.vehicle_model)vehicle_year = F.udf(fake.vehicle_year)vehicle_category = F.udf(fake.vehicle_category)vehicle_object = F.udf(fake.vehicle_object)latitude = F.udf(fake.latitude)longitude = F.udf(fake.longitude)location_on_land = F.udf(fake.location_on_land)local_latlng = F.udf(fake.local_latlng)zipcode = F.udf(fake.zipcode) Function to Generate 1MB Row of Data Define a function to generate a DataFrame that simulates a row of data approximately 1 MB in size. @F.udf(StringType())def large_text_udf(size: int): “””Generate large text data with a specified size.””” return fake.text(max_nb_chars=size)# Configuration for large text datanum_large_columns = 10 # Number of large text columnssize_per_large_column = (1024 * 1024) // num_large_columns # Distribute 1MB across columns def generate_1mb_row_df(rowsPerSecond=10, numPartitions=2): “””Generate a DataFrame simulating streaming data, including vehicle and geographic data.””” logger.info(“Generating vehicle and geo data frame…”) df = spark.readStream.format(“rate”) \ .option(“numPartitions”, numPartitions) \ .option(“rowsPerSecond”, rowsPerSecond) \ .load() \ .withColumn(“event_id”, event_id()) \ .withColumn(“vehicle_year_make_model”, vehicle_year_make_model()) \ .withColumn(“vehicle_year_make_model_cat”, vehicle_year_make_model_cat()) \ .withColumn(“vehicle_make_model”, vehicle_make_model()) \ .withColumn(“vehicle_make”, vehicle_make()) \ .withColumn(“vehicle_model”, vehicle_model()) \ .withColumn(“vehicle_year”, vehicle_year()) \ .withColumn(“vehicle_category”, vehicle_category()) \ .withColumn(“vehicle_object”, vehicle_object()) \ .withColumn(“latitude”, latitude()) \ .withColumn(“longitude”, longitude()) \ .withColumn(“location_on_land”, location_on_land()) \ .withColumn(“local_latlng”, local_latlng()) \ .withColumn(“zipcode”, zipcode()) \ .withColumn(“large_text_col_1”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_2”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_3”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_4”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_5”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_6”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_7”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_8”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_9”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_10”, large_text_udf(F.lit(size_per_large_column))) return df You can test the above code by running the below command display(generate_1mb_row_df()) Streaming Data to Kafka Start streaming the generated data to Kafka. (generate_1mb_row_df(rowsPerSecond=100, numPartitions=12) .selectExpr(“CAST(event_id AS STRING) AS key”, “to_json(struct(*)) AS value”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_tls) .option(“kafka.security.protocol”, “SASL_SSL”) .option(“kafka.sasl.mechanism”, “PLAIN”) .option(“kafka.sasl.jaas.config”, f’kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=”{kafka_api_key}” password=”{kafka_api_secret}”;’) .option(“checkpointLocation”, checkpoint_location) .option(“topic”, topic) .option(“queryName”, f”SendDataToKafka-{topic}”) .option(“kafka.max.request.size”, “1100000”) # Setting new max request size to 1.1 MB .start()) The Confluent UI was able to verify that we are able to generate 100 MB/Second Conclusion This approach allows you to create custom-sized synthetic data and stream it to Kafka efficiently. By using Confluent Cloud, you can significantly reduce the setup time and complexity, enabling a more streamlined and efficient data generation and streaming process. Stay tuned for Part 2 of this series, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. Whether you’re testing, benchmarking, or exploring data streaming, this guide provides a solid foundation to get you started. Happy streaming! Download this notebook References For more details on Spark and Kafka integration, you can refer to the following documentation: These resources provide comprehensive information and examples to help you further understand and implement Spark and Kafka integration. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Databricks, Delta, Spark, Stream, Stream-Stream

How to write your first Spark application with Stream-Stream Joins with working code.

Have you been waiting to try Streaming but cannot take the plunge? In a single blog, we will teach you whatever needs to be understood about Streaming Joins. We will give you a working code which you can use for your next Streaming Pipeline. The steps involved: Create a fake dataset at scaleSet a baseline using traditional SQLDefine Temporary Streaming ViewsInner Joins with optional WatermarkingLeft Joins with WatermarkingThe cold start edge case: withEventTimeOrderCleanup What is Stream-Stream Join? Stream-stream join is a widely used operation in stream processing where two or more data streams are joined based on some common attributes or keys. It is essential in several use cases, such as real-time analytics, fraud detection, and IoT data processing. Concept of Stream-Stream Join Stream-stream join combines two or more streams based on a common attribute or key. The join operation is performed on an ongoing basis, with each new data item from the stream triggering a join operation. In stream-stream join, each data item in the stream is treated as an event, and it is matched with the corresponding event from the other stream based on matching criteria. This matching criterion could be a common attribute or key in both streams. When it comes to joining data streams, there are a few key challenges that must be addressed to ensure successful results. One of the biggest hurdles is the fact that, at any given moment, neither stream has a complete view of the dataset. This can make it difficult to find matches between inputs and generate accurate join results. To overcome this challenge, it’s important to buffer past input as a streaming state for both input streams. This allows for every future input to be matched with past input, which can help to generate more accurate join results. Additionally, this buffering process can help to automatically handle late or out-of-order data, which can be common in streaming environments. To further optimize the join process, it’s also important to use watermarks to limit the state. This can help to ensure that only the most relevant data is being used to generate join results, which can help to improve accuracy and reduce processing times. Types of Stream-Stream Join Depending on the nature of the join and the matching criteria, there are several types of stream-stream join operations. Some of the popular types of stream-stream join are: Inner Join In inner join, only those events are returned where there is a match in both the input streams. This type of join is useful when combining the data from two streams with a common key or attribute. Outer Join In outer join, all events from both the input streams are included in the joined stream, whether or not there is a match between them. This type of join is useful when we need to combine data from two streams, and there may be missing or incomplete data in either stream. Left Join In left join, all events from the left input stream are included in the joined stream, and only the matching events from the right input stream are included. This type of join is useful when we need to combine data from two streams and keep all the data from the left stream, even if there is no matching data in the right stream. 1. The Setup: Create a fake dataset at scale Most people do not have 2 streams just hanging around for one to experiment with Stream Steam Joins. Thus I used Faker to mock 2 different streams which we will use for this example. The name of the library being used is Faker and faker_vehicle to create Datasets. !pip install faker_vehicle !pip install faker Imports from faker import Faker from faker_vehicle import VehicleProvider from pyspark.sql import functions as F import uuid from utils import logger Parameters # define schema name and where should the table be stored schema_name = “test_streaming_joins” schema_storage_location = “/tmp/CHOOSE_A_PERMANENT_LOCATION/” Create the Target Schema/Database Create a Schema and set location. This way, all tables would inherit the base location. create_schema_sql = f””” CREATE SCHEMA IF NOT EXISTS {schema_name} COMMENT ‘This is {schema_name} schema’ LOCATION ‘{schema_storage_location}’ WITH DBPROPERTIES ( Owner=’Jitesh’); “”” print(f”create_schema_sql: {create_schema_sql}”) spark.sql(create_schema_sql) Use Faker to define functions to help generate fake column values fake = Faker() fake.add_provider(VehicleProvider) event_id = F.udf(lambda: str(uuid.uuid4())) vehicle_year_make_model = F.udf(fake.vehicle_year_make_model) vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat) vehicle_make_model = F.udf(fake.vehicle_make_model) vehicle_make = F.udf(fake.vehicle_make) vehicle_model = F.udf(fake.vehicle_model) vehicle_year = F.udf(fake.vehicle_year) vehicle_category = F.udf(fake.vehicle_category) vehicle_object = F.udf(fake.vehicle_object) latitude = F.udf(fake.latitude) longitude = F.udf(fake.longitude) location_on_land = F.udf(fake.location_on_land) local_latlng = F.udf(fake.local_latlng) zipcode = F.udf(fake.zipcode) Generate Streaming source data at your desired rate def generated_vehicle_and_geo_df (rowsPerSecond:int , numPartitions :int ): return ( spark.readStream.format(“rate”) .option(“numPartitions”, numPartitions) .option(“rowsPerSecond”, rowsPerSecond) .load() .withColumn(“event_id”, event_id()) .withColumn(“vehicle_year_make_model”, vehicle_year_make_model()) .withColumn(“vehicle_year_make_model_cat”, vehicle_year_make_model_cat()) .withColumn(“vehicle_make_model”, vehicle_make_model()) .withColumn(“vehicle_make”, vehicle_make()) .withColumn(“vehicle_year”, vehicle_year()) .withColumn(“vehicle_category”, vehicle_category()) .withColumn(“vehicle_object”, vehicle_object()) .withColumn(“latitude”, latitude()) .withColumn(“longitude”, longitude()) .withColumn(“location_on_land”, location_on_land()) .withColumn(“local_latlng”, local_latlng()) .withColumn(“zipcode”, zipcode()) ) # You can uncomment the below display command to check if the code in this cell works #display(generated_vehicle_and_geo_df) # You can uncomment the below display command to check if the code in this cell works #display(generated_vehicle_and_geo_df) Now let’s generate the base source table and let’s call it Vehicle_Geo table_name_vehicle_geo= “vehicle_geo” def stream_write_to_vehicle_geo_table(rowsPerSecond: int = 1000, numPartitions: int = 10): ( generated_vehicle_and_geo_df(rowsPerSecond, numPartitions) .writeStream .queryName(f”write_to_delta_table: {table_name_vehicle_geo}”) .option(“checkpointLocation”, f”{schema_storage_location}/{table_name_vehicle_geo}/_checkpoint”) .format(“delta”) .toTable(f”{schema_name}.{table_name_vehicle_geo}”) ) stream_write_to_vehicle_geo_table(rowsPerSecond = 1000, numPartitions = 10) Let the above code run for a few iterations, and you can play with rowsPerSecond and numPartitions to control how much data you would like to generate. Once you have generated enough data, kill the above stream and get a base line for row count. spark.read.table(f”{schema_name}.{table_name_vehicle_geo}”).count() display( spark.sql(f””” SELECT * FROM {schema_name}.{table_name_vehicle_geo} “””) ) Let’s also get a min & max of the timestamp column as we would be leveraging it for watermarking. display( spark.sql(f””” SELECT min(timestamp) ,max(timestamp) ,current_timestamp() FROM {schema_name}.{table_name_vehicle_geo} “””) ) Next, we will break this Delta table into 2 different tables Because for Stream-Stream Joins we need 2 different streams. We will use Delta To Delta Streaming here to create these tables. table_name_vehicle = “vehicle” vehicle_df = ( spark.readStream.format(“delta”) .option(“maxFilesPerTrigger”,

Databricks, Delta, Delta Live Tables

Delta Live Tables Advanced Q & A

This is primarily written for those trying to handle edge cases. Q1.) How can a single/unified table be built with historical backfill and ongoing streaming Kafka data? The streaming table built using DLT allows writes to the table outside of the DLT. Thus, you can build and run your DLT pipeline with Kafka as a source, generating the physical table with a name. Then, you can do a streaming write to this table outside DLT. What is the gotcha here? The data has lost its natural ordering which is fine in most cases, meaning it did not go into the Delta table in the same order it was generated. This is in contrast to an ideal world in which Kafka had infinite retention, and a single DLT pipeline would have ingested the data. If and only if you are using the table as a Streaming source with Watermarking downstream then while reading this data, we will have to instruct Spark Streaming to sort the data while reading it. We can do this by using the following parameter ‘withEventTimeOrder’. spark.readStream.format(“delta”) .option(“maxFilesPerTrigger”, f”{maxFilesPerTrigger}”) .option(“withEventTimeOrder”, “true”) .table(f”{schema_name}.{table_name}”) You can further read about this solution here https://canadiandataguy.medium.com/how-to-write-your-first-spark-application-with-stream-stream-joins-with-working-code-dd9b0b39f814#d828 To reiterate, the gotcha only applies if you use this table as a Streaming Source along with Watermarking. Q2.) How do I handle deletes in a Streaming Table? Let’s take GDPR as an example of where we need to enforce retention on the Delta table. One can run a regular DELETE command on the table and then in the DLT pipeline make changes to downstream consumers. “By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits flag can be set on the target streaming table to ignore those changes. For more information about this flag, see Ignore updates and deletes.” @tabledef b(): return spark.readStream.option(“skipChangeCommits”, “true”).table(“LIVE.A”) Q3.) How to enable mergeSchema on DLT table? This is already handled in DLT. If you want to control otherwise explicitly, you can pass the following spark conf property at the DLT pipeline or table level. spark.databricks.delta.schema.autoMerge.enabled True If you are using Autoloader, consider playing with different schema evolution modes while reading data. .option(“cloudFiles.schemaEvolutionMode”, “addNewColumns”) Q4.) How to change the location where the table is stored? @dlt.table( name=”<name>”, comment=”<comment>”, spark_conf={“<key>” : “<value”, “<key” : “<value>”}, table_properties={“<key>” : “<value>”, “<key>” : “<value>”}, path=”<storage-location-path>”, partition_cols=[“<partition-column>”, “<partition-column>”], schema=”schema-definition”, temporary=False) 3. In your DLT pipeline configuration, set this property pipelines.tableManagedByMultiplePipelinesCheck.enabledto false 4. Now, we need to make sure that we do not read any duplicate data because we cannot reuse our old checkpoint. We will solve this by using filters or providing a starting configuration for the streaming source. E.g., if your streaming source is: 4. a) Kafka: Then we will provide offset information. More information can be found here. 4. b) Delta: For example, suppose you have a table user_events. If you want to read changes since version 5, use: spark.readStream.format(“delta”) .option(“startingVersion”, “5”) .load(“/tmp/delta/user_events”) If you want to read changes since 2023–03–03, use: spark.readStream.format(“delta”) .option(“startingTimestamp”, “2018-10-18”) .load(“/tmp/delta/user_events”) More details can be found here. 5. To do step 4, you should parameterize your DLT pipeline, which can be done by following these instructions. Q5.) Does DLT support Identity Columns? Yes, more details here. However, Identity columns are not supported with APPLY CHANGES tables. Q6.) How to stream out of a table which was loaded using apply_changes? This is generally not recommended. The target of the APPLY CHANGES INTO query or apply_changes the function cannot be used as a source for a streaming live table. A table that reads from the target of a APPLY CHANGES INTO query or apply_changes function must be a live table. You can rely on enabling SCD and then use audit columns (__START_AT &__END_AT)to identify the changes. However, the downstream would still have to do a batch read and filter on these audit columns to limit the information being read. If you are adventurous and still want to do a read stream of this source. You need to enableChangeDataFeed on the delta table ‘fact_sales’. @dlt.table(name=”fact_sales”, comment=”This is a fact tables for sales”, partition_cols = [“order_date”], table_properties={ “pipelines.autoOptimize.zOrderCols”: “StoreId,ItemId”, “delta.enableChangeDataFeed”: “true”, }) Then you can decide to stream changes out of the __apply_changes_{table_name} . Make sure to handle tombstones/deletes as part of your downstream pipeline. Q7.) How to delete Data using DLT? Use the Change Data Capture functionality of DLT. The particular expression which will help you achieve this is called apply_as_deletes. You can change the parameter to match your custom criteria. For example, if you had bad records originating in a specific time interval or file name, you can change the expression to meet your custom criteria. import dltfrom pyspark.sql.functions import col, expr@dlt.viewdef users(): return spark.readStream.format(“delta”).table(“cdc_data.users”)dlt.create_streaming_live_table(“target”)dlt.apply_changes( target = “target”, source = “users”, keys = [“userId”], sequence_by = col(“sequenceNum”), apply_as_deletes = expr(“operation = ‘DELETE’ or {any other custom logic} “), except_column_list = [“operation”, “sequenceNum”], stored_as_scd_type = “2”) Q8.) How to avoid accidental overwrites in DLT? Set this property so that tables cannot be overwritten. pipelines.reset.allowed false Q9.) DLT Pipeline was deleted, but the Delta table exists. What to do now? What if the owner has left the org and I need a new DLT pipeline to take care of the table Step 1.) Verify via CLI if the pipeline has been deleted databricks –profile <your_env> pipelines listdatabricks –profile <your_env> pipelines get –pipeline-id <deleted_pipeline_id> Step 2.) Change the owner of the table ALTER TABLE <db>.<table> SET TBLPROPERTIES(pipelines.pipelineId = ‘<NEW_PIPELINE_ID>’); Note: In case you do not have a pipeline ID yet, you can use the below parameter once; run your pipeline to get the pipeline ID and then remove the below parameter. pipelines.tableManagedByMultiplePipelinesCheck.enabledto false Q10.) How does sequence_by work in apply_changes() ? There are two types of data management strategies with apply_changes: Type 1 involves keeping only the latest state of a record. This means that if an older record arrives out-of-order and we already have a newer record in the target, the older record will not update the target because it is not the latest state. Type 2 involves keeping a history of all records. This means

Best Practices, Databricks

Databricks Workspace Best Practices- A checklist for both beginners and Advanced Users

Most good things in life come with a nuance. While learning Databricks a few years ago, I spent hours searching for best practices. Thus, I devised a set of best rules that should hold in almost all scenarios. These will help you start on the right foot. Here are some basic rules for using Databricks Workspace: Once you have multiple teams using the same workspace, it’s time to set more controls. Here are examples of some Advanced best practices to put in place: Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference:

Blog, Databricks

How to get the Job ID, Run ID & Start Time for a Databricks Job with working code

It’s crucial to monitor task parameter variables such as job_id, run_id, and start_time while running ELT jobs. These system-generated values can be saved or printed for future reference. Please refer below to find the comprehensive list of supported parameters. This is a simple 2-step process: Step 1: Pass the parameters Step 2: Get/Fetch and print the values print(f””” job_id: {dbutils.widgets.get(‘job_id’)} run_id: {dbutils.widgets.get(‘run_id’)} parent_run_id: {dbutils.widgets.get(‘parent_run_id’)} task_key: {dbutils.widgets.get(‘task_key’)} “””) Next step, when you run the job; you should see an output like this Advanced & quicker method to implement Add the following boilerplate code on top of the notebook. It will capture whole context information instead, and you can parse whatever information is helpful to you. The below is code based and attributes are subject to change without notice import json, pprintdict_job_run_metadata = json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())print(f”’ currentRunId: {dict_job_run_metadata[‘currentRunId’]} jobGroup: {dict_job_run_metadata[‘jobGroup’]} ”’)# Pretty print the dictionarypprint.pprint(dict_job_run_metadata) Footnote Thank you for taking the time to read this article. If you found it helpful or enjoyable, please clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Blog, Databricks, Spark, Stream

How to write your first Spark Stream Batch Join with working code

When I started learning about Spark Streaming, I could not find enough code/material which could kick-start my journey and build my confidence. I wrote this blog to fill this gap which could help beginners understand how simple streaming is and build their first application. In this blog, I will explain most things by first principles to increase your understanding and confidence and you walk away with code for your first Streaming application. Scenario: Let’s assume we have a streaming source with data arriving all the time. We want to add more attributes from another table( Think lookup table/ dimension table). Thus we will stream the data and join with the lookup table via Stream-Batch join. The result would be written as a Delta table, which could be used downstream for analytics or streaming. Imports & Parameters from pyspark.sql import functions as Ffrom faker import Fakerimport uuid# define schema name and where should the table be storedschema_name = “test_streaming_joins”schema_storage_location = “/tmp/CHOOSE_A_PERMANENT_LOCATION/”# Please download this file from https://simplemaps.com/data/us-zips then download and place it at a location of your choice and then change the value for the variable belowstatic_table_csv_file = “/FileStore/jitesh.soni/data/us_zip_code_and_its_attributes.csv”# Static table specificationstatic_table_name = “static_zip_codes”# Target Stareaming Table specificationtarget_table_name = “joined_datasets”# Recommend you to keep the checkpoint next to the Delta table so that you do have to notion about where the checkpoint ischeckpoint_location = f”{schema_storage_location}/{target_table_name}/_checkpoints/”Create Target Database create_schema_sql = f””” CREATE SCHEMA IF NOT EXISTS {schema_name} COMMENT ‘This is {schema_name} schema’ LOCATION ‘{schema_storage_location}’ WITH DBPROPERTIES ( Owner=’Jitesh’); “””print(f”create_schema_sql: {create_schema_sql}”) Generate Static Or a lookup Dataset We will use a public dataset source with attributes about a zip code. This could be any other static source or a Delta table being updated in parallel. Note: If you pick a static source and start streaming, Spark Streaming will only read it once. If you have a few updates to the static source, you will have to restart the Spark Stream so it rereads the static source. Meanwhile, if you have the Delta table as a source, then Spark Streaming will identify the update automatically, and nothing extra needs to be done. csv_df = ( spark.read.option(“header”, True) .option(“inferSchema”, True) .csv(static_table_csv_file))display(csv_df)csv_df.write.saveAsTable(f”{schema_name}.{static_table_name}”) Next, we will Z-order the table on the key, which would be used in joins. This will help Spark Streaming do efficient joins because the Delta table is sorted by join key with statistics about which file contains which key value. spark.sql( f””” OPTIMIZE {schema_name}.{static_table_name} ZORDER BY (zip); “””) Generate Streaming Dataset We will generate a Streaming dataset using the Faker library. In the below code, we will define a few user-defined functions. fake = Faker()fake_id = F.udf(lambda: str(uuid.uuid4()))fake_firstname = F.udf(fake.first_name)fake_lastname = F.udf(fake.last_name)fake_email = F.udf(fake.ascii_company_email)# fake_date = F.udf(lambda:fake.date_time_this_month().strftime(“%Y-%m-%d %H:%M:%S”))fake_address = F.udf(fake.address)fake_zipcode = F.udf(fake.zipcode) Now, we will use spark.readStream.format(“rate”) to generate data at your desired rate. streaming_df = ( spark.readStream.format(“rate”) .option(“numPartitions”, 10) .option(“rowsPerSecond”, 1 * 1000) .load() .withColumn(“fake_id”, fake_id()) .withColumn(“fake_firstname”, fake_firstname()) .withColumn(“fake_lastname”, fake_lastname()) .withColumn(“fake_email”, fake_email()) .withColumn(“fake_address”, fake_address()) .withColumn(“fake_zipcode”, fake_zipcode()))# You can uncomment the below display command to check if the code in this cell works# display(streaming_df) Stream- Static Join or Stream -Delta Join Structured Streaming supports joins (inner join and left join) between a streaming and a static DataFrame or a Delta Table. However, a few types of stream-static outer Joins are not supported yet. lookup_delta_df = spark.read.table(static_table_name)joined_streaming_df = streaming_df.join( lookup_delta_df, streaming_df[“fake_zipcode”] == lookup_delta_df[“zip”], “left_outer”,).drop(“fake_zipcode”)# display(joined_streaming_df) Orchestrate the pipeline and write Spark Stream to Delta Table Some Tips: ( joined_streaming_df.writeStream # .trigger(availableNow=True) .queryName(“do_a_stream_join_with_the_delta_table”) .option(“checkpointLocation”, checkpoint_location) .format(“delta”) .toTable(f”{schema_name}.{target_table_name}”)) Download the code Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Scroll to Top