Kafka disaster recovery on Kubernetes using MirrorMaker2

Toader Sebastian
Toader Sebastian

Monday, March 9th, 2020

In one of the earlier Kafka disaster recovery on Kubernetes with CSI post we discussed how brokers of an Apache Kafka clusters can be backed up and restored leveraging Kubernetes volume snapshots. While this solution provides a good enough disaster recovery option (and a super quick recovery), it doesn't help when the entire Kubernetes cluster hosting the Kafka cluster is lost. In this post, we show you how to:

  • back up a Kafka cluster to a remote Kafka cluster running on a separate Kubernetes cluster, and also how to
  • recover the lost Kafka cluster with Supertubes.

Check out Supertubes in action on your own clusters:

Register for an evaluation version and run a simple install command!

As you might know, Cisco has recently acquired Banzai Cloud. Currently we are in a transitional period and are moving our infrastructure. Contact us so we can discuss your needs and requirements, and organize a live demo.

Evaluation downloads are temporarily suspended. Contact us to discuss your needs and requirements, and organize a live demo.

supertubes install -a --no-demo-cluster --kubeconfig <path-to-k8s-cluster-kubeconfig-file>

or read the documentation for details.

Take a look at some of the Kafka features that we've automated and simplified through Supertubes and the Koperator, which we've already blogged about:

tl;dr

MirrorMaker v1’s capability of cross-cluster replication has been used in the past for Kafka cluster disaster recovery. MirrorMaker v1 had a couple of limitations that the Kafka community had addressed in the next version of MirrorMaker (MirrorMaker2), which was released with Apache Kafka 2.4.0. MirrorMaker2 leverages the Connect framework to replicate topics between Kafka clusters. It includes several new features, including:

  • Both topics and consumer groups are replicated
  • Topic configuration and ACLs are replicated
  • Cross-cluster offsets are synchronized
  • Partitioning is preserved

Supertubes is using MirrorMaker2 to set up cross-cluster replication between remote Kafka clusters and recover a lost Kafka cluster from a remote Kafka cluster. It deploys a MirrorMaker2 instance for each Kafka cluster into the same namespace where the Kafka cluster resides. The MirrorMaker2 instance is launched and it acts as a:

  • Producer targeting the Kafka cluster running on the same Kubernetes cluster and namespace (it is recommended to have MirrorMaker2 deployed close to the target Kafka cluster)
  • Consumer for the remote Kafka clusters

Cross-cluster replication

Supertubes expects a descriptor file in yaml or json format that describes the topology of the Kafka clusters and the MirrorMaker2 replication topology:

supertubes mm2 deploy -f
<path-to-mm2-deployment-descriptor>
# list of Kubernetes config file paths of clusters hosting our Kafka clusters that we want to make MM2 deployments aware of

kubernetesConfigs:

- # path-to-the-kubeconfig
- # path-to-the-kubeconfig# list of Kafka clusters to make MM2 deployments aware of

kafkaClusters:

- namespace: # kubernetes namespace hosting the Kafka
  cluster, defaults to 'kafka' name: # Kafka cluster name,
  defaults to 'kafka' kubernetesConfigContext: # name of
  Kubernetes configuration context as defined in the
  kubeconfig files which references the Kubernetes cluster
  hosting the Kafka cluster. If not specified, the default
  context is used. alias: # kafka cluster alias by which MM2
  refers to this Kafka cluster as (e.g. kafka1). If not
  provided it defaults to
  '${kubernetesConfigContext}_${namespace}\_${name}'
  internalListenerName: # name of the internal listener
  which local MM2 instances access this Kafka cluster
  through externalListenerName: # name of the external
  listener which remote MM2 instances access this Kafka
  cluster through mirrorMaker2Spec: kafkaHeapOpts: # heap
  opts setting for MirrorMaker2, defaults to -Xms256M -Xmx2G
  resources: nodeSelector: tolerations: affinity:

mirrorMaker2Properties: |

# replication topologies and flows, mm2 config, etc.

# two way replication between 3 Kafka clusters

kafka1->kafka2.enabled=true kafka1->kafka3.enabled=true

kafka2->kafka1.enabled=true kafka2->kafka3.enabled=true

kafka3->kafka1.enabled=true kafka3->kafka2.enabled=true

