Outshift Logo

12 min read

Blog thumbnail
CN

by Avi Zimmerman

Published on 10/16/2019
Last updated on 02/05/2024

User authenticated and access controlled clusters with the Kafka operator

Share

Our thinking that there was a hunger for an operator that makes easy the provisioning and operating Apache Kafka clusters on Kubernetes which is not based on Kubernetes stateful sets, proved to be correct as shortly after we released the first version our open-source Koperator a community started to build around it. We received lots of valuable feedback that helps to shape the future of Koperator and also feature contributions from the community. In the following guest post, Avi Zimmerman talks about Kafka topic, ACLs and user management that he contributed to Koperator.
This is a guest post by Avi Zimmerman.
Kafka is rapidly becoming an integral part of event-driven architectures and data streaming pipelines. But just like in every other service that hold valuable data - it's necessary to keep what's important safe from prying eyes and unauthorized access. In Kafka, this means setting up a Public Key Infrastructure (PKI) or configuring SASL, which can be a difficult to automate securely. With Koperator, the solution to this problem can be managed for you, using CRDs. The Kafka cluster, topics, users, and their ACLs are all handled by the operator based off the yaml definitions you provide. The operator also uses Cruise Control and Prometheus to monitor your cluster's health and perform graceful healing and scaling operations when needed. The below picture shows the operator creating a Kafka Topic. kafka-topic

Why should I be using SSL and ACLs if I'm only exposing Kafka internally?

A common throughline in the design of data pipelines (that meet security requirements) is that all data should be encrypted both at rest, and in transit. When you configure Kafka to use SSL, data is encrypted between your clients and the Kafka cluster. Additionally, you can restrict access to topics to people holding specific certificates. This gives you an extra layer of security in the event of, for example, an unauthorized user gaining access to a pod inside your cluster. If such a user has the ability to use certain network sniffing tools, they'll be able to listen to unencrypted traffic inside your cluster. And if Kafka isn't using ACLs, they can start reading and writing from your topics, and, if they know the address of your cluster, changing broker configurations. While there's no such thing as perfect security, it's easy to take a few reasonable steps to avoid catastrophes like this. And with Koperator doing all the heavy-lifting, why shouldn't you? The picture below shows how the operator manages a user created by CRD. kafka-user

Complete Example

This example will guide you through setting up a Kubernetes cluster on your machine, installing Koperator and its dependencies, deploying a KafkaCluster CR, and then an application with KafkaTopic and KafkaUser CRs. For this example we are going to use kind to run a cluster on our development machine. Kind lets you run a full kubernetes cluster inside Docker. And you'll need to have docker installed on your machine, first, if you don't have it already. First, we're going to install kind and create a cluster, then we'll set our KUBECONFIG to point our kubernetes commands at the local running cluster.
# Install kind on macOS/Linux (refer to their documentation for windows)
curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/v0.5.1/kind-$(uname)-amd64
chmod +x ./kind
sudo mv ./kind /usr/local/bin/kind

# Create a local cluster with one control-pane and three workers
cat << EOF | kind create cluster --config -
kind: Cluster
apiVersion: kind.sigs.k8s.io/v1alpha3
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
EOF

## The above may take a minute or two depending on your internet connection

# Export the kind KUBECONFIG for kubectl, helm, etc.
export KUBECONFIG="$(kind get kubeconfig-path)"
Now that we have a cluster up and running on our machine, we need to add a few more things to it, before we can install Koperator. First, we're going to use helm to install cert-manager and the zookeeper-operator. We believe in the principle of seperation of concerns, so Koperator does not install or manage either Zookeeper, or cert-manager. This allows for different, non-dependent services to take care of the things they are best at, without any unnecessary overhead.
# Setup Helm in the Kind cluster
cat << EOF | kubectl apply -f -
apiVersion: v1
kind: ServiceAccount
metadata:
  name: tiller
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: tiller
  roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
  name: tiller
  namespace: kube-system
EOF

# Install tiller into the cluster and add the jetstack and banzaicloud repos
helm init --service-account tiller
helm repo add jetstack https://charts.jetstack.io
helm repo add banzaicloud-stable https://kubernetes-charts.banzaicloud.com/
helm repo update

