This post is part of the Debug 101 series. If you missed the previous post in this series, check it out here:
We're in the middle of deploying
Apache Kafka to Kubernetes the
cloud native-way - by totally removing the
Zookeeper dependency and
using etcd, instead. This
means that service registry/discovery and other internal
Kafka to Zookeeper operations will be dispatched to a
pre-existing etcd cluster. Sweet, isn't it? No need for yet
another third party system, because already have etcd
as
part of Kubernetes, out-of-the-box.
In this post we don't want to go into detail about why we choose to totally remove Zookeeper, or why it's considerably better to rely on etcd when deploying to Kubernetes. Once we've completed this project and pushed the PR upstream, we'll revisit that topic. Meanwhile, if you'd like consider one single point on this matter - performance - we reccomend you read this blog.
To recap, etcd
is a distributed key/value store which
relies on the
Raft consensus algorithm and is
used internally by Kubernetes. The java library which can
interact with it is called
jetcd - currently under
development and in beta. We are using this library to remove
the Zookeeper dependency, and it's worked pretty well so
far. Earlier this week we arrived at a point in making these
changes wherein we were running a large amount of Kafka
tests, and few of them were failing. The problem we
encontered was related to Transactions
. A simple jetcd
transaction looks like this:
val client: Client = Client.builder().endpoints("http://localhost:2379").build()
val test = Try {
client.getKVClient.txn().
If(new Cmp(ByteSequence.fromString("foo"), Cmp.Op.GREATER, CmpTarget.version(0))).
Then(Op.get(ByteSequence.fromString("foo"), GetOption.DEFAULT)).
Else(Op.put(ByteSequence.fromString("foo"), ByteSequence.fromString("bar"),PutOption.DEFAULT)).commit().get()
}
It checkes if the key foo
exists, if it does, it gets the
value or else it creates foo
with the value bar
.
if(test.get.isSucceeded)
println(s"Key `foo` exists and the value is: ${test.get.getGetResponses.get(0).getKvs.get(0).getValue}")
else
println("Key `foo` does not exists, creating new one with value `bar`")
client.close()
Before we can check this code we need a running etcd
cluster. To do that, run the following docker command:
docker run --rm -d -p 2379:2379 -p 2380:2380 --name etcd quay.io/coreos/etcd:v3.2.9 /usr/local/bin/etcd --data-dir=/etcd-data --name node1 --initial-advertise-peer-urls http://127.0.0.1:2380 --listen-peer-urls http://0.0.0.0:2380 --advertise-client-urls http://127.0.0.1:2379 --listen-client-urls http://0.0.0.0:2379 --initial-cluster node1=http://127.0.0.1:2380
Back to our scala code. As anybody might rightfully assume,
our first run will produce the following output:
Key foo does not exists, creating new one with value bar
.
But what happens if we try to run the code again?
Unfortunatelly the results are less predictable:
Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at Main$.delayedEndpoint$Main$1(Main.scala:21)
at Main$delayedInit$body.apply(Main.scala:9)
at scala.Function0.apply$mcV$sp(Function0.scala:34)
at scala.Function0.apply$mcV$sp$(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App.$anonfun$main$1$adapted(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:389)
at scala.App.main(App.scala:76)
at scala.App.main$(App.scala:74)
at Main$.main(Main.scala:9)
at Main.main(Main.scala)
Let's take a closer look:
test.get.getGetResponses.get(0).getKvs.get(0).getValue
.
The test variable holds a Try[TxnResponse]
. The
TxnResponse
can contain three different responses:
- PutResponse
- GetResponse
- DeleteResponse
In our case we have one GetResponse
, because inside the
transaction
(Then(Op.get(ByteSequence.fromString("foo"), GetOption.DEFAULT))
)
we issued one get operation. But, of course, put, get and
delete operations can be mixed into one transaction. So to
get back every GetResponse
you have to call
getGetResponses
. Now take a closer look at the jetcd API
implementation, especially the
TxnResponse.java
file:
/**
* returns a list of GetResponse; empty list if none.
*/
public synchronized List<GetResponse> getGetResponses() {
if (getResponses == null) {
getResponses = getResponse().getResponsesList().stream()
.filter((responseOp) -> responseOp.getResponseCase() != RESPONSE_RANGE)
.map(responseOp -> new GetResponse(responseOp.getResponseRange()))
.collect(Collectors.toList());
}
return getResponses;
}
In the exception above, we clearly received an empty list.
How did this happen? It turns out that this method's
implementation has errors, because it cannot return anything
but an empty list. Moreover, getPutResponses
and
getDeleteResponses
are also affected by this bug. The bug
is inside the filter call, RESPONSE_RANGE
is an enum which
is used to identify the various responses. It transpires
that the GetResponse
corresponds to RESPONSE_RANGE
, so,
instead of !=
, ==
is needed.
Luckily, this bug was fixed on the master
branch by this
PR. We assume
there will be a new release, soon. However, until then, if
Txn
is essential for your work, the master
branch needs
to be built locally with mvn clean install
, and a local
maven repo needs to be referred on your project.