engineering

One Year of DynamoDB at Timehop

dynamo.png

2,675,812,470

That’s the number of rows in our largest database table. Or it was, one year ago today. Since then, it’s grown to over 60 billion rows. On average, that’s roughly 160mm inserts per day.

When I think about it, that seems like quite a lot. But because the database we use is Amazon’s DynamoDB, I rarely have to think about it at all. In fact, when Timehop’s growth spiked from 5,000 users/day to more than 55,000 users/day in a span of eight weeks, it was DynamoDB that saved our butts.

What is DynamoDB?

I don’t intend to explain the ins and outs of what DynamoDB is or what it’s intended for, there’s plenty of documentation online for that. But I will mention the most salient point: That DynamoDB is consistentfast, and scales without limit. And when I say “scales without limit,” I literally mean there is no theoretical boundary to be made aware of. That isn’t to say that all of DynamoDB’s features behave identically at every scale, but in terms of consistency and speed it absolutely does. More on that below. First, some history.

The Ex I Worry About Seeing in My Timehop is Mongo

Timehop currently uses DynamoDB as it’s primary data store. Every historical tweet, photo, checkin, and status is persisted to DynamoDB as time-series data. It wasn’t always this way. Back in ye olden days, we used a combination of Postgres and Mongo. So why the switch?

Listen, Mongo is great. I mean, I’m sure it’s pretty good. I mean, there’s probably people out there who are pretty good with it. I mean, I don’t think my soul was prepared for using Mongo at scale.

When we were small, Mongo was a joy to use. We didn’t have to think too hard about the documents we were storing and adding an index wasn’t very painful. Once we hit the 2TB mark, however, we started to see severe and unpredictable spikes in query latency. Now, this might have been due to our hosting provider and had nothing to do with Mongo itself (We were using a managed service through Heroku at the time). Or, if we rolled our own we might have solved things with a sharded solution. But to be honest, none of us at Timehop are DBAs and a hosted database has always been a more attractive option (As long as it didn’t break the bank!). So we started to look at alternatives. Our friends at GroupMe had some nice things to say about using DynamoDB and it seemed pretty cost-effective too, so we gave it a go.

Early Mistakes with DynamoDB

As is often the case with new technologies, DynamoDB first seemed unwieldy. The biggest hurdle for us right out of the gate was the lack of a mature client library written in Go, our primary language (Things have changed since then). But once we nailed down the subset of API queries we actually needed to understand, things started to move along. That’s when we started making real mistakes.

Mistake #1: Disrespecting Throughput Errors

This one should have been obvious as the error handling docs are quite clear. DynamoDB has two main types of http errors: retryable and non-retryable. The most common type of retryable error is a throughput exception. The way dynamo’s pricing model works is that you pay for query throughput, where reads and writes are configured independently. And the way Amazon enforces throughput capacity is to throttle any queries that bust it. It’s up to the client to handle the errors and retry requests with an exponential backoff. Our early implementations assumed this would be a rare occurrence, when in reality, you will likely incur some amount of throttling at any level of usage. Make sure you handle this frequent type of error.

Mistake #2: Not Understanding How Partitions Affect Throughput

All DynamoDB tables are partitioned under the covers and each partition gets a sliver of your provisioned (purchased) capacity. The equation is simple:

Total Provisioned Throughput / Partitions = Throughput Per Partition

What this means is that if your query patterns are not well distributed across hash keys, you may only achieve a fraction of your provisioned throughput. Not understanding this point can lead to serious headaches.

It’s also important to say that the number of partitions that exist are not directly exposed to you (However, there is now a helpful guide to help you estimate the number of partitions you are accruing). A single table may have 1 partition or 1000. That means it’s difficult to predict exactly how hot a key can become. And the larger your table grows the more partitions that will be allocated to it. Which means you may not even notice hot key issues until you are at scale. The best thing you can do is fully understand how to design and use a well distributed hash key.

Mistake #3: Poorly Designed Hash Keys

