Authors: John Martin and Adam Bellemare
Shopify’s data warehouse has gone through many iterations since the company's founding in 2004. Since then, the data warehouse has evolved and grown into a data lake, compromised of multiple storage mechanisms, systems, and consumers. Along the way, Shopify has moved to the cloud and adopted open-source big data tools such as Apache Spark, Presto, and dbt.
At its conception, Shopify’s data warehouse was implemented specifically for internal analytics. As it has grown, the platform has become the e-commerce backbone for many of our one million+ merchants. Many of these merchants require insight and access to the vast amount of data generated by their business on the Shopify platform.
Problems emerged as our company grew. The data lake couldn’t satisfy the ever-growing demand for new and enhanced merchant analytics. These limitations were in part due to the relatively slow query times of our OLAP databases, the limited support for serving structured datasets from our data lake, and the slow batch nature of our ingest pipelines. Our business requirements saw us needing faster ingest and semi-structured data with low-latency query response times, which led to us building the dedicated Merchant Analytics Platform. This brings us to today’s data platform architecture:
Internal Data Warehouse
- Extract: Batch-style, scheduled jobs using query-based data extraction. Change-events from different Shopify shards are written to Cloud Storage and denormalized into large Hive/Parquet tables.
- Transform: Apache Spark and dbt transformation jobs.
- Load: Data from the warehouse is loaded into a variety of sinks for innumerable different use-cases. Some sinks include BigQuery, Redshift, and custom applications for serving structured data in different Shopify services.
Merchant Analytics Platform
- Extract: A batch-query, always-on application using query-based data extraction. Change-events are written to Kafka.
- Transform: Some light-weight streaming transformations are done with this data, writing outputs back into Kafka.
- Load: Kafka data is streamed into Google Bigtable and made queryable from the merchant's admin page.
Splitting efforts into two different data platforms has enabled us to provide high-quality merchant analytics without interfering with dependencies required by Shopify’s data science team. But some problems arise from separate data platforms:
- Different sets of tools and standards: Creating a merchant dashboard within the Merchant Analytics platform requires creating a kafka streaming application to write to a new Bigtable table (all of this in Golang!). This is an entirely different set of skills than writing a batch-based Spark job.
- Separate data extraction tools and results: Two tools doing the same thing don’t always produce the same results. Factors like when a query ran can change the data extract results for any given table.
- Duplication of work: Two platforms means that developers/data scientists may need to create reports and analytics for each.
As Shopify’s product has grown from a merchants' storefront to a commerce backbone, adding offerings like Shopify Email, Capital, Balance, and Marketing have ballooned the merchant analytics requirements. With a growing number of data sources, the problems described above only worsen. Bridging the gap between the two data platforms necessitates a single unified data extraction pipeline. To achieve this, we must ensure that we fulfil three main requirements:
- We need access to data contained within Shopify’s business applications. Most of this is encapsulated within our main sharded monolith.
- We need to increase the freshness of our data such that it is available for general-purpose consumption as quickly as possible.
- The data is of the highest quality, accurately reflecting every change occurring within the upstream database.
The remainder of this blog post discusses how we achieve these requirements.
Acquiring Data Through Batch Queries
Obtaining data from our applications’ internal data stores has followed the age-old tactic of reaching into the data store, grabbing the required data, and writing it down outside for others to access. This has been the design basis of Longboat, Shopify’s internal data extraction service.
Longboat is a custom-built, query-based, change-data capture (CDC) application that operates via batch extraction jobs. Longboat periodically queries the source database based on the tables required and obtains all rows of a given table that have changed since the last Longboat query. The following figure shows Longboat querying the `users` table from multiple shards, writing the output to cloud storage.
Queries are written at the per-table level to minimize the impact to the datastore under query (no joins allowed). Longboat connects to our sharded core monolith’s read-only replicas to further minimize the querying impact, as some of these long-running queries can substantially impact the database’s performance.
A Longboat query for a specific table can only be run as often as once an hour due to the performance impacts on the sources. This maximum frequency imposes a limit on how fresh our data can be, particularly with querying time, processing time, and copying time included. In practice, our Longboat-sourced data is at a minimum 1 hour old.
The data obtained by Longboat is stored in cloud storage and eventually joined into a database “snapshot” for usage by downstream reporting, analytics, data science modeling, and other data-team use cases. This data is primarily used by the data team for analytics, reports, and other batch-based jobs and sparingly used as a general-purpose source for the rest of the company.
Query-Based Data Extraction
The simplest Longboat data extraction job is a simple selection statement:
SELECT * FROM `TABLE`.
While this query can certainly extract all information, it doesn’t scale to the size of data that we have at Shopify. Our largest tables have billions of rows. A query like this would be unable to complete in a reasonable amount of time, nor without unduly affecting other queries. Instead, Longboat queries use a table’s
"updated_at" to run sequential jobs, obtaining only data that has been updated since that last job run. The following figure illustrates the results from sequential queries on the
Longboat Issue: Unable to Capture Hard Delete Events
There are two types of deletion events: soft deletes and hard deletes.
A soft delete is simply a field indicating “deleted” status, though the actual record remains in the database indefinitely. The soft delete updates the row’s
"updated_at", ensuring the deletion event is noticed. Conversely, a hard delete fully removes the entire row from the database. This leaves no
"updated_at" for our job to fetch, since the entire row no longer exists.
Query-based capture will miss all hard-deletes, resulting in the absence of deletions in the data lake. The difference between the two deletion mechanisms is illustrated in the following figure.
Many tables at Shopify use the soft-deletion method as a consequence of our query-based data extraction. This can be a problem; soft deletes don’t scale well when a table regularly adds and “removes” unique entries, as a significant portion of the table will be made up of soft-deleted rows. This poses performance issues, as well as issues around ensuring data is ultimately deleted.
Longboat Issue: Relies on updated_at field
Reliance upon the
"updated_at" means that all queries must modify the field with the current time whenever an update is made. While this happens during most normal operations, there are scenarios where this may not be done. For one, large-scale data migrations typically avoid touching this field, lest Longboat incremental queries be overwhelmed by results. For another, coding errors may fail to update the field after changing the contents of the row. These updates will be completely missed by an extraction query. Using our earlier example, if Peyton’s name was updated to Eli, but the
"updated_at" was never updated, that update would be missed downstream, as shown in the following figure.
Longboat Issue: Misses Intermediate State of a Row Between Job Runs
A number of data modelling jobs require precise information on how a buyer or merchant operates within our platform. If this information is obtained using a query-based approach, any rows updated multiple times between queries will be missing the intermediate state. The extractor can only report the most recent state of a row, so any intermediate state is permanently lost.
Longboat Issue: Consistency
Imagine a sales report that joins sales and products lLongboat snapshots. The high-water mark (HWM) of the latest sales snapshot is 2:15. The HWM of the latest products snapshot is 2:30. What timestamp of data freshness do we communicate to stakeholders for the result of this join? This is a problem when communicating data freshness to merchants, and we’re excited by the opportunity for CDC to help resolve this.
Longboat Issue: Missed Records
There is a rare case where Longboat never “catches” the row because it’s constantly being updated/touched and always out of reach as something keeps touching the row and updating it. There isn’t a solution other than to stop making updates. Read to learn more.
Can’t We Just Make Longboat Go Faster?
Our business requirements are moving towards near real-time requirements. We need reports that accurately reflect what’s going on now. We need fresher information for making business decisions, to understand our merchants better, and to serve them real-time recommendations. Numerous products have been left wanting a source of comprehensive, reliable, and timely delivery of data. And unfortunately, we can’t simply make Longboat run more frequently to provide this.
Further increasing the frequency of Longboat’s queries causes a subsequent increase in the source data stores’ performance requirements. They are, in essence, pummelled to death by an incessant and comprehensive demand for data. There’s a point where it’s no longer technically feasible to execute those queries at such a rapid pace, and it’s safe to say that our current Longboat work is sitting at that edge.
A further limitation is that even with extremely rapid querying, each subsequent step in the pipeline is also batch-based, generally scheduled to start when the previous task stops. Any abnormal events (such as a query taking too long, resource starvation in the processing cluster, network connectivity issues, etc.) can result in erroneous processing results, creating further delays. A batch system is still a batch system, and it can only speed up so much before you must instead look at a streaming solution.
We Need a New Solution
While Longboat has provided us with the means to access data in a standardized, periodically updating, and mostly reliable way, new business requirements demand more than it can reasonably offer. Additionally, issues such as missing intermediate events and hard deletes have limited the scope in which this data can be used.
Both our retail merchants, along with our internal initiatives, require fast and comprehensive updates to our datasets. A failure to provide this will reduce our competitive advantage and reduce trust in the quality and integrity of the data we do provide. We need a system that can deliver comprehensive updates to each row in a table and provide near real-time updates without unduly affecting the source datastore.
What about Change-Data Capture and Event Streams?
Increasing batch frequency and performance can only go so far before we’re in the realm of event-streaming. And since we’re not looking to re-invent streaming mechanisms, what options do we have available?
Apache Kafka is already in use at Shopify, and it is arguably the de-facto leader of distributed event-brokers. Purpose-built tooling that captures database data and converts it into events, such as Debezium and Maxwell’s Daemon, already exist to serve these event sourcing needs. Additionally, moving to event streaming will enable us to unlock native event-driven applications. The need to support this pattern has arisen independently in a number of areas of the company which are, unfortunately, beyond the scope of this post. However, the business requirements coincide with those in the data platform: we need fresh data, we need it reliably, and we cannot pull it out of the database directly without undue hardship.
Shopify’s core product uses a replicated topology for its MySQL, with a primary instance and multiple read-replicas. MySQL’s internal replication is made possible by a binary log “that contains events that describe database changes such as table creation operations or changes to table data.” This same internal replication tool is used as the foundation of log-based change data capture. And so, we began the Longboat replacement project with the intention of building an entirely new platform based on proven event-streaming technology.
Building a Log-Based Change-Data Capture Platform
In mid-2019, a cross-functional team began exploring the feasibility of using a log-based change-data capture (CDC) system to consume from the 100+ MySQL shards that back Shopify’s core monolith. The team considered Debezium, Maxwell’s Daemon, Spinal Tap, and a DIY implementation similar to Netflix’s approach with DBLog. Of the open-source options available, Debezium emerged as the clear winner. It is by far the most active open-source CDC project. It has a strong group of core maintainers who are actively contributing and responsive. Debezium also has the ability to support a variety of different databases and datastores, which is important for our use-cases as there are a number of non-MySQL sources that may require future support.
While Longboat extracts data from MySQL and writes it to cloud storage, Debezium writes change records to Apache Kafka. As Kafka is already a large piece of Shopify’s infrastructure, using it to transmit and store CDC records aligns with our current infrastructure. It also enables a standard API for consumption, allowing us to leverage existing stream processing tooling.
Debezium out-of-the-box creates one topic per table per database instance, which isn’t ideal given that Shopify core is made up of 100+ independent (sharded) databases. For example, a Debezium connector with default configurations would write a topic for
shopify_shard_1.users, etc. This would require consumers to consume one topic per shard, instead of a single topic. Thus, we merge data from each shard together on a per-table basis, such that each logical table maps 1:1 to a topic. This topology is shown in the following figure.
We create a single Debezium connector per database shard. This connector will read the change-events for a subset of tables that we configure. Next, a RegexRouter (Kafka Connect transform) is applied to each connector. This configures the connector to write every event for a database shard into a single topic (
shopify_shard_1_events in the example). After receiving a shard’s change records into a single topic, a custom Kafka Streams app reads the sharded input topics and sifts through records, writing them to the new Kafka topic for each table.
All topics are partitioned by the source table’s primary key, such that all events for a specific key are published to the same topic partition. The final product is a kafka topic per logical table, such that the source database sharding is isolated from the consumers of the topic. This is shown in the following figure.
Another unique feature of the per-table output topics is their compaction-based retention. CDC records are keyed by the source table’s primary key, and a compacted Kafka topic keeps the most recent record of that key. The result of all of this is that CDC creates a compacted topic that is made up of the latest version for every key. These output topics become a tool for downstream consumers to locally initialize the entire state of the upstream table. Provided that topics are compacted, the required volume of storage will remain directly proportional to the domain of the dataset.
The system I’ve just described has been able to satisfy Shopify’s requirements of near-real time data ingestion. From MySQL insertion to the time it takes for that record to become available in Kafka, the p99 latency is less than 10 seconds. The median latency across database connectors is much lower than 10 seconds, but it can be affected by replication lag of the MySQL reader the Debezium Connector is reading from.
The finished CDC platform will provide a single, blessed route for database extraction throughout Shopify. Rather than having developers write one-off, batch extraction queries for individual databases, they only need to pass configuration about their database to Kafka Connect.
Each Debezium record looks something like this:
This example record represents a single insertion event from our “addresses” table.
Lessons Learned Scaling Our Platform
Debezium is an exciting project undergoing active development. Its 1.0 release only happened in early 2020. Shopify had no Kafka Connect uses until CDC, so we have had to develop expertise as the project has progressed. These two things combined presents plenty of opportunities to encounter bugs for the first time and create system reliability practices.
Schemas and Schema Changes
Debezium supports Apache Avro schemas out of the box. Avro schemas provide a mechanism for accurately representing the captured data, such as the field name, type (string, int, long, etc.), and default values. Schemas ensure a complete and accurate representation of the captured data and are an essential part of capturing data. Each table under capture has its own schema reflecting the table definition.
Schemas are derived directly from the source system’s internal data model, and as a consequence, are tightly coupled to all changes made to the table. Schema evolution rules can provide some flexibility in evolving the schema, such that additions, deletions, and modifications to table definitions can be made. Adhering to schema evolution enables decoupling between producers and consumers, such that consumers are only forced to update if they care about the new fields being defined or about old fields being removed. Not all schema changes are evolutionarily compatible; for instance, changing a source table column definition from “Integer” to “String” breaks compatibility, requiring all consumers to update immediately to handle these records. A failure to do so can result in erroneous processing.
Breaking schema changes are extremely disruptive for our downstream consumers, so we do our best as a company to avoid them whenever possible. In the cases where a breaking change must occur, we want as much forewarning as possible to identify and notify all consumers of the upcoming change. This is an area of active development for us, and we are currently exploring better options for minimizing impact to consumers.
One of the unfortunate aspects of CDC is the tight coupling of the external event consumers to the internal data model of the source. Breaking changes and data migrations can be a normal part of application evolution. By allowing external consumers to couple on these fields, the impact of breaking changes spreads beyond the single codebase into multiple consumer areas.
The Schema Registry
Confluent’s Kafka Schema Registry is the de-facto leader for schema registries in the Kafka ecosystem. It also seamlessly integrates with Debezium and our Kafka Consumers, making it an easy choice for integrating into our workflow. The schema registry makes for an easy place to store schemas and search for data, and it actually forms the basis for data discovery for CDC event streams.
The schema registry also provides a mechanism for sourcing event stream definitions. We adhere to a single domain definition per topic (can have multiple schemas, evolutionarily compatible), such that you can search through the registered schemas to determine which topics hold which data. When combined with features such as Client Identities and Access Control Lists, you can figure out which consumers are reading which data. This allows you to notify the service owners when a breaking schema change is impending, enables the generation of dependency graphs, and allows for lineage tracking of sensitive information through streaming systems.
Initial table snapshots
Debezium connectors will perform an initial table snapshot when starting a brand new connector. A snapshot is a MySQL query ("
SELECT * FROM TABLE") that sends the current contents of the table into Kafka. After the snapshot is complete, log-based CDC will begin on the database tables.
There are some significant limitations to this approach, however. This snapshotting mechanism will hold a read lock on the table it’s snapshotting for the duration of each table snapshot. Snapshots can take hours to complete, and so we run them against the MySQL read replicas. Even then, these locks cause contention on Shopify’s databases. One team member implemented a new snapshot mode for Debezium’s MySQL connector that allowed for a lock contention workaround.
Other major problems we encountered with Debeizum’s table snapshotting were:
- Debezium allows a configurable “allow.list” of tables to import. You can edit this “allow.list” after the initial snapshot, but you can only receive that table’s events from that point forward. This is a problem because all tables which were a part of the “allow.list” when the connector was first created will have been snapshotted, and downstream consumers expect to have the complete state of the table available. Tables added later will only contain change-data events from after that point on, no snapshot events.
- Debezium does not support snapshotting a table without blocking incoming binlog events. Some of our largest tables take hours to snapshot fully. In a data-loss event, where we may need to re-snapshot some tables, delaying incoming binlog event updates by the time it takes to snapshot is unacceptable for latency-sensitive consumers.
- There are several tables in Shopify’s Core monolith that are too big to snapshot in any reasonable time-frame.
These problems are not yet resolved with our platform, and we’ve had to communicate these limitations to early platform adopters. We’ve proposed an incremental snapshot tool we plan on building internally that will solve the issues described. I’ll discuss this briefly later on in this post.
Shopify has some workflows that store substantially large amounts of data within our main MySQL database, typically in the form of a text blob. In some cases, these records can reach many tens of MBs, which far exceeds Kafka’s default record limit of 1 MB. While we could increase the maximum record size on the brokers[x], doing so would negatively impact our overall performance. Additionally, this fails to solve the scenario where a future record is slightly larger than our new maximum value.
There are two main options we evaluated for handling arbitrarily large records.
- Split the record into multiple events on the producer side, and recombine them on the consumer side.
- Store the event in external storage, with the Kafka record providing a pointer to it.
Option 1: Split record into multiple events
In this option, the large record is split into many smaller pieces, each sent to the Kafka topic. Upon consumption, the consumer must recombine the multiple events into a single logical record for further processing.
- Can rely exclusively on Kafka for data storage and access controls
- Difficult to handle records that span many consumer batch boundaries
- Vanilla consumers will fail to correctly interpret records spread over multiple events.
This option appealed to us at first, but it became evident that we would need to ensure we have specialized consumers in each language we use. Basic consumers obtain events from Kafka, then hand them off as-is to the downstream business logic. We’d have to rework a substantial part of the consumer process to ensure all records are successfully recombined before passing them off downstream. Though doable, this seemed complex and error-prone.
Option 2: Store in external storage
In this option, only a single Kafka record is created, pointing to the large data stored in an external data store. For us, this data store is Google Cloud Storage (GCS), where we can store arbitrarily large records.
- Simple production and consumption
- Plain-old Kafka consumers can still consume the data, though they may find they cannot necessarily access the record directly (intentional - see next section)
- Reliance upon a third-party data store, including factors such as access control, connectivity, and accidentally losing data via human error
- Must ensure that records in GCS are cleaned up when they are no longer available through Kafka
We achieved this option by implementing a custom Kafka Serializer/Deserializer (Serde), which encodes records into Avro using the Confluent Avro Converter . Our custom code checks every record’s size and compresses any record over the default 1 MB limit. If compression is insufficient to reduce the record size, it is written to Cloud Storage (GCS), and a placeholder record is sent to Kafka. The path to the stored record in GCS in the placeholder records header. Consumers use that same custom Kafka Serde to deserialize records, deferring to the Serde for decompression and pulling the records from GCS.
The Impacts of Large Records on Consumers
Large records are a non-ideal outcome of historical decisions to use MySQL as a blob data store for certain applications. The mission of creating a CDC Kafka source requires that we support access to this data. Going forwards, however, we want to reduce reliance upon sending large records through Kafka. Events should be well-formed and focused on a specific domain. Seldomly are large blobs of data essential for communication with other systems.
We also need to ensure that GCS data is encrypted, that it’s inaccessible to people and systems lacking the corresponding credentials in Kafka itself (see Kafka ACLS), and that it’s destroyed promptly when the corresponding Kafka record is deleted. These factors add complexity and overhead, but are essential for ensuring the availability of the large record data and compliance with Shopify’s data-handling policies.
We’ve currently storing 400TB+ of CDC data in our Kafka cluster. The Kafka Connect cluster runs ~150 Debezium connectors across 12 Kubernetes pods. Over the Black Friday Cyber Monday weekend in 2020, we processed ~65,000 records/second on average, handling spikes up to 100,000 records/second. These numbers are based on the system consuming a subset of Shopify’s tables that will be expanded as downstream consumers develop.
With any new platform, users can take some time to develop. One of our fastest-growing consumers is an improved Marketing Engagements API. This stream-processing service enables merchants to view how buyers have been responding to marketing campaigns. Moving from the older batch warehouse model to Beam modeling of a CDC ingest stream reduced their average freshness from one day to one hour.
Numerous other internal customers are looking to move their batch-based applications from the Longboat-generated tables to the CDC streams. Teams are also exploring various stream-processing frameworks, including figuring out how to handle streaming joins and indefinite materialized state.
Here are a few areas that we’re focusing on in the short term.
Integrating CDC events into streaming
Moving Shopify’s CDC ingest to Kafka not only has the benefit of unifying our database extraction mechanisms, but of also leveraging Kafka as a single source of truth. Making CDC data available alongside already existing operational event streams has standardized our investment into Kafka infrastructure.
CDC has also enabled us to replace batch pipelines with stream processing pipelines. As a team, we are interested in Apache Beam, Apache Kafka Streams, Apache Flink, and Materialize for redeveloping important models into a streaming approach.
Integrating CDC into existing batch
Our data platform is moving its data sources from the old query-based engine onto the CDC event streams. This process includes consuming from the CDC event streams and resolving the events into a batch format that suits existing expectations. The intention is to create a resolved dataset that suits batch ingestion needs, such that all streaming and batch jobs effectively consume from the same source.
There is plenty of work remaining, such as the aforementioned need for incremental snapshotting. The details of this are unfortunately beyond the scope of this post, but in brief, incremental snapshots will enable us to backfill chunks of the existing MySQL tables without locking the table. By reconciling the incremental query results with the ongoing binlog events, we can dynamically build the current state of the table without expensive locking. This is particularly important due to the large size of a number of our tables and the current impact of locking these tables for snapshots.
Kafka and Debezium CDC has become a pivotal part of Shopify’s data platform modernization strategy. It has enabled us to standardize our data extraction tooling, move away from expensive and slow batch-based acquisition, and unify our batch and streaming data sources. We are able to capture each change made to the dataset, providing us with clearer insights and analytics. The delay between when a business event occurs and when we have the ability to act on it has substantially decreased, providing much-needed data freshness for our analysts, business partners, and event-driven application developers. Though there are some outstanding issues, we have already reaped significant benefits from change-data capture, and will continue to invest in it.
John Martin is a Senior Production Engineer working on Shopify’s Streaming Platform. He has specialized in Data and Streaming infrastructure at Shopify for the past 4 years. He's currently scaling the CDC platform to new heights so it will reliably underpin all of Shopify's reporting and critical applications for years to come. If you’d like to work with John, you can reach out on Linkedin.
Adam Bellemare is a Staff Engineer of Data Platform. He is an event-driven microservice enthusiast, having just completed his first book on the subject in 2020. Adam is also a contributor to both Apache Kafka and Avro, two technologies which have been pivotal to enabling event-driven applications.
Wherever you are, your next journey starts here! If building systems from the ground up to solve real-world problems interests you, our Engineering blog has stories about other challenges we have encountered. Intrigued? Visit our Engineering career page to find out about our open positions and learn about Digital by Design.