Disaster Recovery in Kafka Servers

Ankur Ranjan
Towards Dev
Published in
8 min readDec 25, 2022

--

I recently tried onboarding disaster management for our streaming pipeline which involves Kafka, Spark Streaming and MongoDb in one of our use cases at Walmart Global Tech

The last few weeks were a good learning curve for me and I really enjoyed all these awesome implementations of the streaming pipeline. I am still trying to make it more scalable and cover all other edge cases. If you have also used the below concept to manage the disaster for your Kafka Cluster then feel free to correct my mistake and help me to cover more minute details. So let’s discuss our content in a brief way and then we will try to understand it deeply.

In this article, we will try to understand the following things.

  1. How Kafka provides fault tolerance even if it is deployed in single clusters?
  2. How can we use two different regions with Mirror Maker 2(MM2) capabilities to achieve disaster recovery in case of some failure at a region where the Kafka cluster is deployed? We will be discussing two implementations for managing Disasters. Those are Active Active and Active Passive.

Let’s try to understand how does Kafka provide fault tolerance, even with a single region server?

Kafka Cluster is already reliable using the following things.

  • Replication Factor
  • Rack Awareness

In Kafka, messages are pointed to one Kafka topic which is further divided into multiple partitions. Each partition will be present in different brokers hence the records will be always distributed across different nodes.

Let’s try to understand the above concept using one simple example.

Let’s suppose we are having 2 racks and 6 brokers clusters. Now let’s suppose we have created some topic on this cluster with the name topic1.

While creating the topic we decided that there will be 10 partitions & replication factor will be three.

So We have 10 * 3 = 30 replica (p0-p9, p0`-p9`, p0``-p9``)

Let suppose in

Now the question is how partitions of these topics are distributed across various brokers & how fault tolerance is guaranteed using the replication factor.

For distributing partitions of topics across various brokers, Kafka takes care of the following things

  1. Even Distributions (Achieving Workload balance)
  2. Fault tolerance (Replicas should be stored on a different machine)

For even distributions, Kafka will create an ordered list of available brokers.

R1 → B0

R2 → B3

R1 → B1

R2 → B4

R1 → B2

R2 → B5

There will be actually two types of partitions.

  • Leader Partitions — The leader performs the task of all reading and writing request
  • Followers Partitions — while the followers passively replicate the role of a leader

So let’s suppose that partitions(p0-p9) are the leader’s partitions and (p0`-p9`, p0``-p3``) are followers’ partitions for now.

Leaders — p0,p1,p2,p3,p4,p5,p6,p7,p8,p9

Followers — p0`,p1`,p2`,p3`,p4`,p5`,p6`,p7`,p8`,p9`

- p0``,p1``,p2``,p3``,p4``,p5``,p6``,p7``,p8``,p9``

Let’s see the even distributions of topic partitions and their replications on all brokers.

This round-robin fashion will determine that on every broker’s topics partitions are evenly distributed. So, this determines that our first point i.e. Distributions (Achieving Workload balance)

Now, Let’s try to understand, how fault tolerance will be achieved using the replication factor after the evenly distribution of partitions across different brokers and racks.

Let’s understand the following steps which Kafka uses for fault tolerance.

  • The topic is divided into partitions and kept in different brokers.
  • If any broker fails, data should not be lost.
  • For fault-tolerance purposes, the partition is replicated and stored in different brokers.
  • If leader brokers fail, the controller will elect one of the replicas as the leader.
  • Even controller brokers can fail, in this case, the Zookeeper will help in selecting the broker as the controller.

Controller: It’s just a normal broker with extra responsibility. The controller Broker takes care of electing the leader broker for the partitions. The controller broker keeps track of brokers joining and leaving the cluster with the help of the Zookeeper.

These above two concepts help Kafka to provide fault tolerance. But how it be fault tolerant if the whole server is down in one region? This creates a single point of failure and if your application uses Kafka intensively in your application then disaster in one region can create panic in your system and in our use case we could afford to lag that much. We are responsible for the Seller and Item onboarding smoothly after detecting any fraud and in case of failure on our side Seller and Item will be impacted.

For managing the disaster, we have used the following things.

Let’s try to understand these concepts.

When I used to work as Software Developer, I set up Postgres Sever & I implemented an active-passive setup. it was a standard active-passive cluster where the application reads and writes to a primary cluster. This primary cluster is asynchronously replicated to a secondary cluster. The application falls back to the secondary cluster in case the primary cluster fails.

The fallback can be automatic or manual. Database vendors such as Postgres, Oracle, MSSQL, etc have built products to automate the failover. Same way, newer distributed applications, such as NoSQL databases like Cassandra prefer an active-active approach. This is because these databases might not require very high consistency.

I found near to the same implementation with Kafka too. So active-passive and active-active setups are very standard implementations. One does implement this in their API management and database management. In the same way, we will be implementing these concepts in Kafka.

But Ankur, make this concept a little easier for me🫣. What do active-passive and active-active actually mean?

  • Active Active — It addresses a disaster recovery approach where consumers reconnect to a new cluster after the first one fail
  • Active Passive — In this deployment, the participant’s cluster has its own producers and consumers, and topics are replicated. Here messages are processed by different consumers in the two data centres.

Now, let’s try to understand the MirrorMaker in little brief way.

