Apache Beam for Search: Getting Started by Hacking Time

To create relevant search, processing clickstream data is key: you frequently want to promote search results that are being clicked on and purchased, and demote those things users don’t love.

Typically search systems think of processing clickstream data as a batch job run over historical data, perhaps using a system like Spark. But on Shopify’s Discovery team, we ask the question: What if we could auto-tune relevance in real-time as users interact with search results—not having to wait days for a large batch job to run?

At Shopify—this is what we’re doing! We’re using streaming data processing systems that can process both real-time and historic data to enable real-time use cases ranging from simple auto boosting or down boosting of documents, to computing aggregate click popularity statistics, building offline search evaluation sets, and on to more complex reinforcement learning tasks.

But this article is introducing you to the streaming system themselves. In particular, to Apache Beam. And the most important thing to think about is time with those streaming systems. So let’s get started!

What Exactly is Apache Beam?

Apache Beam is a unified batch and stream processing system. This lets us potentially unify historic and real-time views of user search behaviors in one system. Instead of a batch system, like Spark, to churn over months of old data, and a separate streaming system, like Apache Storm, to process the live user traffic, Beam hopes to keep these workflows together.

For search, this is rather exciting. It means we can build search systems that both rely on historic search logs while perhaps being able to live-tune the system for our users’ needs in various ways.

Let’s walk through an early challenge everyone faces with Beam: that of time! Beam is a kind of time machine that has to reorder events in their right spot after getting annoyingly delayed by lots of intermediate processing and storage step. This is one of the core complications of a streaming system - how long do we wait? How do we deal with late or out of order data?

So to get started with Beam, the first thing you’ll need to do is Hack Time!

The Beam Time Problem

At the core of Apache Beam are pipelines. They connect a source through various processing steps to finally a sink.  

Data flowing through a pipeline is timestamped. When you consider a streaming system, this makes sense. We have various delays as events flow from browsers, through APIs, and other data systems. Finally the events arrive at our Beam pipeline. They can easily be out-of-order or delayed. Beam source APIs, like the one for Kafka, maintain a moving view of the event data to emit well-ordered events known as a watermark.

If we don’t give our Beam source good information on how to build a timestamp, we’ll drop events or receive them in the wrong order. But even more importantly for search, we likely must combine different streams of data to build a single view on a search session or query, like below:

combine different streams of data to build a single view on a search session or query, like below

Joining (a Beam topic for another day!) needs to look back over each source’s watermark and ensure they’re aligned in time before deciding that sufficient time has elapsed before moving on. But before you get to the complexities of streaming joins, replaying with accurate timestamps is the first milestone on your Beam-for-clickstream journey.

Configuring the Timestamp Right at the Source

Let’s set up a simple Beam pipeline to explore Beam. Here we’ll use Kafka in Java as an example. You can see the full source code in this gist.

Here we’ll set up a Kafka source, the start of a pipeline producing a custom SearchQueryEvent stored in a search_queries_topic.

You’ll notice we have information on the topic/servers to retrieve the data, along with how to deserialize the underlying binary data. We might add further processing steps to transform or process our SearchQueryEvents, eventually sending the final output to another system.

But nothing about time yet. By default, the produced SearchQueryEvents will use Kafka processing time. That is, when they’re read from Kafka. This is the least interesting for our purposes. We care about when users actually searched and clicked on results.

More interesting is when the event was created in a Kafka client. Which we can add here:

.withCreateTime(Duration.standardMinutes(5))

You’ll notice above, when we use create time below, we need to give the source’s Watermark a tip for how out of order event times might be. For example, below we instruct the Kafka source to use create time, but with a possible 5 minutes of discrepancy. 

Appreciating The Beam Time Machine

Let’s reflect on what such a 5 minute possible delay actually means from the last snippet. Beam is kind of a time machine… How Beam bends space-time is where your mind can begin to hurt.

