Shopify's Path to a Faster Trino Query Execution: Infrastructure

By Matt Bruce & Bruno Deszczynski

Driving down the amount of time data scientists are waiting for query results is a critical focus (and necessity) for every company with a large data lake. However, handling and analyzing high-volume data within split seconds is complicated. One of the biggest hurdles to speed is whether you have the proper infrastructure in place to efficiently store and query your data.

At Shopify, we use Trino to provide our data scientists with quick access to our data lake, via an industry standard SQL interface that joins and aggregates data across heterogeneous data sources. However, our data has scaled to the point where we’re handling 15 Gbps and over 300 million rows of data per second. With this volume, greater pressure was put on our Trino infrastructure, leading to slower query execution times and operational problems. We’ll discuss how we scaled our interactive query infrastructure to handle the rapid growth of our datasets, while enabling a query execution time of less than five seconds.

Our Interactive Query Infrastructure 

At Shopify, we use Trino and multiple client apps as our main interactive query tooling, where the client apps are the interface and Trino is the query engine. Trino is a distributed SQL query engine. It’s designed to query large data sets distributed over heterogeneous data sources. The main reason we chose Trino is that it gives you optionality in the case of database engine use. However, it’s important to note that Trino isn’t a database itself, as it’s lacking the storage component. Rather, it's optimized to perform queries across one or more large data sources.

Our architecture consists of two main Trino clusters:

  • Scheduled cluster: runs reports from Interactive Analytics apps configured on a fixed schedule.
  • Adhoc cluster:  runs any on-demand queries and reports, including queries from our experiments platform.

We use a fork of Lyft’s Trino Gateway to route queries to the appropriate cluster by inspecting header information in the query. Each of the Trino clusters runs on top of Kubernetes (Google GKE) which allows us to scale the clusters and perform blue-green deployments easily.

While our Trino deployment managed to process an admirable amount of data, our users had to deal with inconsistent query times depending on the load of the cluster, and occasionally situations where the cluster became so bogged down that almost no queries could complete. We had to get to work to identify what was causing these slow queries, and speed up Trino for our users.

The Problem 

When it comes to querying data, Shopify data scientists (rightfully) expect to get results within seconds. However, we encounter scenarios like interactive analytics, A/B testing (experiments), and reporting all in one place. In order to improve our query execution times, we focused on speeding up Trino, as it enables a larger portion of optimization to the final performance of queries executed via any SQL client software.

We wanted to achieve a query latency of P95 less than five seconds, which would be a significant decrease (approximately 30 times). That was a very ambitious target as approximately five percent of our queries were running around one to five minutes. To achieve this we started by analyzing these factors:

  • Query volumes
  • Most often queried datasets
  • Queries consuming most CPU wall time
  • Datasets that are consuming the most resources
  • Failure scenarios.

When analyzing the factors above, we discovered that it’s not necessarily the query volume itself that was driving our performance problems. We noticed a correlation between certain types of queries and datasets consuming the most resources that was creating a lot of error scenarios for us. So we decided to zoom in and look into the errors.

We started looking at error classes in particular:

A dashboard showing 0.44% Query Execution Failure rate and a 0.35% Highly relevant error rate. The dashboard includes a breakdown of the types of Presto errors.
Trino failure types breakdown

It can be observed that our resource relevant error rate (related to exceeding resource use) was around 0.35 percent, which was acceptable due to the load profile that was executed against Trino. What was most interesting for us was the ability to identify the queries that were timing out or causing a degradation in the performance of our Trino cluster. At first it was hard for us to properly debug our load specific problems, as we couldn’t recreate the state of Trino during the performance degradation scenarios. So, we created a Trino Query Replicator that allowed us to recreate any load from the past.

Recreating the state of Trino during performance degradation scenarios enabled us to drill down deeper on the classes of errors, and identify that the majority of our problems were related to:

  • Storage type: especially compressed JSON format of messages coming from Kafka.
  • Cluster Classes: using the ad-hoc server for everything, and not just what was scheduled.
  • CPU & Memory allocation: both on the coordinator and workers. We needed to scale up together with the number of queries and data.
  • JVM settings: we needed to tune our virtual machine options.
  • Dataset statistics: allowing for better query execution via cost based optimization available in Trino.

