Blog

Your blog category

Blog

Managing Interview Anxiety: Strategies for Data Professionals

As data professionals, we often focus on technical skills and industry knowledge when preparing for interviews. However, even the most qualified candidates can struggle with interview anxiety. This post will explore various strategies to help you manage stress and perform at your best during data interviews. Understanding Interview Anxiety It’s important to recognize that feeling nervous before an interview is completely normal. In fact, it often indicates that you care about the opportunity. However, excessive anxiety can hinder your performance, making it crucial to develop coping mechanisms. Preparation: Your First Line of Defense Thorough preparation is one of the most effective ways to reduce interview anxiety: Techniques for Managing Anxiety 1. Visualization and Positive Self-Talk Visualize yourself succeeding in the interview. Use positive affirmations like “I am well-prepared and qualified for this role” to boost your confidence. My personal favourite song is “You Need Me, I Don’t Need You”. 2. Breathing Exercises Deep breathing can help calm your nervous system. Try this simple technique: 3. Progressive Muscle Relaxation This technique involves tensing and then relaxing different muscle groups to reduce physical tension: 4. Mindfulness Meditation Practicing mindfulness can help you stay grounded and focused: Herbal Supplements for Anxiety Management I know many of us get a bit anxious when it comes to interviews. I wanted to share something that might help! 🌿 Certain herbal supplements are known for their calming effects and could be worth trying to help manage stress during interviews: Here is a supplement which has worked for a few folks I personally know. Just a reminder: It’s best to check with a healthcare professional before starting anything new, especially to ensure it’s right for your body.Also, remember that managing anxiety doesn’t just have to be about supplements. Practices like deep breathing, meditation, and thorough preparation can really help too. Here’s to more relaxed, confident interviews! 🌟 On the Day of the Interview Data-Specific Anxiety Management As data professionals, we often face unique challenges in interviews: Post-Interview Self-Care After the interview, take time to decompress and reflect on the experience. Regardless of the outcome, each interview is an opportunity to learn and improve your skills. Remember, managing interview anxiety is a skill that improves with practice. By implementing these strategies and focusing on thorough preparation, you can approach your next data interview with greater confidence and composure.Whether you choose to try herbal supplements or stick to traditional anxiety management techniques, the key is finding what works best for you. With practice and preparation, you can learn to manage your anxiety and showcase your skills effectively in any data interview. This post contains affiliate links. As an Amazon Associate, I may earn a commission from qualifying purchases at no additional cost to you.

Blog

Merge Multiple Spark Streams Into A Delta Table with working code

Overall, the process works in the following manner: Read data from a streaming source Use this special function foreachBatch. Using this we will call any user-defined function responsible for all the processing. Our user-defined function runs the Merge and Optimize over the target Delta table. Architecture First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc. However, for simplicity we will use .format(’rate’) to generate a stream. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 1,000 rows per second across 100 partitions. Thanks for reading CanadianDataGuy’s Newsletter! Subscribe for free to receive new posts and support my work. For the purpose of this blog, we will build 2 Spark streams one for each country Canada & USA. USA’s stream generated_streaming_usa_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5” ,”value” ,”value%1000000 as hash” ,”‘USA’ AS country” ,”current_timestamp() as ingestion_timestamp” ) )#display(generated_streaming_usa_df) Canada’s Stream generated_streaming_canada_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5” ,”value” ,”value%1000000 as hash” ,”‘Canada’ AS country” ,”current_timestamp() as ingestion_timestamp” ) )#display(generated_streaming_canada_df) Parameters / Variables (Feel free to change as per your needs) target_table_name = “to_be_merged_into_table_partitioned_by_country” check_point_location_for_usa_stream = f”/tmp/delta/{target_table_name}/_checkpoints/_usa/” check_point_location_for_canada_stream = f”/tmp/delta/{target_table_name}/_checkpoints/_canada/” join_column_name =”hash” partition_column = “country” Create an Empty Delta table so data could be merged into it #spark.sql(f””” DROP TABLE IF EXISTS {target_table_name};”””) ( generated_steaming_usa_df.writeStream .partitionBy(partition_column) .format(“delta”) .outputMode(“append”).trigger(once=True) .option(“checkpointLocation”, check_point_location_for_usa_stream) .toTable(target_table_name) ) Check if data is populated. If you do not see any data, just run the above code snippet once more. Sometimes it takes time for the data to show up. display(spark.read.table(target_table_name)) Now we will build the code for the user-defined function which does all the data processing, merge & Optimize def make_changes_using_the_micro_batch(microBatchOutputDF, batchId: int): print(f”Processing batchId: {batchId}”) microBatchOutputDF.createOrReplaceTempView(“updates”) spark_session_for_this_micro_batch = microBatchOutputDF._jdf.sparkSession() spark_session_for_this_micro_batch.sql(f””” SELECT * FROM ( select * ,rank() over(partition by {join_column_name} order by value desc) as dedupe from updates ) WHERE dedupe =1 “””).drop(“dedupe”).createOrReplaceTempView(“updates_which_need_to_be_merged”) spark_session_for_this_micro_batch.sql(f””” MERGE INTO {target_table_name} target using updates_which_need_to_be_merged u on u.{partition_column} = target.{partition_column} AND u.{join_column_name} = target.{join_column_name} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) optimize_every_n_batches = 20 #Define how often should optimize run? for example: at 50, it means that we will run the optimize command every 50 batches of stream data if batchId % optimize_every_n_batches == 0: optimize_and_zorder_table(table_name = target_table_name, zorder_by_col_name = join_column_name) Optimize/ Z-order a Delta table Why do we need to optimize a table? If we keep adding files to our Delta table and never optimize/sort them then over time we need to read a lot of files during merge time. Thus, optimizing the Delta table after every N merges is better. N needs to be decided on your latency requirements. You could start with N as 10 and change it as per your needs. The below code will run an optimize and zorder command on a given table that is being fed by a stream. Optimize commands can’t run in a silo because it will require us to pause and then resume the stream. Therefore, we need to call this function a part of the upsert function. This enables us to optimize before the next batch of streaming data comes through. from timeit import default_timer as timer def optimize_and_zorder_table(table_name: str, zorder_by_col_name: str) -> None: “”” Parameters: table_name: str name of the table to be optimized zorder_by_col_name: str comma separated list of columns to zorder by. example “col_a, col_b, col_c” “”” start = timer() print(f”Met condition to optimize table {table_name}”) sql_query_optimize = f”OPTIMIZE {table_name} ZORDER BY ({zorder_by_col_name})” spark.sql(sql_query_optimize) end = timer() time_elapsed_seconds = end – start print( f”Successfully optimized table {table_name} . Total time elapsed: {time_elapsed_seconds} seconds” ) Orchestrate the streaming pipeline end to end Read the Canada stream and merge into Delta table ( generated_steaming_canada_df .writeStream.format(‘delta’) #.trigger(availableNow=True) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location_for_canada_stream) .foreachBatch(make_changes_using_the_micro_batch) .start() ) Read the USA stream and merge into Delta table ( generated_steaming_usa_df .writeStream.format(‘delta’) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location_for_usa_stream) .foreachBatch(make_changes_using_the_micro_batch) .start() ) Now, let’s validate that data is being populated display( spark.sql(f””” SELECT {partition_column} as partition_column ,count(1) as row_count FROM {target_table_name} GROUP BY {partition_column} “””) ) If you have reached so far, you should have an end-to-end pipeline working with streaming data and merging data into a Delta table. Download this notebook material_for_public_consumption/Merge Multiple Spark Streams Into A Delta Table.py at main ·… Contribute to jiteshsoni/material_for_public_consumption development by creating an account on GitHub. github.com Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference Footnote: Enjoyed this article? Don’t forget to subscribe to the newsletter for more insights delivered straight to your inbox! Looking to grow your data career? Join our community on Discord, where you’ll find like-minded individuals striving to advance in the data field. Check out my website canadiandataguy.com for more resources and updates. I want to emphasize that my blog posts are designed to be practical resources that you can readily use in your own environments. By providing code examples with careful attention to best practices, I aim to simplify the implementation of real-time data processing solutions. I encourage you to explore the blog, copy the code snippets, and adapt them to your specific needs. With these resources, you’ll be equipped to accelerate your development process and unlock the power of Spark Streaming. Dive in, leverage the code, and start building your real-time data processing pipelines with confidence! Go Build! Canadian Data Guy Feel free to leave a comment below and share this article if you found it helpful! Thanks for reading CanadianDataGuy’s Newsletter! Subscribe for free to receive new posts and

