7 Tips For Optimizing Apache Flink Applications

 by: Yaroslav Tkachenko, Kevin Lam, and Rafael Aguiar

At Shopify, we’ve adopted Apache Flink as a standard stateful streaming engine that powers a variety of use cases like our BFCM Live Map. Our Flink applications are deployed in a Kubernetes environment leveraging Google Kubernetes Engine. Our clusters are configured to use High Availability mode to avoid the Job Manager being the single point of failure. We also use RocksDB state backend and write our checkpoints and savepoints to Google Cloud Storage (GCS).

Making sure our Flink applications stay performant and resilient is one of our top priorities. It’s also one of our biggest challenges. Keeping large stateful applications resilient is difficult. Some data models require storing immense state (for example, 13 TB for the sales data, as we shared in our Storing State Forever: Why It Can Be Good For Your Analytics talk) and we’ve spent a lot of time on performance tuning, learning many lessons along the way.

Below we’ll walk you through key lessons for optimizing large stateful Apache Flink applications. We’ll start off by covering recommended tooling, then focus on performance and resiliency aspects.

1. Find the Right Profiling Tools

First things first. Having the right profiling tools on hand are key to getting insights into how to solve a performance problem. While deploying our first applications, we found that using the below set of tools were useful when debugging Flink:

  • Async-profiler: a profiling tool built for the Java Virtual Machine (JVM) that’s used to trace many kinds of events including CPU cycles, Java Heap allocations, and performance counters like cache misses and page faults. Its support for flame graphs is especially useful for inspecting where your Task Managers are spending their time. This tool has been particularly useful for helping us debug the performance difference Kryo serialization can make.
  • VisualVM: another JVM profiling tool. You can connect this tool to running JVM instances to view heap allocations and CPU usage live. It’s useful for interactive debugging. We frequently use it as an initial tool for investigating memory issues. It has a friendly UI and it doesn’t need a lot of setup.
  • jemalloc + jeprof: a general purpose malloc implementation that’s adopted as a default memory Flink allocator starting in version 1.12. jeprof is a profiler that works with jemalloc. Combined you can set up your Task Managers and Job Managers to automatically dump memory profiles that you can then analyze with jeprof. We found this useful for observing memory trends over longer periods of time, helping us detect memory leaks in RocksDB for one of our applications.
  • Eclipse Memory Analyzer (Eclipse MAT): Eclipse MAT is a Java heap analyzer used to inspect JVM heap dumps for memory utilization, finding memory leaks, etc. It can be used to read heap dumps outputted by jemalloc to provide an additional layer of analysis and interpretation. This tool proved extremely useful when we needed to investigate one of our applications facing out of memory issues with the GCS file sink, which we describe below.

These tools can be universally used with any Flink application, but they’re by no means the only tools—just the ones that have worked for us. JVM has a rich ecosystem of profiling tools that are worth investigating from basic built-in commands like jmap to modern advanced capabilities like the Java Flight Recorder.

2. Avoid Kryo Serialization

Flink provides a variety of different serializers for the data structures it uses. Most of the time we use Scala’s case classes or Avro records that are supported in Flink. They may not be the most performant choices, but they’re great for developer experience.

When Flink fails to serialize a record using built-in case class or Avro serializers, it falls back to Kryo serialization. Kryo serialization is slow, much slower than other data types you’d typically use. You actually don’t notice this performance degradation until you use a tool like async-profiler. For example, when we used async-profiler for debugging unrelated performance issues, we observed just how much space Kryo classes were occupying in the memory flamegraph. We disabled fallback to Kryo (`env.getConfig().disableGenericTypes();`) that surfaced various serialization failures leading to the fallback. Below are some examples we encountered and how we fixed them:

  • Scala’s BigDecimal. Flink doesn’t support serializing Scala’s BigDecimal values, but it can serialize Java ones. Default to using Java’s BigDecimal to avoid this instance of serializer failure. You’ll probably face this issue when you deal with monetary values.
  • Scala ADTs. Flink doesn’t support serializing Scala ADTs implemented with a sealed trait and a few case objects, typically representing an enum-like data structure. However, it does support Scala enums, so you can use those instead.

