Spark Streaming Checkpointing on Kubernetes
Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes
Apache Zeppelin on Kubernetes series:
Running Zeppelin Spark notebooks on Kubernetes
Running Zeppelin Spark notebooks on Kubernetes - deep dive
Apache Kafka on Kubernetes series:
Kafka on Kubernetes - using etcd
Spark Streaming applications are
special Spark applications capable of processing data continuously, which allows reuse of code for batch processing, joining streams against historical data, or the running of ad-hoc queries on stream data. These streaming scenarios require special considerations when apps run for long periods and without interruption. To begin with, you'll need at least these two things:
- A good scheduler
- And to enable checkpointing in the Spark streaming app
For the scheduler, and for Spark in general, we use Spark on Kubernetes. If you need to deploy a Kubernetes cluster to a cloud provider, you can use Pipeline to do the heavy lifting for you. By default, Kubernetes takes care of failing Spark executors and drivers by restarting failing pods. Although this is enough for
executors, for a
driver it is necessary, but insufficient. In order to make such a
driver more resilient to failure, Spark checkpointing must first be enabled.
When speaking about Spark checkpointing, it is necessary that we distinguish between two different varieties:
- Metadata checkpointing
- Data checkpointing
This blogpost focuses on the
Metadata, because that's the checkpointing needed to recover from failures. If you're interested in a scenario which requires data checkpointing, you should check out the Spark documentation.
Metadata checkpointing saves the required metadata, so, in case of failure, the application can continue where it left of. Usually, the most common storage layer for the checkpoint is HDFS or S3. For Kubernetes and in the cloud, you'll probably be using S3 in favor of managing your own HDFS cluster. On the other hand, S3 is slow and, if you're working with large Spark streaming applications, you'll face bottlenecks and issues pertaining to slowness. One of your
better options is to use either EFS on Amazon, AzureFile on Azure, or GCE-PD on Google.
Azure Files offers fully managed file shares in the cloud that are accessible via the Server Message Block (SMB) protocol and can be used concurrently by multiple cloud VMs.
It turns out that just Spark checkpointing, alone, doesn't work with the current Kubernetes Spark implementation. In order to support Spark checkpointing, we need to make the following changes:
- Drivers should be Jobs instead of Pods;
Podsare vulnerable because they're only scheduled to one node, so if a node fails its corresponding pods will never be rescheduled.
- The driver should be bound to a PersistentVolume for the
checkpointdir, this also requires some configuring of
- SPARK-22294 is required in order for you to bind to the right IP in the event case of a restart.
We built our own Spark version with all of the above changes, which is available on DockerHub (banzaicloud/spark-driver:v2.2.0-k8s-1.0.207). We are going to contribute these changes back in a
Next, we need a Spark Streaming application that has been properly configured to enable checkpointing. For simplicity's sake, we're going to use a slightly modified version of the Spark Streaming example, NetworkWordCount.
This application will consume data from a
netcat server, which, to keep things managable, we'll run on our local machine. Since we're running Kubernetes clusters on AKS, we need to ensure that the
netcat server running on our host is reachable from outside. We'll use ngrok for that.
nc -lk 9999 ngrok tcp 9999
After creating an AKS cluster with Pipeline, we need to change and delete the default
storageclass created by Azure, because it is of the
AzureDisk type, when what we want is
AzureFile. If KUBECONFIG is set correctly (e.g.: export KUBECONFIG=path to kubeconfig), the new storage class can be created with
kubectl create via the following yaml:
kubectl create -f - <<EOF kind: StorageClass apiVersion: storage.k8s.io/v1 metadata: annotations: storageclass.kubernetes.io/is-default-class: "true" name: azurefile provisioner: kubernetes.io/azure-file parameters: skuName: Standard_LRS location: eastus storageAccount: banzaicloud EOF
The old storageclass can be deleted with
kubectl delete storageclass default. Feel free to skip this phase, if using
AzureDisk for storage is ammenable to you. However, you should note that we picked AzureFiles in order to leverage some of its comperative benefits.
AzureFiles also requires a new StorageAccount, which should be created in the same resource group as a cluster, and, if you're mimicking the example given above, the account name needs to be
banzaicloud. Next, you've got to deploy the Spark Helm chart, which creates all the required subsystems for your Spark job. To do that, use our Postman
collection to create a deployment called
To submit a Spark application from your computer to a Kubernetes cluster, you need a
spark-submit that supports the previously mentioned configs. We've created a tar.tgz that can be downloaded and used to submit your Spark application. Download the file and untar it to the directory of your choosing. When you're finished, clone or download the example and build it:
ls -lah total 80 drwxr-xr-x 11 baluchicken staff 352B Feb 21 15:07 . drwxr-xr-x 24 baluchicken staff 768B Feb 5 16:30 .. drwxr-xr-x 14 baluchicken staff 448B Feb 21 19:07 .git -rw-r--r-- 1 baluchicken staff 43B Feb 5 16:55 .gitignore drwxr-xr-x 11 baluchicken staff 352B Feb 21 19:07 .idea -rw-r--r-- 1 baluchicken staff 11K Feb 20 14:57 LICENSE -rw-r--r-- 1 baluchicken staff 2.9K Feb 20 14:57 pom.xml -rw-r--r-- 1 baluchicken staff 241B Jan 31 14:55 settings.xml -rw-r--r-- 1 baluchicken staff 14K Feb 5 16:54 spark-network-word-count.iml drwxr-xr-x 3 baluchicken staff 96B Jan 31 14:55 src mvn clean package
To submit a Spark app from your local machine, you must use a resource staging server (if you'd like to learn more about spark-submit/resource staging servers read this blogpost). Port-forward to upload your jar to the Kubernetes cluster. In order to do that, find your resource staging server and use
kubectl's port-forwarding feature:
kubectl get pods NAME READY STATUS RESTARTS AGE shuffle-ncmw9 1/1 Running 0 30m falling-monkey-spark-rss-867c7c855d-h82nzj 1/1 Running 0 30m kubectl port-forward ulterior-dingo-spark-rss-558ff96bb4-ng7sj 31000:10000 Forwarding from 127.0.0.1:31000 -> 10000
Please note that sometimes port-forwarding fails. Restart and try again if you're having trouble.
port-forward is running, you can submit your Spark app with the following command:
bin/spark-submit --verbose \ --deploy-mode cluster \ --class com.banzaicloud.SparkNetworkWordCount \ --master k8s://<replace this with the kubernetesendpoint> \ --kubernetes-namespace default \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.app.name=NetworkWordCount \ --conf spark.kubernetes.driver.docker.image=banzaicloud/spark-driver:v2.2.0-k8s-1.0.207 \ --conf spark.kubernetes.executor.docker.image=banzaicloud/spark-executor:v2.2.0-k8s-1.0.207 \ --conf spark.kubernetes.initcontainer.docker.image=banzaicloud/spark-init:v2.2.0-k8s-1.0.207 \ --conf spark.kubernetes.checkpointdir.enable=true \ --conf spark.driver.cores="300m" \ --conf spark.executor.instances=2 \ --conf spark.kubernetes.shuffle.namespace=default \ --conf spark.kubernetes.resourceStagingServer.uri=http://localhost:31000 \ --conf spark.kubernetes.resourceStagingServer.internal.uri=http://spark-rss:10000 \ --conf spark.local.dir=/tmp/spark-local \ file:///<your path to the jar>spark-network-word-count-1.0-SNAPSHOT.jar tcp://0.tcp.ngrok.io <your ngrok port> file:///checkpointdir
Now check and see if your cluster is alive:
kubectl get pods NAME READY STATUS RESTARTS AGE networkwordcount-1519234651100-driver-6frsx 1/1 Running 0 4m networkwordcount-1519234651100-v2wj-exec-1 1/1 Running 0 3m networkwordcount-1519234651100-v2wj-exec-2 1/1 Running 0 3m shuffle-ncmw9 1/1 Running 0 1h falling-monkey-spark-rss-867c7c855d-h82nz 1/1 Running 0 1h
To check if the checkpointing works, kill the driver pod:
kubectl delete pod networkwordcount-1519234651100-driver-6frsx pod "networkwordcount-1519234651100-driver-6frsx" deleted
Job will automatically restart the pod for you:
kubectl get pods NAME READY STATUS RESTARTS AGE networkwordcount-1519234651100-driver-lj7pr 1/1 Running 0 1m networkwordcount-1519234651100-fbd4-exec-1 1/1 Running 0 22s networkwordcount-1519234651100-fbd4-exec-2 1/1 Running 0 22s shuffle-ncmw9 1/1 Running 0 1h falling-monkey-spark-rss-867c7c855d-h82nz 1/1 Running 0 1h
Check the driver logs and look for checkpointing dir usage:
kubectl logs networkwordcount-1519234651100-driver-lj7pr CheckpointReader:54 - Attempting to load checkpoint from file file:/checkpointdir/checkpoint-1519236375000 Checkpoint:54 - Checkpoint for time 1519236375000 ms validated CheckpointReader:54 - Checkpoint successfully loaded from file file:/checkpointdir/checkpoint-1519236375000
That's it for now - you can go through this same process on AWS EFS by changing AzureFiles to EFS, and by reading our EFS blog.