# Install the cert-manager and CRDs into the cluster
kubectl create ns cert-manager
kubectl label namespace cert-manager certmanager.k8s.io/disable-validation=true
kubectl apply -f https://raw.githubusercontent.com/jetstack/cert-manager/release-0.10/deploy/manifests/00-crds.yaml
helm install --name cert-manager --namespace cert-manager --version v0.10.1 --set webhook.enabled=false jetstack/cert-manager

# Install the zookeeper-operator and setup a zookeeper cluster
helm install --name zookeeper-operator --namespace zookeeper banzaicloud-stable/zookeeper-operator
cat << EOF | kubectl apply -f -
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
  name: example-zookeepercluster
  namespace: zookeeper
spec:
  replicas: 3
EOF
We are now ready to setup a Kafka cluster running on Kubernetes. All the examples below can be found in the Koperator repository so we'll first clone that with git.
# Clone the repository
git clone https://github.com/banzaicloud/koperator
cd koperator

# Install the operator - we will disable prometheus server for this example
helm install --name kafka-operator --namespace kafka banzaicloud-stable/kafka-operator

# Create the example Kafka cluster with internal SSL and ACLs enabled
kubectl apply -n kafka -f config/samples/kafkacluster_with_ssl_groups.yaml
The last command above will return immediately, but it may take 2-3 minutes for all brokers and cruise-control to startup. You can use kubectl to see when the brokers are all ready. The output should look something like this:
$&gt; kubectl get pod -n kafka

NAME                                       READY   STATUS    RESTARTS   AGE
kafka-cruisecontrol-549976ffd8-6swvz       1/1     Running   0          1m48s
kafka-operator-operator-5cf97c9959-llpnn   2/2     Running   0          3m22s
kafkab6dx6                                 1/1     Running   0          2m6s
kafkafhdsv                                 1/1     Running   0          2m7s
kafkapnd5w                                 1/1     Running   0          2m21s
The sample CR configured a three-broker Kafka cluster with SSL, a managed PKI for user certificates, and a cluster-wide internal reachable address of kafka-headless.kafka.svc.cluster.local:29092. We can now deploy kafka topics, users, and a pod reading/writing from the topic with a simple manifest:
# kafka-operator/hack/kafka-test-pod/manifest.yaml
#
# A KafkaTopic called 'test-topic' with a single partition replicated across all three brokers
#  -- you can also specify arbitrary topic configurations with `spec.config` --
apiVersion: banzaicloud.banzaicloud.io/v1alpha1
kind: KafkaTopic
metadata:
  name: test-topic
spec:
  clusterRef:
    name: kafka
    namespace: kafka
  name: test-topic
  partitions: 1
  replicationFactor: 3
---
# A KafkaUser with permission to read from 'test-topic'
apiVersion: banzaicloud.banzaicloud.io/v1alpha1
kind: KafkaUser
metadata:
  name: test-kafka-consumer
spec:
  clusterRef:
    name: kafka
    namespace: kafka
  secretName: test-kafka-consumer
  topicGrants:
    - topicName: test-topic
      accessType: read
---
# A KafkaUser with permission to write to 'test-topic'
apiVersion: banzaicloud.banzaicloud.io/v1alpha1
kind: KafkaUser
metadata:
  name: test-kafka-producer
spec:
  clusterRef:
    name: kafka
    namespace: kafka
  secretName: test-kafka-producer
  topicGrants:
    - topicName: test-topic
      accessType: write
---
# A pod containing a producer and consumer using the above credentials
apiVersion: v1
kind: Pod
metadata:
  name: kafka-test-pod
spec:
  volumes:
    # The consumer secret
    - name: consumer-credentials
      secret:
        defaultMode: 420
        secretName: test-kafka-consumer

    # The producer secret
    - name: producer-credentials
      secret:
        defaultMode: 420
        secretName: test-kafka-producer

  containers:
    # Container reading from topic with the consumer credentials
    # The environment variables used below are for our test-application,
    # which is described in greater detail later in this post.
    - name: consumer
      image: banzaicloud/kafka-test:latest
      env:
        - name: KAFKA_MODE
          value: consumer
      volumeMounts:
        - mountPath: /etc/secrets/certs
          name: consumer-credentials
          readOnly: true

    # Container writing to topic with the producer credentials
    - name: producer
      image: banzaicloud/kafka-test:latest
      env:
        - name: KAFKA_MODE
          value: producer
      volumeMounts:
        - mountPath: /etc/secrets/certs
          name: producer-credentials
          readOnly: true
