Author name: CanadianDataGuy

Databricks, Delta, Delta Live Tables

Delta Live Tables Advanced Q & A

This is primarily written for those trying to handle edge cases. Q1.) How can a single/unified table be built with historical backfill and ongoing streaming Kafka data? The streaming table built using DLT allows writes to the table outside of the DLT. Thus, you can build and run your DLT pipeline with Kafka as a source, generating the physical table with a name. Then, you can do a streaming write to this table outside DLT. What is the gotcha here? The data has lost its natural ordering which is fine in most cases, meaning it did not go into the Delta table in the same order it was generated. This is in contrast to an ideal world in which Kafka had infinite retention, and a single DLT pipeline would have ingested the data. If and only if you are using the table as a Streaming source with Watermarking downstream then while reading this data, we will have to instruct Spark Streaming to sort the data while reading it. We can do this by using the following parameter ‘withEventTimeOrder’. spark.readStream.format(“delta”) .option(“maxFilesPerTrigger”, f”{maxFilesPerTrigger}”) .option(“withEventTimeOrder”, “true”) .table(f”{schema_name}.{table_name}”) You can further read about this solution here https://canadiandataguy.medium.com/how-to-write-your-first-spark-application-with-stream-stream-joins-with-working-code-dd9b0b39f814#d828 To reiterate, the gotcha only applies if you use this table as a Streaming Source along with Watermarking. Q2.) How do I handle deletes in a Streaming Table? Let’s take GDPR as an example of where we need to enforce retention on the Delta table. One can run a regular DELETE command on the table and then in the DLT pipeline make changes to downstream consumers. “By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits flag can be set on the target streaming table to ignore those changes. For more information about this flag, see Ignore updates and deletes.” @tabledef b(): return spark.readStream.option(“skipChangeCommits”, “true”).table(“LIVE.A”) Q3.) How to enable mergeSchema on DLT table? This is already handled in DLT. If you want to control otherwise explicitly, you can pass the following spark conf property at the DLT pipeline or table level. spark.databricks.delta.schema.autoMerge.enabled True If you are using Autoloader, consider playing with different schema evolution modes while reading data. .option(“cloudFiles.schemaEvolutionMode”, “addNewColumns”) Q4.) How to change the location where the table is stored? @dlt.table( name=”<name>”, comment=”<comment>”, spark_conf={“<key>” : “<value”, “<key” : “<value>”}, table_properties={“<key>” : “<value>”, “<key>” : “<value>”}, path=”<storage-location-path>”, partition_cols=[“<partition-column>”, “<partition-column>”], schema=”schema-definition”, temporary=False) 3. In your DLT pipeline configuration, set this property pipelines.tableManagedByMultiplePipelinesCheck.enabledto false 4. Now, we need to make sure that we do not read any duplicate data because we cannot reuse our old checkpoint. We will solve this by using filters or providing a starting configuration for the streaming source. E.g., if your streaming source is: 4. a) Kafka: Then we will provide offset information. More information can be found here. 4. b) Delta: For example, suppose you have a table user_events. If you want to read changes since version 5, use: spark.readStream.format(“delta”) .option(“startingVersion”, “5”) .load(“/tmp/delta/user_events”) If you want to read changes since 2023–03–03, use: spark.readStream.format(“delta”) .option(“startingTimestamp”, “2018-10-18”) .load(“/tmp/delta/user_events”) More details can be found here. 5. To do step 4, you should parameterize your DLT pipeline, which can be done by following these instructions. Q5.) Does DLT support Identity Columns? Yes, more details here. However, Identity columns are not supported with APPLY CHANGES tables. Q6.) How to stream out of a table which was loaded using apply_changes? This is generally not recommended. The target of the APPLY CHANGES INTO query or apply_changes the function cannot be used as a source for a streaming live table. A table that reads from the target of a APPLY CHANGES INTO query or apply_changes function must be a live table. You can rely on enabling SCD and then use audit columns (__START_AT &__END_AT)to identify the changes. However, the downstream would still have to do a batch read and filter on these audit columns to limit the information being read. If you are adventurous and still want to do a read stream of this source. You need to enableChangeDataFeed on the delta table ‘fact_sales’. @dlt.table(name=”fact_sales”, comment=”This is a fact tables for sales”, partition_cols = [“order_date”], table_properties={ “pipelines.autoOptimize.zOrderCols”: “StoreId,ItemId”, “delta.enableChangeDataFeed”: “true”, }) Then you can decide to stream changes out of the __apply_changes_{table_name} . Make sure to handle tombstones/deletes as part of your downstream pipeline. Q7.) How to delete Data using DLT? Use the Change Data Capture functionality of DLT. The particular expression which will help you achieve this is called apply_as_deletes. You can change the parameter to match your custom criteria. For example, if you had bad records originating in a specific time interval or file name, you can change the expression to meet your custom criteria. import dltfrom pyspark.sql.functions import col, expr@dlt.viewdef users(): return spark.readStream.format(“delta”).table(“cdc_data.users”)dlt.create_streaming_live_table(“target”)dlt.apply_changes( target = “target”, source = “users”, keys = [“userId”], sequence_by = col(“sequenceNum”), apply_as_deletes = expr(“operation = ‘DELETE’ or {any other custom logic} “), except_column_list = [“operation”, “sequenceNum”], stored_as_scd_type = “2”) Q8.) How to avoid accidental overwrites in DLT? Set this property so that tables cannot be overwritten. pipelines.reset.allowed false Q9.) DLT Pipeline was deleted, but the Delta table exists. What to do now? What if the owner has left the org and I need a new DLT pipeline to take care of the table Step 1.) Verify via CLI if the pipeline has been deleted databricks –profile <your_env> pipelines listdatabricks –profile <your_env> pipelines get –pipeline-id <deleted_pipeline_id> Step 2.) Change the owner of the table ALTER TABLE <db>.<table> SET TBLPROPERTIES(pipelines.pipelineId = ‘<NEW_PIPELINE_ID>’); Note: In case you do not have a pipeline ID yet, you can use the below parameter once; run your pipeline to get the pipeline ID and then remove the below parameter. pipelines.tableManagedByMultiplePipelinesCheck.enabledto false Q10.) How does sequence_by work in apply_changes() ? There are two types of data management strategies with apply_changes: Type 1 involves keeping only the latest state of a record. This means that if an older record arrives out-of-order and we already have a newer record in the target, the older record will not update the target because it is not the latest state. Type 2 involves keeping a history of all records. This means

