A Simple Spark Structured Streaming Example

Recently, I had the opportunity to learn about Apache Spark, write a few batch jobs and run them on a pretty impressive cluster. The Spark cluster I had access to made working with large data sets responsive and even pleasant. We were processing terabytes of historical data interactively like it was nothing. The ease with which we could perform typical ETL tasks on such large data sets was impressive to me.

It has been a while since I’ve had to work with very large data sets. I think the tools for working with big data have evolved for the better. Spark makes working with larger data sets a great experience compared to other tools like Hadoop’s MapReduce API or even higher-level abstractions like Pig Latin. Spark is fast and it’s easier to reason about the programming model. A Spark job can be up to 100 times faster than something written with Hadoop’s API and requires less code to express.

This post will introduce Spark a bit and highlight some of the things I’ve learned about it so far. In addition, I’ll describe two very simple Spark jobs written in Java. The two jobs are meant to show how similar the batch and streaming APIs are becoming. They do the same thing but one is expressed as a batch job and the other uses the brand new, still in alpha, Structured Streaming API to deal with data incrementally. The new Spark Structured Streaming API is what I’m really excited about. Hopefully, it will be evident with this post how feasible it is to go from batch analytics to a real-time analytics with small tweaks in a batch process.

I won’t go into too much detail on the jobs but I will provide a few links at the end of the post for additional information. The examples should provide a good feel for the basics and a hint at what is possible in real life situations.

Spark : A Core Set of Tools and a Good Team Player

Before getting into the simple examples, it’s important to note that Spark is a general-purpose framework for cluster computing that can be used for a diverse set of tasks. Some of these task include :

  • Filtering - getting rid of things you don’t care about.
  • Data cleaning - dealing with data accuracy, completeness, uniqueness, timeliness
  • Extraction - pulling out structured information out of raw data.
  • Projections - only taking parts of a record you care about.
  • Aggregation - counting things, calculating percentiles etc.
  • Joining - taking two data sets and bringing them together.
  • Modeling - turning the data into something that can predict the future.

Spark has a few components that make these tasks possible. Ill briefly describe a few of these pieces here. Spark Core enables the basic functionality of Spark like task scheduling, memory management, fault recovery and distributed data sets (usually called RDDs). Spark SQL enables Spark to work with structured data using SQL as well as HQL. Spark Streaming enables Spark to deal with live streams of data (like Twitter, server and IoT device logs etc.). Personally, I find Spark Streaming is super cool and I’m willing to bet that many real-time systems are going to be built around it. MLlib adds machine learning (ML) functionality to Spark. MLlib has made many frequently used algorithms available to Spark, in addition other third party libraries like SystemML and Mahout add even more ML functionality.

Spark also integrates nicely with other pieces in the Hadoop ecosystem. If you have existing big data infrastructure (e.g Existing Hadoop Cluster, Cluster Manager etc..), Spark can make use of it. It also interacts with an endless list of data stores (HDFS, S3, HBase etc).

Spark is built in Scala and provides APIs in Scala, Java, Python and R. If your shop has existing skills in these languages, the only new concept to learn is the Spark API. Note that the Python and R bindings lag a bit behind new API releases as they need to catch up with the Scala API releases. The sample code you will find on sites like stackoverflow is often written in Scala but these are easy to translate to your language of choice if Scala is not your thing.

Taking Spark for a Spin - Cloud-based Notebook Solution

If you want to play with Spark and do not want to fiddle around with installing any software, there is a cloud-based solution available from databricks. You can have your own, free, cloud-based mini 6GB Spark cluster, that comes with a notebook interface, by following this link and registering. There is also a paid full-platform offering.

Taking Spark for a Spin - Developer Installation

If you want to get your hands a little dirtier, and setup your own Spark cluster to write and test jobs with it, it’s pretty simple. Spark comes with a default, standalone cluster manager when you download it. You can download Spark from Apache’s web site or as part of larger software distributions like Cloudera, Hortonworks or others. Using the standalone cluster manager is the easiest way to run spark applications in a clustered environment. When starting the cluster, you can specify a local master (–master local[*]) which will use as many threads as there are cores to simulate a cluster. This has the effect of parallelizing your jobs across threads instead of machines.

Once you have written a job you are happy with, you can submit the job to a different master which would be part of a beefier cluster. The spark-submit.sh shell script (available with the Spark download) is one way you can configure which master cluster URL to use. See the Spark documentation for more info. If your application dependencies are in Java or Scala, they are easily distributed to worker nodes with the spark-submit.sh shell script.

The Simple Batch Job

Now that we’ve gotten a little Spark background out of the way, we’ll look at the first Spark job. With this job we’re going to read a full data set of people records (JSON-formatted) and calculate the average age of a population grouped by sex. Along the way, just for fun, we’ll use a User Defined Function (UDF) to transform the dataset by adding an extra column to it. The UDF is just to add a little excitement and illustrate one way to perform a transformation.