What is Mirror Maker

In simple terms, Mirror Maker allows you to mirror a Kafka cluster to another cluster. All topics, partitions, and messages are replicated. Any changes in the source, like the addition/deletion of topics, messages will be mirrored to the destination. Mirror Makes allows Kafka users to set up an active-passive cluster where an active cluster is continuously mirroring the data to a secondary cluster. We can set up replication unidirectional or bidirectional(using MM2).

What is the problem?

All is good until now when we realize that a huge amount of infrastructure and network bandwidth is being wasted to maintain this active-passive cluster, wherein the passive cluster will come into use only when the active servers go down. Not to mention that large Kafka deployments process millions of messages per sec and all that data needs to be replicated to the destination cluster.

Solution — Mirror Maker 2.0

The Kafka team has recently launched a new version of Mirror Maker 2.0, which allows you to set up a bi-directional mirror between two clusters. This solves the problem of the wasted network infrastructure of the second cluster which would have come into use only if the primary cluster had gone down.

So, how does this work? In simple terms, Mirror Maker 2.0 does bi-directional mirroring, i.e. data from the primary cluster will be mirrored to the secondary cluster, and data from the secondary cluster will be mirrored to the primary cluster.

If you logically think of this approach, this can result in an infinite loop, where in a message can be continuously mirrored between two clusters.

To solve, this problem, Mirror Maker allows topic renaming to eliminate the above problem. Let me illustrate how:

Let’s say, We have one server in South Central US( SCUS) and another in West US( WUS). Now if we have enabled MirrorMaker 2.0 (MM2) while creating any topic then instead of creating 2 topics, It will create 4.

Let’s suppose we have created one Kafka topic with the name topic1 then normally there will be the following topics created.

Here

Here Disaster recovery(DR) topics only have read-only permission whereas the main topic will have write and read-only permission.

Here let’s suppose some producer writes records to topic1(SCUS) then all records from this topic will be replicated to its corresponding DR topic i.e. kafka-v2-appname-scus.topic1(WUS). The same applies to if some producer writes to topic1(WUS) then it will be replicated to its DR topic i.e. kafka-v2-appname-wus.topic1(SCUS)

Now we know that replication can be taken care of by MirrorMaker2 then how as an application team can use these replication benefits in implementing disaster recovery?

We can actually do this using any one of the designs i.e.

Active-Active implementation.

In active-active, In one Producer will write messages in topic1(SCUS) and another producer will produce messages in topic1(WUS).

Note that these two are completely different clusters with zookeepers of their own and hence topics are local to their clusters.

MirrorMaker2(MM2) will keep replicating records to the DR topic bidirectionally.

One can see from the above pics that

  • This model implies there are two clusters with bidirectional mirroring between them.
  • Data is asynchronously mirrored in both directions between the clusters.
  • Client applications are aware of several clusters and can be ready to switch to another cluster in case of a single cluster failure.
  • Client requests are processed by both clusters.

In this case, the full network cluster bandwidth is utilised. The producers can load balance their traffic to two clusters either using round robin or local affinity.

The consumer in SCUS will have to subscribe to the data from both T1 and T1.WUS. This can be done easily using a wild card subscription which is supported in most frameworks. One pseudo-code can be

@KafkaListener(id = "xxx", topicPattern = "kbgh.*") 
public void listen(String in) {
System.out.println(in);
}

In the case of DR or Failure in a single cluster.

  • The other one continues to operate properly with no downtime, providing high availability.
  • Data between clusters is eventually consistent, which means that the data written to a cluster won’t be immediately available for reading in the other one.
  • In case of a single cluster failure, some acknowledged ‘write messages’ in it may not be accessible in the other cluster due to the asynchronous nature of mirroring.

But implementing Active Active means contract changes from the team who might be producing records in one region only. Now they have to make their code changes to set up producers that can send data to both regions. Awareness of multiple clusters for client applications is also required. Active-Active also involves the complexity of bidirectional mirroring between clusters

So let’s try to see the Active Passive setup in its pros and cons.

Active Passive

  • The active-passive model suggests there are two clusters with unidirectional mirroring between them.
  • Data is asynchronously mirrored from an active to a passive cluster.
  • Client applications are aware of several clusters and must be ready to switch to a passive cluster once an active one fails.
  • Client requests are processed only by an active cluster.

Here code changes are minimal but it needs manual intervention in case of downtime and the Lag may increase if developers are not able to set up the configurational changes like changing the brokers and topic information in a given amount of time.

Let’s see one approach which can be used here.

In the case of DR in SCUS region.

  • The consumer will make the configuration information changes and start reading from the DR topic of WUS i.e. kafka-v2-appname-scus.topic1. It will read all the records which were left from the topic1(SCUS). This can be set up using offset reset based on some timestamp.
  • The producer will start writing to topic1(WUS) instead of topic1(SCUS).
  • The consumer will switch back to topic1(WUS) once it does read all records from the DR topic of topic1(SCUS).

So this ends our short discussion on disaster management strategy with Kafka and Mirror Maker 2.0.

Feel free to subscribe to my YouTube channel i.e The Big Data Show. I might upload a more detailed discussion of the above concepts in the coming days.

More so, thank you for that most precious gift to a me as writer i.e. your time.

Originally published at https://www.linkedin.com.

--

--