By Matt Bruce & Bruno Deszczynski
Driving down the amount of time data scientists are waiting for query results is a critical focus (and necessity) for every company with a large data lake. However, handling and analyzing high-volume data within split seconds is complicated. One of the biggest hurdles to speed is whether you have the proper infrastructure in place to efficiently store and query your data.
At Shopify, we use Trino to provide our data scientists with quick access to our data lake, via an industry standard SQL interface that joins and aggregates data across heterogeneous data sources. However, our data has scaled to the point where we’re handling 15 Gbps and over 300 million rows of data per second. With this volume, greater pressure was put on our Trino infrastructure, leading to slower query execution times and operational problems. We’ll discuss how we scaled our interactive query infrastructure to handle the rapid growth of our datasets, while enabling a query execution time of less than five seconds.
Our Interactive Query Infrastructure
At Shopify, we use Trino and multiple client apps as our main interactive query tooling, where the client apps are the interface and Trino is the query engine. Trino is a distributed SQL query engine. It’s designed to query large data sets distributed over heterogeneous data sources. The main reason we chose Trino is that it gives you optionality in the case of database engine use. However, it’s important to note that Trino isn’t a database itself, as it’s lacking the storage component. Rather, it's optimized to perform queries across one or more large data sources.
Our architecture consists of two main Trino clusters:
- Scheduled cluster: runs reports from Interactive Analytics apps configured on a fixed schedule.
- Adhoc cluster: runs any on-demand queries and reports, including queries from our experiments platform.
We use a fork of Lyft’s Trino Gateway to route queries to the appropriate cluster by inspecting header information in the query. Each of the Trino clusters runs on top of Kubernetes (Google GKE) which allows us to scale the clusters and perform blue-green deployments easily.
While our Trino deployment managed to process an admirable amount of data, our users had to deal with inconsistent query times depending on the load of the cluster, and occasionally situations where the cluster became so bogged down that almost no queries could complete. We had to get to work to identify what was causing these slow queries, and speed up Trino for our users.
The Problem
When it comes to querying data, Shopify data scientists (rightfully) expect to get results within seconds. However, we encounter scenarios like interactive analytics, A/B testing (experiments), and reporting all in one place. In order to improve our query execution times, we focused on speeding up Trino, as it enables a larger portion of optimization to the final performance of queries executed via any SQL client software.
We wanted to achieve a query latency of P95 less than five seconds, which would be a significant decrease (approximately 30 times). That was a very ambitious target as approximately five percent of our queries were running around one to five minutes. To achieve this we started by analyzing these factors:
- Query volumes
- Most often queried datasets
- Queries consuming most CPU wall time
- Datasets that are consuming the most resources
- Failure scenarios.
When analyzing the factors above, we discovered that it’s not necessarily the query volume itself that was driving our performance problems. We noticed a correlation between certain types of queries and datasets consuming the most resources that was creating a lot of error scenarios for us. So we decided to zoom in and look into the errors.
We started looking at error classes in particular:
It can be observed that our resource relevant error rate (related to exceeding resource use) was around 0.35 percent, which was acceptable due to the load profile that was executed against Trino. What was most interesting for us was the ability to identify the queries that were timing out or causing a degradation in the performance of our Trino cluster. At first it was hard for us to properly debug our load specific problems, as we couldn’t recreate the state of Trino during the performance degradation scenarios. So, we created a Trino Query Replicator that allowed us to recreate any load from the past.
Recreating the state of Trino during performance degradation scenarios enabled us to drill down deeper on the classes of errors, and identify that the majority of our problems were related to:
- Storage type: especially compressed JSON format of messages coming from Kafka.
- Cluster Classes: using the ad-hoc server for everything, and not just what was scheduled.
- CPU & Memory allocation: both on the coordinator and workers. We needed to scale up together with the number of queries and data.
- JVM settings: we needed to tune our virtual machine options.
- Dataset statistics: allowing for better query execution via cost based optimization available in Trino.
While we could write a full book diving into each problem, for this post we’ll focus on how we addressed problems related to JVM settings, CPU and Memory allocation, and cluster classes.
The Solution
In order to improve Trino query execution times and reduce the number of errors caused by timeouts and insufficient resources, we first tried to “money scale” the current setup. By “money scale” we mean we scaled our infrastructure horizontally and vertically. We doubled the size of our worker pods to 61 cores and 220GB memory, while also increasing the number of workers we were running. Unfortunately, this alone didn’t yield stable results. For that reason, we dug deeper into the query execution logs, stack-traces, Trino codebase, and consulted Trino creators. From this exploration, we discovered that we could try the following:
- Creating separate clusters for applications with predictable heavy compute requirements.
- Lowering the number of concurrent queries to reduce coordinator lock contention.
- Ensuring the recommended JVM recompilation settings are applied.
- Limiting the maximum number of drivers per query task to prevent compute starvation.
Workload Specific Clusters
As outlined above, we initially had two Trino clusters: a Scheduled cluster and an Adhoc cluster. The shared cluster for user's ad hoc queries and the experiment queries was causing frustrations on both sides. The experiment queries were adding a lot of excess load causing user's queries to have inconsistent query times. A query that might take seconds to run could take minutes if there were experiment queries running. Correspondingly, the user's queries were making the runtime for the experiments queries unpredictable. To make Trino better for everyone, we added a new cluster just for the experiments queries, leveraging our existing deployment of Trino Gateway to route experiments queries there based on a HTTP header.
We also took this opportunity to write some tooling that allows users to create their own ephemeral clusters for temporary heavy-duty processing, or investigations with a single command (these are torn down automatically by an Airflow job after a defined TTL).
Lock Contention
After exhausting the conventional scaling up options, we moved onto the most urgent problem: when the Trino cluster overloaded and work wasn’t progressing, what was happening? By analyzing metrics output to Datadog, we were able to identify a few situations that would arise.One problem we identified was that the Trino cluster’s queued work would continue to increase, but no queries or splits were being dispatched. In this situation, we noticed that the Trino coordinator (the server that handles incoming queries) was running, but it stopped outputting metrics for minutes at a time. We originally assumed that this was due to CPU load on the coordinator (those metrics were also unavailable). However, after logging into the coordinator’s host and looking at the CPU usage, we saw that the coordinator wasn’t busy enough that it shouldn’t be able to report statistics. We proceeded to capture and analyze multiple stack traces and determined that the issue wasn’t an overloaded CPU, but lock contention against the Internal Resource Group object from all the active queries and tasks.
We set hardConcurrencyLimit to 60 in our root resource group to limit the number of running parallel queries and reduce the lock contention on the coordinator.
"rootGroups": [
{
"hardConcurrencyLimit": "60",
Resource group configuration
This setting is a bit of a balancing act between allowing enough queries to run to fully utilize the cluster, and capping the amount running to limit the lock contention on the coordinator.
JVM Recompilation Settings
After the coordinator lock contention was reduced, we noticed that we would have a reasonable number of running queries, but the cluster throughput would still be lower than expected. This caused queries to eventually start queuing up. Datadog metrics showed that a single worker’s CPU was running at 100%, but most of the others were basically idle.
We investigated this behaviour by doing some profiling of the Trino process with jvisualvm while the issue was occurring. What we found was that almost all the CPU time was spent either:
- Doing GCM AES decryption of the data coming from GCS.
- JSON deserialization of that data.
What was curious to us is that the datasets the affected workers were processing were no different than any of the other workers. Why were these using more CPU time to do the same work?After some trial and error, we found setting the following JVM options prevented our users from being put in this state:
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
JVM settings
It’s worth noting that these settings were added to the recommended JVM options in a later version of Trino than we were running at the time. There’s a good discussion about those settings in the trino GitHub repo! It seems that we were hitting a condition that was causing the JVM to no longer attempt compilation of some methods, which caused them to run in the JVM interpreter rather than as compiled code which is much, much slower.
In the graph below, the CPU of the workers is more aligned without the ‘long tail’ of the single worker running at 100 percent.
Amount of Splits Per Stage Per Worker
In the process of investigating the performance of queries, we happened to come across an interesting query via the Trino Web UI:
What we found was one query had a massive number of running splits: approximately 29,000. This was interesting because, at that time, our cluster only had 18,000 available worker threads, and our Datadog graphs showed a maximum of 18,000 concurrent running splits. We’ll chalk that up to an artifact of the WebUI. Doing some testing with this query, we discovered that a single query could monopolize the entire Trino cluster, starving out all the other queries.After hunting around the Slack and forum archives, we came across an undocumented configuration option: `task.max-drivers-per-task`. This configuration enabled us to limit the maximum number of splits that can be scheduled per stage, per query, per worker. We set this to 16, which limited this query to around 7,200 active splits.
The Results and What’s Next
Without leveraging the storage upgrade and by tapping into cluster node sizing, cluster classes, Trino configs, and JVM tuning, we managed to bring down our execution latency to 30 seconds and provide a stable environment for our users. The below charts present the final outcome:
The changes in the distribution of queries being run within certain bins shows that we managed to move more queries into the zero to five second bucket and (most importantly) limited the time that the heaviest queries were executed at. Our execution time trendline speaks for itself, and as we’re writing this blog, we hit less than 30 seconds with P95 query execution time.
By creating separate clusters, lowering the number of concurrent queries, ensuring the recommended JVM recompilation setting were applied, and limiting the maximum number of drivers per query task, we were able to scale our interactive query infrastructure.
While addressing the infrastructure was an important step to speed up our query execution, it’s not our only step. We still think there is room for improvement and are working to make Trino our primary interactive query engine. We’re planning to put further efforts into:
- Making our storage more performant (JSON -> Parquet).
- Introducing a Alluxio Cache layer.
- Creating a load profiling tooling.
- Enhancing our statistics to improve the ability of the Trino query optimizer to choose the most optimal query execution strategy, not just the overall performance of user queries.
- Improving our Trino Gateway by rolling out Shopify Trino Conductor (a Shopify specific gateway), improving UI/infrastructure, and introducing weighted query routing.
Matt Bruce: Matt is a four-year veteran at Shopify serving as Staff Data Developer for the Foundations and Orchestration team. He’s previously helped launch many open source projects in Shopify including Apache Druid and Apache Airflow, as well as migrating Shopify’s Hadoop and Presto infrastructure from physical Data centers into cloud based services.
Bruno Deszczynski: Bruno is a Data Platform EPM working with the Foundations team. He is obsessed with making Trino execute interactive analytics queries (P95) below five seconds in Shopify.
Wherever you are, your next journey starts here! If building systems from the ground up to solve real-world problems interests you, our Engineering blog has stories about other challenges we have encountered. Intrigued? Visit our Data Science & Engineering career page to find out about our open positions. Learn about how we’re hiring to design the future together—a future that is Digital by Design.