Spark has a few levels of abstractions to choose from when working with data. This can be a bit confusing at first. The Spark APIs are built in layers. Each layer adds functionality to the next. The layers all do similar things but they have their own characteristics. The 3 API levels for working with data are :

  • RDD API (RDD -> Resilient Distributed Dataset)
  • DataFrames API
  • Dataset API

We’ll highlight some characteristics for each layer here :

  • RDD’s make no attempts to optimize queries. It’s harder to write jobs with this API.
  • The DataFrames API queries can be automatically optimized by the framework. They are easier to work with but you lose type information so compile-time error checking is not there.
  • The Dataset API brings compile-time type-safety checking of the RDD API with the query optimizations approach of the DataFrames API.

These three levels of APIs are good to know about when you are initially getting familiar with the Spark framework. Our Sample jobs will make use of the Dataset API.

Here is the simple batch job code :


package ca.redsofa.jobs;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoders;
import ca.redsofa.domain.Person;
import ca.redsofa.udf.StringLengthUdf;
import static org.apache.spark.sql.functions.callUDF;

public class SimpleBatch {
    private static String INPUT_FILE = "/Users/richardr/dev/git/spark_spin/data";

    public static void main(String[] args) {
        SimpleBatch job = new SimpleBatch();
        job.startJob();
    }

    private void startJob( ){
        System.out.println("Stating Job...");

        long startTime = System.currentTimeMillis();	    

        //1 - Start the Spark session
        SparkSession spark = SparkSession
                .builder()
                .appName("Simple Batch Job")
                .config("spark.eventLog.enabled", "false")
                .config("spark.driver.memory", "2g")
                .config("spark.executor.memory", "2g")
                .enableHiveSupport()
                .getOrCreate();

        //2 - Read in the input data
        Dataset<Person> sourceDS = spark.read()
                                     .json(INPUT_FILE)
                                     .as(Encoders.bean(Person.class));

        //3 - Create a temporary table so we can use SQL queries
        sourceDS.createOrReplaceTempView("people");

        //4 - Register a user defined function to calculate the length of a String
        StringLengthUdf.registerStringLengthUdf(spark);

        //5 - Create a new Dataset based on the source Dataset

        //Example of adding a calculated column with a user defined function. 
        //This section of the code is not really central to exploring the 
        //similarity between a batch and streaming jobs.

        //Creates a new Dataset based on the sourceDS
        //The new Dataset contains a new field called name_length which 
        //is calculated with the call to our new UDF         
        Dataset<Row> newDS = sourceDS
                                .withColumn("name_length", 
                                    callUDF("stringLengthUdf", 
                                    sourceDS.col("firstName")));    		

        //6 - Show a few records in the newDS Dataset. Added a name_length column
        newDS.show();

        String sql = "SELECT " + 
                        "AVG(age) as average_age, " + 
                        "sex " + 
                     "FROM " +
                       "people " +  
                     "GROUP BY " + 
                       "sex" ;

        //7 - Calculate the average age by sex for our population using a SQL script
        Dataset<Row> ageAverage = spark.sql(sql);

        //8 - Show the average age table
        ageAverage.show();

        spark.stop();		
        long stopTime = System.currentTimeMillis();
        long elapsedTime = stopTime - startTime;
        System.out.println("Execution time in ms : " + elapsedTime);
    }
}

We see from the code above that the job is executing a few simple steps :

  1. Start the Spark session
  2. Read in the input data
  3. Create a temporary table so we can use SQL queries
  4. Register a user defined function to calculate the length of a String
  5. Create a new Dataset based on the source Dataset
  6. Show a few records in the newDS Dataset. Added a name_length column
  7. Calculate the average age by sex for our population using a SQL script
  8. Show the average age table

The code is not hard to follow. You can’t easily tell from looking at this code that we’re leveraging a distributed computing environment with (possibly) many compute nodes working away at the calculations. The framework does all the heavy lifting around distributing the data, executing the work and gathering back the results.

Before moving on to the streaming example, we’ll mention one last thing about the code above. Spark supports domain-specific objects with Encoders. Encoders are used by Spark at runtime to generate code which serializes domain objects. The serialized objects have a low memory footprint and are optimized for efficiency in data processing.

We are using a bean encoder when reading our input file to return a Dataset of Person types. Pertinent simple batch job code from above :

//2 - Read in the input data
Dataset<Person> sourceDS = spark.read()
                             .json(INPUT_FILE)
                             .as(Encoders.bean(Person.class));

The input JSON files contain person records which look like the following :


{"firstName": "Joey", "lastName": "Jollymore", "sex": "M", "age": 12}
{"firstName": "Walter", "lastName": "Hasthag", "sex": "M", "age": 21}
{"firstName": "Willy", "lastName": "Waller", "sex": "M", "age": 11}
etc ...

