...

Synthetic Data Made Simple: Generating and Streaming Custom-Sized Data to Kafka

Introduction

In the fast-paced world of data engineering, there often arises a need to generate large volumes of synthetic data for testing and benchmarking purposes. Recently, I was tasked with a crucial project: creating records of a specific size (1 MB each) and streaming them to Kafka for performance benchmarking. This blog post, the first in a two-part series, will walk you through how to generate such data using Python and Apache Spark, and then stream it to Kafka efficiently. Tomorrow, we’ll dive into Part 2, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables.

But first, let me share the story behind this endeavor.

The Challenge: Preparing for Technology Decisions

Imagine you’re part of a data engineering team at a rapidly growing tech startup. Your CTO has tasked you with benchmarking the expected speed of Kafka to Delta ingestion before making critical technology decisions. You quickly realize two things:

  1. No suitable public Kafka feed: You need a Kafka feed that matches your specific requirements, especially in terms of record size.
  2. Complex setup with AWS MSK: Setting up AWS Managed Streaming for Apache Kafka (MSK) for external access is time-consuming and complex.

The solution? Generating custom-sized fake data and using Confluent Cloud for a quick and hassle-free Kafka setup.

Why Confluent Cloud?

Setting up a Kafka cluster can be cumbersome, especially when dealing with security configurations and access permissions. AWS MSK is robust, but its setup can be daunting. Confluent Cloud, on the other hand, offers a quick setup process and provides $300 in free credits, making it perfect for quick experiments and testing. I had my Kafka instance up and running in just five minutes with Confluent Cloud. https://www.confluent.io/confluent-cloud/

Step-by-Step Guide

Let’s dive into the code that helps you create synthetic data and push it to Kafka.

Installing Necessary Packages

First, install the required packages. Faker is a library that helps generate fake data, and faker_vehicle adds vehicle-specific data generation capabilities.

# Databricks notebook source
# MAGIC %pip install Faker faker_vehicle

Importing Required Libraries

Next, import the necessary libraries for data generation, streaming, and logging.

from faker import Faker
from faker_vehicle import VehicleProvider
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import uuid
import logging
from pyspark.sql.streaming import StreamingQuery
from datetime import datetime

Setting Parameters

Define the parameters for Kafka configuration and checkpoint location.

timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
checkpoint_location = f"/tmp/confluent_kafka_checkpoint_{timestamp}"
# Kafka configuration
topic = "YOUR_TOPIC"
kafka_bootstrap_servers_tls = "YOUR_KAFKA_URL.confluent.cloud:9092"
kafka_api_key = "YOUR_KAFKA_API_KEY"
kafka_api_secret = "YOUR_KAFKA_API_SECRET"

Initialization and UDF

Initialize Faker and add the vehicle provider. Configure logging for tracking the process.

# Initialize Faker for data generation and add vehicle data provider
fake = Faker()
fake.add_provider(VehicleProvider)

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

Create User-Defined Functions (UDFs) for generating various fake data attributes.

# User-defined functions (UDFs) for generating fake data
event_id = F.udf(lambda: str(uuid.uuid4()), StringType())
vehicle_year_make_model = F.udf(fake.vehicle_year_make_model)
vehicle_year_make_model_cat = F.udf(fake.vehicle_year_make_model_cat)
vehicle_make_model = F.udf(fake.vehicle_make_model)
vehicle_make = F.udf(fake.vehicle_make)
vehicle_model = F.udf(fake.vehicle_model)
vehicle_year = F.udf(fake.vehicle_year)
vehicle_category = F.udf(fake.vehicle_category)
vehicle_object = F.udf(fake.vehicle_object)
latitude = F.udf(fake.latitude)
longitude = F.udf(fake.longitude)
location_on_land = F.udf(fake.location_on_land)
local_latlng = F.udf(fake.local_latlng)
zipcode = F.udf(fake.zipcode)

Function to Generate 1MB Row of Data

Define a function to generate a DataFrame that simulates a row of data approximately 1 MB in size.

@F.udf(StringType())
def large_text_udf(size: int):
"""Generate large text data with a specified size."""
return fake.text(max_nb_chars=size)
# Configuration for large text data
num_large_columns = 10 # Number of large text columns
size_per_large_column = (1024 * 1024) // num_large_columns # Distribute 1MB across columns
def generate_1mb_row_df(rowsPerSecond=10, numPartitions=2):
"""Generate a DataFrame simulating streaming data, including vehicle and geographic data."""
logger.info("Generating vehicle and geo data frame...")
df = spark.readStream.format("rate") \
.option("numPartitions", numPartitions) \
.option("rowsPerSecond", rowsPerSecond) \
.load() \
.withColumn("event_id", event_id()) \
.withColumn("vehicle_year_make_model", vehicle_year_make_model()) \
.withColumn("vehicle_year_make_model_cat", vehicle_year_make_model_cat()) \
.withColumn("vehicle_make_model", vehicle_make_model()) \
.withColumn("vehicle_make", vehicle_make()) \
.withColumn("vehicle_model", vehicle_model()) \
.withColumn("vehicle_year", vehicle_year()) \
.withColumn("vehicle_category", vehicle_category()) \
.withColumn("vehicle_object", vehicle_object()) \
.withColumn("latitude", latitude()) \
.withColumn("longitude", longitude()) \
.withColumn("location_on_land", location_on_land()) \
.withColumn("local_latlng", local_latlng()) \
.withColumn("zipcode", zipcode()) \
.withColumn("large_text_col_1", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_2", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_3", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_4", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_5", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_6", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_7", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_8", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_9", large_text_udf(F.lit(size_per_large_column))) \
.withColumn("large_text_col_10", large_text_udf(F.lit(size_per_large_column)))
return df

You can test the above code by running the below command

display(generate_1mb_row_df())

Streaming Data to Kafka

Start streaming the generated data to Kafka.

(generate_1mb_row_df(rowsPerSecond=100, numPartitions=12)
.selectExpr("CAST(event_id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_tls)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_api_key}" password="{kafka_api_secret}";')
.option("checkpointLocation", checkpoint_location)
.option("topic", topic)
.option("queryName", f"SendDataToKafka-{topic}")
.option("kafka.max.request.size", "1100000") # Setting new max request size to 1.1 MB
.start()
)

The Confluent UI was able to verify that we are able to generate 100 MB/Second

Conclusion

This approach allows you to create custom-sized synthetic data and stream it to Kafka efficiently. By using Confluent Cloud, you can significantly reduce the setup time and complexity, enabling a more streamlined and efficient data generation and streaming process.

Stay tuned for Part 2 of this series, where we’ll benchmark Kafka against Delta ingestion speed on Databricks Jobs and Delta Live Tables. Whether you’re testing, benchmarking, or exploring data streaming, this guide provides a solid foundation to get you started. Happy streaming!

Download this notebook

References

For more details on Spark and Kafka integration, you can refer to the following documentation:

These resources provide comprehensive information and examples to help you further understand and implement Spark and Kafka integration.

Footnote:

Thank you for taking the time to read this article. If you found it helpful or enjoyable, please consider clapping to show appreciation and help others discover it. Don’t forget to follow me for more insightful content, and visit my website CanadianDataGuy.com for additional resources and information. Your support and feedback are essential to me, and I appreciate your engagement with my work.

Scroll to Top
Seraphinite AcceleratorOptimized by Seraphinite Accelerator
Turns on site high speed to be attractive for people and search engines.