Eventual consistency — a further manifestation in Amazon SQS and its solution
March 2, 2010 4 Comments
A cloud is a large distributed system, whose design requires tradeoffs among competing goals. Notably, the CAP theorem, conjectured by Eric Brewer — a professor at UC Berkeley and the founder of Inktomi, governs the tradeoff. The Amazon cloud is designed to trade off consistency in favor of availability and tolerance to network partition, and it has adopted a consistency model called “Eventual consistency“. Following on my earlier article on manifestations of eventual consistency, I will describe another manifestation that we are able to observe in Amazon SQS and the techniques that we used to get around the problem.
The manifestation is around reading from Amazon SQS. On the surface, this is surprising because a read is normally not associated with consistency problems. To understand the problem, we have to look at what a read in SQS does. Amazon SQS has a feature called “visibility timeout”. When you read a message, the message disappears from the queue for a period specified by the “visibility timeout”, and if you do not explicitly delete the message, it would reappear in the queue after the timeout. This feature is designed for fault tolerance purposes, where even if a reader of a message dies in the middle of processing that message, another reader could take over the processing in the future. Because a read must hide the message for a period, it has to modify the state; thus, a read is also a “write” and potential consistency conflict could arise.
A concrete manifestation is that when two readers read from the same SQS queue at the same time, both may get the same message at the same time. If you read serially, it is safe to assume that you will only read each message once. Unfortunately, when you read in parallel, you have to handle duplicate messages in your application. How do you handle the duplicate depends on your application. In Cloud MapReduce, we handled in three different ways depending on the application requirement, all use SimpleDB, or other central data stores, to resolve the conflict. I believe the techniques we used are general enough that they can be used in other applications as well.
Case 1: Duplicate processing is ok.
If a message could be processed by two readers independently, then the solution is very easy: just do nothing. You may be wasting some computation cycles, but you do not need to do any special handling. In Cloud MapReduce, we read the map task messages from the input queue. Since a map task could be processed by two different readers twice, we do not do anything special.
Even if duplicate processing is ok, you may not want to see duplicate results coming from the two independent processings. So, you may want to filter the results to remove those duplicates. How to filter depends on your application, which may be as easy as sorting the output and removing the duplicate. In Cloud MapReduce, we write a commit message for each map task processing, and the consumer of the map output (the reducers) uses the commit messages to filter out duplicate results.
Case 2: One reader per queue.
Even if you are sure that a queue will only be processed by one reader, there is still a possibility that the reader may receive the same message twice. It happens when the reader uses multiple threads to read in parallel — a common technique to hide the long latency for SQS access. The solution is to tag each message when writing it into the queue; then, when the reader reads it, it keeps track of a list of tags that it has seen. If duplicate arises, the reader can easily tell that it has seen the same tag twice. In Cloud MapReduce, all reduce queues are processed by one reader only, and the above technique is exactly what we used to handle the message duplication problem.
Case 3: No duplicate processing allowed.
For some applications, it is simply not ok to have two readers processing the same message twice. This is the case for the reduce task messages from the master reduce queue in Cloud MapReduce, since a reduce task has to be processed by one and only one reader. The solution is to use a central data store to resolve the conflict. Each reader writes to a central store stating that it is processing a message, and it then reads back to see who else is also processing the same message. If a conflict is found, a deterministic resolution protocol is run to determine who should be responsible for processing the message. The resolution protocol has to be deterministic because two readers may run the protocol indepedently and they need to arrive at the same answer independently.
Even though a conflict happens rarely, the conflict resolution is quite expensive as it involves writing to and reading from a data store. It would be helpful to know when a conflict may be happening in order to reduce the number of times a reader needs to invoke conflict resolution. In Cloud MapReduce, we detect duplicate reduce messages by checking how many readers are working on the same queue. We keep track of how many messages are in each reduce queue. If two readers are working on the same reduce queue, neither can process all messages; thus, we know there is potentially a conflict.