Stateful transformations in Spark Streaming — Part 1 | Spark Streaming Session 3

Ankur Ranjan
6 min readFeb 26, 2023

In the previous article of this series i.e. Spark Streaming in layman’s terms we have understood the following things.

  1. Different streaming sources
  2. Stateful vs Stateless transformation

For those who are reading this article without reading the previous article of this series, I recommend reading the last article or watching the video before starting this article. This will give you a better understanding of all spark streaming concepts.

I also created a YouTube video for the same concepts. Please follow these links.

Useful links:

Here in this session, we will mainly focus on the following stateful transformations.

  • updateStateByKey (This Article)
  • reduceByKeyAndWindow (Next Article)

These functions will help you to understand the stateful transformation in an easy way.

So let’s start and discuss these concepts in very layman’s terms.

Before understanding these examples let’s quickly recall the meaning of stateful transformations.

Stateful Transformation

Stateful transformation is a particular property of s park streaming, it enables us to maintain the state between micro-batches. In other words, it maintains the state across a period of time, which can be as long as an entire session of streaming jobs.

We mostly achieved this by creating checkpoints on streaming applications.

If we understand it in simple terms then using stateful transformation we can use some data from the previous batch to generate the results for a new batch.

Let me try to make you understand the above concept using one simple example too.

  • Let’s consider, you have a streaming application which runs for 12 hours continuously.
  • Suppose our batch interval is of 5 minutes. It means that a new RDD will be created every 5 minutes.
  • So during the entire stream, we will have 144 RDD.
  • It’s simple mathematics.
  • 12 hours = 12 * 60 = 720 minutes.
  • 720 minutes / 5 minutes = 144 RDD
  • Now let’s suppose we wanted to create a running sum then we need information about the previous states too. It is because it is a streaming application. At one point in time, you will only have some data. Some of the data might have already come or some might be coming in the next batch duration.

There can be two scenarios here.

  • We might be interested in calculating the running sum of the entire stream i.e. 12 hours.
  • Or, the last 30 minutes. It means that we are interested in the last 6 RDDs always. For this type of requirement, we use the concept of sliding interval and window size. Please check my previous article to understand the concept of sliding interval and window size.

Let’s try to solve the first scenario i.e. finding the running sum of the entire stream. Here we will be calculating the frequency of each word across the entire stream.

We will be using the updateStateByKey transformation function for this.

updateStateByKey

updateStateByKey is a stateful transformation and it requires 2 steps.

  1. Define a state to start with — The state can be an arbitrary data type.
  2. A function to update the state — In this function, you have to specify how to update the state with the previous state and the new value received from the stream.

Do you remember, we have written a word count example in our previous article where we read a stream of data coming from the console?

Just follow this article and you will find the word count code snippet.

Now let’s suppose we want to count the occurrence of some word in an entire stream of data. For example, we would like to check how many times the word “Data” is occurring in the entire stream.

I hope you have understood the above problem statement of finding the occurrence of one particular word in the entire stream.

In this example too we will be using a socket as the source or producer for producing data. For using a socket, we will simply use `nc -lk 9000`command. It will help us to produce data from port 9000.

Let’s suppose for the first time, we have entered the following line in the terminal.

The Big Data Show is a great channel for learning Data Engineering concepts.

So after applying the following code. Here all code snippet is written in scala

val sc: SparkContext = spark.sparkContext 

// Initiate the streaming context

val ssc: StreamingContext = new StreamingContext( sc, batchDuration = Seconds(15) )

// Here we have used the check point directory. It will help us
// perform stateful operation.

ssc.checkpoint("./checkpointing")

val lines: ReceiverInputDStream[String] = ssc.socketTextStream( "localhost", 9000 )
val words: DStream[String] = lines.flatMap(x => x.split(" "))
val wordsFrequencyMap = words.map(w => { (w, 1) })

We will get output something like this

(The, 1) 
(Big, 1)
(Data, 1)
(Show, 1)
(is, 1)
(a, 1)
(great, 1)
(channel, 1)
(for, 1)
(learning, 1)
(Data, 1)
(Engineering, 1)
(concepts, 1))

Now we will use the updateStateByKey function to apply our first stateful operation.

We have just read above that updateStateByKey work on the entire stream of data & it accepts one function as a parameter. Let’s look at an example