Best Practices, Databricks

Databricks Workspace Best Practices- A checklist for both beginners and Advanced Users

Most good things in life come with a nuance. While learning Databricks a few years ago, I spent hours searching for best practices. Thus, I devised a set of best rules that should hold in almost all scenarios. These will help you start on the right foot. Here are some basic rules for using Databricks Workspace: Once you have multiple teams using the same workspace, it’s time to set more controls. Here are examples of some Advanced best practices to put in place: Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference:

Blog, Databricks

How to get the Job ID, Run ID & Start Time for a Databricks Job with working code

It’s crucial to monitor task parameter variables such as job_id, run_id, and start_time while running ELT jobs. These system-generated values can be saved or printed for future reference. Please refer below to find the comprehensive list of supported parameters. This is a simple 2-step process: Step 1: Pass the parameters Step 2: Get/Fetch and print the values print(f””” job_id: {dbutils.widgets.get(‘job_id’)} run_id: {dbutils.widgets.get(‘run_id’)} parent_run_id: {dbutils.widgets.get(‘parent_run_id’)} task_key: {dbutils.widgets.get(‘task_key’)} “””) Next step, when you run the job; you should see an output like this Advanced & quicker method to implement Add the following boilerplate code on top of the notebook. It will capture whole context information instead, and you can parse whatever information is helpful to you. The below is code based and attributes are subject to change without notice import json, pprintdict_job_run_metadata = json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())print(f”’ currentRunId: {dict_job_run_metadata[‘currentRunId’]} jobGroup: {dict_job_run_metadata[‘jobGroup’]} ”’)# Pretty print the dictionarypprint.pprint(dict_job_run_metadata) Footnote Thank you for taking the time to read this article. If you found it helpful or enjoyable, please clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Blog, Databricks, Spark, Stream