DynamoDB tables can be configured with just a hash key, or with a composite hash and range key. Timehop’s data is in time-series, so for us a range key is a necessity. But because hash keys always map to a single virtual node in a partition, a large set of range keys per hash key can lead to hot-key problems. I dug into my old emails and found a few write capacity graphs that illustrate this point:

March 2015

March 2015

March 2014

March 2014

The red lines here indicate provisioned throughput, ie: the most you could ever use. The blue lines represent actual throughput achieved.

The top graph represents our current schema, which has a well-designed hash key based on a combination of user id and calendar date.

The bottom graph shows our first stab last March, where our hash key was just the user id. Notice the difference? Yeah, it was bad. It took us about 3 attempts at loading and destroying entire tables before we got it right.

Advanced Lessons

Once we stopped making the above mistakes we found further ways to optimize our DynamoDB usage patterns and reduce costs. These points are actually covered in the developer guidelines, but we had to make our own mistakes to reach the same conclusions.

Lesson #1: Partition Your Hash Keys

Every hash key will consistently hit a single virtual node in DynamoDB. That means that every range key associated with a hash key also hits that same node. If a Timehop user has 5000 items of content (a rough average) and most of that content is written at around the same time, that can result in a very hot hash key. Anything you can do to split the data across multiple hash keys will improve this. The pros and cons of doing so are:

  • Pro: Hash key distribution increases linearly with each hash key partition; Throughput errors decrease.
  • Con: The number of network queries required to write the same amount of data increases linearly with each hash key partition; the likelihood of i/o errors and poor network latency increases.

For Timehop our hash key partition strategy was a natural function of our use case: We partition all user data by date. This gives us a significant increase in hash key distribution. As an example, a user with 5000 items of content across 7 years of social media activity sees about a 2.5k times increase in distribution:

Without partitioning (ie: hash key = “<user_id>”):

5000 items per hash key on average

With partitioning (ie: hash key = “<date>:<user_id>”):

5000 / 365 days / 7 years = about 2 items per hash key on average

Lesson #2: Temporally Distribute Your Write Activity

As I stated earlier, a table that uses a range key can expect at least somethrottling to occur. How many errors you experience is directly tied to how well distributed your hash keys are. However, if you really want to maximize your throughput you should also consider temporally distributing the writes you make against a single hash key.

Our writes happen asynchronously when a user signs up and connects their social media accounts. At any given moment the write requests that occur are heavily skewed towards the subset of hash keys that represent the most recent new user. In order to encourage well-distributed writes, we built a simple write buffer that sits in front of DynamoDB (essentially a managed Redis set). A separate process drains random elements from the buffer and issues batch write requests to DynamoDB. The more users that sign up together, the more data that ends up in the buffer. The key benefit of this architecture pattern is the more data that’s in the buffer, the more randomization there is when draining it and therefore more distributed the hash keys are in our batch write requests. So increasing scale actually improves our write performance. Yay!

Scaling Without Limit

So now that we’ve got our usage patterns stabilized, what’s the real benefit? The most explicit win we saw from using DynamoDB was during our most significant period of user growth, which just happened to coincide with our final draft of a DynamoDB strategy.

During the 8 week period between March and May of 2014 our user growth started to spike upwards from roughly 5k signups/day to more than 55k signups/day. As you can imagine we were hyper-focused on our infrastructure during this time.

There were many many fires. Each dip in the graph probably corresponds to some outage or backend service falling over due to load. But what the dips definitely do not represent, is an outage or performance problem with DynamoDB. In fact, for the last year, no matter how large our table grew or how many rows we’ve written to it (currently in the order of hundreds of millions per day) the query latency has always been single digit milliseconds, usually 4–6.

I can’t stress enough how nice it was not to have to think about DynamoDB during this time (aside from a few throughput increases here and there). This was a new experience for Timehop Engineering as in the past the database was always the choke point during periods of growth. The stability and performance of DynamoDB allowed us to focus our efforts on improving the rest of our infrastructure and gave us confidence that we can handle the next large spike in user signups.

AWS describes DynamoDB as a providing “single-digit millisecond latency at any scale” (emphasis mine). We’ve asked the DynamoDB team what the theoretical limits are and the answer is, barring the physical limitations of allocating enough servers, there are none. In evidence of this, our table is quite large now, about 100TB, and the performance is the same as on day one.