While we could write a full book diving into each problem, for this post we’ll focus on how we addressed problems related to JVM settings, CPU and Memory allocation, and cluster classes.

A line graph showing the P95 execution time over the month of December. The trend line shows that execution time was steadily increasing.
Our P95 Execution time and trend line charts before we fine tuned our infrastructure

The Solution

In order to improve Trino query execution times and reduce the number of errors caused by timeouts and insufficient resources, we first tried to “money scale” the current setup. By “money scale” we mean we scaled our infrastructure horizontally and vertically. We doubled the size of our worker pods to 61 cores and 220GB memory, while also increasing the number of workers we were running. Unfortunately, this alone didn’t yield stable results. For that reason, we dug deeper into the query execution logs, stack-traces, Trino codebase, and consulted Trino creators. From this exploration, we discovered that we could try the following:

  • Creating separate clusters for applications with predictable heavy compute requirements.
  • Lowering the number of concurrent queries to reduce coordinator lock contention.
  • Ensuring the recommended JVM recompilation settings are applied.
  • Limiting the maximum number of drivers per query task to prevent compute starvation.

Workload Specific Clusters

As outlined above, we initially had two Trino clusters: a Scheduled cluster and an Adhoc cluster. The shared cluster for user's ad hoc queries and the experiment queries was causing frustrations on both sides. The experiment queries were adding a lot of excess load causing user's queries to have inconsistent query times. A query that might take seconds to run could take minutes if there were experiment queries running. Correspondingly, the user's queries were making the runtime for the experiments queries unpredictable. To make Trino better for everyone, we added a new cluster just for the experiments queries, leveraging our existing deployment of Trino Gateway to route experiments queries there based on a HTTP header.

We also took this opportunity to write some tooling that allows users to create their own ephemeral clusters for temporary heavy-duty processing, or investigations with a single command (these are torn down automatically by an Airflow job after a defined TTL).

A system diagram showing the Trino infrastructure before changes. Mode and internal SQL clients feed into the Trino Gateway. The Gateway feeds into scheduled reports and adhoc queries.
Trino infrastructure before
A system diagram of the Trino infrastructure after changes. Mode and internal clients both feed into the Trino Gateway. The Gateway feeds into Scheduled Reports, Ad hoc queries, and experimental queries. In addition, the Internal SQL clients feed into Short-Term clusters
Trino infrastructure after

Lock Contention

After exhausting the conventional scaling up options, we moved onto the most urgent problem: when the Trino cluster overloaded and work wasn’t progressing, what was happening? By analyzing metrics output to Datadog, we were able to identify a few situations that would arise.One problem we identified was that the Trino cluster’s queued work would continue to increase, but no queries or splits were being dispatched. In this situation, we noticed that the Trino coordinator (the server that handles incoming queries) was running, but it stopped outputting metrics for minutes at a time. We originally assumed that this was due to CPU load on the coordinator (those metrics were also unavailable). However, after logging into the coordinator’s host and looking at the CPU usage, we saw that the coordinator wasn’t busy enough that it shouldn’t be able to report statistics. We proceeded to capture and analyze multiple stack traces and determined that the issue wasn’t an overloaded CPU, but lock contention against the Internal Resource Group object from all the active queries and tasks.

We set hardConcurrencyLimit to 60 in our root resource group to limit the number of running parallel queries and reduce the lock contention on the coordinator.

