Introduction
In the fast-paced world of data engineering, there often arises a need to generate large volumes of synthetic data for testing and benchmarking purposes. Recently, I was tasked with a crucial project: creating records of a specific size (1 MB each) and streaming them to Kafka for performance benchmarking. This blog post, the first in a two-part series, will walk you through how to generate such data using Python and Apache Spark, and then stream it to Kafka efficiently. Tomorrow, we’ll dive into Part 2, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables.
But first, let me share the story behind this endeavor.
The Challenge: Preparing for Technology Decisions
Imagine you’re part of a data engineering team at a rapidly growing tech startup. Your CTO has tasked you with benchmarking the expected speed of Kafka to Delta ingestion before making critical technology decisions. You quickly realize two things:
- No suitable public Kafka feed: You need a Kafka feed that matches your specific requirements, especially in terms of record size.
- Complex setup with AWS MSK: Setting up AWS Managed Streaming for Apache Kafka (MSK) for external access is time-consuming and complex.
The solution? Generating custom-sized fake data and using Confluent Cloud for a quick and hassle-free Kafka setup.
Why Confluent Cloud?
Setting up a Kafka cluster can be cumbersome, especially when dealing with security configurations and access permissions. AWS MSK is robust, but its setup can be daunting. Confluent Cloud, on the other hand, offers a quick setup process and provides $300 in free credits, making it perfect for quick experiments and testing. I had my Kafka instance up and running in just five minutes with Confluent Cloud. https://www.confluent.io/confluent-cloud/
Step-by-Step Guide
Let’s dive into the code that helps you create synthetic data and push it to Kafka.
Installing Necessary Packages
First, install the required packages. Faker
is a library that helps generate fake data, and faker_vehicle
adds vehicle-specific data generation capabilities.
# Databricks notebook source
# MAGIC %pip install Faker faker_vehicle
Importing Required Libraries
Next, import the necessary libraries for data generation, streaming, and logging.
from faker import Faker
from faker_vehicle import VehicleProvider
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import uuid
import logging
from pyspark.sql.streaming import StreamingQuery
from datetime import datetime
Setting Parameters
Define the parameters for Kafka configuration and checkpoint location.
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
checkpoint_location = f"/tmp/confluent_kafka_checkpoint_{timestamp}"
# Kafka configuration
topic = "YOUR_TOPIC"
kafka_bootstrap_servers_tls = "YOUR_KAFKA_URL.confluent.cloud:9092"
kafka_api_key = "YOUR_KAFKA_API_KEY"
kafka_api_secret = "YOUR_KAFKA_API_SECRET"
Initialization and UDF
Initialize Faker
and add the vehicle provider. Configure logging for tracking the process.
# Initialize Faker for data generation and add vehicle data provider
fake = Faker()
fake.add_provider(VehicleProvider)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
Create User-Defined Functions (UDFs) for generating various fake data attributes.
# User-defined functions (UDFs) for generating fake data
event_id = F.udf(lambda: str(uuid.uuid4()), StringType())
vehicle_year_make_model = F.udf(fake.vehicle_year_make_model)
vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat)
vehicle_make_model = F.udf(fake.vehicle_make_model)
vehicle_make = F.udf(fake.vehicle_make)
vehicle_model = F.udf(fake.vehicle_model)
vehicle_year = F.udf(fake.vehicle_year)
vehicle_category = F.udf(fake.vehicle_category)
vehicle_object = F.udf(fake.vehicle_object)
latitude = F.udf(fake.latitude)
longitude = F.udf(fake.longitude)
location_on_land = F.udf(fake.location_on_land)
local_latlng = F.udf(fake.local_latlng)
zipcode = F.udf(fake.zipcode)
Function to Generate 1MB Row of Data
Define a function to generate a DataFrame that simulates a row of data approximately 1 MB in size.
@F.udf(StringType())
def large_text_udf(size: int):
"""Generate large text data with a specified size."""
return fake.text(max_nb_chars=size)
# Configuration for large text data
num_large_columns = 10 # Number of large text columns
size_per_large_column = (1024 * 1024) // num_large_columns # Distribute 1MB across columns
def generate_1mb_row_df(rowsPerSecond=10, numPartitions=2):
"""Generate a DataFrame simulating streaming data, including vehicle and geographic data."""
logger.info("Generating vehicle and geo data frame...")
df = spark.readStream.format("rate") \
.option("numPartitions", numPartitions) \
.option("rowsPerSecond", rowsPerSecond) \
.load() \
.withColumn("event_id", event_id()) \
.withColumn("vehicle_year_make_model", vehicle_year_make_model()) \
.withColumn("vehicle_year_make_model_cat", vehicle_year_make_model_cat()) \
.withColumn("vehicle_make_model", vehicle_make_model()) \
.withColumn("vehicle_make", vehicle_make()) \
.withColumn("vehicle_model", vehicle_model()) \
.withColumn("vehicle_year", vehicle_year()) \
.withColumn("vehicle_category", vehicle_category()) \
.withColumn("vehicle_object", vehicle_object()) \
.withColumn("latitude", latitude()) \
.withColumn("longitude", longitude()) \
.withColumn("location_on_land", location_on_land()) \
.withColumn("local_latlng", local_latlng()) \
.withColumn("zipcode", zipcode()) \
.withColumn("large_text_col_1", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_2", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_3", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_4", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_5", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_6", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_7", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_8", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_9", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_10", large_text_udf(F.lit(size_per_large_column)))
return df
You can test the above code by running the below command
display(generate_1mb_row_df())
Streaming Data to Kafka
Start streaming the generated data to Kafka.
(generate_1mb_row_df(rowsPerSecond=100, numPartitions=12)
.selectExpr("CAST(event_id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_tls)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_api_key}" password="{kafka_api_secret}";')
.option("checkpointLocation", checkpoint_location)
.option("topic", topic)
.option("queryName", f"SendDataToKafka-{topic}")
.option("kafka.max.request.size", "1100000") # Setting new max request size to 1.1 MB
.start()
)
The Confluent UI was able to verify that we are able to generate 100 MB/Second
Conclusion
This approach allows you to create custom-sized synthetic data and stream it to Kafka efficiently. By using Confluent Cloud, you can significantly reduce the setup time and complexity, enabling a more streamlined and efficient data generation and streaming process.
Stay tuned for Part 2 of this series, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. Whether you’re testing, benchmarking, or exploring data streaming, this guide provides a solid foundation to get you started. Happy streaming!
References
For more details on Spark and Kafka integration, you can refer to the following documentation:
- Apache Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
- Faker Documentation
- Apache Kafka Documentation
These resources provide comprehensive information and examples to help you further understand and implement Spark and Kafka integration.
Footnote:
Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.