Here is a screencast of the simple batch job in action :

The Simple Structured Streaming Job

When a batch job is written and running successfully in Spark, quite often, the next requirement that comes to mind is to make it run continuously as new data arrives. Spark is a low-latency data analysis framework so it’s natural to want to analyze live data streams with it. This is where Spark Streaming comes in.

There is a new higher-level Streaming API for Spark in 2.0. It’s called Structured Streaming. The developers of Spark say that it will be easier to work with than the streaming API that was present in the 1.x versions of Spark. The new API is built on top of Datasets and unifies the batch, the interactive query and streaming worlds. The new Structured Streaming API is Spark’s DataFrame and Dataset API. The system can now also run incremental queries instead of just batch. The system maintains enough state to recover from failures and keep results consistent.

According to the developers of Spark, the best way to deal with distributed streaming and all the complexities associated with it is not to have to think about it. The brunt of the work in dealing with the order of data, fault tolerance and data consistency is handled by Spark. Spark makes strong guarantees about the data in the structured streaming environment. The environment guarantees that at any time, the output of a structured streaming process is equivalent to executing a batch job on the prefix of the data (The prefix being whatever data passed through the streaming system so far). The environment guarantees that there will not be duplicates, partial or out of sequence updates. This consistency is guaranteed both inside the streaming engine and connected components (ex. Updating a text file with streaming data will always be consistent).

The streaming API exposes a stream of data as an infinite table, or if you prefer, a table that keeps growing as your job executes. With the new Structured Streaming API, the batch jobs that you have already written can be easily adapted to deal with a stream of data.

Here is our simple batch job from above modified to deal with a file system stream. The use of a user defined function has been eliminated to make the code a little shorter :


package ca.redsofa.jobs;

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import ca.redsofa.domain.Person;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.types.StructType;


public final class StructuredStreamingAverage {
  private static String INPUT_DIRECTORY = "/Users/richardr/dev/git/spark_spin/input_files";

  public static void main(String[] args) throws Exception {
    System.out.println("Starting StructuredStreamingAverage job...");
    

    //1 - Start the Spark session
    SparkSession spark = SparkSession
      .builder()
      .config("spark.eventLog.enabled", "false")
      .config("spark.driver.memory", "2g")
      .config("spark.executor.memory", "2g")
      .appName("StructuredStreamingAverage")
      .getOrCreate();

    //2- Define the input data schema
    StructType personSchema = new StructType()
                                    .add("firstName", "string")
                                    .add("lastName", "string")
                                    .add("sex", "string")
                                    .add("age", "long");

    //3 - Create a Dataset representing the stream of input files
    Dataset<Person> personStream = spark
      .readStream()
      .schema(personSchema)
      .json(INPUT_DIRECTORY)
      .as(Encoders.bean(Person.class));


    //When data arrives from the stream, these steps will get executed

    //4 - Create a temporary table so we can use SQL queries       
    personStream.createOrReplaceTempView("people");

    String sql = "SELECT AVG(age) as average_age, sex FROM people GROUP BY sex";
    Dataset<Row> ageAverage = spark.sql(sql);

    //5 - Write the the output of the query to the console
    StreamingQuery query = ageAverage.writeStream()
      .outputMode("complete")
      .format("console")
      .start();

    query.awaitTermination();
  }
}

We see from the code above that the job is also executing a few simple steps :

  1. Start the Spark session
  2. Define the input data schema
  3. Create a Dataset representing the stream of input files. Here we’re monitoring a directory (see INPUT_DIRECTORY variable) for new data
  4. Create a temporary table so we can use SQL queries
  5. Write the the output of the query to the console

Again, the code is not hard to follow. The heavy lifting around streaming is handled by Spark. With a slight modifications (step 2 and 3), we have converted out batch job into a streaming job that monitors a directory for new files. As new files appear in this directory, average ages will be calculated by sex and updates will be shown on the console.

Here is a screencast of the simple structured streaming job in action :

In this example, the stream is generated from new files appearing in a directory. A stream can be a Twitter stream, a TCP stream socket, data from Kafka or other stream of data..

Final Thoughts

Spark’s release cycles are very short and the framework is evolving rapidly. It’s sometimes difficult to keep track of what’s new and what’s not so new. The best way to follow the progress and keep up to date is to use the most recent version of Spark and refer to the awesome the documentation available on spark.apache.org.

Whatever form the new Structured Streaming API takes in the end, and it’s looking pretty good right now, I think it will contribute greatly to brining real-time analytics to the masses. The barriers to entry for creating systems capable of producing real-time data analysis are effectively being eliminated with each new iteration of Spark.

Source

Source files for the example batch jobs in this post : GitHub Repository. Enjoy!

References

comments powered by Disqus