How to write your first Spark Stream Batch Join with working code

When I started learning about Spark Streaming, I could not find enough code/material which could kick-start my journey and build my confidence. I wrote this blog to fill this gap which could help beginners understand how simple streaming is and build their first application. In this blog, I will explain most things by first principles to increase your understanding and confidence and you walk away with code for your first Streaming application. Scenario: Let’s assume we have a streaming source with data arriving all the time. We want to add more attributes from another table( Think lookup table/ dimension table). Thus we will stream the data and join with the lookup table via Stream-Batch join. The result would be written as a Delta table, which could be used downstream for analytics or streaming. Imports & Parameters from pyspark.sql import functions as Ffrom faker import Fakerimport uuid# define schema name and where should the table be storedschema_name = “test_streaming_joins”schema_storage_location = “/tmp/CHOOSE_A_PERMANENT_LOCATION/”# Please download this file from https://simplemaps.com/data/us-zips then download and place it at a location of your choice and then change the value for the variable belowstatic_table_csv_file = “/FileStore/jitesh.soni/data/us_zip_code_and_its_attributes.csv”# Static table specificationstatic_table_name = “static_zip_codes”# Target Stareaming Table specificationtarget_table_name = “joined_datasets”# Recommend you to keep the checkpoint next to the Delta table so that you do have to notion about where the checkpoint ischeckpoint_location = f”{schema_storage_location}/{target_table_name}/_checkpoints/”Create Target Database create_schema_sql = f””” CREATE SCHEMA IF NOT EXISTS {schema_name} COMMENT ‘This is {schema_name} schema’ LOCATION ‘{schema_storage_location}’ WITH DBPROPERTIES ( Owner=’Jitesh’); “””print(f”create_schema_sql: {create_schema_sql}”) Generate Static Or a lookup Dataset We will use a public dataset source with attributes about a zip code. This could be any other static source or a Delta table being updated in parallel. Note: If you pick a static source and start streaming, Spark Streaming will only read it once. If you have a few updates to the static source, you will have to restart the Spark Stream so it rereads the static source. Meanwhile, if you have the Delta table as a source, then Spark Streaming will identify the update automatically, and nothing extra needs to be done. csv_df = ( spark.read.option(“header”, True) .option(“inferSchema”, True) .csv(static_table_csv_file))display(csv_df)csv_df.write.saveAsTable(f”{schema_name}.{static_table_name}”) Next, we will Z-order the table on the key, which would be used in joins. This will help Spark Streaming do efficient joins because the Delta table is sorted by join key with statistics about which file contains which key value. spark.sql( f””” OPTIMIZE {schema_name}.{static_table_name} ZORDER BY (zip); “””) Generate Streaming Dataset We will generate a Streaming dataset using the Faker library. In the below code, we will define a few user-defined functions. fake = Faker()fake_id = F.udf(lambda: str(uuid.uuid4()))fake_firstname = F.udf(fake.first_name)fake_lastname = F.udf(fake.last_name)fake_email = F.udf(fake.ascii_company_email)# fake_date = F.udf(lambda:fake.date_time_this_month().strftime(“%Y-%m-%d %H:%M:%S”))fake_address = F.udf(fake.address)fake_zipcode = F.udf(fake.zipcode) Now, we will use spark.readStream.format(“rate”) to generate data at your desired rate. streaming_df = ( spark.readStream.format(“rate”) .option(“numPartitions”, 10) .option(“rowsPerSecond”, 1 * 1000) .load() .withColumn(“fake_id”, fake_id()) .withColumn(“fake_firstname”, fake_firstname()) .withColumn(“fake_lastname”, fake_lastname()) .withColumn(“fake_email”, fake_email()) .withColumn(“fake_address”, fake_address()) .withColumn(“fake_zipcode”, fake_zipcode()))# You can uncomment the below display command to check if the code in this cell works# display(streaming_df) Stream- Static Join or Stream -Delta Join Structured Streaming supports joins (inner join and left join) between a streaming and a static DataFrame or a Delta Table. However, a few types of stream-static outer Joins are not supported yet. lookup_delta_df = spark.read.table(static_table_name)joined_streaming_df = streaming_df.join( lookup_delta_df, streaming_df[“fake_zipcode”] == lookup_delta_df[“zip”], “left_outer”,).drop(“fake_zipcode”)# display(joined_streaming_df) Orchestrate the pipeline and write Spark Stream to Delta Table Some Tips: ( joined_streaming_df.writeStream # .trigger(availableNow=True) .queryName(“do_a_stream_join_with_the_delta_table”) .option(“checkpointLocation”, checkpoint_location) .format(“delta”) .toTable(f”{schema_name}.{target_table_name}”)) Download the code Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Blog, Delta, Spark, Stream