Supertubes automatically generates the MirrorMaker2 configuration for each MirrorMaker2 instance (MirrorMaker2 has its proprietary configuration format). Supertubes maintains the Kafka servers section of the file, while the replication flows and other MirrorMaker2 settings are populated from the mirrorMaker2Properties provided by the user. The generated MirrorMaker2 config file is as follows:

# maintained by supertubes

clusters: kafka1, kafka2, kafka3, ...

kafka1.bootstrap.servers=... # internal kafka bootstrap
servers URL if MM2 is on the same Kubernetes cluster as
Kafka cluster, otherwise external kafka bootstrap servers
URL kafka2.bootstrap.servers=... # internal kafka bootstrap
servers URL if MM2 is on the same Kubernetes cluster as
Kafka cluster, otherwise external kafka bootstrap servers
URL kafka3.bootstrap.servers=... # internal kafka bootstrap
servers URL if MM2 is on the same Kubernetes cluster as
Kafka cluster, otherwise external kafka bootstrap servers
URL

# user provided mm2 settings

kafka1->kafka2.enabled=true kafka1->kafka3.enabled=true

kafka2->kafka1.enabled=true kafka2->kafka3.enabled=true

kafka3->kafka1.enabled=true kafka3->kafka2.enabled=true

Note: it is recommended to keep the replication flow settings the same across all MirrorMaker2 instances to avoid omitting topics from cross-cluster replication.

In the example above, MirrorMaker2 will replicate topics from kafka1 to kafka2 and the other way around:

  • topics from kafka1 are replicated to kafka2 as kafka1.{topic-name}, similarly
  • topics from kafka2 are replicated to kafka1 as kafka2.{topic-name}.

As an example, if we have a topic named topic1 on both Kafka clusters, then:

  • kafka1 will have topic1 and kafka2.topic1, and
  • kafka2 will have topic1 and kafka2.topic1, respectively.

Bidirectional cross-cluster replication happens similarly between kafka2 <-> kafka3 and kafka1 <-> kafka3.

Losing and recovering a cluster

In the event of losing the Kubernetes cluster that hosts our Kafka cluster:

  1. You must provision a new Kubernetes cluster. This Kubernetes cluster can be automatically provided by Pipeline across 5 clouds or on-premises.

  2. Create a new Kafka cluster as a replacement for the lost Kafka cluster (it is as simple as supertubes cluster create). While the new cluster is provisioned, client applications (consumers and producers) can be directed to the remaining Kafka clusters.

  3. From the perspective of MirrorMaker2, the new (replacement) Kafka cluster must get a new alias and should not reuse the alias of the lost Kafka cluster, because MirrorMaker2 doesn't replicate topics that are prefixed with the name of a cluster alias.

  4. Update the MirrorMaker2 descriptor file to reflect the new Kubernetes cluster and Kafka cluster alias as well.

  5. Run the following command with the modified descriptor file:

    $ supertubes mm2 deploy -f <path-to-mm2-deployment-descriptor>
    
  6. Supertubes updates all MirrorMaker2 instances.

  7. The new Kafka cluster starts catching up from the other clusters.

Showtime

Let's go through an example to see how this works.

Create two Kubernetes clusters

We create two PKE clusters using our hybrid cloud container management platform, Banzai Cloud Pipeline:

  • one on AWS in the eu-north-1(Stockholm) region, and
  • one on Azure in the westeurope region.

These two PKE clusters are named as pke-aws-kafka-eun1 and pke-azure-kafka-weu, respectively.

PKE is Banzai Cloud’s CNCF certified Kubernetes distribution.

Install Supertubes on both Kubernetes clusters

We want to ensure that our Kafka clusters are accessed through TLS. Instead of getting into the cumbersome task of configuring TLS for each component, Supertubes runs Kafka inside an Istio service mesh on each Kubernetes cluster, and let Istio take care of TLS at the networking layer for us.

We have our own automated and operationalized service mesh product built on Istio, called Backyards (now Cisco Service Mesh Manager)

Supertubes sets up an Istio service mesh for our Kafka components automatically, all you need is to provide a common root certificate for the Istio deployments.

supertubes install
--root-cert=root-cert.pem --ca-cert=ca-cert.pem
--ca-key=ca-key.pem -a --no-demo-cluster -c
{pke-aws-kafka-eun1-kubeconfig.yaml}