Impedance (mis)matching

mismatching.png

In this post we’ll focus on the architecture of our Push Notification subsystem, covering some of the decisions made, talk about our open source Go library to send pushes through the Apple Push Notification System, and wrap it up with an anecdote on how we ended up DDoS’ing ourselves with what we’ve built.

Sunsetting the Rails Monolith

Over the past year at Timehop, we broke our big monolithic Rails app into a service based architecture, written almost entirely in Go.

Breaking a big system down into smaller parts makes it far more modular and, when done right, more available and fault-tolerant.

We ended up with more services but fewer single points of failure. There are now more potential points of failure but none of them can — or should — cause a complete halt.

One of the side effects of dividing to conquer is that communication becomes explicit. Functionality and error handling are now spread across multiple processes, over the network — which also makes them less reliable.

Impedance matching: buffering and throttling

At Timehop, we put a lot of effort into making sure that all communication between our systems is correctly buffered and/or throttled, so as to avoid internally DDoS’ing ourselves.

Whenever a system can offload some work for deferred processing by other systems, we use message queues as buffers. As those queues often grow to hold millions of records in a short amount of time, we keep a close eye on them through an extensive set of alarms.

Whenever a system needs a real-time response from another (e.g. an HTTP call or some other form of RPC), we use aggressive timeouts on the requesting side and throttling on the serving side. It’s all designed to fail fast; the requester won’t wait longer than a few seconds for a response and the server will immediately return an error if too many requests are already being served.

We would rather fail fast and keep the median response times low, even if it comes at a small cost in percentage of successful requests served:

  • From an infrastructure perspective, we’re in control of the internal resource usage. We decide when the system should stop responding vs let it grind itself to a halt.
  • From a UX perspective, we can often silently mask errors. When we cannot, we believe it’s still preferable to quickly show the user something went wrong over keeping her indefinitely waiting for that spinner.
  • From a service perspective, a degraded experience for some is better than no experience for all.

The push notification subsystem

We call it salt-n-pepa. I would have personally gone with static-x.

Whenever we need to send out a push notification to a Timehopper, we load up all her device tokens (one per phone) and then hit Google’s GCM or Apple’s APNS.

If you’ve never dealt with push notification systems, a device token is what Apple and Google use to uniquely identify your device(s) so that we can send you notifications.

With our monolithic system, we kept all these tokens in a PostgreSQLdatabase, which was hidden behind the niceties of Rails’ ActiveRecord. Grabbing the Apple device tokens for a user was as easy as calling a method on a User object — user.valid_apns_tokens.

As the need arose to perform the same tasks from multiple parts of our shiny new (but incredibly lean and minimalist) Go stack, multiple problems became apparent:

  • Duplicate code in different languages: higher effort to maintain two codebases.
  • Tight coupling to the database: some systems required a connection to the database for the sole purpose of loading tokens to send pushes.
  • Harder cache management: if cache invalidation on its own is often very tricky, then distributed cache invalidation is a nightmare.
  • Difficult upgrades: whenever the logic changed, we’d have to upgrade not only the different codebases but all the different systems using that code. The more independent moving parts you have, the harder this procedure is.

To solve those problems, we created a black-box service, salt-n-pepa, that has message queues as entry points. Messages (or tasks) in this queue are JSON documents, whose most notable fields a target user ID, some content and, optionally, a delivery time (so that it supports scheduling for future delivery vs immediate.)

The moving parts

