Delta

Blog, Delta, Delta Live Tables

How to parameterize Delta Live Tables and import reusable functions with working code

This blog will discuss passing custom parameters to a Delta Live Tables (DLT) pipeline. Furthermore, we will discuss importing functions defined in other files or locations. You can import files from the current directory or a specified location using sys.path.append(). Update: As of December 2022, you can directly import files if the reusable_functions.py file exists in the same repository by just using the import command, which is the preferred approach. However, in case these reusable_functions.py file exists outside the repository, you can take the sys.path.append() approach mentioned below. Overall, this a 4-step process: 1. Create a reusable_functions.py file Create a reusable function in a Python File (not Notebook), so we can import it later. Let’s call the file ‘reusable_functions.py’ below and place it in a path. Please make sure to note the absolute path of the folder where this file will be placed. from pyspark.sql import DataFramefrom pyspark.sql.functions import current_timestamp, current_datedef append_ingestion_columns(_df: DataFrame): return _df.withColumn(“ingestion_timestamp”, current_timestamp()).withColumn( “ingestion_date”, current_date() ) 2. Add code to receive the DLT parameters The below function is defined with try and except block so that it can work with Notebook as well, where we cannot pass the parameter value from the DLT pipeline from pyspark.sql import SparkSessiondef get_parameter_or_return_default( parameter_name: str = “pipeline.parameter_blah_blah”, default_value: str = “default_value”,) -> str: try: spark = SparkSession.getActiveSession() if spark is not None: parameter = spark.conf.get(parameter_name) else: raise Exception(“No active Spark session found.”) except Exception as e: print(f”Caught Exception: {e}. Using default value for {parameter_name}”) parameter = default_value return parameter In this example, we will pass two parameters: path_to_reusable_functions & parameter_abc. Then we will use the function defined previously to get and set default values for both. path_to_reusable_functions = get_parameter_or_return_default( parameter_name=”pipeline.path_to_reusable_functions”, default_value=”/Workspace/Repos/jitesh.soni@databricks.com/material_for_public_consumption/”,)parameter_abc = get_parameter_or_return_default( parameter_name=”pipeline.parameter_abc”, default_value=”random_default_value”) 3. Append the path to reusable_functions.py file and import the functions in the notebook import sys# Add the path so functions could be importedsys.path.append(path_to_reusable_functions)# Attempt the importfrom reusable_functions import append_ingestion_columns Next step, we will define a function to return a DataFrame and the run display command to see the output of the function. This helps one test if the code works without running the DLT pipeline. def static_dataframe(): df_which_we_got_back_after_running_sql = spark.sql( f””” SELECT ‘{path_to_reusable_functions}’ as path_to_reusable_functions ,'{parameter_abc}’ as parameter_abc “”” ) return append_ingestion_columns(df_which_we_got_back_after_running_sql)display(static_dataframe()) At this point, you should be able to run your notebook and validate everything works before we create a DLT pipeline. Next step, we define a DLT table. import dlt@dlt.table(name=”static_table”, comment=”Static table”)def dlt_static_table(): return static_dataframe() 4. Create a DLT pipeline and set/pass parameters At this step, we can create a DLT pipeline via UI, add our custom parameters, and assign them values. The full JSON representation would look something like this, we only care about the configuration section in this JSON. { “id”: “d40fa97a-5b5e-4fe7-9760-b67d78a724a1”, “clusters”: [ { “label”: “default”, “policy_id”: “E06216CAA0000360”, “autoscale”: { “min_workers”: 1, “max_workers”: 2, “mode”: “ENHANCED” } }, { “label”: “maintenance”, “policy_id”: “E06216CAA0000360” } ], “development”: true, “continuous”: false, “channel”: “PREVIEW”, “edition”: “CORE”, “photon”: false, “libraries”: [ { “notebook”: { “path”: “/Repos/jitesh.soni@databricks.com/material_for_public_consumption/notebooks/how_to_parameterize_delta_live_tables_and_import_reusable_functions” } } ], “name”: “how_to_parameterize_delta_live_tables_and_import_reusable_functions”, “storage”: “dbfs:/pipelines/d40fa97a-5b5e-4fe7-9760-b67d78a724a1”, “configuration”: { “pipeline.parameter_abc”: “this_was_passed_from_dlt_config”, “pipeline.path_to_reusable_functions”: “/Workspace/Repos/jitesh.soni@databricks.com/material_for_public_consumption/” }, “target”: “tmp_taget_schema”} Trigger your DLT pipeline. If you have reached so far, you should have an end-to-end DLT pipeline working with parameter passing and imports. *Update | How do you edit these parameters via API or CLI Below are screenshots of how to edit these parameters via CLI. The API solution would be similar. Create a JSON file with the parameters: { “id”: “d40fa97a-5b5e-4fe7-9760-b67d78a724a1”, “name”: “how_to_parameterize_delta_live_tables_and_import_reusable_functions”, “clusters”: [ { “label”: “default”, “policy_id”: “E06216CAA0000360”, “autoscale”: { “min_workers”: 1, “max_workers”: 5, “mode”: “ENHANCED” } }, { “label”: “maintenance”, “policy_id”: “E06216CAA0000360″ } ],”configuration”: { “pipeline.parameter_created_from_jobs_cli”: “this_was_created_from_jobs_cli”, “pipeline.parameter_abc”: “this_was_passed_from_dlt_config_via_job_cli”, “pipeline.path_to_reusable_functions”: “/Workspace/Repos/jitesh.soni@databricks.com/material_for_public_consumption/” }, “libraries”: [ { “notebook”: { “path”: “/Repos/jitesh.soni@databricks.com/material_for_public_consumption/notebooks/how_to_parameterize_delta_live_tables_and_import_reusable_functions” } } ]} Call the Datarbricks CLI to push the changes: Go back to Delta Live Tables UI and the change would have gone through Download the code DLT notebook and Reusable_function.py Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. References Delta Live Tables settings Delta Live Tables settings specify one or more notebooks that implement a pipeline and the parameters specifying how to… docs.databricks.com