supertubes install --root-cert=root-cert.pem
--ca-cert=ca-cert.pem --ca-key=ca-key.pem -a
--no-demo-cluster -c {pke-azure-kafka-weu-kubeconfig.yaml}

If you don't have root and intermediate CA certificates at hand, you can use this tool to generate self-signed certificates quickly.

Create Kafka clusters on both Kubernetes clusters

supertubes cluster create --namespace
kafka -c {pke-aws-kafka-eun1-kubeconfig.yaml} -f
https://raw.githubusercontent.com/banzaicloud/koperator/master/config/samples/kafkacluster-with-istio.yaml
supertubes
cluster create --namespace kafka -c
{pke-azure-kafka-weu-kubeconfig.yaml} -f
https://raw.githubusercontent.com/banzaicloud/koperator/master/config/samples/kafkacluster-with-istio.yaml

Wait until the Kafka clusters becomes operational:

supertubes cluster get --namespace
kafka --kafka-cluster kafka -c
{pke-aws-kafka-eun1-kubeconfig.yaml} Namespace Name State
Image Alerts Cruise Control Topic Status Rolling Upgrade
Errors Rolling Upgrade Last Success kafka kafka
ClusterRunning banzaicloud/kafka:2.13-2.4.0 0
CruiseControlTopicReady 0 2020-03-04 14:07:36

Create a test topic

Create a topic named testtopic on both Kafka clusters:

supertubes cluster topic create
--namespace kafka --kafka-cluster kafka -c
{pke-aws-kafka-eun1-kubeconfig.yaml} -f -<<EOF apiVersion:
kafka.banzaicloud.io/v1alpha1 kind: KafkaTopic metadata:
name: testtopic spec: name: testtopic partitions: 3
replicationFactor: 2 config: "retention.ms": "28800000"
"cleanup.policy": "delete" EOF
supertubes cluster topic create
--namespace kafka --kafka-cluster kafka -c
{pke-azure-kafka-weu-kubeconfig.yaml} -f -<<EOF apiVersion:
kafka.banzaicloud.io/v1alpha1 kind: KafkaTopic metadata:
name: testtopic spec: name: testtopic partitions: 3
replicationFactor: 2 config: "retention.ms": "28800000"
"cleanup.policy": "delete" EOF

Enable cross-cluster replication with MirrorMaker2

In the descriptor file we are going to reference the Kafka cluster running on pke-aws-kafka-eun1 as kafka1 and kafka2 the one running on pke-azure-kafka-weu.

supertubes mm2 deploy -f -<<EOF

# list of Kubernetes config file paths of clusters hosting our Kafka clusters that we want to make MM2 deployments aware of

kubernetesConfigs:

- {pke-aws-kafka-eun1-kubeconfig.yaml}
- {pke-azure-kafka-weu-kubeconfig.yaml}

# list of Kafka clusters to make MM2 deployments aware of

kafkaClusters:

- namespace: kafka name: kafka kubernetesConfigContext:
  kubernetes-admin@pke-aws-kafka-eun1 # the context from
  {pke-aws-kafka-eun1-kubeconfig.yaml} alias: kafka1 # name
  MM2 refers to this Kafka cluster to internalListenerName:
  internal # name of the Kafka cluster internal listener
  local MM2 instance to use externalListenerName: external #
  name of the Kafka cluster external listener remote MM2
  instances to use
- namespace: kafka name: kafka kubernetesConfigContext:
  kubernetes-admin@pke-azure-kafka-weu # the context from
  {pke-azure-kafka-weu-kubeconfig.yaml} alias: kafka2 # name
  MM2 refers to this Kafka cluster to internalListenerName:
  internal # name of the Kafka cluster internal listener
  local MM2 instance to use externalListenerName: external #
  name of the Kafka cluster external listener remote MM2
  instances to use

mirrorMaker2Properties: |-

# replication topologies and flows, mm2 config, etc.

kafka1->kafka2.enabled=true kafka2->kafka1.enabled=true

# we don't have ACLs set so skip replicating them

sync.topic.acls.enabled=false EOF

Write messages to testtopic on both clusters

Remember that our Kafka clusters run inside Istio Mesh with TLS enabled. This means we need a client certificate to be able to write messages to our testtopic topic. In order to get a client certificate, run:

supertubes istio certificate
generate-client-certificate -c
{pke-aws-kafka-eun1-kubeconfig.yaml} > cert-data.json

The returned json contains the following fields in base64-encoded format:

  • client-cert.pem - the client certificate
  • client-key.pem - the client key
  • ca-cert.pem - the CA certificate
  • root-cert.pem - the root certificate
  • cert-chain.pem - the certificate chain containing root and CA certificate

Find the public addresses through which the Kafka cluster is exposed on both Kubernetes cluster:

kubectl get svc -n kafka
kafka-meshgateway NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S)
AGE kafka-meshgateway LoadBalancer 10.10.171.247
a7c265ab846c14f8fa082773fee7c0da-2063154997.eu-north-1.elb.amazonaws.com
19090:31645/TCP,19091:32240/TCP,19092:32483/TCP,29092:30681/TCP
61m 
kubectl get svc -n kafka
kafka-meshgateway NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S)
AGE kafka-meshgateway LoadBalancer 10.10.161.160
51.124.19.105
19090:32651/TCP,19091:31402/TCP,19092:30403/TCP,29092:31586/TCP
57m 

Use kafkacat to produce and consume messages from/to our testtopic topic.

kafkacat -b
a7c265ab846c14f8fa082773fee7c0da-2063154997.eu-north-1.elb.amazonaws.com:29092
-X security.protocol=SSL -X ssl.ca.location=cert_chain.pem
-X ssl.certificate.location=client.pem -X
ssl.key.location=client.key -P -t testtopic

kafka1: message 1 kafka1: message 2 
kafkacat -b 51.124.19.105:29092 -X
security.protocol=SSL -X ssl.ca.location=cert_chain.pem -X
ssl.certificate.location=client.pem -X
ssl.key.location=client.key -P -t testtopic

kafka2: message 1 kafka2: message 2

Check replicated messages

Check that MirrorMaker replicated messages from testtopic from kafka1 to kafka2 under kafka2.testtopic topic:

kafkacat -b
a7c265ab846c14f8fa082773fee7c0da-2063154997.eu-north-1.elb.amazonaws.com:29092
-X security.protocol=SSL -X ssl.ca.location=cert_chain.pem
-X ssl.certificate.location=client.pem -X
ssl.key.location=client.key -C -t kafka2.testtopic -c2
kafka2: message 1 kafka2: message 2
kafkacat -b 51.124.19.105:29092 -X
security.protocol=SSL -X ssl.ca.location=cert_chain.pem -X
ssl.certificate.location=client.pem -X
ssl.key.location=client.key -C -t kafka1.testtopic -c2
kafka1: message 2 kafka1: message 1

The summary of what we are seeing:

+-- kafka1 | +-- testtopic | +-- kafka1: message 1 | +--
kafka1: message 2 | +-- kafka2.testtopic | +-- kafka2:
message 1 | +-- kafka2: message 2 +-- kafka2 | +-- testtopic
| +-- kafka2: message 1 | +-- kafka2: message 2 | +--
kafka1.testtopic | +-- kafka1: message 1 | +-- kafka1:
message 2

