This blog aims to guide those familiar with Databricks but new to streaming data, providing an understanding of how streaming works within the platform and demonstrating how to set up a basic streaming workflow.
What is Structured Streaming?
If you’re like me, then when the word streaming is mentioned, you think of things like Netflix and Spotify. This is a good start, rather than downloading a file which is then watched or listened to, the data is sent in small, manageable pieces and played almost immediately as it is received. This is the same with Data Streaming, instead of processing data in bulk, streaming data is processed incrementally as it arrives. Examples of this could financial transactions, social media feeds Internet of Things devices.
How does it work in Databricks
In Databricks, streaming data can be ingested from multiple sources, such as Delta Lake, Azure Event Hubs, AWS Kinesis or even just as multiple files landing in a Data Lake. In this demo, we’ll use a Python script to simulate weather data coming into a Delta File.
Simulating Streaming Data
Open up a new Databricks notebook and add the following code:
import time
import random
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DoubleType
from datetime import datetime
schema = StructType([
StructField(“SensorID”, StringType(), True),
StructField(“Timestamp”, TimestampType(), True),
StructField(“Temperature”, DoubleType(), True),
StructField(“Humidity”, DoubleType(), True),
StructField(“Pressure”, DoubleType(), True),
StructField(“Location”, StringType(), True)
])
spark = SparkSession.builder.getOrCreate()
Define a function to generate random data
def generate_random_data():
return [
(f”S{random.randint(1, 100)}”, # SensorID
datetime.today(), # Timestamp is today
round(random.uniform(-30, 50), 3), # Temperature in C (-30 to 50)
round(random.uniform(30, 80), 2), # Humidity in % (30 to 80)
round(random.uniform(1000, 1020), 2), # Pressure in hPa (1000 to 1020)
random.choice([“A”, “B”, “C”, “D”, “E”])) # Location (A, B, C, D, or E)
]
Write data to Delta table
for _ in range(50):
new_data = spark.createDataFrame(generate_random_data(), schema)
new_data.write.format(“delta”).mode(“append”).save(“/mnt/delta/streaming_data”)
time.sleep(random.uniform(0, 3)) # Wait a random time (0 to 3 seconds)
This code simulates weather data coming into a Delta file at intervals of 0 to 3 seconds.
Reading Streaming Data
While the first notebook is running, open a second notebook and add this code:
from pyspark.sql import functions as F
Read the streaming data
streaming_df = (spark.readStream
.format(“delta”)
.load(“/mnt/delta/streaming_data”)
)
Print to see if the dataframe is streaming
print(streaming_df.isStreaming)
Here we have made a streaming dataframe. This is very similar to a standard dataframe, however with one main difference. Streaming dataframes work with unbounded data sources, meaning data continuously flows in and rows are incrementally processed (they’re processed in micro-batches) as they arrive.
On these dataframes, you can transform them as you would any other dataframe. I would like to find the average temperature per location.
transformed_df = (streaming_df
.groupBy(F.window(“Timestamp”, “60 seconds”), “Location”)
.withWatermark(eventTime=”Timestamp”, delayThreshold=”5 minutes”)
.agg(F.avg(“Temperature”).alias(“avg_temperature”))
)
Because this is continual data, taking an average of all the data would be very impractical as we’d have to recalculate the average using all the data anytime a new data point comes in. Therefore we’ll take an average over a specific window. We’ll walk through what each line does in the code above:
.groupBy(F.window(“Timestamp”, “60 seconds”), “Location”)
We create a 60-second tumbling window, As we are also grouping by Location, for every 60 second window, the data will be grouped by each unique location value. We create a tumbling window of 60 seconds, then group by this window and also “Location”.
.withWatermark(eventTime=”Timestamp”, delayThreshold=”5 minutes”)
A watermark is used to handle late data. As new data comes in, the aggregation is calculated for every window it’s in. Once the delay threshold is passed, the aggregation is finalized for that specific window, e.g. data arriving more than 5 minutes late won’t be considered for the aggregation.
.agg(F.avg(“temperature”).alias(“avg_temperature”))
This calculates the average temperature and renames the newly calculated column to be `avg_temperature`.
Example Timeline:
Imagine the following sequence:
– **10:00 – 10:01**: Data comes in for a time window from `10:00 ` to `10:01 `. The average temperature for that window keeps updating as new data arrives during this time.
– **10:01**: The 60-second window ends, but the system doesn’t finalize the result yet. It will continue to accept late data for the next 5 minutes.
– **Between 10:01 and 10:06**: If data arrives late with timestamps between `10:00 ` and `10:01 `, it is still processed and included in the aggregation.
– **After 10:06**: Once the 5-minute watermark has passed, the window is closed. Any data that arrives after `10:06` with timestamps earlier than `10:01 ` is dropped, and the result for that window is no longer updated.
In a new cell running `display(transformed_df)` will show you the data, you can also create a visualization by pressing the plus button next to the table.
This should look something like this (“The data is completely random “)
Saving the Data
query_avg_temp = (transformed_df.writeStream
.outputMode(“complete”)
.option(“checkpointLocation”, “/mnt/temp/avg_temp_data”)
.queryName(“avg_temperature_query”)
.trigger(availableNow=True)
.toTable(“avg_temperature_data”)
)
Again we shall walk through each step as it goes through:
.outputMode(“complete”)
There are three different output modes:
– Append Mode: Only new rows are added to the sink – A sink is the destination where the files are being saved.
– Complete Mode: All rows will be written to the sink every time (used most often when aggregating).
– Update Mode: Only the updated aggregated data is written each time. If the data has not been aggregated, this is the same as append mode.
.option(“checkpointLocation”, “/mnt/temp/avg_temp_data”)
This specifies the checkpoint location where the metadata of the stream will be stored. This is used to enable fault tolerance so if the job fails or restarts, it can recover from the last successful checkpoint.
.queryName(“avg_temperature_query”)
The name for the query. This can then be used to stop it or check the status at later dates.
.trigger(availableNow=True)
The trigger determines how frequently the query should process the data. The following triggers can be used:
– `Trigger.ProcessingTime(interval)` e.g. “15 seconds” – The query will check for new data every 15 seconds and treat it as a micro-batch.
– `Trigger.Continuous(interval)` – The query attempts to process the data as soon as it arrives; the `interval` determines how frequently the query is checkpointed.
– `Trigger.AvailableNow` – The query processes all available data then stops when there is no more new data. (A new version of `Trigger.Once` where it keeps checking until no more data is available.)
.toTable(“avg_temperature_data”)
This specifies which table the result is written to. If you run this and check the catalog, you should now see your table there along with any updates which will get added to the table.
for s in spark.streams.active:
print(s.name)
s.stop()
This code block will stop any Streaming Dataframes currently active in the session. Make sure you run this or stop the cluster as this can be expensive.
Next Steps
That’s the intro to streaming data in Databricks, I would recommend that you next look at Delta Live Tables and Pipelines – these would be how you would actually run streaming notebooks, along with Ingestion Patterns (Bronze, Silver and Gold Tables)