def stateFullEntireStream(wordsFrequencyMap: DStream[(String, Int)]): Unit = {
// Here we are filtering the data
val filterMap: DStream[(String, Int)] = wordsFrequencyMap.filter(x=> {
x._1.startsWith("Data")
})

val result = filterMap.updateStateByKey(updateFunc) result.print()
}

Here you can see that the first thing that we are doing is filtering all messages of RDD which starts with “Data”. Then we are calling updateStateByKey on our Filtered Dstream and pass one updateFunc here. Let’s check the updateFunc code & things will look more clear then.

def updateFunc(newValues:Seq[Int], previousState:Option[Int]): Option[Int] = { 
val newCount = previousState.getOrElse(0) + newValues.sum Some(newCount)
}

Here you can see that for the first time, there will be the following values of the parameter passed.

But as we have used getOrElse(0) with the previousState whose data type is Option[Int]. So the value will equal 0 in the code.

After doing the calculation, updateFunc will return 2 for two times occurrences of the word “Data”

Now let’s suppose we enter the following lines in the terminal which is a producer for us.

The Big Data Show is an interesting channel on Youtube but there are other channels too which are equally good for learning Data Engineering.

Now once we enter this line and after our filter statement is applied.

....filter(x=> { x._1.startsWith("Data") })

We will get the following parameter in updateFunc function.

(Data, {1, 1}) newValues = Seq(1, 1) previousState = 2

Here if you are observing and understanding it clearly then now previousState is not but .

After running this code, it will return the output as 4.

So now you have understood how updateStateByKey can be used for calculating operations in the entire stream of data.

I hope, you do understand that we are using a checkpoint directory in our code where we are storing all our states.

But frankly speaking, there will be very few scenarios where you will be really interested in calculating the running aggregation for the entire stream of data. It is technically not flexible also because if we keep maintaining the state every time for the entire stream then the size of the checkpointing directory will keep on increasing and this creates a lot of problems.

I have not seen or implemented the aggregation of stream data for the entire duration of the entire stream.

One needs to understand that this information of checkpointing directory will actually be copied to every executor and if the size of checkpointing keeps on growing then there are greater chances of getting memory out of exception error.

Then how can we actually apply the aggregation or operation on streaming data?

We can actually do it in production but don’t apply an operation on the entire stream mostly but we can apply aggregation on the last 30 min or the last 15 min of data.

For this Window Size and Sliding Interval comes in very handy.

We will understand this concept in the next article and we will try to understand it with one function reduceByKeyAndWindow. Till then, keep learning and goodbye. Cheers to learning.

Please find the whole code snippet of updateStateByKey demo.

package sparkStreaming 
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.{SparkIOUtil => SU}

object WordSearching {
val WORDS_TO_SEARCH = "Data"

def updateFunc(newValues:Seq[Int], previousState:Option[Int]): Option[Int] = {
val newCount = previousState.getOrElse(0) + newValues.sum Some(newCount)
}

def stateFullEntireStream(wordsFrequencyMap: DStream[(String, Int)]): Unit = {

val filterMap: DStream[(String, Int)] = wordsFrequencyMap.filter(x=> {
x._1.startsWith(WORDS_TO_SEARCH)
})

val result = filterMap.updateStateByKey(updateFunc)

result.print()
}

def main(args: Array[String]): Unit = {
val spark: SparkSession = SU.getSparkSession
spark.sparkContext.setLogLevel("ERROR")

val sc: SparkContext = spark.sparkContext

val ssc: StreamingContext = new StreamingContext(
sc, batchDuration = Seconds(15)
)

ssc.checkpoint("./checkpointing")

val lines: ReceiverInputDStream[String] = ssc.socketTextStream(
"localhost", 9000
)

val words: DStream[String] = lines.flatMap(w => w.split(" "))

val wordsFrequencyMap: DStream[(String, Int)] = words.map(w => (w, 1))

stateFullEntireStream(wordsFrequencyMap)

ssc.start()

ssc.awaitTermination()
}
}

Feel free to subscribe to my YouTube channel i.e The Big Data Show. I might upload a more detailed discussion of the above concepts in the coming days.

More so, thank you for that most precious gift to a me as writer i.e. your time.

Originally published at https://www.linkedin.com.

--

--

Ankur Ranjan

Data Engineer III @Walmart | Contributor of The Big Data Show