You can apply the manifest above (found in the repository) to your test cluster and it should start reading and writing from Kafka. We use kubetail to tail the logs below, which is a script for tailing the logs of multiple pods/containers at a time.
$> kubectl apply -f hack/kafka-test-pod/manifest.yaml

kafkatopic.banzaicloud.banzaicloud.io/test-topic created
kafkauser.banzaicloud.banzaicloud.io/test-kafka-consumer created
kafkauser.banzaicloud.banzaicloud.io/test-kafka-producer created
pod/kafka-test-pod created

# The above pod may restart a couple times at first due to applying
# all the objects at once like this and not everything being ready.
# But after a few seconds you should be able to tail the logs and see something like this.
$> kubetail kafka-test-pod
Will tail 2 logs...
kafka-test-pod consumer
kafka-test-pod producer

[kafka-test-pod producer] 2019/09/19 02:07:18 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:18 Consumed message offset 0 - hello world
[kafka-test-pod producer] 2019/09/19 02:07:23 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:23 Consumed message offset 1 - hello world
[kafka-test-pod producer] 2019/09/19 02:07:28 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:28 Consumed message offset 2 - hello world
[kafka-test-pod producer] 2019/09/19 02:07:33 Sending message to topic
[kafka-test-pod consumer] 2019/09/19 02:07:33 Consumed message offset 3 - hello world
^C

Show me some code!

The code running in the pod above is a simple Go application that depends on the environment variable KAFKA_MODE, which will either read or be written from our test topic. The code for this example can be found in the kafka-operator repository at hack/kafka-test-pod/main.go and at the end of this blog post. First, if you look at the manifest above, you'll see that we set a secret to store our user credentials. We are mounting this secret inside the pod at /etc/secrets/certs. We first define a function that can create a TLS configuration from those credentials.
package main

import (
  "crypto/tls"
  "crypto/x509"
  "io/ioutil"
  "log"
)

// The paths to our credentials
const certFile = "/etc/secrets/certs/tls.crt"
const keyFile = "/etc/secrets/certs/tls.key"
const caFile = "/etc/secrets/certs/ca.crt"

// ...

func getTLSConfig() *tls.Config {
	// Create a TLS configuration from our secret for connecting to Kafka
	clientCert, err := tls.LoadX509KeyPair(certFile, keyFile)
	if err != nil {
		log.Fatal(err)
	}
	caCert, err := ioutil.ReadFile(caFile)
	if err != nil {
		log.Fatal(err)
	}
	caPool := x509.NewCertPool()
	caPool.AppendCertsFromPEM(caCert)
	return &tls.Config{
		Certificates: []tls.Certificate{clientCert},
		RootCAs:      caPool,
	}
}
In this example, we're using Shopify's sarama library to interact with Kafka. We can include the TLS configuration created by the function above in our options to the sarama client. For example, to create a new consumer we can do the following:
package main

import (
  // ...

  "github.com/Shopify/sarama"
)

var brokerAddrs = []string{"kafka-headless.kafka.svc.cluster.local:29092"}

func main() {
  config := sarama.NewConfig()
  config.Net.TLS.Enable = true
  config.Net.TLS.Config = getTLSConfig()

  // Get a consumer
  consumer, err := sarama.NewConsumer(brokerAddrs, config)
  if err != nil {
    log.Fatal(err)
  }
  defer consumer.Close()
}
The full application will create a sarama configuration using the methods above. Then, depending on the value of the environment variable, will produce or consume from the topic. Below is the full example with inline comments.
package main

import (
	"crypto/tls"
	"crypto/x509"
	"io/ioutil"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/Shopify/sarama"
)

// Constants for our topic and credential locations
const kafkaTopic = "test-topic"
const certFile = "/etc/secrets/certs/tls.crt"
const keyFile = "/etc/secrets/certs/tls.key"
const caFile = "/etc/secrets/certs/ca.crt"