After fixing all these issues we noticed a 20 percent throughput increase. You can follow the same approach: disable Kryo fallback and fix the issues that pop up until Flink doesn’t use Kryo anymore.

3. Tune Configuration Depending on the Workload

Flink provides a myriad of options when it comes to configuration, but tuning really depends on the state and load of your application. For example, at Shopify, a typical streaming pipeline can be subject to different system load profiles, specifically:

  • Backfill: starting from the beginning of time and consuming all the historical messages available in the input sources until the pipeline catches up with the present time (that is, source lag is near zero).
  • Steady state: the pipeline is consuming near real-time messages and source lag is minimal (that is, seconds).
  • Extreme or seasonal event: the pipeline is consuming near real-time messages, but resource usage is spiky and lag may be increased. An example of this for Shopify is high volume sales moments like flash sales and Black Friday.

Let’s focus on the first two profiles as they define the key operational modes of our pipeline. During the start of a backfill, the pipeline backlog is at its maximum size, whereas, during steady state, the pipeline backlog is minimal. We want the backfill to catch up with the present as quickly as possible, to reduce the time cost of tasks and code changes that require reprocessing all the data from the beginning.

The large data volume and the speed criteria makes the backfill a challenging and computationally intense undertaking. For some large applications this could mean processing tens of billions of messages in a couple of hours. That’s why we tune our pipeline to prioritize throughput over freshness. On the other hand, for steady state, we tune our applications to run with very low latency and maximize the freshness of all the outputs. This results in two different configuration profiles that are set depending on the current state of an application.

While you need to consider your system load and how it influences your tuning, below are a few considerations that can be applied to both load profiles:

  • Input Source Partitions (for example, Kafka partitions). During steady state the backlog is minimal, so a small number of input partitions will likely provide sufficient parallelism for a healthy pipeline with minimal lag. However, during a backfill it's incredibly important to make sure that your pipeline can get as much throughput from the input sources as possible—the more input partitions, the more throughput. So, be sure to take into consideration the backfilling scenario when you create your sources.
  • Back Pressure. Performance bottlenecks can cause back pressure, when data is produced faster than the downstream operators can consume, to upstream operators. If your pipeline is healthy you’re unlikely to see back pressure at steady state. However, while backfilling, the pipeline bottlenecks will become evident (colored red in the job graph UI). Take this opportunity to identify slow stages of your pipeline and optimize them if possible.
  • Sink Throttling. Even if your application code is highly optimized, there’s still a chance that the pipeline can’t write to the sink as fast as you would want. The sink might not support many connections, or even if it does, it might just get overwhelmed with too many concurrent writes. When possible, scale up the resources for the sink (for example, add more nodes to the database or partitions to the Kafka topic), and when that isn’t on the table, consider reducing the parallelism of the sink or the number of outgoing connections.
  • Networking. Network buffers were first introduced to improve resource utilization and increase throughput at the cost of delaying messages in the buffer queue. In order to increase parallelism, you can add more Task Managers and provide more task slots. But note that often this requires an accompanying increase in the number of network buffers (via taskmanager.memory.network.fraction) if the pipeline graph is fairly complex and contains several shuffle operations.
  • Checkpointing. To reduce the time to recover from failure, it’s important to keep checkpoint frequency (execution.checkpointing.interval) high during steady state. However, during a backfill it’s better to reduce frequency and avoid the associated overhead. Depending on the size of the pipeline state, it might be necessary to tweak the Task Manager heap so that there’s enough memory for uploading files. Also, if state size is large, consider using incremental checkpoints (state.backend.incremental). Finally, look into increasing the checkpointing timeout (execution.checkpointing.timeout) if necessary.

For a list of other Flink deployment configurations that might be useful, consult the Flink docs.

4. Profile Heap