How to upgrade your Spark Stream application with a new checkpoint With working code

Sometimes in life, we need to make breaking changes which require us to create a new checkpoint. Some example scenarios: There could be plenty of scenarios where you want to control precisely which data(Kafka offsets) need to be processed. Not every scenario requires a new checkpoint. Here is a list of things you can change without requiring a new checkpoint. This blog helps you understand how to handle a scenario where a new checkpoint is unavoidable. Kafka Basics: Topics, partition & offset Kafka Cluster has Topics: Topics are a way to organize messages. Each topic has a name that is unique across the entire Kafka cluster. Messages are sent to and read from specific topics. In other words, producers write data on a topic, and consumers read data from the topic. Topics have Partitions, and data/messages are distributed across partitions. Every message belongs to a single partition. Partition has messages, each with a unique sequential identifier within the partition called the Offset. What is the takeaway here? We must identify what offset has already been processed for each partition, and this information can be found inside the checkpoint. What information is inside the checkpoint? Under the checkpoint folder, there are four subfolders: How to fetch information about Offset & Partition from the Checkpoint folder? List the files at the checkpoint location; we are looking for the offsets folder. checkpoint_location= “/checkpoint_location/checkpoint_for_kafka_to_delta”dbutils.fs.ls(checkpoint_location)dbutils.fs.ls(f”{checkpoint_location}/”) Next, we will list the files under the commits folder and identify the most recent commits. dbutils.fs.ls(checkpoint_location)dbutils.fs.ls(f”{checkpoint_location}/commits”) /checkpoint_location/checkpoint_for_kafka_to_delta/commits/0/checkpoint_location/checkpoint_for_kafka_to_delta/commits/1/checkpoint_location/checkpoint_for_kafka_to_delta/commits/2 Once we identify the last commits file number; we will open the equivalent offsets file. In this example, we can see the latest commits is “2”. Now let’s view the contents of the offsets file. #%fs head {FILL_THE_EXACT_PATH_OF_THE_FILE_WHICH_NEEDS_TO_BE_VIEWED}%fs head /checkpoint_location/checkpoint_for_kafka_to_delta/offsets/2{“batchWatermarkMs”:0,”batchTimestampMs”:1674623173851,”conf”:{“spark.sql.streaming.stateStore.providerClass”:”org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider”,”spark.sql.streaming.join.stateFormatVersion”:”2″,”spark.sql.streaming.stateStore.compression.codec”:”lz4″,”spark.sql.streaming.stateStore.rocksdb.formatVersion”:”5″,”spark.sql.streaming.statefulOperator.useStrictDistribution”:”true”,”spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion”:”2″,”spark.sql.streaming.multipleWatermarkPolicy”:”min”,”spark.sql.streaming.aggregation.stateFormatVersion”:”2″,”spark.sql.shuffle.partitions”:”200″}}{“topic_name_from_kafka”:{“0”:400000, “1”:300000}} The information of interest is in the end. This has the topic name and offset per partition. {“topic_name_from_kafka”:{“0”:400000, “1”:300000}} Now the easy part: Use Spark to start reading Kafka from a particular Offset Spark Streaming starts read stream by default with the latest offset. However, it provides a parameter “startingOffsets” to select a custom starting point. startingOffsets = “””{“topic_name_from_kafka”:{“0”:400000, “1”:300000}}”””kafka_stream = (spark.readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_plaintext ) .option(“subscribe”, topic ) .option(“startingOffsets”, startingOffsets ) .load())display(kafka_stream) And we are Done!!. Recommend parameterizing your code so that “startingOffsets” can be passed as a parameter. Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Blog, Delta, Delta Live Tables

