Autoscaling Kinesis Data Streams in Epsagon
Large-scale data processing applications usually consume streams of continuously generated data. Use cases of data streaming can be found in every industry, from monitoring vehicle sensors data to analyzing in-game player interactions. AWS Kinesis data streams is a managed streaming data service that supports the collection and processing of large streams of data in real-time.
The main issue we had with Kinesis at Epsagon is incorporating it into our autoscaling architecture. Until not long ago, Kinesis data streams didn’t have autoscaling capabilities, which required us to develop an in-house solution. After the introduction of the native on-demand mode (more on that later), we explored it as a possible replacement for our solution. We concluded that although it solves all of our requirements from a technical perspective, using it in a system with hundreds of streams can get very expensive.
The following proposed solution aims to solve this cost issue by creating an autoscaling service that every stream in the architecture can subscribe to in order to autoscale when needed.
Capacity Mode: Provisioned vs. On-Demand
At the end of November 2021, AWS announced Amazon Kinesis Data Streams On-Demand. Instead of handling the mechanics of manually scaling a stream, AWS automatically adjusts the stream to accommodate the required throughput.
As with most AWS services, native on-demand autoscaling mode cost more than using a provisioned mode. How much more? A stream ingesting 1000 records per second with an average record size of 3KB will cost $69.64 in provisioned mode while costing as much as $918.34 in on-demand mode (monthly). The cost impact on large systems with many streams can be significant.
On-demand mode is a good choice when the workload is unknown, the traffic is unpredictable, or when manually managing capacity for the stream is not possible or scaleable. If the traffic is somewhat predictable but highly fluctuating, it might be cheaper to provision a much higher capacity than needed instead of using on-demand mode.
Scaling a Stream
Before we deep dive into implementing autoscaling for streams, we should first understand how to scale a Kinesis data stream. A Kinesis data stream is no more than a set of shards. Each shard is a sequence of data records of fixed capacity. Therefore, the maximum throughput of a stream is the sum of its shards’ throughput. A single shard can ingest up to 1MB of data per second or 1K records per second for writes and supports a read throughput of up to 2MB per second. For example, a stream with ten shards can support a write throughput of up to 10MB / 10K records per second and a read throughput of 20MB per second. When discussing scaling a stream, we mean increasing or decreasing the number of shards allocated to the stream.
Resharding is the operation done to adjust the number of shards according to changes in the rate data flows through the stream. Resharding is a pairwise operation. A single shard can be split into two shards (increases the throughput), and two shards can be merged into a single shard (decreases the throughput). For example, to increase the number of shards in a stream from 1 to 4, we must perform three split operations.
Splitting a shard results in 1 parent shard and two child shards. Similarly, merging shards results in 2 parent shards and one child shard. During the resharding operation, a parent shard state transitions from OPEN – records can be added and retrieved, to CLOSED – records can only be retrieved, and finally, to EXPIRED – records can not be added or retrieved. As long as a parent shard is not expired, it still counts as an active shard in terms of billing.
Figure 1: Resharding
AWS provides us with two ways to reshard a stream:
- SplitShard / MergeShards – these two API operations are used to scale the number of shards in a stream by manipulating specific shards. This is particularly useful when dealing with hot/cold shards: shards that are receiving more data than expected can be selectively split while shards that are receiving less data than expected can be merged.
- UpdateShardCount – this API allows us to increase or decrease the number of shards in a stream without specifying specific shards. Behind the scenes, AWS performs the split and merge operations on our behalf. Although it is more straightforward than handling specific shards, it also has drawbacks. The API doesn’t consider hot and cold shards, i.e., it will scale out less active shards. Scaling with the API also has cost implications. It only supports uniform scaling, meaning scaling to a number that is not precisely double or half the existing shard count will create temporary shards.
The split/merge operations approach gives us fine-grained control over the scaling process. The downside of this approach is the complexity of the implementation – to target particular shards for the scaling process, we would have to monitor every single shard separately, which adds a lot of overhead.
The UpdateShardCount API solves most of the split/merge approach problems, but it introduces a few new ones. The first issue is an API limitation – a maximum of 10 scaling operations in a rolling 24-hour window. The second issue is how the API executes the operations. Currently, the API only supports uniform scaling, which means many temporary shards might be created during the scaling process. Temporary shards are billed until they expire, depending on the stream retention configuration.
As most of our Kinesis data streams have even partition key distribution (i.e., no hot/cold shards), we will use the UpdateShardCount API approach for our solution. With few adjustments to the scaling execution logic, the same architecture can implement the split/merge approach.
Now that we are familiar with Kinesis and its APIs, we can develop a solution to automate the scaling process.
Figure 2: Solution Architecture
The solution consists of 3 parts: triggers, delivery, and execution.
- Scaling alarms – Defined for each stream subscribed to the autoscaling service. Triggers the scaling operation by publishing to the relevant autoscaling service SNS topic.
- Scaling topics – Exported by the autoscaling service and used as the entry point to the service. Invokes the relevant scaling lambda when receiving a message from the scaling alarms.
- Scaling lambdas – Calculates the target shard count, updates the stream and alarms according to it, and writes a result log to a DynamoDB table.
Triggers: Scaling Alarms
CloudWatch alarms allow us to respond quickly in an event-driven way to activity changes in the monitored stream based on metrics sent every minute.
AWS provides stream-level metrics free of charge, sending data every minute. As write limitations caused most of the scalability issues we faced with Kinesis data streams, we focused on write-related metrics, although the same can be applied to read metrics.
IncomingBytes – The number of bytes successfully put to the Kinesis stream over the specified period.
IncomingRecords – The number of records successfully put to the Kinesis stream over the specified period.
As our focus is on autoscaling caused by write limitations, we will define a formula that calculates the maximum write usage of the stream by factoring all the possible write limitations and considering the current shard count of the stream. This formula should allow us to set our scaling thresholds based on the actual stream usage.
The CW metrics formula calculates the incoming bytes and incoming records usage factors as a number between 0 and 1, then use the higher usage factor to create a new time series that indicates the usage percentage of the stream at a given moment. We will use this newly created time series for defining our scaling alarms thresholds.
Figure 3: Alarms Metrics Configuration
shardCount – The current shard count of the stream. After each scaling operation, this will have to be updated to stay in sync with the stream shard count.
periodMinutes – The number of minutes to use for each datapoint aggregation. For example, when using a 5 minutes aggregation, a usage factor of 1 for each shard in terms of records would be 1000 (a single shard records limit per second) 60 (seconds in a minute) 5 (metrics aggregation period).
incoming[Bytes/Records] – Data written to the stream, aggregated by the periodMinutes variable. 5 minute period is used by default to avoid temporary traffic spikes.
incoming[Bytes/Records]FilledWithZeroForMissingDataPoints – Fills missing metric data points with 0 before performing math operations on them to ensure a valid result.
incoming[Bytes/Records]UsageFactor – A number between 0 and 1 represents the used stream throughput in terms of the specific metric. A single shard can ingest up to 1MB of data per second or 1K records.
maxIncomingUsageFactor – A number between 0 and 1 that represents the maximum usage factor of the stream in terms of write throughput at a given period.
The formula defined above helps us understand the throughput usage of the stream. With it, we should determine the upper and lower limits for the scaling process.
For the scale-up alarm, we will define the upper usage factor limit as 0.75 with a timeframe of 5 minutes. That means that the scale-up process should start whenever the usage factor increases above 0.75 in a 5 minutes period (i.e., a single data point).
The scale-down alarm threshold will be defined as a 0.25 usage factor with a timeframe of 24 hours. A 24 hours timeframe is used to avoid frequent scale downs, which might create temporary shards and eat up the UpdateShardCount API operations (10 operations in a 24-hour rolling window). This quota might be needed for more critical scale-up operations. The scale-down process should start whenever the usage factor decreases below 0.25 and remains below it for 24 hours.
Delivery: Scaling Topics
CloudWatch alarms have a limited set of integrations to AWS services as notifications destinations. SNS topic is the only reasonable target for a CW alarm to forward a message to other services. In our case, from the scaling trigger alarm to the scaling execution lambda.
Two SNS topics should be created inside our autoscaling service, one for the scale-up alarm and one for the scale-down alarm. Both topics should be exported by the service and used as targets for each subscribed stream alarm.
Execution: Scaling Lambdas
When a scaling lambda is invoked, we already know that an alarm was triggered and which scaling operation should be done. We also know the name of the stream that should be scaled. With that info, we can execute the scaling process.
Both the scale-up and scale-down lambdas operates in a similar way:
- Calculates the target shard count of the stream.
- Scales the stream using the UpdateShardCount API.
- Updates the stream scaling alarms according to the new shard count.
- Writes a scale operation log to a DynamoDB table.
The main difference between the lambdas is the target shard count calculation which depends on the scaling strategy used.
When the scale-up alarm hits the threshold and triggers the scaling lambda, it is still unknown how long the increased load will last and the optimal shard count to accommodate the increasing load.
When calculating the target shard count of the scaling operation, we have to keep in mind some of the limitations of the UpdateShardCount API:
- The API is limited to a maximum of 10 operations in a 24-hour rolling window. Scaling too little might not leave us enough room to scale up to a higher shard count later in that window.
- UpdateShardCount only supports uniform scaling, which means that the first step of scaling up is splitting all the existing shards in the stream. That means that scaling to any number below 100% of the current shard count will create temporary shards. In large streams, the temporary shards can become costly.
We have to scale enough to prevent continued scale-up operations, which would consume our operations quota and create a lot of temporary shards, but at the same time not to scale too much and create shards unnecessarily. A good way to achieve both objectives is to perform the scaling operation in tiers. The tiers can be adjusted according to the organization’s needs. A good starting point can be:
- current_shards ≤ 3 – 100%
- current_shards ≤ 25 – 75%
- current_shards ≤ 50 – 150%
- current_shards > 50 – 125%
25% Intervals are used for faster scaling operation, as the UpdateShardCount API documentation suggests. Additionally, a stream can’t scale up to more than 100% of its current shard count.
According to the suggested tiers, if a stream with two shards triggers a scale-up process, it would be scaled to 4. Similarly, a stream with 40 shards would be scaled to 60 shards to avoid creating too many shards.
As a reminder, in the alarm thresholds section, we defined that the scale-down process should be triggered when the stream write throughput in the last 24 hours was steadily below 25% usage (0.25 usage factor).
As opposed to the scale-up process, which requires us to predict future traffic, the scale-down process is done after we know the actual usage factor of the stream. That means that we can scale the shard count of the stream accurately according to the required throughput.
To calculate the target shard count, we can query CloudWatch metrics with the same query we used to define the scaling alarms. This should give us 288 data points (as the alarm metrics are aggregated by 5 minutes over the last 24 hours). From the returned data points, we should pick the maximum value, i.e., the maximum usage factor of the stream in the last 24 hours. Multiplying that number by the current shard count should give us the maximum number of shards that were required to handle the load in the last 24 hours. In other words, if the result were the actual shard count in the stream, the usage factor would have been 1 (maximum throughput) at that data point. By multiplying that value by 2, we are essentially setting the maximum usage factor to 0.5, which is the “sweet spot” in terms of shards utilization for us, enough to justify the shard count in terms of cost but also to give us some extra room for handling more traffic. It might be simpler to put it into code:
def get_target_shard_count(current_shard_count: int) -> int: """ Calculates the scale-down operation target shard count. This is done by querying for the max usage factor and calculating the shard count that will result in a usage factor of 0.5 (50%) at the end of the scaling operation. :param current_shard_count: the current shard count of the stream :return: the shard count the stream should scale to """ max_usage_factor = get_max_usage_factor(current_shard_count) used_shard_count = current_shard_count * max_usage_factor target_shard_count = math.ceil(used_shard_count * 2) min_possible_shard_count = math.ceil(current_shard_count / 2) return max(min_possible_shard_count, target_shard_count)
This sums up the basic implementation of the autoscaling solution. Some extra features can be added according to the organization’s needs. For example, at Epsagon, we’ve added the ability to specify minimum/maximum shard count per stream to avoid possible issues like autoscaling a stream to the point that other components in the architecture might fail due to increased load.
Another idea is to send autoscaling notifications to your preferred communication platform. A Slack integration can help teams track and discuss changes in streams throughput.
Figure 4: Slack Integration
Conclusion and Open Source Project
Data throughput is often unpredictable. An architecture that is limited in its ability to scale automatically can cost us more due to over-provisioning or, worse, make us lose important data.
This solution provides an event-driven architecture that is flexible and relatively easy to implement. It should reduce the cost of the subscribed streams and ensure they can handle increased traffic.
Reading about a solution is one thing. Implementing it is a whole different thing. Therefore, I’ve started an open-source project that implements this solution as a deployable CloudFormation stack. Contributions to the project are welcomed. You can also fork and adjust it to your specific use cases.