Flink provides a File Sink capable of writing files to a file system or an object store like HDFS, S3, or GCS (which Shopify uses). Configuring File Sink is pretty straightforward, but getting it to work efficiently and reliably can be tricky.

Flink’s File Sink maintains a list of partitions (or buckets) in memory. Each bucket is determined by a BucketAssigner. For example, a custom BucketAssigner can use a timestamp field in the provided record to generate a bucket that looks like date=2021-01-01. This is an extremely popular partition format used by Hive.

We configured a File Sink and naively added it to the existing DataStream:

val records: DataStream[Record] = … 
val fileSink: SinkFunction[Record] = …

This worked in tests, but when we deployed it to a real environment and tried to process all historical data during a backfill, we immediately observed memory issues. Our application would quickly use all available Java heap and crash! And it kept crashing even after we increased memory multiple times. We understood that some memory may be needed for buffering the records in buckets, but probably not tens of gigabytes.

So we took a few heap dumps when the application was almost ready to crash and analyzed them with Eclipse MAT. The results looked really concerning:

Eclipse MAT: Overview

In the above heap dump, you can see two big objects that occupy almost the whole heap. The dominator tree report clearly showed (as highlighted in red) two HashMaps that back File Sink buckets as violators:

Eclipse MAT: Dominator tree

After further exploring the heap dump and application logs we realized what was going on. As we didn’t apply any data reshuffling, each Task Manager consumed records that could end up in any bucket, meaning:

  • Each Task Manager needs to hold a large list of buckets in memory. We regularly observed over 500 buckets.
  • Due to the above, the time to roll and flush the files is significantly increased: there isn’t enough data on each Task Manager to do it quickly.

We can apply a simple solution to this problem—just key the records by the partition string before writing them to the sink:

By doing this, we guarantee that we’re routing records with the same partition or bucket to the same Task Manager, resulting in each Task Manager holding less things in memory and flushing the files faster. No more memory issues! And the heap dump analysis showed a 90% decrease in the number of active buckets per Task Manager.

This approach works well if the selected partition has a good distribution. But if some of your days have significantly more data than the others (could be expected when doing historical backfill) you could end up with a big skew, leading to memory issues. Slightly changing the partitioning to improve the distribution by adding hours to the partition key can be a good solution for this problem.

Data locality is an important aspect in distributed systems, as this experience clearly demonstrated. A simple technique of logically shuffling the data to achieve good data parallelism can be applied to other sinks and operators as well.

5. Use SSD for RocksDB Storage

Even though RocksDB—the most popular and recommended Flink state backend—keeps some data in memory, the majority of state is persisted on disk. So, very performant disks are required when dealing with large stateful applications. This isn’t immediately obvious, especially when just starting with Flink. For example, initially we started using Network File System (NFS) volumes for the RocksDB state backend when deploying applications with little state (for example, Kafka consumer offsets). We didn’t notice any performance problems, but benefited from additional resiliency provided by NFS.

However, there are a lot of resources online recommending fast disks like local SSDs, so we tried using one provided by GCP for one of our applications with over 8 terabytes of state. By using a local SSD, we noticed a speedup improvement of roughly ten times in processing rates due to increased disk I/O rates. At the same time, a local SSD in GCP can be lost if an instance goes down, but thanks to Flink checkpoints and savepoints, the state can be easily recovered

6. Avoid Dynamic Classloading

Flink has several ways in which it loads classes for use by Flink applications. From Debugging Classloading:

  • The Java Classpath: This is Java’s common classpath, and it includes the JDK libraries, and all code (the classes of Apache Flink and some dependencies) in Flink’s /lib folder.
  • The Flink Plugin Components: The plugins code folder resides in Flink’s /plugins folder. Flink’s plugin mechanism dynamically loads them once during startup.
  • Dynamic User Code: These are all classes that are included in the JAR files of dynamically submitted jobs (via REST, CLI, web UI). They are loaded (and unloaded) dynamically per job.”