"rootGroups": [
    {
    "hardConcurrencyLimit": "60",

Resource group configuration

This setting is a bit of a balancing act between allowing enough queries to run to fully utilize the cluster, and capping the amount running to limit the lock contention on the coordinator.

A line graph showing Java Lang:System CPU load in percent over a period of 5 hours before the change. The graph highlights the spikes where metric dropouts happened six times.
Pre-change CPU graph showing Metrics dropouts due to lock contention
A line graph showing Java Lang:System CPU load in percent over a period of 5 hours after the change. The graph highlights there were no more metric dropouts.
Post change CPU graph showing no metrics dropouts

JVM Recompilation Settings

After the coordinator lock contention was reduced, we noticed that we would have a reasonable number of running queries, but the cluster throughput would still be lower than expected. This caused queries to eventually start queuing up. Datadog metrics showed that a single worker’s CPU was running at 100%, but most of the others were basically idle.

A line graph showing Java Lang:System CPU load by worker in percent over a period of 5 hours. It highlights that a single worker's CPU was running at 100%
CPU Load distribution by worker

We investigated this behaviour by doing some profiling of the Trino process with jvisualvm while the issue was occurring. What we found was that almost all the CPU time was spent either: 

  1. Doing GCM AES decryption of the data coming from GCS.
  2. JSON deserialization of that data.

What was curious to us is that the datasets the affected workers were processing were no different than any of the other workers. Why were these using more CPU time to do the same work?After some trial and error, we found setting the following JVM options prevented our users from being put in this state:

-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000

JVM settings

It’s worth noting that these settings were added to the recommended JVM options in a later version of Trino than we were running at the time. There’s a good discussion about those settings in the trino GitHub repo! It seems that we were hitting a condition that was causing the JVM to no longer attempt compilation of some methods, which caused them to run in the JVM interpreter rather than as compiled code which is much, much slower.

In the graph below, the CPU of the workers is more aligned without the ‘long tail’ of the single worker running at 100 percent.

A line graph showing
CPU Load distribution by worker

Amount of Splits Per Stage Per Worker

In the process of investigating the performance of queries, we happened to come across an interesting query via the Trino Web UI:

A screenshot showing the details displayed on the Trino WebUI. It includes query name, execution time, size, etc.
Trino WebUI query details

What we found was one query had a massive number of running splits: approximately 29,000. This was interesting because, at that time, our cluster only had 18,000 available worker threads, and our Datadog graphs showed a maximum of 18,000 concurrent running splits. We’ll chalk that up to an artifact of the WebUI. Doing some testing with this query, we discovered that a single query could monopolize the entire Trino cluster, starving out all the other queries.After hunting around the Slack and forum archives, we came across an undocumented configuration option: `task.max-drivers-per-task`. This configuration enabled us to limit the maximum number of splits that can be scheduled per stage, per query, per worker. We set this to 16, which limited this query to around 7,200 active splits.

The Results and What’s Next

Without leveraging the storage upgrade and by tapping into cluster node sizing, cluster classes, Trino configs, and JVM tuning, we managed to bring down our execution latency to 30 seconds and provide a stable environment for our users. The below charts present the final outcome:

A bar graph showing the large decrease in execution time before the change and after the change.
Using log scale binned results for execution time before and after
A line graph showing the P95 execution time over a 3 month period.  The trend line shows that execution time reduces.
P95 Execution time and trendline over 3 month period

The changes in the distribution of queries being run within certain bins shows that we managed to move more queries into the zero to five second bucket and (most importantly) limited the time that the heaviest queries were executed at. Our execution time trendline speaks for itself, and as we’re writing this blog, we hit less than 30 seconds with P95 query execution time.

By creating separate clusters, lowering the number of concurrent queries, ensuring the recommended JVM recompilation setting were applied, and limiting the maximum number of drivers per query task, we were able to scale our interactive query infrastructure. 

While addressing the infrastructure was an important step to speed up our query execution, it’s not our only step. We still think there is room for improvement and are working to make Trino our primary interactive query engine. We’re planning to put further efforts into:

  • Making our storage more performant (JSON -> Parquet).
  • Introducing a Alluxio Cache layer.
  • Creating a load profiling tooling.
  • Enhancing our statistics to improve the ability of the Trino query optimizer to choose the most optimal query execution strategy, not just the overall performance of user queries.
  • Improving our Trino Gateway by rolling out Shopify Trino Conductor (a Shopify specific gateway), improving UI/infrastructure, and introducing weighted query routing.

Matt Bruce: Matt is a four-year veteran at Shopify serving as Staff Data Developer for the Foundations and Orchestration team. He’s previously helped launch many open source projects in Shopify including Apache Druid and Apache Airflow, as well as migrating Shopify’s Hadoop and Presto infrastructure from physical Data centers into cloud based services.

Bruno Deszczynski: Bruno is a Data Platform EPM working with the Foundations team. He is obsessed with making Trino execute interactive analytics queries (P95) below five seconds in Shopify.