Outshift Logo

7 min read

Blog thumbnail
CN

by Balint Molnar

Published on 02/20/2018
Last updated on 02/05/2024

Spark Streaming Checkpointing on Kubernetes

Share

Apache Spark on Kubernetes series: Introduction to Spark on Kubernetes Scaling Spark made simple on Kubernetes The anatomy of Spark applications on Kubernetes Monitoring Apache Spark with Prometheus Spark History Server on Kubernetes Spark scheduling on Kubernetes demystified Spark Streaming Checkpointing on Kubernetes Deep dive into monitoring Spark and Zeppelin with Prometheus Apache Spark application resilience on Kubernetes

Apache Zeppelin on Kubernetes series: Running Zeppelin Spark notebooks on Kubernetes Running Zeppelin Spark notebooks on Kubernetes - deep dive

Apache Kafka on Kubernetes series: Kafka on Kubernetes - using etcd

Spark Streaming applications are special Spark applications capable of processing data continuously, which allows reuse of code for batch processing, joining streams against historical data, or the running of ad-hoc queries on stream data. These streaming scenarios require special considerations when apps run for long periods and without interruption. To begin with, you'll need at least these two things:

  • A good scheduler
  • And to enable checkpointing in the Spark streaming app

For the scheduler, and for Spark in general, we use Spark on Kubernetes. If you need to deploy a Kubernetes cluster to a cloud provider, you can use Pipeline to do the heavy lifting for you. By default, Kubernetes takes care of failing Spark executors and drivers by restarting failing pods. Although this is enough for executors, for a driver it is necessary, but insufficient. In order to make such a driver more resilient to failure, Spark checkpointing must first be enabled.

When speaking about Spark checkpointing, it is necessary that we distinguish between two different varieties:

  • Metadata checkpointing
  • Data checkpointing

This blogpost focuses on the Metadata, because that's the checkpointing needed to recover from failures. If you're interested in a scenario which requires data checkpointing, you should check out the Spark documentation.

Metadata checkpointing saves the required metadata, so, in case of failure, the application can continue where it left of. Usually, the most common storage layer for the checkpoint is HDFS or S3. For Kubernetes and in the cloud, you'll probably be using S3 in favor of managing your own HDFS cluster. On the other hand, S3 is slow and, if you're working with large Spark streaming applications, you'll face bottlenecks and issues pertaining to slowness. One of your better options is to use either EFS on Amazon, AzureFile on Azure, or GCE-PD on Google.

For this blogpost we'll use AzureFiles, and showcase the usage of Spark streaming with AzureFiles on Azure AKS, using the latest 0.2.0 version of Pipeline.

Azure Files offers fully managed file shares in the cloud that are accessible via the Server Message Block (SMB) protocol and can be used concurrently by multiple cloud VMs.

It turns out that just Spark checkpointing, alone, doesn't work with the current Kubernetes Spark implementation. In order to support Spark checkpointing, we need to make the following changes:

  • Drivers should be Jobs instead of Pods; Pods are vulnerable because they're only scheduled to one node, so if a node fails its corresponding pods will never be rescheduled.
  • The driver should be bound to a PersistentVolume for the checkpointdir, this also requires some configuring of  spark.kubernetes.checkpointdir.enable,spark.kubernetes.checkpointdir.path,  spark.kubernetes.checkpointdir.size
  • SPARK-22294 is required in order for you to bind to the right IP in the event case of a restart.

We built our own Spark version with all of the above changes, which is available on DockerHub (banzaicloud/spark-driver:v2.2.0-k8s-1.0.207). We are going to contribute these changes back in a PR soon.

Next, we need a Spark Streaming application that has been properly configured to enable checkpointing. For simplicity's sake, we're going to use a slightly modified version of the Spark Streaming example, NetworkWordCount. This application will consume data from a netcat server, which, to keep things managable, we'll run on our local machine. Since we're running Kubernetes clusters on AKS, we need to ensure that the netcat server running on our host is reachable from outside. We'll use ngrok for that.

nc -lk 9999
ngrok tcp 9999

After creating an AKS cluster with Pipeline, we need to change and delete the default storageclass created by Azure, because it is of the AzureDisk type, when what we want is AzureFile. If KUBECONFIG is set correctly (e.g.: export KUBECONFIG=path to kubeconfig), the new storage class can be created with kubectl create via the following yaml:

kubectl create -f - <<EOF
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
 annotations:
   storageclass.kubernetes.io/is-default-class: "true"
 name: azurefile
provisioner: kubernetes.io/azure-file
parameters:
 skuName: Standard_LRS
 location: eastus
 storageAccount: banzaicloud
EOF

The old storageclass can be deleted with kubectl delete storageclass default. Feel free to skip this phase, if using AzureDisk for storage is ammenable to you. However, you should note that we picked AzureFiles in order to leverage some of its comperative benefits. AzureFiles also requires a new StorageAccount, which should be created in the same resource group as a cluster, and, if you're mimicking the example given above, the account name needs to be banzaicloud. Next, you've got to deploy the Spark Helm chart, which creates all the required subsystems for your Spark job. To do that, use our Postman collection to create a deployment called banzaicloud-stable/spark. To submit a Spark application from your computer to a Kubernetes cluster, you need a spark-submit that supports the previously mentioned configs. We've created a tar.tgz that can be downloaded and used to submit your Spark application. Download the file and untar it to the directory of your choosing. When you're finished, clone or download the example and build it:

ls -lah
total 80
drwxr-xr-x  11 baluchicken  staff   352B Feb 21 15:07 .
drwxr-xr-x  24 baluchicken  staff   768B Feb  5 16:30 ..
drwxr-xr-x  14 baluchicken  staff   448B Feb 21 19:07 .git
-rw-r--r--   1 baluchicken  staff    43B Feb  5 16:55 .gitignore
drwxr-xr-x  11 baluchicken  staff   352B Feb 21 19:07 .idea
-rw-r--r--   1 baluchicken  staff    11K Feb 20 14:57 LICENSE
-rw-r--r--   1 baluchicken  staff   2.9K Feb 20 14:57 pom.xml
-rw-r--r--   1 baluchicken  staff   241B Jan 31 14:55 settings.xml
-rw-r--r--   1 baluchicken  staff    14K Feb  5 16:54 spark-network-word-count.iml
drwxr-xr-x   3 baluchicken  staff    96B Jan 31 14:55 src
mvn clean package

To submit a Spark app from your local machine, you must use a resource staging server (if you'd like to learn more about spark-submit/resource staging servers read this blogpost). Port-forward to upload your jar to the Kubernetes cluster. In order to do that, find your resource staging server and use kubectl's port-forwarding feature:

kubectl get pods
NAME                                        READY     STATUS    RESTARTS   AGE
shuffle-ncmw9                               1/1       Running   0          30m
falling-monkey-spark-rss-867c7c855d-h82nzj   1/1       Running   0          30m

kubectl port-forward ulterior-dingo-spark-rss-558ff96bb4-ng7sj 31000:10000
Forwarding from 127.0.0.1:31000 -&gt; 10000

Please note that sometimes port-forwarding fails. Restart and try again if you're having trouble.

If port-forward is running, you can submit your Spark app with the following command:

 bin/spark-submit --verbose \
  --deploy-mode cluster \
  --class com.banzaicloud.SparkNetworkWordCount \
  --master k8s://&lt;replace this with the kubernetesendpoint&gt; \
  --kubernetes-namespace default \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.app.name=NetworkWordCount \
  --conf spark.kubernetes.driver.docker.image=banzaicloud/spark-driver:v2.2.0-k8s-1.0.207 \
  --conf spark.kubernetes.executor.docker.image=banzaicloud/spark-executor:v2.2.0-k8s-1.0.207 \
  --conf spark.kubernetes.initcontainer.docker.image=banzaicloud/spark-init:v2.2.0-k8s-1.0.207 \
  --conf spark.kubernetes.checkpointdir.enable=true \
  --conf spark.driver.cores=&quot;300m&quot; \
  --conf spark.executor.instances=2 \
  --conf spark.kubernetes.shuffle.namespace=default \
  --conf spark.kubernetes.resourceStagingServer.uri=http://localhost:31000 \
  --conf spark.kubernetes.resourceStagingServer.internal.uri=http://spark-rss:10000 \
  --conf spark.local.dir=/tmp/spark-local \
  file:///&lt;your path to the jar&gt;spark-network-word-count-1.0-SNAPSHOT.jar tcp://0.tcp.ngrok.io &lt;your ngrok port&gt; file:///checkpointdir

Now check and see if your cluster is alive:

kubectl get pods
NAME                                          READY     STATUS    RESTARTS   AGE
networkwordcount-1519234651100-driver-6frsx   1/1       Running   0          4m
networkwordcount-1519234651100-v2wj-exec-1    1/1       Running   0          3m
networkwordcount-1519234651100-v2wj-exec-2    1/1       Running   0          3m
shuffle-ncmw9                                 1/1       Running   0          1h
falling-monkey-spark-rss-867c7c855d-h82nz     1/1       Running   0          1h

To check if the checkpointing works, kill the driver pod:

kubectl delete pod networkwordcount-1519234651100-driver-6frsx
pod &quot;networkwordcount-1519234651100-driver-6frsx&quot; deleted

The Job will automatically restart the pod for you:

kubectl get pods
NAME                                          READY     STATUS    RESTARTS   AGE
networkwordcount-1519234651100-driver-lj7pr   1/1       Running   0          1m
networkwordcount-1519234651100-fbd4-exec-1    1/1       Running   0          22s
networkwordcount-1519234651100-fbd4-exec-2    1/1       Running   0          22s
shuffle-ncmw9                                 1/1       Running   0          1h
falling-monkey-spark-rss-867c7c855d-h82nz     1/1       Running   0          1h

Check the driver logs and look for checkpointing dir usage:

<pre class="language-unknown"><code class="language-unknown">kubectl logs networkwordcount-1519234651100-driver-lj7pr

CheckpointReader:54 - Attempting to load checkpoint from file file:/checkpointdir/checkpoint-1519236375000
Checkpoint:54 - Checkpoint for time 1519236375000 ms validated
CheckpointReader:54 - Checkpoint successfully loaded from file file:/checkpointdir/checkpoint-1519236375000</code></pre>

That's it for now - you can go through this same process on AWS EFS by changing AzureFiles to EFS, and by reading our EFS blog.

Subscribe card background
Subscribe
Subscribe to
the Shift!

Get emerging insights on emerging technology straight to your inbox.

Unlocking Multi-Cloud Security: Panoptica's Graph-Based Approach

Discover why security teams rely on Panoptica's graph-based technology to navigate and prioritize risks across multi-cloud landscapes, enhancing accuracy and resilience in safeguarding diverse ecosystems.

thumbnail
I
Subscribe
Subscribe
 to
the Shift
!
Get
emerging insights
on emerging technology straight to your inbox.

The Shift keeps you at the forefront of cloud native modern applications, application security, generative AI, quantum computing, and other groundbreaking innovations that are shaping the future of technology.

Outshift Background