Eventual consistency — a further manifestation in Amazon SQS and its solution

A cloud is a large distributed system, whose design requires tradeoffs among competing goals. Notably, the CAP theorem, conjectured by Eric Brewer — a professor at UC Berkeley and the founder of Inktomi, governs the tradeoff. The Amazon cloud is designed to trade off consistency in favor of availability and tolerance to network partition, and it has adopted a consistency model called “Eventual consistency“. Following on my earlier article on manifestations of eventual consistency, I will describe another manifestation that we are able to observe in Amazon SQS and the techniques that we used to get around the problem.

The manifestation is around reading from Amazon SQS. On the surface, this is surprising because a read is normally not associated with consistency problems. To understand the problem, we have to look at what a read in SQS does. Amazon SQS has a feature called “visibility timeout”. When you read a message, the message disappears from the queue for a period specified by the “visibility timeout”, and if you do not explicitly delete the message, it would reappear in the queue after the timeout. This feature is designed for fault tolerance purposes, where even if a reader of a message dies in the middle of processing that message, another reader could take over the processing in the future. Because a read must hide the message for a period, it has to modify the state; thus, a read is also a “write” and potential consistency conflict could arise.

A concrete manifestation is that when two readers read from the same SQS queue at the same time, both may get the same message at the same time. If you read serially, it is safe to assume that you will only read each message once. Unfortunately, when you read in parallel, you have to handle duplicate messages in your application. How do you handle the duplicate depends on your application. In Cloud MapReduce, we handled in three different ways depending on the application requirement, all use SimpleDB, or other central data stores, to resolve the conflict. I believe the techniques we used are general enough that they can be used in other applications as well.

Case 1: Duplicate processing is ok.

If a message could be processed by two readers independently, then the solution is very easy: just do nothing. You may be wasting some computation cycles, but you do not need to do any special handling. In Cloud MapReduce, we read the map task messages from the input queue. Since a map task could be processed by two different readers twice, we do not do anything special.

Even if duplicate processing is ok, you may not want to see duplicate results coming from the two independent processings. So, you may want to filter the results to remove those duplicates. How to filter depends on your application, which may be as easy as sorting the output and removing the duplicate. In Cloud MapReduce, we write a commit message for each map task processing, and the consumer of the map output (the reducers) uses the commit messages to filter out duplicate results.

Case 2: One reader per queue.

Even if you are sure that a queue will only be processed by one reader, there is still a possibility that the reader may receive the same message twice. It happens when the reader uses multiple threads to read in parallel — a common technique to hide the long latency for SQS access. The solution is to tag each message when writing it into the queue; then, when the reader reads it, it keeps track of a list of tags that it has seen. If duplicate arises, the reader can easily tell that it has seen the same tag twice. In Cloud MapReduce, all reduce queues are processed by one reader only, and the above technique is exactly what we used to handle the message duplication problem.

Case 3: No duplicate processing allowed.

For some applications, it is simply not ok to have two readers processing the same message twice. This is the case for the reduce task messages from the master reduce queue in Cloud MapReduce, since a reduce task has to be processed by one and only one reader. The solution is to use a central data store to resolve the conflict. Each reader writes to a central store stating that it is processing a message, and it then reads back to see who else is also processing the same message. If a conflict is found, a deterministic resolution protocol is run to determine who should be responsible for processing the message. The resolution protocol has to be deterministic because two readers may run the protocol indepedently and they need to arrive at the same answer independently.

Even though a conflict happens rarely, the conflict resolution is quite expensive as it involves writing to and reading from a data store. It would be helpful to know when a conflict may be happening in order to reduce the number of times a reader needs to invoke conflict resolution. In Cloud MapReduce, we detect duplicate reduce messages by checking how many readers are working on the same queue. We keep track of how many messages are in each reduce queue. If two readers are working on the same reduce queue, neither can process all messages; thus, we know there is potentially a conflict.

4 Responses to Eventual consistency — a further manifestation in Amazon SQS and its solution

  1. Tim Freeman says:

    I agree with the statement on page 6 of your “Cloud Mapreduce” technical report at http://code.google.com/p/cloudmapreduce/ that this behavior contradicts Amazon’s documentation. One of those documents is point 3 under “Amazon SQS Message Lifecycle” at http://aws.amazon.com/sqs/#details. Have you reported the bug? If so, what did Amazon say?

    It is not obvious to me that eventual consistency requires this to happen. Since SQS never promises to give you back all of the queue elements on a schedule, they’re already sacraficing availability, so it’s not obvious to me that they must also sacrafice consistency by sometimes reporting the same element to multiple readers.

    In particular, when it comes time to read a queue element, the element will be found on some set of machines. A transaction can be attempted among all of those machines to mark the queue element as recently-read. If the transaction fails, that element was not successfully read so it should not be returned from ReceiveMessage. If this algorithm is correct, then it is definitely a bug for Amazon to return the same queue element to muliple readers.

    • huanliu says:

      I do not think this is a bug, so I have not reported it. Keep in mind that they explicitly state that they are giving up consistency in favor of availability. To keep high availability and low latency, they cannot use any locking mechanism, but rather, they rely on reconciliation at a later time to bring the system back into consistency. In the read scenario you mentioned, they return the read data as soon as they found one copy on one machine. Even though they have to update the state to hide the message, they perform the update in the background so that they can return the data right away.

      The definition of “availability” is the most confusing part of the CAP theorem. By “availability”, I believe they mean whether you return anything at all, whether right or wrong. The only time it is not available is when you call the service and it returns 4xx. But this concept does not capture a much richer dimensions of tradeoffs involving persistence, latency, and throughput, which give rise to a lot of confusions around this topic.

  2. Tim Freeman says:

    I was unable to use Google to immediately find where Amazon states that they are giving up consistency in favor of availability for SQS. There were numerous claims of this on the web, but they all seemed to be quoting your Cloud MapReduce tech report. Can you point at where Amazon says this?

    • huanliu says:

      Werner Vogels — Amazon’s CTO — has several articles (e.g., his blog, which also appeared in Communications ACM) talking about eventual consistency. In those articles, you will see the arguments to have high availability, thus adopting a weaker consistency model. In terms of SQS specifically, I do not believe they talk about the underlying implementation (they never talk about any specific implementation ever). What I stated (e.g., no locking, return first copy as soon as possible) is my hypothesis on how they implemented SQS, which is not only consistent with their eventual consistency model, but it also matches what we observe in practice.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: