Databricks Structured Streaming Example

Platforms
Databricks Structured Streaming Example

Introduction

Spark 2 introduced the concept of structured streaming, giving users the ability to process streams of unbounded data using higher level abstractions.

This is an extremely powerful capability which allows data engineers to do streaming transformations and analytics over data as it is ingested, and potentially join and integrate this with batch data at rest. This can happen within Spark and potentially within Databricks so that ETL can take place in the same location as data analysis and data science activities.

As the name implies, Structured Streams relies on a typed model, whereby we define the structure of our messages up front as a schema. In the example below, we have defined a simple order with an ID, a category, a value and a shopping type.

import org.apache.spark.sql.types._

spark.conf.set("spark.sql.streaming.stopActiveRunOnRestart", true)

val arraySchema = ArrayType(StringType)

val jsonSchema = new StructType()
                .add("OrderID",StringType)
                .add("Category", StringType)
                .add("Value", IntegerType)
                .add("Shipping", StringType)

Often, we would would then look to ingest messages as they stream in from an external source such as Apache Kafka:

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.types._
import scala.util.Random

def getUrlContent(url: String): String = {
    scala.io.Source.fromURL(url).mkString
}

val inputStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "123.123.123.123:9094,123.123.123.123:9094,123.123.123.123:9094")
  .option("subscribe", "1_1_Orders")
  .option("startingOffsets", "latest")
  .load()
  .selectExpr("cast (value as STRING) jsonData")
  .select(from_json($"jsonData", jsonSchema).alias("rec"))
  .select("rec.*")

display(inputStream)

The next step is typically to stream the structured stream to disk, potentially as a delta table for performance and transactionality. Sometimes this is described as a bronze table as it is a simple audit log of inbound data.

  inputStream.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/dbfs/ordercheckpoint")
  .option("mergeSchema", "true")
  .option("path", "/dbfs/orders")
  .table("inbound_orders")

display(inputStream)

Finally, we then tend to stream directly from the bronze table as it is written to create downstream aggregations such as a group by category which is updated in real time. This gives us both an audit of the inbound data and a downstream aggregation table that is updated in real time.

val ordersStream = spark
  .readStream
  .format("delta")
  .table("inbound_orders")
  .groupBy("category")
  .agg(sum($"value") as "total_value")

display(ordersStream)

ordersStream.writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/dbfs/ordersbycategorycheckpoint")
  .option("mergeSchema", "true")
  .option("path", "/dbfs/ordersbycategory")
  .table("orderbycategory")

Structured Streaming within Databricks is a very nice solution for ETL, pre-aggregating data and real time analytics. If you would like to deploy this pattern, please get in touch.

Hands-On Training For The Modern Data Stack

Timeflow Academy is an online, hands-on platform for learning about Data Engineering and Modern Cloud-Native Database management using tools such as DBT, Snowflake, Kafka, Spark and Airflow.

Join our mailing list for our latest insights on Data Engineering:

Timeflow Academy is the leading online, hands-on platform for learning about Data Engineering using the Modern Data Stack. Bought to you by Timeflow CI

© 2023 Timeflow Academy. All rights reserved