// The address we are going to use for kafka broker discovery
var brokerAddrs = []string{"kafka-headless.kafka.svc.cluster.local:29092"}

func main() {

	// Create an SSL configuration for connecting to Kafka
	config := sarama.NewConfig()
	config.Net.TLS.Enable = true
	config.Net.TLS.Config = getTLSConfig()

	// Trap SIGINT and SIGTERM to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

	// Consume or produce depending on the value of the environment variable
	if os.Getenv("KAFKA_MODE") == "consumer" {
		consume(config, signals)
	} else if os.Getenv("KAFKA_MODE") == "producer" {
		produce(config, signals)
	} else {
		log.Fatal("Invalid test mode:", os.Getenv("KAFKA_MODE"))
	}

}

func getTLSConfig() *tls.Config {
	// Create a TLS configuration from our secret for connecting to Kafka
	clientCert, err := tls.LoadX509KeyPair(certFile, keyFile)
	if err != nil {
		log.Fatal(err)
	}
	caCert, err := ioutil.ReadFile(caFile)
	if err != nil {
		log.Fatal(err)
	}
	caPool := x509.NewCertPool()
	caPool.AppendCertsFromPEM(caCert)
	return &tls.Config{
		Certificates: []tls.Certificate{clientCert},
		RootCAs:      caPool,
	}
}

func consume(config *sarama.Config, signals chan os.Signal) {
	// Get a consumer
	consumer, err := sarama.NewConsumer(brokerAddrs, config)
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	// Setup a channel for messages from our topic
	partitionConsumer, err := consumer.ConsumePartition(kafkaTopic, 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatal(err)
	}
	defer partitionConsumer.Close()

	consumed := 0
ConsumerLoop:
	// Consume the topic
	for {
		select {
		case msg := <-partitionConsumer.Messages():
			log.Printf("Consumed message offset %d - %s\n", msg.Offset, string(msg.Value))
			consumed++
		case <-signals:
			log.Println("Shutting down")
			break ConsumerLoop
		}
	}
}

func produce(config *sarama.Config, signals chan os.Signal) {

	// Get a broker connection
	broker := sarama.NewBroker(brokerAddrs[0])
	if err := broker.Open(config); err != nil {
		log.Fatal(err)
	}
	defer broker.Close()

	// Create a time ticker to trigger a message every 5 seconds
	ticker := time.NewTicker(time.Duration(5) * time.Second)

ProducerLoop:
	// Send a message everytime the ticker is triggered
	for {
		select {
		case <-ticker.C:
			log.Println("Sending message to topic")
			msg := &sarama.ProduceRequest{}
			msg.AddMessage(kafkaTopic, 0, &sarama.Message{
				Value: []byte("hello world"),
			})
			if _, err := broker.Produce(msg); err != nil {
				log.Fatal(err)
			}
		case <-signals:
			log.Println("Shutting down")
			break ProducerLoop
		}
	}

}
And, voila, you've set up a Kafka cluster with access control and user authentication. Hopefully, this exercise had been sufficiently enlightening, but if you have questions, or if you're interested in contributing, check us out on GitHub.

About Banzai Cloud Pipeline

Banzai Cloud’s Pipeline provides a platform for enterprises to develop, deploy, and scale container-based applications. It leverages best-of-breed cloud components, such as Kubernetes, to create a highly productive, yet flexible environment for developers and operations teams alike. Strong security measures — multiple authentication backends, fine-grained authorization, dynamic secret management, automated secure communications between components using TLS, vulnerability scans, static code analysis, CI/CD, and so on — are default features of the Pipeline platform.
Subscribe card background
Subscribe
Subscribe to
the Shift!

Get emerging insights on emerging technology straight to your inbox.

Unlocking Multi-Cloud Security: Panoptica's Graph-Based Approach

Discover why security teams rely on Panoptica's graph-based technology to navigate and prioritize risks across multi-cloud landscapes, enhancing accuracy and resilience in safeguarding diverse ecosystems.

thumbnail
I
Subscribe
Subscribe
 to
the Shift
!
Get
emerging insights
on emerging technology straight to your inbox.

The Shift keeps you at the forefront of cloud native modern applications, application security, generative AI, quantum computing, and other groundbreaking innovations that are shaping the future of technology.

Outshift Background