Spark Streaming
Structured Streaming
• Strong guarantees about consistency with batch jobs – the engine uploads the data as a sequential stream.
• Transactional integration with storage systems – transactional updates are part of the core API now, once data is processed it is only being updated, this provides a consistent snapshot of the data.
• Tight integration with the rest of Spark – Structured Streaming supports serving interactive queries on streaming state with Spark SQL and JDBC, and integrates with MLlib.
• Late data support – explicit support of “event time” to aggregate out of order data (late data) and bigger support for windowing and sessions, this avoids the problems Spark Streaming has with Processing Order.
Source Data
%fs ls "/FileStore/tables/streaming/"
%fs head "/FileStore/tables/streaming/fhv_tripdata_2018_01_1-b3a55.csv"
%fs head "/FileStore/tables/streaming/Lookup/NYC_Taxi_zones.csv"
Data Loading
from pyspark.sql.functions import * from pyspark.sql.types import * inputPath = "/FileStore/tables/streaming/" staticDataFrame = spark.read.format("csv")\ .option("header", "true")\ .option("inferSchema","true") \ .load(inputPath) \ .select("Pickup_DateTime", "DropOff_datetime", "PUlocationID", "DOlocationID") OrderSchema = staticDataFrame.schema display(staticDataFrame)
Now let’s load the mapping table for NYC zone and borough names:
LocationPath = "/FileStore/tables/streaming/Lookup/NYC_Taxi_zones.csv" df_Location = spark.read.option("header", "true").csv(LocationPath) display(df_Location)
Let’s add some business logic for the purposes of the analysis, we need to combine Pickup_datetime and Dropoff_Datetime in one column – called ServiceTime and adding a new hardcoded column for Service Type. We also need to join our static data frame with the location mapping table to retrieve the borough and zone names.
df_Pickup = staticDataFrame.select(col("Pickup_Datetime").alias("ServiceTime"), col("PUlocationID").alias("Location")).withColumn("ServiceType",lit("Pickup")) df_Dropoff = staticDataFrame.select(col("Dropoff_Datetime").alias("ServiceTime"), col("DOlocationID").alias("Location")).withColumn("ServiceType",lit("DropOff")) df_final = df_Pickup.union(df_Dropoff) df_final = df_final.join(df_Location,df_final.Location == df_Location.LocationID, 'left' ).withColumn("ServiceHour", hour("ServiceTime")) df_final.createOrReplaceTempView("taxi_data") staticSchema= df_final.schema display(df_final)
Now we can compute the number of “Pickup” and “Dropoff” orders per borough, Service hour, Service day with one-hour windows. To do this, we will group by the ServiceType, ServiceHour, ServiceDay and borough columns and 1-hour windows over the Servicetime column.
from pyspark.sql.functions import * # for window() function staticCountsDF = ( df_final\ .selectExpr( "Borough", "ServiceType", "ServiceHour", "ServiceTime").withColumn("Service_Day", date_format(col("ServiceTime"),'EEEE')) .groupBy( col("Service_Day"), col("ServiceType"), col("ServiceHour"), col("Borough"), window("ServiceTime", "1 hour")) .count() ) staticCountsDF.cache() # Register the DataFrame as view 'static_counts' staticCountsDF.createOrReplaceTempView("static_counts")
Now we can directly use SQL to query the view. For example, here are we show a timeline of windowed counts separated by Service type and Service hour.
%sql select ServiceType,Servicehour, sum(count) as total_count from static_counts group by ServiceType,Servicehour order by Servicehour, ServiceType
%sql select Borough,Service_Day, sum(count) as total_count from static_counts where Servicetype = 'Pickup' group by Borough, Service_Day order by Service_Day
from pyspark.sql.types import * from pyspark.sql.functions import * spark.conf.set("spark.sql.shuffle.partitions", "2") streamingDataframe = ( spark .readStream .schema(OrderSchema) .option("maxFilesPerTrigger", 1) \ .format("csv")\ .option("header", "true")\ .load(inputPath) )
Let’s add the same logic as before. The result of the last command OrderByBoroughPerDayAndServiceType.isStreaming is true, meaning our streaming is active.
# Let's add some business logic df_StreamingPickup = streamingDataframe.select(col("Pickup_Datetime").alias("ServiceTime"), col("PUlocationID").alias("Location")).withColumn("ServiceType",lit("Pickup")) df_StreamingDropoff = streamingDataframe.select(col("Dropoff_Datetime").alias("ServiceTime"), col("DOlocationID").alias("Location")).withColumn("ServiceType",lit("DropOff")) df_Streamingfinal = df_StreamingPickup.union(df_StreamingDropoff) # Join Streaming data with location mapping table to retrieve the relevant NYC zone names df_Streamingfinal = df_Streamingfinal.join(df_Location,df_Streamingfinal.Location == df_Location.LocationID, 'left' ).withColumn("ServiceHour", hour("ServiceTime")) # Same query as staticInputDF OrderByBoroughPerDayAndServiceType = ( df_Streamingfinal .groupBy( date_format(col("ServiceTime"),'EEEE').alias("Service_Day"), df_Streamingfinal.ServiceType, df_Streamingfinal.ServiceHour, df_Streamingfinal.borough, window(df_Streamingfinal.ServiceTime, "1 hour") ) .count() ) OrderByBoroughPerDayAndServiceType.isStreaming
Now we have a streaming dataframe, but it is not writing anywhere. We need to stream to a certain destination using writestream() on our dataframe with concrete options:
spark.conf.set("spark.sql.shuffle.partitions", "2") query = ( OrderByBoroughPerDayAndServiceType .writeStream .format("memory") # memory = store in-memory table (for testing only in Spark 2.0) .queryName("counts") # counts = name of the in-memory table .outputMode("complete") # complete = all the counts should be in the table .start() )
We can now watch the data streaming live while in our notebook.
Let’s wait 5 seconds for some data to be processed:
from time import sleep sleep(5)
Let display the same charts as we did for our static data:
%sql select ServiceType,Servicehour, sum(count) as total_count from counts group by ServiceType,Servicehour order by Servicehour, ServiceType
%sql select Service_Day, borough, sum(count) as total_count from counts group by Service_Day, borough
sleep(5)
%sql select ServiceType,Servicehour, sum(count) as total_count from counts group by ServiceType,Servicehour order by Servicehour, ServiceType
%sql select Service_Day, borough, sum(count) as total_count from counts group by Service_Day, borough