Blog

Simplifying Real-time Data Processing with Spark Streaming’s foreachBatch with working code

Spark Streaming & foreachBatch Spark Streaming is a powerful tool for processing streaming data. It allows you to process data as it arrives, without having to wait for the entire dataset to be available. This can be very useful for applications that need to respond to changes in data in real time. One of the features of Spark Streaming is the foreachBatch() method. This method allows you to apply a custom function to each batch of data as it arrives. This can be useful for a variety of tasks, such as: Thanks for reading CanadianDataGuy’s Newsletter! Subscribe for free to receive new posts and support my work. Filtering data Transforming data Writing data to a database Sending data to an external system The foreachBatch() method is a powerful tool that can be used to extend the capabilities of Spark Streaming. In this blog post, we will take a closer look at how to use foreachBatch(). Introducing foreachBatch: foreachBatch is a method provided by Spark Streaming that allows developers to apply arbitrary operations on the output of a streaming query. It acts as a bridge between the streaming world and the structured world of DataFrames and Datasets. This means that we can leverage the rich functionality of Spark’s structured APIs to process real-time data efficiently. The Power of foreachBatch: The foreachBatch operation enables developers to perform batch-like operations on streaming data. Instead of processing each individual record, which can be inefficient, foreachBatch processes the data in micro-batches, offering better performance and resource utilization. This approach also provides the flexibility to leverage the full power of Spark’s DataFrames, including various transformations and aggregations, to perform complex computations on streaming data. Implementing foreachBatch: To use foreachBatch, you need to define a function that takes two arguments: the batch identifier and the DataFrame representing the micro-batch of data. Inside this function, you can apply any transformations or computations required on the streaming data. You can use Spark’s SQL, DataFrame, or Dataset APIs to manipulate the data and write the results to any external systems, such as databases or file systems. Benefits of foreachBatch: Performance: foreachBatch allows batch-like processing on streaming data, resulting in improved performance compared to processing individual records. Flexibility: Leveraging Spark’s DataFrames and Datasets provides a wide range of transformations and aggregations to handle complex computations easily. Scalability: Spark Streaming inherently provides scalability and fault-tolerance, and foreachBatch seamlessly integrates with these capabilities. Ecosystem Integration: The results from foreachBatch can be easily written to external systems such as databases, file systems, or streaming analytics platforms. Code & Setup Here’s how we can use foreachBatch to achieve this: ∘ Define parameters for the job∘ Create a Streaming source∘ Define custom processing logic and parameters∘ Create an instance of forEachBatchProcessor Class with the parameters∘ Orchestrate the job∘ Look at the output table∘ Clean Up Define parameters for the job target_table_name = “for_each_batch_paramerterize” check_point_location = f”/tmp/delta/{target_table_name}/_checkpoints/” dedupe_colum_name =”hash” Create a Streaming source We will create a synthetic dataset. generated_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 4) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5” ,”value” ,”value%1000000 as hash” ) ) Define custom processing logic and parameters class forEachBatchProcessor: def __init__(self, dedupe_column: str, filter_criteria:str, passed_value: int): self.dedupe_column = dedupe_column self.filter_criteria = filter_criteria self.passed_value = passed_value def print_attributes(self): attributes = vars(self) print( “n”.join([f”{attr}: {value}” for attr, value in attributes.items()]) ) def make_changes_using_the_micro_batch(self, microBatchOutputDF, batchId: int): self.print_attributes() print(f”Processing batchId: {batchId}”) # Your processing logic using the parameter view_name = f”updates_for_batchId_{batchId}” microBatchOutputDF.createOrReplaceTempView(view_name) sql_logic = f””” SELECT * ,{self.passed_value} as passed_value ,{batchId} as batch_id FROM ( SELECT * ,rank() over(partition by {self.dedupe_column} order by value desc) as dedupe FROM {view_name} WHERE {self.filter_criteria} ) WHERE dedupe =1 “”” print(f”Processing sql_logic: {sql_logic}”) to_be_written_df = microBatchOutputDF.sparkSession.sql(sql_logic).drop(“dedupe”) to_be_written_df.write.mode(“append”).saveAsTable(target_table_name) Create an instance of forEachBatchProcessor Class with the parameters instantiateForEachBatchProcessor = forEachBatchProcessor( dedupe_column = dedupe_colum_name, filter_criteria = “1=1”, passed_value = 3 ) Orchestrate the job ( generated_df .writeStream #.trigger(availableNow=True) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location) .option(“queryName”, “ParameterizeForEachBatch”) .foreachBatch(instantiateForEachBatchProcessor.make_changes_using_the_micro_batch) .start() ) Look at the output table display(spark.read.table(target_table_name)) Clean Up spark.sql(f””” DROP TABLE IF EXISTS {target_table_name} “””) dbutils.fs.rm(check_point_location,True) Conclusion: Apache Spark Streaming’s foreachBatch operation is a powerful tool for simplifying real-time data processing. By bridging the gap between the streaming and structured worlds, it enables developers to perform batch-like operations on streaming data efficiently. Leveraging the rich functionality of Spark’s DataFrames, foreachBatch empowers users to process and analyze real-time data with ease. Whether you’re performing aggregations, transformations, or writing data to external systems, foreachBatch offers a flexible and scalable solution for real-time streaming applications. Download the code Footnote: Enjoyed this article? Don’t forget to subscribe to the newsletter for more insights delivered straight to your inbox! Looking to grow your data career? Join our community on Discord, where you’ll find like-minded individuals striving to advance in the data field. Check out my website canadiandataguy.com for more resources and updates. I want to emphasize that my blog posts are designed to be practical resources that you can readily use in your own environments. By providing code examples with careful attention to best practices, I aim to simplify the implementation of real-time data processing solutions. I encourage you to explore the blog, copy the code snippets, and adapt them to your specific needs. With these resources, you’ll be equipped to accelerate your development process and unlock the power of Spark Streaming. Dive in, leverage the code, and start building your real-time data processing pipelines with confidence! Go Build! Canadian Data Guy Feel free to leave a comment below and share this article if you found it helpful! Thanks for reading CanadianDataGuy’s Newsletter! Subscribe for free to receive new posts and support my work.