Simulate a cluster loss

  1. Delete the Kubernetes cluster running in AWS in the eu-north-1 region (pke-aws-kafka-eun1). This Kubernetes cluster hosts the Kafka cluster kafka1.

  2. Create a new one pke-aws-kafka-euw1 in the eu-west-1 region.

  3. Wait until the new Kubernetes cluster is ready.

  4. Install Supertubes on the new cluster, by following the installation guide.

    supertubes install
    --root-cert=root-cert.pem --ca-cert=ca-cert.pem
    --ca-key=ca-key.pem -a --no-demo-cluster -c
    {pke-aws-kafka-euw1-kubeconfig.yaml}
    
  5. Create a Kafka cluster that we will refer to as kafka1-new in the MirrorMaker2 descriptor.

    supertubes cluster create
    --namespace kafka -c {pke-aws-kafka-euw1-kubeconfig.yaml}
    -f
    https://raw.githubusercontent.com/banzaicloud/koperator/master/config/samples/kafkacluster-with-istio.yaml
    
  6. Wait until the Kafka clusters becomes operational

    supertubes cluster get --namespace
    kafka --kafka-cluster kafka -c
    {pke-aws-kafka-euw1-kubeconfig.yaml} Namespace Name State
    Image Alerts Cruise Control Topic Status Rolling Upgrade
    Errors Rolling Upgrade Last Success kafka kafka
    ClusterRunning banzaicloud/kafka:2.13-2.4.0 0
    CruiseControlTopicReady 0 2020-03-04
    16:42:34
    
  7. Create a topic named testtopic in the new Kafka cluster. ```yaml supertubes cluster topic create --namespace kafka --kafka-cluster kafka -c {pke-aws-kafka-euw1-kubeconfig.yaml} -f -<<EOF apiVersion: kafka.banzaicloud.io/v1alpha1 kind: KafkaTopic metadata: name: testtopic spec: name: testtopic partitions: 3 replicationFactor: 2 config: "retention.ms": "28800000" "cleanup.policy": "delete" EOF

    undefined
  8. Update the MirrorMaker2 topology descriptor to make it aware of the new kafka1-new Kafka cluster.

    supertubes mm2 deploy -f -<<EOF
    
     # list of Kubernetes config file paths of clusters hosting our Kafka clusters that we want to make MM2 deployments aware of
    
     kubernetesConfigs:
    
     - {pke-aws-kafka-euw1-kubeconfig.yaml}
     - {pke-azure-kafka-weu-kubeconfig.yaml}
    
     # list of Kafka clusters to make MM2 deployments aware of
    
     kafkaClusters:
    
     - namespace: kafka name: kafka kubernetesConfigContext:
       kubernetes-admin@pke-aws-kafka-euw1 alias: kafka1-new #
       name MM2 refers to this Kafka cluster to
       internalListenerName: internal # name of the Kafka cluster
       internal listener local MM2 instance to use
       externalListenerName: external # name of the Kafka cluster
       external listener remote MM2 instances to use
     - namespace: kafka name: kafka kubernetesConfigContext:
       kubernetes-admin@pke-azure-kafka-weu alias: kafka2 # name
       MM2 refers to this Kafka cluster to internalListenerName:
       internal # name of the Kafka cluster internal listener
       local MM2 instance to use externalListenerName: external #
       name of the Kafka cluster external listener remote MM2
       instances to use
    
     mirrorMaker2Properties: |-
    
     ## replication topologies and flows, mm2 config, etc.
    
     kafka1-new->kafka2.enabled=true
     kafka2->kafka1-new.enabled=true
    
     sync.topic.acls.enabled=false EOF
    

1.  Write messages to this **testtopic** topic.
1.  Verify that all messages from the lost `kafka1` cluster
    are replicated from the backup `kafka2` cluster to the
    newly created `kafka1-new` replacement Kafka cluster.

        ```bash

    kubectl get svc -n kafka kafka-meshgateway NAME TYPE
    CLUSTER-IP EXTERNAL-IP PORT(S) AGE kafka-meshgateway
    LoadBalancer 10.10.235.122
    abee2dfdb54dc485e8449b776a649377-1677360459.eu-west-1.elb.amazonaws.com
    19090:30870/TCP,19091:32756/TCP,19092:30961/TCP,29092:31455/TCP
    25m
    ```

       ```bash

    kafkacat -b
    abee2dfdb54dc485e8449b776a649377-1677360459.eu-west-1.elb.amazonaws.com:29092
    -X security.protocol=SSL -X
    ssl.ca.location=cert_chain.pem -X
    ssl.certificate.location=client.pem -X
    ssl.key.location=client.key -P -t testtopic

