By Kevin Lam and Rafael Aguiar
At Shopify, we’ve adopted Apache Flink as a standard stateful streaming engine that powers a variety of use cases. Earlier this year, we shared our tips for optimizing large stateful Flink applications. Below we’ll walk you through 3 more best practices.
1. Set the Right Parallelism
A Flink application consists of multiple tasks, including transformations (operators), data sources, and sinks. These tasks are split into several parallel instances for execution and data processing.
Parallelism refers to the parallel instances of a task and is a mechanism that enables you to scale in or out. It's one of the main contributing factors to application performance. Increasing parallelism allows an application to leverage more task slots, which can increase the overall throughput and performance.
Application parallelism can be configured in a few different ways, including:
- Operator level
- Execution environment level
- Client level
- System level
The configuration choice really depends on the specifics of your Flink application. For instance, if some operators in your application are known to be a bottleneck, you may want to only increase the parallelism for that bottleneck.
We recommend starting with a single execution environment level parallelism value and increasing it if needed. This is a good starting point as task slot sharing allows for better resource utilization. When I/O intensive subtasks block, non I/O subtasks can make use of the task manager resources.
A good rule to follow when identifying parallelism is:
The number of task managers multiplied by the number of tasks slots in each task manager must be equal (or slightly higher) to the highest parallelism value
For example, when using parallelism of 100 (either defined as a default execution environment level or at a specific operator level), you would need to run 25 task managers, assuming each task manager has four slots: 25 x 4 = 100.
2. Avoid Sink Bottlenecks
Data pipelines usually have one or more data sinks (destinations like Bigtable, Apache Kafka, and so on) which can sometimes become bottlenecks in your Flink application. For example, if your target Bigtable instance has high CPU utilization, it may start to affect your Flink application due to Flink being unable to keep up with the write traffic. You may not see any exceptions, but decreased throughput all the way to your sources. You’ll also see backpressure in the Flink UI.
When sinks are the bottleneck, the backpressure will propagate to all of its upstream dependencies, which could be your entire pipeline. You want to make sure that your sinks are never the bottleneck!
In cases where latency can be sacrificed a little, it’s useful to combat bottlenecks by first batch writing to the sink in favor of higher throughput. A batch write request is the process of collecting multiple events as a bundle and submitting those to the sink at once, rather than submitting one event at a time. Batch writes will often lead to better compression, lower network usage, and smaller CPU hit on the sinks. See Kafka’s batch.size property, and Bigtable’s bulk mutations for examples.
You’ll also want to check and fix any data skew. In the same Bigtable example, you may have heavily skewed keys which will affect a few of Bigtable’s hottest nodes. Flink uses keyed streams to scale out to nodes. The concept involves the events of a stream being partitioned according to a specific key. Flink then processes different partitions on different nodes.
KeyBy is frequently used to re-key a DataStream
in order to perform aggregation or a join. It’s very easy to use, but it can cause a lot of problems if the chosen key isn’t properly distributed. For example, at Shopify, if we were to choose a shop ID as our key, it wouldn’t be ideal. A shop ID is the identifier of a single merchant shop on our platform. Different shops have very different traffic, meaning some Flink task managers would be busy processing data, while the others would stay idle. This could easily lead to out-of-memory exceptions and other failures. Low cardinality IDs (< 100) are also problematic because it’s hard to distribute them properly amongst the task managers.
But what if you absolutely need to use a less than ideal key? Well, you can apply a bucketing technique:
- Choose a maximum number (start with a number smaller than or equal to the operator parallelism)
- Randomly generate a value between 0 and the max number
- Append it to your key before keyBy
By applying a bucketing technique, your processing logic is better distributed (up to the maximum number of additional buckets per key). However, you need to come up with a way to combine the results in the end. For instance, if after processing all your buckets you find the data volume is significantly reduced, you can keyBy the stream by your original “less than ideal” key without creating problematic data skew. Another approach could be to combine your results at query time, if your query engine supports it.
3. Use HybridSource
to Combine Heterogeneous Sources
Let’s say you need to abstract several heterogeneous data sources into one, with some ordering. For example, at Shopify a large number of our Flink applications read and write to Kafka. In order to save costs associated with storage, we enforce per-topic retention policies on all our Kafka topics. This means that after a certain period of time has elapsed, data is expired and removed from the Kafka topics. Since users may still care about this data after it’s expired, we support configuring Kafka topics to be archived. When a topic is archived, all Kafka data for that topic are copied to a cloud object storage for long-term storage. This ensures it’s not lost when the retention period elapses.
Now, what do we do if we need our Flink application to read all the data associated with a topic configured to be archived, for all time? Well, we could create two sources—one source for reading from the cloud storage archives, and one source for reading from the real-time Kafka topic. But this creates complexity. By doing this, our application would be reading from two points in event time simultaneously, from two different sources. On top of this, if we care about processing things in order, our Flink application has to explicitly implement application logic which handles that properly.
If you find yourself in a similar situation, don’t worry there’s a better way! You can use HybridSource
to make the archive and real-time data look like one logical source. Using HybridSource
, you can provide your users with a single source that first reads from the cloud storage archives for a topic, and then when the archives are exhausted, switches over automatically to the real-time Kafka topic. The application developer only sees a single logical DataStream
and they don’t have to think about any of the underlying machinery. They simply get to read the entire history of data.
Using HybridSource
to read cloud object storage data also means you can leverage a higher number of input partitions to increase read throughput. While one of our Kafka topics might be partitioned across tens or hundreds of partitions to support enough throughput for live data, our object storage datasets are typically partitioned across thousands of partitions per split (e.g. day) to accommodate for vast amounts of historical data. The superior object storage partitioning, when combined with enough task managers, will allow Flink to blaze through the historical data, dramatically reducing the backfill time when compared to reading the same amount of data straight from an inferiorly partitioned Kafka topic.
Here’s what creating a DataStream
using our HybridSource
powered KafkaBackfillSource
looks like in Scala:
In the code snippet, the KafkaBackfillSource
abstracts away the existence of the archive (which is inferred from the Kafka topic and cluster), so that the developer can think of everything as a single DataStream
.
HybridSource
is a very powerful construct and should definitely be considered if you need your Flink application to read several heterogeneous data sources in an ordered format.
And there you go! 3 more tips for optimizing large stateful Flink applications. We hope you enjoyed our key learnings and that they help you out when implementing your own Flink applications. If you’re looking for more tips and haven’t read our first blog, make sure to check them out here.
Kevin Lam works on the Streaming Capabilities team under Production Engineering. He's focused on making stateful stream processing powerful and easy at Shopify. In his spare time he enjoys playing musical instruments, and trying out new recipes in the kitchen.
Rafael Aguiar is a Senior Data Engineer on the Streaming Capabilities team. He is interested in distributed systems and all-things large scale analytics. When he is not baking some homemade pizza he is probably lost outdoors. Follow him on Linkedin.
Interested in tackling the complex problems of commerce and helping us scale our data platform? Join our team.