Internally, the push system has multiple components, each with a single, very well defined responsibility.

  • The Demuxer: The entry point into the push system, it reads push notification jobs off of a queue — the message we’ve covered above. This process then loads all the valid device tokens for both APNS and GCM and, for each, it queues them to be immediately sent by the appropriate Pusher. In case the push is scheduled for future delivery it puts them in a timestamp-based set so the Deschedulers can then take care of moving it to the appropriate Pusher queue when the time comes. A single push notification job may end up generating multiple pushes if the user has Timehop installed in more than one device.
  • The APNS & GCM Deschedulers: At the right time, transfers pushes scheduled to be sent in the future to the appropriate pusher queue (APNS or GCM).
  • The APNS Pusher: Converts the contents of a message into APNS format and sends it down Apple’s Push Notification System. This is a fire-and-forget system, with no feedback on message delivery status. This process uses our open source Go APNS library, which we’ll cover ahead.
  • The GCM Pusher: Converts the contents of a message into GCM format and sends it down Google’s Cloud Messaging platform. This system is synchronous in the sense that for every request that hits GCM, we know whether the push was successfully scheduled or whether the token is invalid. When a token is invalid, the GCM Pusher queues an invalidation for the GCM Invalidator

Aside from these, there are also a few other components related to token registration and invalidation.

  • The APNS Invalidator: Periodically connects to APNS to download a list of invalid tokens and update our Apple device token records.
  • The GCM Invalidator: Reads off of the GCM token invalidation queue (populated by the GCM Pusher) and updates our GCM device token records.
  • The Registrar: Reads off of the device token registration queue (populated by other subsystems that want to register new device tokens for users) and updates the device token records for the user.

With this system we send, on average, 25 million push notifications every day.

Timehop’s Go APNS library

One of the hardest parts of this whole system was writing the actual code that talks to APNS to send the pushes.

Whereas with GCM you perform an HTTP request and immediately know the results, Apple took on a less common approach in which you have to open a TLS connection and adopt their binary protocol. You write bytes to a socket instead of HTTP POST’s to a Web server. To gather feedback on which tokens are now invalid, you have to open up a separate connection to receive this information.

As we looked for good libraries, we realized the landscape was grim so we decided to roll our own, which features:

  • Long Lived Clients: Apple’s documentation states that you should hold a persistent connection open as opposed to creating a new one for every payload.
  • Use of v2 Protocol: Apple came out with v2 of their API with support for variable length payloads. This library uses that protocol.
  • Robust Send Guarantees: APNS has asynchronous feedback on whether a push sent. That means that if you send pushes after a bad send, those pushes will be lost forever. Our library records the last N pushes, detects errors, and is able to resend the pushes that could have been lost. You can learn more about this here.

So head on to the GitHub project page and give it a spin!

How we DDoS’ed ourselves with pushes

Every day, the system that prepares your next Timehop day (briefly discussed in this other article) enqueues about 15 million push notifications to be sent shortly before 9am on your local timezone. This scheduling is randomized within a 30 minute window, so that for every timezone, we get an evenly distributed traffic pattern — as opposed to massive influx of traffic when everyone opens the app at the exact same time.

All this is performed far in advance of the actual push being sent so we end up queueing plenty of messages, which the de-schedulers will then move on to the appropriate queues to be sent immediately when the time comes. It’s normal to have a few million entries scheduled for later delivery.

The actual sending on the APNS side is pretty fast. It takes about 2ms to run a full cycle — pop a notification from the queue and send it to Apple’s Push servers. Rinse and repeat.

We run a single process, in a single machine, with 50 workers (each in its own goroutine). It’s so fast that its queue never backs up, no matter what we throw at it.

It’s one of those things that has been so reliable for so long that you kind of forget about it when there are other fires to put out. So reliable and fast we forgot to put alarms in place for the case when its queue starts backing up.

And then it got fun.

What goes around, comes around

We never really put thought into limiting the outbound rate of our pushes — as long as Apple could handle it, we’d hammer them.

What we naively overlooked was the fact that pretty much every push we send causes an indirect hit on our client-facing API, as the users open the app.

The morning push: nobody can resist opening the app after one of these.

The higher the volume of immediate pushes sent, the higher the potential volume of hits on our API.

A week ago, due to a certificate problem with our APNS pusher, each of the 50 workers running on the APNS Pusher slowly started to die. We didn’t really notice anything as, even with just a couple workers left, we were still keeping up with the rate at which pushes were being generated.

Then, the last worker died. No more APNS pushes were sent.

While we did not have an alarm in place, the unusually low morning traffic that our dashboards were showing was not a good sign — that and the fact that we didn’t get our own morning pushes either.