How to parameterize Delta Live Tables and import reusable functions with working code

This blog will discuss passing custom parameters to a Delta Live Tables (DLT) pipeline. Furthermore, we will discuss importing functions defined in other files or locations. You can import files from the current directory or a specified location using sys.path.append(). Update: As of December 2022, you can directly import files if the reusable_functions.py file exists in the same repository by just using the import command, which is the preferred approach. However, in case these reusable_functions.py file exists outside the repository, you can take the sys.path.append() approach mentioned below. Overall, this a 4-step process: 1. Create a reusable_functions.py file Create a reusable function in a Python File (not Notebook), so we can import it later. Let’s call the file ‘reusable_functions.py’ below and place it in a path. Please make sure to note the absolute path of the folder where this file will be placed. from pyspark.sql import DataFramefrom pyspark.sql.functions import current_timestamp, current_datedef append_ingestion_columns(_df: DataFrame): return _df.withColumn(“ingestion_timestamp”, current_timestamp()).withColumn( “ingestion_date”, current_date() ) 2. Add code to receive the DLT parameters The below function is defined with try and except block so that it can work with Notebook as well, where we cannot pass the parameter value from the DLT pipeline from pyspark.sql import SparkSessiondef get_parameter_or_return_default( parameter_name: str = “pipeline.parameter_blah_blah”, default_value: str = “default_value”,) -> str: try: spark = SparkSession.getActiveSession() if spark is not None: parameter = spark.conf.get(parameter_name) else: raise Exception(“No active Spark session found.”) except Exception as e: print(f”Caught Exception: {e}. Using default value for {parameter_name}”) parameter = default_value return parameter In this example, we will pass two parameters: path_to_reusable_functions & parameter_abc. Then we will use the function defined previously to get and set default values for both. path_to_reusable_functions = get_parameter_or_return_default( parameter_name=”pipeline.path_to_reusable_functions”, default_value=”/Workspace/Repos/jitesh.soni@databricks.com/material_for_public_consumption/”,)parameter_abc = get_parameter_or_return_default( parameter_name=”pipeline.parameter_abc”, default_value=”random_default_value”) 3. Append the path to reusable_functions.py file and import the functions in the notebook import sys# Add the path so functions could be importedsys.path.append(path_to_reusable_functions)# Attempt the importfrom reusable_functions import append_ingestion_columns Next step, we will define a function to return a DataFrame and the run display command to see the output of the function. This helps one test if the code works without running the DLT pipeline. def static_dataframe(): df_which_we_got_back_after_running_sql = spark.sql( f””” SELECT ‘{path_to_reusable_functions}’ as path_to_reusable_functions ,'{parameter_abc}’ as parameter_abc “”” ) return append_ingestion_columns(df_which_we_got_back_after_running_sql)display(static_dataframe()) At this point, you should be able to run your notebook and validate everything works before we create a DLT pipeline. Next step, we define a DLT table. import dlt@dlt.table(name=”static_table”, comment=”Static table”)def dlt_static_table(): return static_dataframe() 4. Create a DLT pipeline and set/pass parameters At this step, we can create a DLT pipeline via UI, add our custom parameters, and assign them values. The full JSON representation would look something like this, we only care about the configuration section in this JSON. { “id”: “d40fa97a-5b5e-4fe7-9760-b67d78a724a1”, “clusters”: [ { “label”: “default”, “policy_id”: “E06216CAA0000360”, “autoscale”: { “min_workers”: 1, “max_workers”: 2, “mode”: “ENHANCED” } }, { “label”: “maintenance”, “policy_id”: “E06216CAA0000360” } ], “development”: true, “continuous”: false, “channel”: “PREVIEW”, “edition”: “CORE”, “photon”: false, “libraries”: [ { “notebook”: { “path”: “/Repos/jitesh.soni@databricks.com/material_for_public_consumption/notebooks/how_to_parameterize_delta_live_tables_and_import_reusable_functions” } } ], “name”: “how_to_parameterize_delta_live_tables_and_import_reusable_functions”, “storage”: “dbfs:/pipelines/d40fa97a-5b5e-4fe7-9760-b67d78a724a1”, “configuration”: { “pipeline.parameter_abc”: “this_was_passed_from_dlt_config”, “pipeline.path_to_reusable_functions”: “/Workspace/Repos/jitesh.soni@databricks.com/material_for_public_consumption/” }, “target”: “tmp_taget_schema”} Trigger your DLT pipeline. If you have reached so far, you should have an end-to-end DLT pipeline working with parameter passing and imports. *Update | How do you edit these parameters via API or CLI Below are screenshots of how to edit these parameters via CLI. The API solution would be similar. Create a JSON file with the parameters: { “id”: “d40fa97a-5b5e-4fe7-9760-b67d78a724a1”, “name”: “how_to_parameterize_delta_live_tables_and_import_reusable_functions”, “clusters”: [ { “label”: “default”, “policy_id”: “E06216CAA0000360”, “autoscale”: { “min_workers”: 1, “max_workers”: 5, “mode”: “ENHANCED” } }, { “label”: “maintenance”, “policy_id”: “E06216CAA0000360″ } ],”configuration”: { “pipeline.parameter_created_from_jobs_cli”: “this_was_created_from_jobs_cli”, “pipeline.parameter_abc”: “this_was_passed_from_dlt_config_via_job_cli”, “pipeline.path_to_reusable_functions”: “/Workspace/Repos/jitesh.soni@databricks.com/material_for_public_consumption/” }, “libraries”: [ { “notebook”: { “path”: “/Repos/jitesh.soni@databricks.com/material_for_public_consumption/notebooks/how_to_parameterize_delta_live_tables_and_import_reusable_functions” } } ]} Call the Datarbricks CLI to push the changes: Go back to Delta Live Tables UI and the change would have gone through Download the code DLT notebook and Reusable_function.py Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. References Delta Live Tables settings Delta Live Tables settings specify one or more notebooks that implement a pipeline and the parameters specifying how to… docs.databricks.com