Dynamic User Code is loaded at the start of every job, so leaks can happen if there are lingering references to old classes. Whenever a Flink application needs to recover from a transient failure, it restarts the job and recovers from the most recent checkpoint available and also reloads all Dynamic User Code.

Metaspace memory before and after disabling dynamic classloading

We observed memory leaks during these restarts, in the form of “java.lang.OutOfMemoryError: Metaspace” errors. As you can see in the screenshot above, every time an application restarts before the fix it increases the amount of used Metaspace memory.

Disabling dynamic classloading, by putting our application code on Java’s common classpath, resolved the issue. The screenshot above after the fix shows that the memory doesn’t increase with restarts.

This solution is applicable if you run Flink clusters in Application Mode, and don’t need to support running multiple jobs on a single Flink cluster.

7. Understand RocksDB Memory Usage

We also observed another memory-related issue that was very complex to debug that happened whenever we:

  • started one of the Flink applications with a lot of state
  • waited for at least an hour
  • manually terminated one of the Task Manager containers.

We expected a replacement Task Manager to be added to the cluster (thanks to the Kubernetes Deployment) and the application recovery to happen shortly after that. But instead, we noticed another one of our Task Managers crashing with an “Out of Memory” error, leading to a never-ending loop of crashes and restarts:

Memory usage of Flink containers with OOM errors

We confirmed that the issue only happens for an application with a lot of state that’s been running for at least an hour. We were surprised to realize that the “Out of Memory” errors didn’t come from the JVM—heap profiling using async-profiler and VisualVM didn’t show any problems. But still, something led to increased memory usage and eventually forced Kubernetes runtime to kill a pod violating its memory limits.

So we started to suspect that the RocksDB state backend was using a lot of native memory outside of JVM. We configured jemalloc to periodically write heap dumps to a local filesystem so we could analyze them with jeprof. We were able to capture a heap dump just before another “Out of Memory” error and confirmed RocksDB trying to allocate more memory than it was configured to use:

In this particular example, Flink Managed Memory was configured to use 5.90 GB, but the profile clearly shows 6.74 GB being used.

We found a relevant RocksDB issue that supports this claim: many users of the library reported various memory-related issues over the last three years. We followed one of the suggestions in the issue and tried to disable RocksDB block cache using a custom RocksDBOptionsFactory:

It worked! Now, even after killing a Task Manager we didn’t observe any memory issues:

Memory usage of Flink containers without OOM errors

Disabling RocksDB block cache didn’t affect performance. In fact, we only saw a difference during the time it takes to populate the cache. But there was no difference in performance between a Flink application with disabled RocksDB block cache and a Flink application with full RocksDB block cache. This also explains why we needed to wait in order to reproduce the issue: we were waiting for the block cache to fill. We later confirmed it by enabling the RocksDB Native Metrics.

And there you have it! Apache Flink is an incredibly powerful stream processing engine, however building complex applications with it brings performance and resiliency challenges that require some tuning and optimization work. We hope you enjoyed this whirlwind tour of Flink, and some of the lessons we learned.

Yaroslav Tkachenko is a Staff Data Engineer who works on the Streaming Capabilities team. Yaroslav has been building stream processing applications for many years and currently he is on the mission to make stream processing easy at Shopify. Follow Yaroslav on Twitter.

Kevin Lam works on the Streaming Capabilities team under Data Platform Engineering. He's focused on making stateful stream processing powerful and easy at Shopify. In his spare time he enjoys playing musical instruments, and trying out new recipes in the kitchen.

Rafael Aguiar is a Senior Data Engineer on the Insights Platform Capabilities team. He is interested in distributed systems and all-things large scale analytics. When he is not baking some homemade pizza he is probably lost outdoors. Follow him on Linkedin.

Wherever you are, your next journey starts here! If building systems from the ground up to solve real-world problems interests you, our Engineering blog has stories about other challenges we have encountered. Intrigued? Visit our Engineering career page to find out about our open positions and learn about Digital by Design.