As you might be picking up, event time  is quite different from processing time! So in the code snippet above, we’re *not* telling the computer to wait for 5 minutes of execution time for more data. No, the event time might be replayed from historical data, where 5 minutes of event time is replayed through our pipeline in mere milliseconds. Or it could be event time is really now, and we’re actively streaming live data for processing. So we DO indeed wait 5 real minutes! 

Let’s take a step back and use a silly example to understand this. It’s really crucial to your Beam journey. 

Imagine we’re super-robot androids that can watch a movie at 1000X speed. Maybe like Star Trek The Next Generation’s Lt Commander Data. If you’re unfamiliar, he could process input as fast as a screen could display! Data might say “Hey look, I want to watch the classic 80s movie, The Goonies, so I can be a cultural reference for the crew of the Enterprise.” 

Beam is like watching a movie in super-fast forward mode with chunks of the video appearing possibly delayed or out of order relative to other chunks in movie time. In this context we have two senses of time:

  • Event Time: the timestamp in the actual 1h 55 minute runtime of The Goonies aka movie time.
  • Processing Time: the time we actually experience The Goonies (perhaps just a few minutes if we’re super-robot androids like Data).

So Data tells the Enterprise computer “Look, play me The Goonies as fast as you can recall it from your memory banks.” And the computer has various hiccups where certain frames of the movie aren’t quite getting to Data’s screen to keep the movie in order. 

Commander Data can tolerate missing these frames. So Data says “Look, don’t wait more than 5 minutes in *movie time* (aka event time) before just showing me what you have so far of that part of the movie. This lets Data watch the full movie in a short amount of time, dropping a tolerable number of movie frames.

This is just what Beam is doing with our search query data. Sometimes it’s replaying days worth of historic search data in milliseconds, and other times we’re streaming live data where we truly must wait 5 minutes for reality to be processed. Of course, the right delay might not be 5 minutes, it might be something else appropriate to our needs. 

Beam has other primitives such as windows which further inform, beyond the source, how data should be buffered or collected in units of time. Should we collect our search data in daily windows? Should we tolerate late data? What does subsequent processing expect to work over? Windows also work with the same time machine concepts that must be appreciated deeply to work with Beam.

Incorporating A Timestamp Policy

Beam might know a little about Kafka, but it really doesn’t know anything about our data model. Sometimes we need even more control over the definition of time in the Beam time machine.

For example, in our previous movie example, movie frames perhaps have some field informing us of how they should be arranged in movie time. If we examine our SearchQueryEvent, we also see a specific timestamp embedded in the data itself:

public class SearchQueryEvent {

   public final String queryString;

   public final Instant searchTimestamp;

}

Well Beam sources can often be configured to use a custom event time like our searchTimestamp. We just need to make a TimestampPolicy. We simply provide a simple function-class that takes in our record (A key-value of Long->SearchQueryEvent) and returns a timestamp:

We can use this to create our own timestamp policy:

Here, we’ve passed in our own function, and we’ve given the same allowed delay (5 minutes). This is all wrapped up in a factory class TimestampPolicyFactory SearchQueryTimestampPolicyFactory (now if that doesn’t sound like a Java class name, I don’t know what does ;) )

We can add our timestamp policy to the builder:

.withTimestampPolicyFactory(new SearchQueryTimestampPolicyFactory())

Hacking Time!

Beam is about hacking time, I hope you’ve appreciated this walkthrough of some of Beam’s capabilities. If you’re interested in joining me on building Shopify’s future in search and discovery, please check out these great job postings!

Doug Turnbull is a Sr. Staff Engineer in Search Relevance at Shopify. He is known for writing the book “Relevant Search”, contributing to “AI Powered Search”, and creating relevance tooling for Solr and Elasticsearch like Splainer, Quepid, and the Elasticsearch Learning to Rank plugin. Doug’s team at Shopify helps Merchants make their products and brands more discoverable. If you’d like to work with Doug, send him a Tweet at @softwaredoug!