Best Practices, Blog, Stream

Spark Streaming Best Practices-A bare minimum checklist for Beginners and Advanced Users

Most good things in life come with a nuance. While learning Streaming a few years ago, I spent hours searching for best practices. However, I would find answers to be complicated to make sense for a beginner’s mind. Thus, I devised a set of best practices that should hold true in almost all scenarios. The below checklist is not ordered, you should aim to check off as many items as you can. Beginners best practices checklist for Spark Streaming: .option(“queryName”, “IngestFromKafka”) (input_stream .select(col(“eventId”).alias(“key”), to_json(struct(col(‘action’), col(‘time’), col(‘processingTime’))).alias(“value”)) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_bootstrap_servers_plaintext ) .option(“kafka.security.protocol”, “to_be_filled”) .option(“checkpointLocation”, checkpoint_location ) .option(“topic”, topic) .option(“queryName”, “IngestFromKafka”) .start() ) spark.readStream.format(“kinesis”).**option(“streamName”, stream_name) ** Advanced best practices checklist for Spark Streaming: References: Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website **CanadianDataGuy.com** for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Blog, Delta, forEachBatch, Spark, Stream

Merge Multiple Spark Streams into a Delta Table with working code

This blog will discuss how to read from multiple Spark Streams and merge/upsert data into a single Delta Table. We will also optimize/cluster data of the delta table. Overall, the process works in the following manner: Architecture First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc. However, for simplicity we will use .format(’rate’) to generate a stream. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 1,000 rows per second across 100 partitions. For the purpose of this blog, we will build 2 Spark streams one for each country Canada & USA. USA’s stream generated_streaming_usa_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” ,”‘USA’ AS country” ,”current_timestamp() as ingestion_timestamp” )) #display(generated_streaming_usa_df) Canada’s Stream generated_streaming_canada_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” ,”‘Canada’ AS country” ,”current_timestamp() as ingestion_timestamp” )) #display(generated_streaming_canada_df) Parameters / Variables (Feel free to change as per your needs) target_table_name = “to_be_merged_into_table_partitioned_by_country”check_point_location_for_usa_stream = f”/tmp/delta/{target_table_name}/_checkpoints/_usa/”check_point_location_for_canada_stream = f”/tmp/delta/{target_table_name}/_checkpoints/_canada/”join_column_name =”hash”partition_column = “country” Create an Empty Delta table so data could be merged into it #spark.sql(f””” DROP TABLE IF EXISTS {target_table_name};”””)( generated_steaming_usa_df.writeStream .partitionBy(partition_column) .format(“delta”) .outputMode(“append”).trigger(once=True) .option(“checkpointLocation”, check_point_location_for_usa_stream) .toTable(target_table_name)) Check if data is populated. If you do not see any data, just run the above code snippet once more. Sometimes it takes time for the data to show up. display(spark.read.table(target_table_name)) Now we will build the code for the user-defined function which does all the data processing, merge & Optimize def make_changes_using_the_micro_batch(microBatchOutputDF, batchId: int): print(f”Processing batchId: {batchId}”) microBatchOutputDF.createOrReplaceTempView(“updates”) spark_session_for_this_micro_batch = microBatchOutputDF._jdf.sparkSession() spark_session_for_this_micro_batch.sql(f””” SELECT * FROM ( select * ,rank() over(partition by {join_column_name} order by value desc) as dedupe from updates ) WHERE dedupe =1 “””).drop(“dedupe”).createOrReplaceTempView(“updates_which_need_to_be_merged”) spark_session_for_this_micro_batch.sql(f””” MERGE INTO {target_table_name} target using updates_which_need_to_be_merged u on u.{partition_column} = target.{partition_column} AND u.{join_column_name} = target.{join_column_name} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) optimize_every_n_batches = 20 #Define how often should optimize run? for example: at 50, it means that we will run the optimize command every 50 batches of stream data if batchId % optimize_every_n_batches == 0: optimize_and_zorder_table(table_name = target_table_name, zorder_by_col_name = join_column_name) Optimize/ Z-order a Delta table Why do we need to optimize a table? If we keep adding files to our Delta table and never optimize/sort them then over time we need to read a lot of files during merge time. Thus, optimizing the Delta table after every N merges is better. N needs to be decided on your latency requirements. You could start with N as 10 and change it as per your needs. The below code will run an optimize and zorder command on a given table that is being fed by a stream. Optimize commands can’t run in a silo because it will require us to pause and then resume the stream. Therefore, we need to call this function a part of the upsert function. This enables us to optimize before the next batch of streaming data comes through. from timeit import default_timer as timer def optimize_and_zorder_table(table_name: str, zorder_by_col_name: str) -> None: “”” Parameters: table_name: str name of the table to be optimized zorder_by_col_name: str comma separated list of columns to zorder by. example “col_a, col_b, col_c” “”” start = timer() print(f”Met condition to optimize table {table_name}”) sql_query_optimize = f”OPTIMIZE {table_name} ZORDER BY ({zorder_by_col_name})” spark.sql(sql_query_optimize) end = timer() time_elapsed_seconds = end – start print( f”Successfully optimized table {table_name} . Total time elapsed: {time_elapsed_seconds} seconds” ) Orchestrate the streaming pipeline end to end Read the Canada stream and merge into Delta table ( generated_steaming_canada_df .writeStream.format(‘delta’) #.trigger(availableNow=True) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location_for_canada_stream) .foreachBatch(make_changes_using_the_micro_batch) .start()) Read the USA stream and merge into Delta table ( generated_steaming_usa_df .writeStream.format(‘delta’) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location_for_usa_stream) .foreachBatch(make_changes_using_the_micro_batch) .start()) Now, let’s validate that data is being populated display( spark.sql(f””” SELECT {partition_column} as partition_column ,count(1) as row_count FROM {target_table_name} GROUP BY {partition_column} “””)) If you have reached so far, you should have an end-to-end pipeline working with streaming data and merging data into a Delta table. Download this notebook material_for_public_consumption/Merge Multiple Spark Streams Into A Delta Table.py at main ·… Contribute to jiteshsoni/material_for_public_consumption development by creating an account on GitHub. github.com Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference https://cdn.embedly.com/widgets/media.html?src=https%3A%2F%2Fwww.youtube.com%2Fembed%2F2Iy5S0Hf4XM%3Ffeature%3Doembed&display_name=YouTube&url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3D2Iy5S0Hf4XM&image=https%3A%2F%2Fi.ytimg.com%2Fvi%2F2Iy5S0Hf4XM%2Fhqdefault.jpg&key=a19fcc184b9711e1b4764040d3dc5c07&type=text%2Fhtml&schema=youtube