As we investigated and reached the natural conclusion that the APNS Pusher was dead — at that point, the queue had over 6 million pushes and growing — we restarted it.

Within 30 minutes, our client-facing API error rates went up by 60% and our inbound traffic went up nearly 3x. When we looked at the push queue, it was empty. Over 6 million pushes sent under 40 minutes. Most of those were people that actually opened Timehop and hit our servers.

An incredibly simple rate limiter

All it took for this to never happen again were a few lines of code. The algorithm is pretty simple:

  • Each worker, running on its own goroutine, has access to a rate limiter
  • Whenever they’re about to begin a cycle, they query the rate limiter
  • If the rate limiter is over the threshold, the worker sleeps for a bit
  • If the rate limiter is under the threshold, the worker performs a work cycle
  • Every minute, another worker resets the rate limiter

Kinda like pushing the button in LOST.

Here’s what it looks like:

import "sync/atomic"

func NewLimiter(limit int64) *Limiter {
  return &Limiter{limit: limit}
}

type Limiter struct {
  limit   int64
  counter int64
}

// Atomically increments the underlying counter
// and returns whether the new value of counter
// is under the limit, i.e. whether the caller should
// proceed or abort.
func (s *Limiter) Increment() bool {
  return atomic.AddInt64(&s.counter, 1) <= t.limit
}

// Atomically resets the value of the counter to 0.
func (s *Limiter) Clear() {
  atomic.StoreInt64(&s.counter, 0)
}

The limit is then shared across all the workers (goroutines) and whenever they’re about to begin a new cycle, they simply test whether they can proceed:

func (s *apnsWorker) workCycle() bool {
  if !s.limiter.Increment() {
    return false
  }
  // ...
}

Lastly, another goroutine calls Clear() on this shared Limiter every minute, which allows the workers to begin sending pushes again.

A final note

When going distributed you’ll invariably run into throughput impedance mismatches. Make sure you dedicate some time to understand how every part of your system will affect the next and how you can use different techniques, such as the ones we talked about in this article, to help mitigate the effects.

Oh, and always keep an eye out for how outbound traffic can get back at you so you don’t end up nuking yourself like we did! 😬


Why Timehop Chose Go to Replace Our Rails App

switch.png

Here at Timehop, we’ve been running Go in production for over a year and a half and have fallen in love with it. We’ve gone from having our median response time of 700ms with our Rails app to a 95th percentile response time of 70ms. We do this while serving over six million unique users every single day.

I thought I’d share the highlights of our conversation in hopes that it would be useful to other engineering teams considering Go.

What prompted you to first consider Go?

We originally built Timehop as, like many startups, a Rails app. Then we started growing extremely quickly and what we built in Rails couldn’t keep up.

A lot of what we do lends itself to being parallelized. Whenever a user opens the app, we gather data from all the years they have content. The queries we issue to our database are independent of each other, which makes them very easily parallelized. We tried making it work with Ruby, but ultimately, Ruby doesn’t have true multithreading and the abstractions to work around that limitation felt brittle.

When we set out to explore new languages we had three things at the top of our wish list: easy concurrency, true parallelism, and performance.

Why did Go win compared to the other languages you considered?

We looked at a few other languages (Node.js primarily), but ultimately Go had sold us on three major things:

  • Performance — Go code is compiled down to machine code. There is no VM or interpreter that adds overhead.
  • Static typing — Turns out computers are way better than humans at a whole class of errors when you know what type a variable is. Who knew?
  • Sane, readable concurrency — Goroutines and channels make concurrent code easy to read and reason about. Those constructs also make safe concurrent code without needing explicit locks. Also, no callback spaghetti.

Those were the initial points that really sold us on it. As time went on, we added to the list:

  • Dead-simple deployment — it compiles down to a single binary with all of its dependencies. More on that later.
  • Amazing toolchain — Go also includes tons of amazing tools, not the least of which is the code formatter`go fmt`. It has eliminated code formatting debates, and with it, an untold amount of wasted developer-hours.
  • Extremely robust standard library — we’ve found that we haven’t needed a ton of third party libraries. The libraries provided by the language are extremely well thought out.