Blog, Delta, forEachBatch, Spark, Stream

Merge Multiple Spark Streams into a Delta Table with working code

This blog will discuss how to read from multiple Spark Streams and merge/upsert data into a single Delta Table. We will also optimize/cluster data of the delta table. Overall, the process works in the following manner: Architecture First, we need some input data to merge. You could technically make a stream out of Kafka, Kinesis, s3, etc. However, for simplicity we will use .format(’rate’) to generate a stream. Feel free to alter numPartitions & rowsPerSecond . These parameters help you control how much volume of data you want to generate. In the below code, we generated 1,000 rows per second across 100 partitions. For the purpose of this blog, we will build 2 Spark streams one for each country Canada & USA. USA’s stream generated_streaming_usa_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” ,”‘USA’ AS country” ,”current_timestamp() as ingestion_timestamp” )) #display(generated_streaming_usa_df) Canada’s Stream generated_streaming_canada_df = ( spark.readStream .format(“rate”) .option(“numPartitions”, 100) .option(“rowsPerSecond”, 1 * 1000) .load() .selectExpr( “md5( CAST (value AS STRING) ) as md5″ ,”value” ,”value%1000000 as hash” ,”‘Canada’ AS country” ,”current_timestamp() as ingestion_timestamp” )) #display(generated_streaming_canada_df) Parameters / Variables (Feel free to change as per your needs) target_table_name = “to_be_merged_into_table_partitioned_by_country”check_point_location_for_usa_stream = f”/tmp/delta/{target_table_name}/_checkpoints/_usa/”check_point_location_for_canada_stream = f”/tmp/delta/{target_table_name}/_checkpoints/_canada/”join_column_name =”hash”partition_column = “country” Create an Empty Delta table so data could be merged into it #spark.sql(f””” DROP TABLE IF EXISTS {target_table_name};”””)( generated_steaming_usa_df.writeStream .partitionBy(partition_column) .format(“delta”) .outputMode(“append”).trigger(once=True) .option(“checkpointLocation”, check_point_location_for_usa_stream) .toTable(target_table_name)) Check if data is populated. If you do not see any data, just run the above code snippet once more. Sometimes it takes time for the data to show up. display(spark.read.table(target_table_name)) Now we will build the code for the user-defined function which does all the data processing, merge & Optimize def make_changes_using_the_micro_batch(microBatchOutputDF, batchId: int): print(f”Processing batchId: {batchId}”) microBatchOutputDF.createOrReplaceTempView(“updates”) spark_session_for_this_micro_batch = microBatchOutputDF._jdf.sparkSession() spark_session_for_this_micro_batch.sql(f””” SELECT * FROM ( select * ,rank() over(partition by {join_column_name} order by value desc) as dedupe from updates ) WHERE dedupe =1 “””).drop(“dedupe”).createOrReplaceTempView(“updates_which_need_to_be_merged”) spark_session_for_this_micro_batch.sql(f””” MERGE INTO {target_table_name} target using updates_which_need_to_be_merged u on u.{partition_column} = target.{partition_column} AND u.{join_column_name} = target.{join_column_name} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) optimize_every_n_batches = 20 #Define how often should optimize run? for example: at 50, it means that we will run the optimize command every 50 batches of stream data if batchId % optimize_every_n_batches == 0: optimize_and_zorder_table(table_name = target_table_name, zorder_by_col_name = join_column_name) Optimize/ Z-order a Delta table Why do we need to optimize a table? If we keep adding files to our Delta table and never optimize/sort them then over time we need to read a lot of files during merge time. Thus, optimizing the Delta table after every N merges is better. N needs to be decided on your latency requirements. You could start with N as 10 and change it as per your needs. The below code will run an optimize and zorder command on a given table that is being fed by a stream. Optimize commands can’t run in a silo because it will require us to pause and then resume the stream. Therefore, we need to call this function a part of the upsert function. This enables us to optimize before the next batch of streaming data comes through. from timeit import default_timer as timer def optimize_and_zorder_table(table_name: str, zorder_by_col_name: str) -> None: “”” Parameters: table_name: str name of the table to be optimized zorder_by_col_name: str comma separated list of columns to zorder by. example “col_a, col_b, col_c” “”” start = timer() print(f”Met condition to optimize table {table_name}”) sql_query_optimize = f”OPTIMIZE {table_name} ZORDER BY ({zorder_by_col_name})” spark.sql(sql_query_optimize) end = timer() time_elapsed_seconds = end – start print( f”Successfully optimized table {table_name} . Total time elapsed: {time_elapsed_seconds} seconds” ) Orchestrate the streaming pipeline end to end Read the Canada stream and merge into Delta table ( generated_steaming_canada_df .writeStream.format(‘delta’) #.trigger(availableNow=True) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location_for_canada_stream) .foreachBatch(make_changes_using_the_micro_batch) .start()) Read the USA stream and merge into Delta table ( generated_steaming_usa_df .writeStream.format(‘delta’) .trigger(processingTime=’10 seconds’) .option(“checkpointLocation”, check_point_location_for_usa_stream) .foreachBatch(make_changes_using_the_micro_batch) .start()) Now, let’s validate that data is being populated display( spark.sql(f””” SELECT {partition_column} as partition_column ,count(1) as row_count FROM {target_table_name} GROUP BY {partition_column} “””)) If you have reached so far, you should have an end-to-end pipeline working with streaming data and merging data into a Delta table. Download this notebook material_for_public_consumption/Merge Multiple Spark Streams Into A Delta Table.py at main ·… Contribute to jiteshsoni/material_for_public_consumption development by creating an account on GitHub. github.com Footnote: Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work. Reference https://cdn.embedly.com/widgets/media.html?src=https%3A%2F%2Fwww.youtube.com%2Fembed%2F2Iy5S0Hf4XM%3Ffeature%3Doembed&display_name=YouTube&url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3D2Iy5S0Hf4XM&image=https%3A%2F%2Fi.ytimg.com%2Fvi%2F2Iy5S0Hf4XM%2Fhqdefault.jpg&key=a19fcc184b9711e1b4764040d3dc5c07&type=text%2Fhtml&schema=youtube

Scroll to Top