kafka1-new: message 1 kafka1-new: message
2
  1. Now let's see what topics and messages we have on our old kafka2 and the newly created kafka1-new Kafka cluster.

    **kafka2**:
    kafkacat -b 51.124.19.105:29092 -X security.protocol=SSL
    -X ssl.ca.location=cert_chain.pem -X
    ssl.certificate.location=client.pem -X
    ssl.key.location=client.key -C -t testtopic -c2 kafka2:
    message 1 kafka2: message 2
    
    kafkacat -b 51.124.19.105:29092 -X security.protocol=SSL -X
    ssl.ca.location=cert_chain.pem -X
    ssl.certificate.location=client.pem -X
    ssl.key.location=client.key -C -t kafka1.testtopic -c2
    kafka1: message 1 kafka1: message 2
    
    kafkacat -b 51.124.19.105:29092 -X security.protocol=SSL -X
    ssl.ca.location=cert_chain.pem -X
    ssl.certificate.location=client.pem -X
    ssl.key.location=client.key -C -t kafka1-new.testtopic -c2
    kafka1-new: message 2 kafka1-new: message
    1
    

    **kafka1-new**:
   ```bash

kafkacat -b
abee2dfdb54dc485e8449b776a649377-1677360459.eu-west-1.elb.amazonaws.com:29092
-X security.protocol=SSL -X ssl.ca.location=cert_chain.pem
-X ssl.certificate.location=client.pem -X
ssl.key.location=client.key -C -t testtopic -c2 kafka1-new:
message 1 kafka1-new: message 2

kafkacat -b
abee2dfdb54dc485e8449b776a649377-1677360459.eu-west-1.elb.amazonaws.com:29092
-X security.protocol=SSL -X ssl.ca.location=cert_chain.pem
-X ssl.certificate.location=client.pem -X
ssl.key.location=client.key -C -t kafka2.testtopic -c2
kafka2: message 1 kafka2: message 2

kafkacat -b
abee2dfdb54dc485e8449b776a649377-1677360459.eu-west-1.elb.amazonaws.com:29092
-X security.protocol=SSL -X ssl.ca.location=cert_chain.pem
-X ssl.certificate.location=client.pem -X
ssl.key.location=client.key -C -t kafka2.kafka1.testtopic
-c2 kafka1: message 2 kafka1: message 1

Summary

The summary of what we are seeing:

+-- kafka1-new | +-- testtopic | +-- kafka1-new: message 1 |
+-- kafka1-new: message 2 | +-- kafka2.testtopic | +--
kafka2: message 1 | +-- kafka2: message 2 | +--
kafka2.kafka1.testtopic | +-- kafka1: message 1 | +--
kafka1: message 2 +-- kafka2 | +-- testtopic | +-- kafka2:
message 1 | +-- kafka2: message 2 | +-- kafka1.testtopic |
+-- kafka1: message 1 | +-- kafka1: message 2 | +--
kafka1-new.testtopic | +-- kafka1-new: message 1 | +--
kafka1-new: message 2

We got all the messages from the lost kafka1, kafka2 and the new kafka1-new clusters on both kafka2 and kafka1-newclusters.

If you are using a client capable to read multiple topics, such as kafka-console-consumer than you can query all messages that were written to testtopic.

--bootstrap-server 51.124.19.105:29092 --whitelist
.\*testtopic --consumer.config client-ssl.properties
--from-beginning --max-messages=6 kafka2: message 2 kafka1:
message 1 kafka1-new: message 2 kafka2: message 1 kafka1:
message 2 kafka1-new: message 1 Processed a total of 6
messages
kafka-console-consumer.sh
--bootstrap-server
abee2dfdb54dc485e8449b776a649377-1677360459.eu-west-1.elb.amazonaws.com:29092
--whitelist .\*testtopic --consumer.config
client-ssl.properties --from-beginning --max-messages=6
kafka1-new: message 2 kafka2: message 2 kafka1: message 2
kafka1-new: message 1 kafka2: message 1 kafka1: message 1
Processed a total of 6 messages

Other supported use cases

In this post we presented a disaster recovery solution based on MirrorMaker2 active/active, active/passive topology setup, however, MirrorMaker2 is not limited only to this use case. Other use cases such as fan-out, aggregation, or cluster migration can be implemented as well, and Supertubes supports all these use cases.

About Supertubes

Banzai Cloud Supertubes (Supertubes) is the automation tool for setting up and operating production-ready Kafka clusters on Kubernetes, leveraging a Cloud-Native technology stack. Supertubes includes Zookeeper, the Banzai Cloud Kafka operator, Envoy, Istio and many other components that are installed, configured, and managed to operate a production-ready Kafka cluster on Kubernetes. Some of the key features are fine-grained broker configuration, scaling with rebalancing, graceful rolling upgrades, alert-based graceful scaling, monitoring, out-of-the-box mTLS with automatic certificate renewal, Kubernetes RBAC integration with Kafka ACLs, and multiple options for disaster recovery.