Go checked off all of the boxes for our requirements — and then some. The additional benefits made it a clear winner in our book.

Were there any surprises — positive or negative — once you started using Go?

We were all hesitant about losing productivity in the switch. Ruby is a very expressive programming language which allowed us to write a lot of code quickly. We were concerned we’d lose that switching to a type safe, compiled language.

We didn’t need to be concerned. Very quickly after the switch we found ourselves writing Go code just as fast and a lot safer. Go’s type safety prevented a lot of the fat fingered mistakes that are all too common in Ruby.

Compiling our code also turned out not to be an issue — our largest Go app compiles in ~2.5 seconds at worst.

How did the team ramp up? What’s your advice to help teams go through this process smoothly?

TL;DR: Tour of GoEffective Go, and reading the standard library.

What are Go’s weaknesses?

Dependency management.

Go has a convenient import scheme where you include packages by their location, ie.

import “github.com/timehop/golog/log”

You can pull down the code with a simple “go get” command in your terminal. While convenient, it can also be deployment headache because it pulls the HEAD from the repo. This can get in the way of shipping a feature because someone else’s code changed locations or had breaking API changes.

The most popular dependency management tool right now is Godep. At a high level, Godep pulls all of your dependencies and then vendors them into your project — essentially copying your code into your project so that you always have a working copy of your dependencies.

It is worth mentioning that this weakness is by design. The creators of Go have specifically avoided building a dependency system because they wouldn’t know what a general solution would look like to others.

What Go libraries are critical to deployment on the modern web?

When we first started writing Go, we googled around for “Rails for Go.” We quickly realized that was overkill for building out JSON API.

All of our web services simply use the standard net/http library and Gorillamux for routing. Others seem to do the same.

What are options for hosting? How does deployment work?

We started on Heroku because our Rails app were hosted there as well. It was simple to deploy using the Go buildpack.

We eventually migrated to EC2 and deployment was just as easy. Go apps compile down to a single binary, so it can be as simple as scp-ing the binary to a box and running it.

Our current process is:

  1. Push code to Github
  2. Run tests via Travis
  3. On success, build the binaries, tar them, and upload to S3

The app servers simply have to pull down the tar, unpack it, and run it (we do this via Chef)

We haven’t needed to use this, but it also makes compiling binaries for different architectures as easy as:

$ GOOS=linux GOARCH=arm go build main.go

That means you could easily build your app for many types of operating systems and embedded devices.

Is the language suited for building APIs?

Yes. The Go encoding libraries make writing APIs stupidly simple. Take, for example, a User struct (which is similar to a class, but distinctly not):

type User struct {
  FirstName string `json:"first_name"`
  LastName string `json:"last_name"`
  Password string `json:"-"`
}

Those tags after those fields define how they’re going to be serialized out to JSON. No need for custom serialization functions.

To output an instance of a User, it is simply:

u := User{FirstName: "Abe", LastName: "Dino", Password: "p4ssw0rd"}
jsonBytes, _ := json.Marshal(u)

How does the language deal with polymorphism and modularization?

Go isn’t an object-oriented language — it doesn’t have any type hierarchy. While it initially took some getting used to, it quickly became a positive. Because of the lack of inheritance, it naturally encourages composition.

It also has really nice interface system. Unlike other languages where the class has to explicitly declare that it is implementing an interface, in Go, a struct simply is. It’s all very philosophical. We’ve found it to be one of the most powerful features of the language. You can read more about why here.

How important is Google’s involvement in the project?

It’s huge — having Google release and continue to invest in the language is great for everyone. It’s clear they’re using it internally and that can only mean continued improvement to the language, tools, and ecosystem.

In addition to the backing of Google, the community has grown by an insane amount since we started. It’s been amazing to see the community come alive, which has been a boon the amount of open source code and content now available in go.

Hopefully, this was helpful for teams trying to decide whether Go is worth a look. There are lots of questions whenever you consider a new language, and hopefully we’ve answered some of those here. We love writing Go and have had a ton of success doing so.