Blog, Databricks, Delta, Delta Live Tables

Merging Multiple Data Streams with Delta Live Tables: Kafka, Kinesis, and Delta

Introduction In today’s data landscape, organizations often deal with multiple input datasets from various sources like Kafka, Kinesis, and Delta tables. These datasets may have different schemas but sometime to be combined into a single, unified table. Delta Live Tables (DLT) in Databricks simplifies this process by handling schema evolution and managing Slowly Changing Dimensions (SCD) efficiently. Why Delta Live Tables? Delta Live Tables offers several benefits: Use Case We aim to merge multiple data streams (human_name, human_email, and human_date_of_birth) into a single Delta table (DIM_SSN_SCD_TYP_2) using the common join key ssn. The input data can come from Kafka, Kinesis, or Delta, or a combination of these sources. We have 3 datasets with different schemas and combine attributes of all of them into a single table. The Solution: CHANGE FLOWs API Delta Live Tables is currently previewing a CHANGE FLOWs API, which allows multiple APPLY CHANGES streams to write to a streaming table within a single pipeline. This functionality enables: The apply_changes method uses the existing APPLY CHANGE API with an additional once property to specify whether the operation should run only once. apply_changes( once=True, # optional, only run this code once; ignore new files added to this location target=”<existing-target-table>”, source=”<data-source>”, keys=[“key1”, “key2”, “keyN”], # must match <existing-target-table> sequence_by=”<sequence-column>”, # must match <existing-target-table> ignore_null_updates=False, # must match <existing-target-table> apply_as_deletes=None, apply_as_truncates=None, column_list=None, except_column_list=None, stored_as_scd_type=<type>, # must match <existing-target-table> track_history_column_list=None, # must match <existing-target-table> track_history_except_column_list=None) Implementation Imports and Parameters Define the parameters for our catalog, database names, and the target table name. import dltfrom pyspark.sql.functions import *catalog_name = “soni”database_name = “2024_06_25″tables = [“human_name”, “human_email”, “human_date_of_birth”]target_name = “DIM_SSN_SCD_TYP_2″ Creating DLT Views Create DLT views for each source table dynamically. for table in tables: @dlt.view(name=f”dlt_view_{table}”) def create_view(table=table): “”” Creates a DLT view for the given table from the Delta table. Args: table (str): Name of the table to create the view for. Returns: DataFrame: Streaming DataFrame containing the data from the specified table. “”” table_path = f”{catalog_name}.`{database_name}`.{table}” return spark.readStream.format(“delta”).table(table_path) Creating the Streaming Target Table Create a streaming target table to merge the data streams. dlt.create_streaming_table( name=target_name, table_properties={ “pipelines.autoOptimize.zOrderCols”: “ssn”, },) Note: The pipelines.autoOptimize.zOrderCols property is set to ssn because all merges are happening on the ssn column, and clustering the table on this column optimizes the performance of these merge operations. Merging Data Streams into the Delta Table Merge each stream into the target table using apply_changes. dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_name_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_name”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”,)dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_email_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_email”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”,)dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_date_of_birth_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_date_of_birth”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”,) Output table with SCD Type 2 Explanation Usage Notes How to stream out of this target table With DBR 15.3, one can read the change feed outside of DLT and then stream changes for futher processing. display( spark.readStream.format(“delta”) .option(“readChangeFeed”, “true”).table(“soni.dlt_2024.dim_ssn_scd_typ_2”)) Conclusion Delta Live Tables provides a powerful and flexible way to merge multiple data streams into a single Delta table. By leveraging DLT’s capabilities, data engineers can ensure real-time data consistency and handle complex data integration tasks with ease. Start using Delta Live Tables today to streamline your data processing pipelines and ensure your data is always up-to-date. Go Build!!! Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Note: All opinions are my own.

Blog

Merging Multiple Data Streams with Delta Live Tables: Kafka, Kinesis, and Delta

Introduction In today’s data landscape, organizations often deal with multiple input datasets from various sources like Kafka, Kinesis, and Delta tables. These datasets may have different schemas but sometime to be combined into a single, unified table. Delta Live Tables (DLT) in Databricks simplifies this process by handling schema evolution and managing Slowly Changing Dimensions (SCD) efficiently. Why Delta Live Tables? Delta Live Tables offers several benefits: Real-time Data Processing: Ensures data is processed and available in real-time. Schema Evolution: Automatically propagates new columns from source to target. SCD Type 1 and 2: Easily build SCD Type 1 and Type 2. Inserts, Updates, Deletes: Handles all data operations efficiently. Use Case We aim to merge multiple data streams (human_name, human_email, and human_date_of_birth) into a single Delta table (DIM_SSN_SCD_TYP_2) using the common join key ssn. The input data can come from Kafka, Kinesis, or Delta, or a combination of these sources. We have 3 datasets with different schemas and combine attributes of all of them into a single table. The Solution: CHANGE FLOWs API Delta Live Tables is currently previewing a CHANGE FLOWs API, which allows multiple APPLY CHANGES streams to write to a streaming table within a single pipeline. This functionality enables: Performing data backfills to existing APPLY CHANGES streaming table targets. Ingesting data from multiple independent APPLY CHANGES processes into the same target table. Performing initial hydration (using once=true API) of a streaming table to support Change Data Capture (CDC) use cases. The apply_changes method uses the existing APPLY CHANGE API with an additional once property to specify whether the operation should run only once. apply_changes( once=True, # optional, only run this code once; ignore new files added to this location target=”<existing-target-table>”, source=”<data-source>”, keys=[“key1”, “key2”, “keyN”], # must match <existing-target-table> sequence_by=”<sequence-column>”, # must match <existing-target-table> ignore_null_updates=False, # must match <existing-target-table> apply_as_deletes=None, apply_as_truncates=None, column_list=None, except_column_list=None, stored_as_scd_type=<type>, # must match <existing-target-table> track_history_column_list=None, # must match <existing-target-table> track_history_except_column_list=None ) Implementation Imports and Parameters Define the parameters for our catalog, database names, and the target table name. import dlt from pyspark.sql.functions import * catalog_name = “soni” database_name = “2024_06_25” tables = [“human_name”, “human_email”, “human_date_of_birth”] target_name = “DIM_SSN_SCD_TYP_2” Creating DLT Views Create DLT views for each source table dynamically. for table in tables: @dlt.view(name=f”dlt_view_{table}”) def create_view(table=table): “”” Creates a DLT view for the given table from the Delta table. Args: table (str): Name of the table to create the view for. Returns: DataFrame: Streaming DataFrame containing the data from the specified table. “”” table_path = f”{catalog_name}.`{database_name}`.{table}” return spark.readStream.format(“delta”).table(table_path) Creating the Streaming Target Table Create a streaming target table to merge the data streams. dlt.create_streaming_table( name=target_name, table_properties={ “pipelines.autoOptimize.zOrderCols”: “ssn”, }, ) Note: The pipelines.autoOptimize.zOrderCols property is set to ssn because all merges are happening on the ssn column, and clustering the table on this column optimizes the performance of these merge operations. Merging Data Streams into the Delta Table Merge each stream into the target table using apply_changes. dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_name_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_name”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”, ) dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_email_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_email”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”, ) dlt.apply_changes( flow_name=f”streaming_data_from_dlt_view_human_date_of_birth_to_merge_into_{target_name}”, target=target_name, source=”dlt_view_human_date_of_birth”, keys=[“ssn”], ignore_null_updates=True, stored_as_scd_type=”2″, sequence_by=”timestamp”, ) Output table with SCD Type 2 Explanation Creating Views: The create_view function generates a streaming DataFrame for each table, reading data from the respective Delta table. Streaming Table: The create_streaming_table function initializes the target Delta table with optimization properties. Merging Data: The apply_changes function merges data from each view into the target table, ensuring data consistency and handling Slowly Changing Dimensions (SCD) efficiently. Usage Notes Concurrency: Only one APPLY CHANGES flow for each target table can run concurrently. Consistency: All APPLY CHANGES flows targeting a table must have the same keys, sequencing, and TRACK HISTORY columns. Preview Channel: This feature is currently available in the preview channel of Delta Live Tables. How to stream out of this target table With DBR 15.3, one can read the change feed outside of DLT and then stream changes for futher processing. display( spark.readStream.format(“delta”) .option(“readChangeFeed”, “true”).table(“soni.dlt_2024.dim_ssn_scd_typ_2”) ) Conclusion Delta Live Tables provides a powerful and flexible way to merge multiple data streams into a single Delta table. By leveraging DLT’s capabilities, data engineers can ensure real-time data consistency and handle complex data integration tasks with ease. Start using Delta Live Tables today to streamline your data processing pipelines and ensure your data is always up-to-date. Go Build!!! Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Note: All opinions are my own.

Blog, Databricks, Delta, Delta Live Tables, kafka, Spark, Stream

Need for Speed: Benchmarking the Best Tools for Kafka to Delta Ingestion

Introduction Welcome back to the second installment of our series on data ingestion from Kafka to Delta tables on the Databricks platform. Building on our previous discussion about generating and streaming synthetic data to a Kafka topic. This blog post benchmarks three powerful options available on the Databricks platform for ingesting streaming data from Apache Kafka into Delta Lake: Databricks Jobs, Delta Live Tables (DLT), and Delta Live Tables Serverless (DLT Serverless). The primary objective is to evaluate and compare the end-to-end latency of these approaches when ingesting data from Kafka into Delta tables. Latency is a crucial metric, as it directly impacts the freshness and timeliness of data available for downstream analytics and decision-making processes. It’s important to note that all three tools leverage Apache Spark’s Structured Streaming under the hood. “Breaking the myth: Ingest from Kafka to Delta at scale in just 1.5 seconds with Delta Live Tables Serverless — up to 80% faster than traditional methods!” Benchmark Setup Criteria The key metric measured was latency — the duration from when a row is produced in Kafka to when it becomes available in Delta Lake. Latency was meticulously measured over an extended period to ensure precision and account for variability. Input Kafka Feed For our benchmarks, we utilized a Kafka feed churning out data at a rate of 100 rows per second, each approximately 1MB in size which is 100 MB/Second . Annually, this sums up to a staggering 3.15 petabytes, making it a rigorous testbed for evaluating the ingestion capabilities of our selected tools. I used Confluent Cloud to setup Kafka cluster with 6 partitions and it took less than 5 minutes and they gave me 300$ of credits for experimentation. Tools Compared How was latency measured? Latency is measured by calculating the time difference in milliseconds between the timestamps of consecutive streaming updates to a table. This is done by subtracting the timestamp of a previous update from the timestamp of the current update for each sequential commit, allowing an analysis of how long each update takes to process relative to the previous one. The analysis is currently limited to the last 300 commits, but this number can be adjusted as needed. from pyspark.sql import DataFramedef run_analysis_about_latency(table_name: str) -> DataFrame: # SQL command text formatted as a Python multiline string sql_code = f””” — Define a virtual view of the table’s history WITH VW_TABLE_HISTORY AS ( — Describe the historical changes of the table DESCRIBE HISTORY {table_name} ), — Define a view to calculate the timestamp of the previous write operation VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP AS ( SELECT — Calculate the timestamp of the last write operation before the current one lag(timestamp) OVER ( PARTITION BY 1 ORDER BY version ) AS previous_write_timestamp, timestamp, version FROM VW_TABLE_HISTORY WHERE operation = ‘STREAMING UPDATE’ ), — Define a view to analyze the time difference between consecutive commits VW_BOUND_ANALYSIS_TO_N_COMMITS AS ( SELECT — Calculate the time difference in milliseconds between the previous and current write timestamps TIMESTAMPDIFF( MILLISECOND, previous_write_timestamp, timestamp ) AS elapsed_time_ms FROM VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP ORDER BY version DESC LIMIT 300 — Analyze only the last 300 commits ) — Calculate various statistics about the write latency SELECT avg(elapsed_time_ms) AS average_write_latency, percentile_approx(elapsed_time_ms, 0.9) AS p90_write_latency, percentile_approx(elapsed_time_ms, 0.95) AS p95_write_latency, percentile_approx(elapsed_time_ms, 0.99) AS p99_write_latency, max(elapsed_time_ms) AS maximum_write_latency FROM VW_BOUND_ANALYSIS_TO_N_COMMITS “”” # Execute the SQL query using Spark’s SQL module display(spark.sql(sql_code)) Data Ingestion This code sets up a streaming data pipeline using Apache Spark to efficiently ingest data from a Kafka topic. It defines a schema tailored to the expected data types and columns in the Kafka messages, including vehicle details, geographic coordinates, and text fields.The read_kafka_stream function initializes the streaming process, configuring secure and reliable connections to Kafka, subscribing to the specified topic, and handling data across multiple partitions for improved processing speed. The stream decodes JSON-formatted messages according to the defined schema and extracts essential metadata. from pyspark.sql.types import StructType, StringType, FloatTypefrom pyspark.sql.functions import *# Define the schema based on the DataFrame structure you are writing to Kafkaschema = StructType() \ .add(“event_id”, StringType()) \ .add(“vehicle_year_make_model”, StringType()) \ .add(“vehicle_year_make_model_cat”, StringType()) \ .add(“vehicle_make_model”, StringType()) \ .add(“vehicle_make”, StringType()) \ .add(“vehicle_model”, StringType()) \ .add(“vehicle_year”, StringType()) \ .add(“vehicle_category”, StringType()) \ .add(“vehicle_object”, StringType()) \ .add(“latitude”, StringType()) \ .add(“longitude”, StringType()) \ .add(“location_on_land”, StringType()) \ .add(“local_latlng”, StringType()) \ .add(“zipcode”, StringType()) \ .add(“large_text_col_1”, StringType()) \ .add(“large_text_col_2”, StringType()) \ .add(“large_text_col_3”, StringType()) \ .add(“large_text_col_4”, StringType()) \ .add(“large_text_col_5”, StringType()) \ .add(“large_text_col_6”, StringType()) \ .add(“large_text_col_7”, StringType()) \ .add(“large_text_col_8”, StringType()) \ .add(“large_text_col_9”, StringType())def read_kafka_stream(): kafka_stream = (spark.readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_tls ) .option(“subscribe”, topic ) .option(“failOnDataLoss”,”false”) .option(“kafka.security.protocol”, “SASL_SSL”) .option(“kafka.sasl.mechanism”, “PLAIN”) .option(“kafka.sasl.jaas.config”, f’kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=”{kafka_api_key}” password=”{kafka_api_secret}”;’) .option(“minPartitions”,12) .load() .select(from_json(col(“value”).cast(“string”), schema).alias(“data”), “topic”, “partition”, “offset”, “timestamp”, “timestampType” ) .select(“topic”, “partition”, “offset”, “timestamp”, “timestampType”, “data.*”) ) return kafka_stream Explanation: This setup optimizes data ingestion from Kafka into Spark and prepares the data for further processing or integration into storage systems like Delta Lake. Additional code for Databricks Jobs Configuration: This method involves setting up a Databricks job and cluster resources, although it allows for flexible scheduling and monitoring of ingestion processes it but requires understanding of choosing the right compute. ( read_kafka_stream() .writeStream .option(“checkpointLocation”,checkpoint_location_for_delta) .trigger(processingTime=’1 second’) .toTable(target)) Additional code for Delta Live Tables Configuration: Delta Live Tables manage infrastructure automatically, providing a simpler, declarative approach to building data pipelines. This code snippet uses the Delta Live Tables (DLT) API to define a data table that ingests streaming data from Kafka. The @dlt.table decorator specifies the table’s name (to be replaced with your desired table name) and sets the pipeline to poll Kafka every second. This rapid polling supports near-real-time data processing needs. The function dlt_kafka_stream() calls read_kafka_stream(), integrating Kafka streaming directly into DLT for streamlined management and operation within the Databricks environmen @dlt.table(name=”REPLACE_DLT_TABLE_NAME_HERE”, spark_conf={“pipelines.trigger.interval” : “1 seconds”})def dlt_kafka_stream(): read_kafka_stream() Conclusion Our benchmarks show that Delta Live Tables Serverless stands out in latency performance and operational simplicity, making it highly suitable for scenarios with varying data loads. Meanwhile, Databricks Jobs and Delta Live Tables also offer viable solutions. Why Delta Live Tables Serverless Outperforms Standard Delta Live Tables A key factor contributing to the superior performance of Delta Live Tables Serverless over

Blog

Need for Speed: Benchmarking the Best Tools for Kafka to Delta Ingestion

Introduction Welcome back to the second installment of our series on data ingestion from Kafka to Delta tables on the Databricks platform. Building on our previous discussion about generating and streaming synthetic data to a Kafka topic. This blog post benchmarks three powerful options available on the Databricks platform for ingesting streaming data from Apache Kafka into Delta Lake: Databricks Jobs, Delta Live Tables (DLT), and Delta Live Tables Serverless (DLT Serverless). The primary objective is to evaluate and compare the end-to-end latency of these approaches when ingesting data from Kafka into Delta tables. Latency is a crucial metric, as it directly impacts the freshness and timeliness of data available for downstream analytics and decision-making processes. It’s important to note that all three tools leverage Apache Spark’s Structured Streaming under the hood. “Breaking the myth: Ingest from Kafka to Delta at scale in just 1.5 seconds with Delta Live Tables Serverless — up to 80% faster than traditional methods!” Benchmark Setup Criteria The key metric measured was latency — the duration from when a row is produced in Kafka to when it becomes available in Delta Lake. Latency was meticulously measured over an extended period to ensure precision and account for variability. Input Kafka Feed For our benchmarks, we utilized a Kafka feed churning out data at a rate of 100 rows per second, each approximately 1MB in size which is 100 MB/Second . Annually, this sums up to a staggering 3.15 petabytes, making it a rigorous testbed for evaluating the ingestion capabilities of our selected tools. I used Confluent Cloud to setup Kafka cluster with 6 partitions and it took less than 5 minutes and they gave me 300$ of credits for experimentation. Tools Compared Databricks Jobs: Utilizes Apache Spark Structured Streaming for reading from Kafka and writing to Delta Lake tables. It provides flexibility in job configuration and scheduling but requires manual management of cluster resources. Delta Live Tables (DLT): Employs a declarative approach for data ingestion from Kafka into Delta Lake, automatically managing infrastructure and simplifying pipeline development. Delta Live Tables Serverless (DLT Serverless): Leverages a serverless model to further simplify infrastructure management while performing the same ingestion tasks as DLT. It offers automatic scaling and resource optimization. How was latency measured? Latency is measured by calculating the time difference in milliseconds between the timestamps of consecutive streaming updates to a table. This is done by subtracting the timestamp of a previous update from the timestamp of the current update for each sequential commit, allowing an analysis of how long each update takes to process relative to the previous one. The analysis is currently limited to the last 300 commits, but this number can be adjusted as needed. from pyspark.sql import DataFrame def run_analysis_about_latency(table_name: str) -> DataFrame: # SQL command text formatted as a Python multiline string sql_code = f””” — Define a virtual view of the table’s history WITH VW_TABLE_HISTORY AS ( — Describe the historical changes of the table DESCRIBE HISTORY {table_name} ), — Define a view to calculate the timestamp of the previous write operation VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP AS ( SELECT — Calculate the timestamp of the last write operation before the current one lag(timestamp) OVER ( PARTITION BY 1 ORDER BY version ) AS previous_write_timestamp, timestamp, version FROM VW_TABLE_HISTORY WHERE operation = ‘STREAMING UPDATE’ ), — Define a view to analyze the time difference between consecutive commits VW_BOUND_ANALYSIS_TO_N_COMMITS AS ( SELECT — Calculate the time difference in milliseconds between the previous and current write timestamps TIMESTAMPDIFF( MILLISECOND, previous_write_timestamp, timestamp ) AS elapsed_time_ms FROM VW_TABLE_HISTORY_WITH_previous_WRITE_TIMESTAMP ORDER BY version DESC LIMIT 300 — Analyze only the last 300 commits ) — Calculate various statistics about the write latency SELECT avg(elapsed_time_ms) AS average_write_latency, percentile_approx(elapsed_time_ms, 0.9) AS p90_write_latency, percentile_approx(elapsed_time_ms, 0.95) AS p95_write_latency, percentile_approx(elapsed_time_ms, 0.99) AS p99_write_latency, max(elapsed_time_ms) AS maximum_write_latency FROM VW_BOUND_ANALYSIS_TO_N_COMMITS “”” # Execute the SQL query using Spark’s SQL module display(spark.sql(sql_code)) Data Ingestion This code sets up a streaming data pipeline using Apache Spark to efficiently ingest data from a Kafka topic. It defines a schema tailored to the expected data types and columns in the Kafka messages, including vehicle details, geographic coordinates, and text fields.The read_kafka_stream function initializes the streaming process, configuring secure and reliable connections to Kafka, subscribing to the specified topic, and handling data across multiple partitions for improved processing speed. The stream decodes JSON-formatted messages according to the defined schema and extracts essential metadata. from pyspark.sql.types import StructType, StringType, FloatType from pyspark.sql.functions import * # Define the schema based on the DataFrame structure you are writing to Kafka schema = StructType() .add(“event_id”, StringType()) .add(“vehicle_year_make_model”, StringType()) .add(“vehicle_year_make_model_cat”, StringType()) .add(“vehicle_make_model”, StringType()) .add(“vehicle_make”, StringType()) .add(“vehicle_model”, StringType()) .add(“vehicle_year”, StringType()) .add(“vehicle_category”, StringType()) .add(“vehicle_object”, StringType()) .add(“latitude”, StringType()) .add(“longitude”, StringType()) .add(“location_on_land”, StringType()) .add(“local_latlng”, StringType()) .add(“zipcode”, StringType()) .add(“large_text_col_1”, StringType()) .add(“large_text_col_2”, StringType()) .add(“large_text_col_3”, StringType()) .add(“large_text_col_4”, StringType()) .add(“large_text_col_5”, StringType()) .add(“large_text_col_6”, StringType()) .add(“large_text_col_7”, StringType()) .add(“large_text_col_8”, StringType()) .add(“large_text_col_9”, StringType()) def read_kafka_stream(): kafka_stream = (spark.readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_tls ) .option(“subscribe”, topic ) .option(“failOnDataLoss”,”false”) .option(“kafka.security.protocol”, “SASL_SSL”) .option(“kafka.sasl.mechanism”, “PLAIN”) .option(“kafka.sasl.jaas.config”, f’kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=”{kafka_api_key}” password=”{kafka_api_secret}”;’) .option(“minPartitions”,12) .load() .select(from_json(col(“value”).cast(“string”), schema).alias(“data”), “topic”, “partition”, “offset”, “timestamp”, “timestampType” ) .select(“topic”, “partition”, “offset”, “timestamp”, “timestampType”, “data.*”) ) return kafka_stream Explanation: Connection Setup: Connects to Kafka using specific bootstrap servers and includes security settings like SASL_SSL for encrypted and authenticated data transfer. Topic Subscription: Subscribes to a specified Kafka topic to continuously receive new data. Stream Configuration: Robustly configured to handle potential data loss and manage data across multiple partitions to boost processing speed. Data Transformation: Uses from_json to decode JSON messages according to the established schema, transforming them into a structured format within Spark. Metadata Extraction: Extracts essential metadata like the Kafka topic, partition, and message timestamps. This setup optimizes data ingestion from Kafka into Spark and prepares the data for further processing or integration into storage systems like Delta Lake. Additional code for Databricks Jobs Configuration: This method involves setting up a Databricks job and cluster resources, although it allows for flexible scheduling and monitoring of ingestion processes it but requires understanding of choosing the right compute. ( read_kafka_stream() .writeStream .option(“checkpointLocation”,checkpoint_location_for_delta) .trigger(processingTime=’1 second’) .toTable(target) ) Additional code for Delta Live Tables

Blog, Databricks, Spark, Stream

Synthetic Data Made Simple: Generating and Streaming Custom-Sized Data to Kafka

Introduction In the fast-paced world of data engineering, there often arises a need to generate large volumes of synthetic data for testing and benchmarking purposes. Recently, I was tasked with a crucial project: creating records of a specific size (1 MB each) and streaming them to Kafka for performance benchmarking. This blog post, the first in a two-part series, will walk you through how to generate such data using Python and Apache Spark, and then stream it to Kafka efficiently. Tomorrow, we’ll dive into Part 2, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. But first, let me share the story behind this endeavor. The Challenge: Preparing for Technology Decisions Imagine you’re part of a data engineering team at a rapidly growing tech startup. Your CTO has tasked you with benchmarking the expected speed of Kafka to Delta ingestion before making critical technology decisions. You quickly realize two things: The solution? Generating custom-sized fake data and using Confluent Cloud for a quick and hassle-free Kafka setup. Why Confluent Cloud? Setting up a Kafka cluster can be cumbersome, especially when dealing with security configurations and access permissions. AWS MSK is robust, but its setup can be daunting. Confluent Cloud, on the other hand, offers a quick setup process and provides $300 in free credits, making it perfect for quick experiments and testing. I had my Kafka instance up and running in just five minutes with Confluent Cloud. https://www.confluent.io/confluent-cloud/ Step-by-Step Guide Let’s dive into the code that helps you create synthetic data and push it to Kafka. Installing Necessary Packages First, install the required packages. Faker is a library that helps generate fake data, and faker_vehicle adds vehicle-specific data generation capabilities. # Databricks notebook source# MAGIC %pip install Faker faker_vehicle Importing Required Libraries Next, import the necessary libraries for data generation, streaming, and logging. from faker import Fakerfrom faker_vehicle import VehicleProviderfrom pyspark.sql import functions as Ffrom pyspark.sql.types import StringTypeimport uuidimport loggingfrom pyspark.sql.streaming import StreamingQueryfrom datetime import datetime Setting Parameters Define the parameters for Kafka configuration and checkpoint location. timestamp = datetime.now().strftime(“%Y%m%d%H%M%S”)checkpoint_location = f”/tmp/confluent_kafka_checkpoint_{timestamp}”# Kafka configurationtopic = “YOUR_TOPIC”kafka_bootstrap_servers_tls = “YOUR_KAFKA_URL.confluent.cloud:9092″kafka_api_key = “YOUR_KAFKA_API_KEY”kafka_api_secret = “YOUR_KAFKA_API_SECRET” Initialization and UDF Initialize Faker and add the vehicle provider. Configure logging for tracking the process. # Initialize Faker for data generation and add vehicle data providerfake = Faker()fake.add_provider(VehicleProvider)# Configure logginglogging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(name)s – %(levelname)s – %(message)s’)logger = logging.getLogger(__name__) Create User-Defined Functions (UDFs) for generating various fake data attributes. # User-defined functions (UDFs) for generating fake dataevent_id = F.udf(lambda: str(uuid.uuid4()), StringType())vehicle_year_make_model = F.udf(fake.vehicle_year_make_model)vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat)vehicle_make_model = F.udf(fake.vehicle_make_model)vehicle_make = F.udf(fake.vehicle_make)vehicle_model = F.udf(fake.vehicle_model)vehicle_year = F.udf(fake.vehicle_year)vehicle_category = F.udf(fake.vehicle_category)vehicle_object = F.udf(fake.vehicle_object)latitude = F.udf(fake.latitude)longitude = F.udf(fake.longitude)location_on_land = F.udf(fake.location_on_land)local_latlng = F.udf(fake.local_latlng)zipcode = F.udf(fake.zipcode) Function to Generate 1MB Row of Data Define a function to generate a DataFrame that simulates a row of data approximately 1 MB in size. @F.udf(StringType())def large_text_udf(size: int): “””Generate large text data with a specified size.””” return fake.text(max_nb_chars=size)# Configuration for large text datanum_large_columns = 10 # Number of large text columnssize_per_large_column = (1024 * 1024) // num_large_columns # Distribute 1MB across columns def generate_1mb_row_df(rowsPerSecond=10, numPartitions=2): “””Generate a DataFrame simulating streaming data, including vehicle and geographic data.””” logger.info(“Generating vehicle and geo data frame…”) df = spark.readStream.format(“rate”) \ .option(“numPartitions”, numPartitions) \ .option(“rowsPerSecond”, rowsPerSecond) \ .load() \ .withColumn(“event_id”, event_id()) \ .withColumn(“vehicle_year_make_model”, vehicle_year_make_model()) \ .withColumn(“vehicle_year_make_model_cat”, vehicle_year_make_model_cat()) \ .withColumn(“vehicle_make_model”, vehicle_make_model()) \ .withColumn(“vehicle_make”, vehicle_make()) \ .withColumn(“vehicle_model”, vehicle_model()) \ .withColumn(“vehicle_year”, vehicle_year()) \ .withColumn(“vehicle_category”, vehicle_category()) \ .withColumn(“vehicle_object”, vehicle_object()) \ .withColumn(“latitude”, latitude()) \ .withColumn(“longitude”, longitude()) \ .withColumn(“location_on_land”, location_on_land()) \ .withColumn(“local_latlng”, local_latlng()) \ .withColumn(“zipcode”, zipcode()) \ .withColumn(“large_text_col_1”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_2”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_3”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_4”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_5”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_6”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_7”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_8”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_9”, large_text_udf(F.lit(size_per_large_column))) \ .withColumn(“large_text_col_10”, large_text_udf(F.lit(size_per_large_column))) return df You can test the above code by running the below command display(generate_1mb_row_df()) Streaming Data to Kafka Start streaming the generated data to Kafka. (generate_1mb_row_df(rowsPerSecond=100, numPartitions=12) .selectExpr(“CAST(event_id AS STRING) AS key”, “to_json(struct(*)) AS value”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_tls) .option(“kafka.security.protocol”, “SASL_SSL”) .option(“kafka.sasl.mechanism”, “PLAIN”) .option(“kafka.sasl.jaas.config”, f’kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=”{kafka_api_key}” password=”{kafka_api_secret}”;’) .option(“checkpointLocation”, checkpoint_location) .option(“topic”, topic) .option(“queryName”, f”SendDataToKafka-{topic}”) .option(“kafka.max.request.size”, “1100000”) # Setting new max request size to 1.1 MB .start()) The Confluent UI was able to verify that we are able to generate 100 MB/Second Conclusion This approach allows you to create custom-sized synthetic data and stream it to Kafka efficiently. By using Confluent Cloud, you can significantly reduce the setup time and complexity, enabling a more streamlined and efficient data generation and streaming process. Stay tuned for Part 2 of this series, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. Whether you’re testing, benchmarking, or exploring data streaming, this guide provides a solid foundation to get you started. Happy streaming! Download this notebook References For more details on Spark and Kafka integration, you can refer to the following documentation: These resources provide comprehensive information and examples to help you further understand and implement Spark and Kafka integration. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Blog

Synthetic Data Made Simple: Generating and Streaming Custom-Sized Data to Kafka

Introduction In the fast-paced world of data engineering, there often arises a need to generate large volumes of synthetic data for testing and benchmarking purposes. Recently, I was tasked with a crucial project: creating records of a specific size (1 MB each) and streaming them to Kafka for performance benchmarking. This blog post, the first in a two-part series, will walk you through how to generate such data using Python and Apache Spark, and then stream it to Kafka efficiently. Tomorrow, we’ll dive into Part 2, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. But first, let me share the story behind this endeavor. The Challenge: Preparing for Technology Decisions Imagine you’re part of a data engineering team at a rapidly growing tech startup. Your CTO has tasked you with benchmarking the expected speed of Kafka to Delta ingestion before making critical technology decisions. You quickly realize two things: No suitable public Kafka feed: You need a Kafka feed that matches your specific requirements, especially in terms of record size. Complex setup with AWS MSK: Setting up AWS Managed Streaming for Apache Kafka (MSK) for external access is time-consuming and complex. The solution? Generating custom-sized fake data and using Confluent Cloud for a quick and hassle-free Kafka setup. Why Confluent Cloud? Setting up a Kafka cluster can be cumbersome, especially when dealing with security configurations and access permissions. AWS MSK is robust, but its setup can be daunting. Confluent Cloud, on the other hand, offers a quick setup process and provides $300 in free credits, making it perfect for quick experiments and testing. I had my Kafka instance up and running in just five minutes with Confluent Cloud. https://www.confluent.io/confluent-cloud/ Step-by-Step Guide Let’s dive into the code that helps you create synthetic data and push it to Kafka. Installing Necessary Packages First, install the required packages. Faker is a library that helps generate fake data, and faker_vehicle adds vehicle-specific data generation capabilities. # Databricks notebook source # MAGIC %pip install Faker faker_vehicle Importing Required Libraries Next, import the necessary libraries for data generation, streaming, and logging. from faker import Faker from faker_vehicle import VehicleProvider from pyspark.sql import functions as F from pyspark.sql.types import StringType import uuid import logging from pyspark.sql.streaming import StreamingQuery from datetime import datetime Setting Parameters Define the parameters for Kafka configuration and checkpoint location. timestamp = datetime.now().strftime(“%Y%m%d%H%M%S”) checkpoint_location = f”/tmp/confluent_kafka_checkpoint_{timestamp}” # Kafka configuration topic = “YOUR_TOPIC” kafka_bootstrap_servers_tls = “YOUR_KAFKA_URL.confluent.cloud:9092” kafka_api_key = “YOUR_KAFKA_API_KEY” kafka_api_secret = “YOUR_KAFKA_API_SECRET” Initialization and UDF Initialize Faker and add the vehicle provider. Configure logging for tracking the process. # Initialize Faker for data generation and add vehicle data provider fake = Faker() fake.add_provider(VehicleProvider) # Configure logging logging.basicConfig(level=logging.INFO, format=’%(asctime)s – %(name)s – %(levelname)s – %(message)s’) logger = logging.getLogger(__name__) Create User-Defined Functions (UDFs) for generating various fake data attributes. # User-defined functions (UDFs) for generating fake data event_id = F.udf(lambda: str(uuid.uuid4()), StringType()) vehicle_year_make_model = F.udf(fake.vehicle_year_make_model) vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat) vehicle_make_model = F.udf(fake.vehicle_make_model) vehicle_make = F.udf(fake.vehicle_make) vehicle_model = F.udf(fake.vehicle_model) vehicle_year = F.udf(fake.vehicle_year) vehicle_category = F.udf(fake.vehicle_category) vehicle_object = F.udf(fake.vehicle_object) latitude = F.udf(fake.latitude) longitude = F.udf(fake.longitude) location_on_land = F.udf(fake.location_on_land) local_latlng = F.udf(fake.local_latlng) zipcode = F.udf(fake.zipcode) Function to Generate 1MB Row of Data Define a function to generate a DataFrame that simulates a row of data approximately 1 MB in size. @F.udf(StringType()) def large_text_udf(size: int): “””Generate large text data with a specified size.””” return fake.text(max_nb_chars=size) # Configuration for large text data num_large_columns = 10 # Number of large text columns size_per_large_column = (1024 * 1024) // num_large_columns # Distribute 1MB across columnsdef generate_1mb_row_df(rowsPerSecond=10, numPartitions=2): “””Generate a DataFrame simulating streaming data, including vehicle and geographic data.””” logger.info(“Generating vehicle and geo data frame…”) df = spark.readStream.format(“rate”) .option(“numPartitions”, numPartitions) .option(“rowsPerSecond”, rowsPerSecond) .load() .withColumn(“event_id”, event_id()) .withColumn(“vehicle_year_make_model”, vehicle_year_make_model()) .withColumn(“vehicle_year_make_model_cat”, vehicle_year_make_model_cat()) .withColumn(“vehicle_make_model”, vehicle_make_model()) .withColumn(“vehicle_make”, vehicle_make()) .withColumn(“vehicle_model”, vehicle_model()) .withColumn(“vehicle_year”, vehicle_year()) .withColumn(“vehicle_category”, vehicle_category()) .withColumn(“vehicle_object”, vehicle_object()) .withColumn(“latitude”, latitude()) .withColumn(“longitude”, longitude()) .withColumn(“location_on_land”, location_on_land()) .withColumn(“local_latlng”, local_latlng()) .withColumn(“zipcode”, zipcode()) .withColumn(“large_text_col_1”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_2”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_3”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_4”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_5”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_6”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_7”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_8”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_9”, large_text_udf(F.lit(size_per_large_column))) .withColumn(“large_text_col_10”, large_text_udf(F.lit(size_per_large_column))) return df You can test the above code by running the below command display(generate_1mb_row_df()) Streaming Data to Kafka Start streaming the generated data to Kafka. (generate_1mb_row_df(rowsPerSecond=100, numPartitions=12) .selectExpr(“CAST(event_id AS STRING) AS key”, “to_json(struct(*)) AS value”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_tls) .option(“kafka.security.protocol”, “SASL_SSL”) .option(“kafka.sasl.mechanism”, “PLAIN”) .option(“kafka.sasl.jaas.config”, f’kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=”{kafka_api_key}” password=”{kafka_api_secret}”;’) .option(“checkpointLocation”, checkpoint_location) .option(“topic”, topic) .option(“queryName”, f”SendDataToKafka-{topic}”) .option(“kafka.max.request.size”, “1100000”) # Setting new max request size to 1.1 MB .start() ) The Confluent UI was able to verify that we are able to generate 100 MB/Second Conclusion This approach allows you to create custom-sized synthetic data and stream it to Kafka efficiently. By using Confluent Cloud, you can significantly reduce the setup time and complexity, enabling a more streamlined and efficient data generation and streaming process. Stay tuned for Part 2 of this series, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. Whether you’re testing, benchmarking, or exploring data streaming, this guide provides a solid foundation to get you started. Happy streaming! Download this notebook References For more details on Spark and Kafka integration, you can refer to the following documentation: Apache Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) Faker Documentation Apache Kafka Documentation These resources provide comprehensive information and examples to help you further understand and implement Spark and Kafka integration. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Give More Than You Take
Blog

How to Excel in Data Interviews in 2024

Ready to conquer your data interviews and land that dream job? Here’s a powerful guide packed with actionable steps to ensure you’re at the top of your game. Drawing from my experience coaching over 500 candidates and my time in the industry, here’s how you can prepare effectively for the most in-demand skills of 2024: SQL, Python, and Big Data fundamentals. This post contains affiliate links. As an Amazon Associate, I may earn a commission from qualifying purchases at no additional cost to you. Mastering Big Data Fundamentals and Data Warehousing Designing Data-Intensive Applications: “The Big Ideas Behind Reliable, Scalable, and Maintainable Systems”This book is a must-read, but don’t expect to grasp everything on your first try. Persist through multiple readings. Each pass will deepen your understanding. I’ve read it at least five times, and each revisit enhances my conceptual clarity. Fundamentals of Data Engineering provides a comprehensive understanding of data engineering principles, essential for building robust, scalable, and efficient data systems. This foundational knowledge is crucial for excelling in modern data-driven roles, making the book a valuable resource for both aspiring and experienced data professionals. The Data Warehouse Toolkit: The Definitive Guide to Dimensional Modeling, 3rd Edition by Ralph KimballKimball’s insights remain vital. Reflect on how these concepts apply in the Big Data space. Many of us jump straight into Big Data, overlooking foundational data warehousing principles. Don’t make that mistake. Data Modelling Excellence Understand the metrics you’re aiming to solve for and familiarize yourself with industry-standard metrics. “Measure What Matters” by John Doerr is an excellent resource for this. It introduces Objectives and Key Results (OKRs), providing a robust framework for defining success metrics across industries. Here are some key metrics to consider: Grasping these metrics helps you design data models that capture and generate these KPIs effectively. Your data models should enable these metrics efficiently, showcasing not just technical prowess but also your ability to align data solutions with business goals. Think in SQL SQL is non-negotiable for data roles. Here’s how to hone this skill: Sharpen Your Python Skills Coding skills are crucial. Practice on a whiteboard, not just paper or IDEs. This prepares you for real interview scenarios. Cover both simple and complex problems. Start with Python fundamentals and advance to more complex coding tasks. Perfect Your Interview Technique Practice makes perfect. Mock interviews with peers or coaches can provide invaluable feedback.   Leverage Community Insights Blind is an anonymous community app for workplace discussions. Engage with the tech community to gather insights on company culture, interview styles, and compensation. Always research your prospective employers thoroughly. For more insights on Spark, Delta, DBT, Python, SQL, Terraform, and other big data technologies, explore my other blogs and follow me for the latest updates.

Scroll to Top