Spark Structured Streaming – File Source

1. Spark Structured streaming allows processing the stream data using analytical SQL queries.

2. Spark Structured streaming engine is built on top of the SparkSQL engine. And it was introduced in spark 2.0.

3. Spark structured streaming assumes the streaming data has some fixed structure to be processed. So it always relies on the schema.

4. Under the hood similar to its predecessor (DStream), the structured streaming uses micro-batching to process streams.
So what happens, spark waits for a very small interval (called batch interval) say 1 second and batches together all the events that were received during that interval into a micro-batch. This micro-batch is then scheduled by the Driver to be executed as Tasks at the Executor’s side. After a micro-batch execution is completed, the next batch is collected and scheduled again.

5. Internally Spark Structured streaming streams the data as an infinite table (in the form of DataSet) where each row is considered as Dataset of Row type. Whereas in Spark streaming stream is considered a series of RDDs.

6. In spark structured streaming we can have following built-in input sources which can be considered as producing a stream of data –
1. File source – Reads files from a specific directory. Supported file formats are text, CSV, JSON, orc, parquet.
2. Kafka source – Read data from Kafka topic. It is compatible with Kafka broker versions 0.10.0 or higher.
3. Socket(for testing only) – Reads data from the socket connection.

File Streams continuously looks for files in a folder.
When it is useful-
It is useful in scenarios where we have tools like flume dumping the logs from a source to HDFS folder continuously. We can treat that hdfs folder as stream and read that data into spark structured streaming for further processing.

USE CASE
In this example, we will see use case of File Streams.

We have 2 directories =>
1. src/main/resources/structured-streaming/filestream/
which contains a static file customer_info.csv with Customer information
2. src/main/resources/structured-streaming/filestream/orders/
which contains CSV files with order details and the files are dropped periodically. This directory is partitioned date-wise. The objective here is to join the order details with the customer information file, and write the resulting data to the JSON file as output in real-time.

The objective here is to join the order details with the customer information file, and write the resulting data to JSON file as output in real-time.

customer_info.csv –
id,name,mobile,location
100,dev,9876543210,Noida
101,ravi,9876543211,Noida
102,ankit,9876543212,Gurgoan
104,atul,9876543213,Noida

order’data sample-
id,customer_id,date,amount
1000,103,2019-10-01,5500
1002,101,2019-10-01,7000

Two common steps in building the Spark Structured Streaming application is –
1. Create DataStreamReader using sparkSession.readStream – This reads stream data from input source such as File or Kafka topic.
2. Create DataStreamWriter using sparkSession.writeStream – This writes processed data to a sink such as File or Kafka topic.

 // Create SparkSession object
 val spark = SparkSession.builder()
      .appName("SparkStructuredStreamingWithFileSource")
      .master("local[*]")
      .getOrCreate()
/*
 * Create DataStreamReader to stream files from order folder location
 * having csv files. But before that lets create schema for order data, 
 * as Spark structured streaming wont allow streaming data without schema.
 */
    val orderSchema = new StructType()
      .add("order_id", IntegerType)
      .add("customer_id", IntegerType)
      .add("date", StringType)
      .add("amount", DoubleType)

    val orderStreamDF = spark.readStream
      .option("delimiter", ",")
      .option("header", true) // --> since header is there
      .schema(orderSchema)
      .csv("C:/temp/structured-streaming/filestream/input/orders/*/")
/*
 * Read static customer information from csv file
 */
    val customerSchema = new StructType()
      .add("customer_id", IntegerType) // make sure using same name for joining column
      .add("customer_name", StringType)
      .add("customer_mobile", StringType)
      .add("customer_location", StringType)

    val customerDF = spark.read
      .option("header", true)
      .option("delimiter", ",")
      .schema(customerSchema)
      .csv("C:/temp/structured-streaming/filestream/input/customer_info.csv")
 /*
  * Join the streaming dataframe ordersStreamDF with customerDF on the customer_id field.
  */
    val finalResult = orderStreamDF.join(customerDF, Seq("customer_id"), "inner")
/*
 * Create a output sink, without this streaming job wont start
 */

    val query = finalResult.writeStream
      .queryName("customer-order-info")
      //.format("console")
      .format("json")
      .option("path", "C:/temp/structured-streaming/filestream/output")
      .option("checkpointLocation", "C:/temp/structured-streaming/filestream/chkpoint_dir")
      .partitionBy("date") // for performance improvement
      .outputMode(OutputMode.Append())
      .start()
    
    query.awaitTermination()

So here As soon as the new file is detected by the Spark engine in the C:/temp/structured-streaming/filestream/input/orders/*/
directory, the streaming job is initiated and a new json file is created immediately by joining customer and order details.


Full code can be accessed here in my Github location –
SparkStructuredStreamingWithFileSource.scala

Leave a Reply

Your email address will not be published. Required fields are marked *