Blog

How to prepare yourself to be better at Data Interviews?

In this blog, let’s talk about some specific actions you can take to perform better at Data Interviews. Below is general advice based on my experience coaching 100+ candidates and my industry experience being on both sides of the table. Popular skill set as of 2024 still seems to be SQL, Python & Big Data fundamentals. Here is how to prepare for each of them. Big Data fundamentals and Data Warehousing When you read this book, do not expect to understand this after the first attempt; Just read through it. Some chapters make sense in the second or third iteration. Do not quit on the book because it’s too daunting at first. I have read this book at least five times and keep revising some chapters occasionally, improving my conceptual understanding. Written by “Ralph Kimball” who is considered a thought leader for Data Warehousing. The concepts you will learn in this book still hold true in 2023. I encourage you to ponder how these concepts hold in the Big Data space when you read this book. As a function of time, many of us graduated directly into the Big Data space without understanding data warehousing enough. Think in SQL: Most businesses do a SQL interview in addition to a coding interview because it is a crucial competence for data engineers, scientists or analysts. Building dependable and scalable data processing and data modelling solutions is your job, and SQL should come naturally to you. Practice Python/ ( Or a language of your choice) To practice your coding skills, you must use a whiteboard rather than just paper or integrated development environments (IDEs), which provide syntax support and standard formatting. Doing this will make you feel more at ease during the actual coding interview rounds. You ought to be knowledgeable about both simple and complicated issues. Learning the foundations of a programming language, like Python, and practicing its syntax and commands are good places to start. Read the below books Practice Interviewing: As you start applying for jobs and come close to the on-site interview stage, practicing mock interviews with peers and experienced coaches who can give you personalized feedback to improve your interview performance is beneficial. Interviewing is a skill that needs to be polished before you go to the on-site interviews. I recommend the candidates think about opportunity loss. Finding a good coach who can provide valuable tips can shorten your learning curve and will help you identify your blind spot. Example: If you land a job worth $100K a week early. Then a week represents ~$1923 of opportunity. Blind: Blind is an anonymous community app for the workplace. The tech community is very helpful and transparent. Always research the company, their interview style and compensation before your interviews. “Opinions expressed are solely my own and do not express the views or opinions of my employer.” Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Scroll to Top