Eventual consistency — a further manifestation in Amazon SQS and its solution

A cloud is a large distributed system, whose design requires tradeoffs among competing goals. Notably, the CAP theorem, conjectured by Eric Brewer — a professor at UC Berkeley and the founder of Inktomi, governs the tradeoff. The Amazon cloud is designed to trade off consistency in favor of availability and tolerance to network partition, and it has adopted a consistency model called “Eventual consistency“. Following on my earlier article on manifestations of eventual consistency, I will describe another manifestation that we are able to observe in Amazon SQS and the techniques that we used to get around the problem.

The manifestation is around reading from Amazon SQS. On the surface, this is surprising because a read is normally not associated with consistency problems. To understand the problem, we have to look at what a read in SQS does. Amazon SQS has a feature called “visibility timeout”. When you read a message, the message disappears from the queue for a period specified by the “visibility timeout”, and if you do not explicitly delete the message, it would reappear in the queue after the timeout. This feature is designed for fault tolerance purposes, where even if a reader of a message dies in the middle of processing that message, another reader could take over the processing in the future. Because a read must hide the message for a period, it has to modify the state; thus, a read is also a “write” and potential consistency conflict could arise.

A concrete manifestation is that when two readers read from the same SQS queue at the same time, both may get the same message at the same time. If you read serially, it is safe to assume that you will only read each message once. Unfortunately, when you read in parallel, you have to handle duplicate messages in your application. How do you handle the duplicate depends on your application. In Cloud MapReduce, we handled in three different ways depending on the application requirement, all use SimpleDB, or other central data stores, to resolve the conflict. I believe the techniques we used are general enough that they can be used in other applications as well.

Case 1: Duplicate processing is ok.

If a message could be processed by two readers independently, then the solution is very easy: just do nothing. You may be wasting some computation cycles, but you do not need to do any special handling. In Cloud MapReduce, we read the map task messages from the input queue. Since a map task could be processed by two different readers twice, we do not do anything special.

Even if duplicate processing is ok, you may not want to see duplicate results coming from the two independent processings. So, you may want to filter the results to remove those duplicates. How to filter depends on your application, which may be as easy as sorting the output and removing the duplicate. In Cloud MapReduce, we write a commit message for each map task processing, and the consumer of the map output (the reducers) uses the commit messages to filter out duplicate results.

Case 2: One reader per queue.

Even if you are sure that a queue will only be processed by one reader, there is still a possibility that the reader may receive the same message twice. It happens when the reader uses multiple threads to read in parallel — a common technique to hide the long latency for SQS access. The solution is to tag each message when writing it into the queue; then, when the reader reads it, it keeps track of a list of tags that it has seen. If duplicate arises, the reader can easily tell that it has seen the same tag twice. In Cloud MapReduce, all reduce queues are processed by one reader only, and the above technique is exactly what we used to handle the message duplication problem.

Case 3: No duplicate processing allowed.

For some applications, it is simply not ok to have two readers processing the same message twice. This is the case for the reduce task messages from the master reduce queue in Cloud MapReduce, since a reduce task has to be processed by one and only one reader. The solution is to use a central data store to resolve the conflict. Each reader writes to a central store stating that it is processing a message, and it then reads back to see who else is also processing the same message. If a conflict is found, a deterministic resolution protocol is run to determine who should be responsible for processing the message. The resolution protocol has to be deterministic because two readers may run the protocol indepedently and they need to arrive at the same answer independently.

Even though a conflict happens rarely, the conflict resolution is quite expensive as it involves writing to and reading from a data store. It would be helpful to know when a conflict may be happening in order to reduce the number of times a reader needs to invoke conflict resolution. In Cloud MapReduce, we detect duplicate reduce messages by checking how many readers are working on the same queue. We keep track of how many messages are in each reduce queue. If two readers are working on the same reduce queue, neither can process all messages; thus, we know there is potentially a conflict.

Advertisements

Open sourcing Cloud MapReduce

After a lengthy review process, I finally received the approval to open source Cloud MapReduce — an implementation of MapReduce on top of the Amazon cloud Operating System (OS). It was developed as part of a  research project we have done at Accenture Technology Labs. This shows that Accenture is not only committed to using open source technology, but we are also committed to continue our contribution to the community.

MapReduce was first invented by Google in 2003 to cope with the challenge of processing an exponentially growing amount of data. In the same year the technology was invented, Google’s production index system was converted to MapReduce. Since then, it is quickly proven to be applicable to a wide range of problems. For example, there are roughly 10,000 MapReduce jobs written in Google by June 2007, and there are 2,217,000 MapReduce job runs in the month of September 2007.

MapReduce enjoyed wide adoption outside of Google too. Many enterprises are increasingly facing the same challenges of dealing with a large amount of data. They want to analyze and act on their data quickly to gain competitive advantages, but their existing technology could not keep up with the workload. MapReduce could be the perfect answer to address the challenge.

There are already several open source implementations of MapReduce. The most popular one is Hadoop. Recently, it has gained a lot of tractions in the market. Even Amazon is offering an Elastic MapReduce service which is providing Hadoop on-demand. However, even after 3 years of many engineer’s dedication, Hadoop still has many limitations. For example, Hadoop is still based on a master/slave architecture, where the master node is not only the scalability bottleneck, but it is also a single point of failure. The reason is that implementing a fully distributed system is very difficult.

Cloud MapReduce is not just another implementation — it is not a clone of Hadoop. Instead, it is based on a totally different concept. Hadoop is complex and inefficient because it is designed to run on bare-bone hardware; therefore, Hadoop has to implement many functionalities to make a cluster of servers appear as a single big server. In  comparison, Cloud MapReduce is built on top of the Amazon cloud Operating System(OS), using cloud services such as S3/SQS/SimpleDB. Even though a cloud service could be running on many servers behind the scene, Amazon presents a single big server abstraction, which greatly simplifies a MapReduce implementation.

By building on the Amazon cloud OS, Cloud MapReduce achieves three key advantages over Hadoop.

  • It is faster. In one case, it is 60 times faster than Hadoop (Actual speedup depends on the application and the input data).
  • It is more scalable and failure resistant. It is fully distributed and there is not a single point of bottleneck or a single point of failure.
  • It is dramatically simpler. It has only 3,000 lines of code, two orders of magnitude smaller than Hadoop.

All these advantages directly translate into lower cost, higher reliability and faster turn-around for enterprises to gain competitive advantages.

On the surface, it looks surprising that a simple implementation like Cloud MapReduce could outperform Hadoop. However, if you count in the efforts from hundreds of Amazon engineers, it is natural that we are able to develop a more scalable and higher performance system. Cloud MapReduce demonstrates the power of leveraging cloud services for application design.

Cloud MapReduce has an ambitious vision, so there are many areas that we are looking for help on from the community. Even though Cloud MapReduce was only developed on Amazon OS initially, we envision it will run on many cloud services in the future. For example, it could be ported to Windows Azure, filling a missing capability in Azure that there is no large-scale processing framework at all (Hadoop does not run in Azure). The ultimate goal is to run Cloud MapReduce inside a private cloud. We envision an enterprise would deploy similar cloud services behind the firewall, so that Cloud MapReduce can just build on top. There are already open source projects filling that vision, such as project Voldemort for storage and ActiveMQ for queuing.

Check out the Cloud MapReduce project. We welcome your contributions. Please join the Cloud MapReduce discussions to share you thoughts.

Eventual consistency, its manifestation in Amazon Cloud and how to design your applications around it

A cloud is such a large-scale system that its implementation requires tradeoffs in its design goals. The Amazon cloud has adopted a weaker consistency model called eventual consistency. It has implications on how we can architect and design applications to overcome its limitations. This post talks about the problems this weaker consistency model exposes and how we designed our applications around it.

Eric Brewer, professor at UC Berkeley and founder of Inktomi, conjectured that a large system such as a cloud can only achieve two out of three properties: data consistency, system availability and tolerance to network partition.  Being a public infrastructure, a cloud cannot sacrifice on system availability. Due to its scale and performance requirements, a cloud has to rely on a distributed implementation. Thus, it cannot sacrifice on its tolerance on network partitioning either. Therefore, a cloud has to trade off against data consistency and that is exactly what Amazon Cloud has done.

Eventual consistency states that when you write to and then read from a cloud immediately, the return result may not be exactly what you are expecting. However, the system guarantees that, “eventually” it will return what you expect it to return, but the time it takes is undeterministic.

In the Amazon implementation, the effects of eventual consistency is only noticeable on two fronts, both are in the SQS (Simple Queue Service) implementation.

  • When two clients are asking SQS to create the same queue at the same time, both could be notified that the queue has been created successfully. Normally if you create the same queue twice at different times, the second attempt will get an error message stating that the queue exists already. This could be a problem if your application relies on the fact that a queue can only be created once. In one of the applications we developed, we have to completely re-architect in order to get around the problem. Since the solution is application specific, I would not elaborate on how we solved the problem.
  • When a client reads from a SQS queue, especially when the queue is getting close to being empty, the queue may respond to say that it is empty even though there are still messages in it. This could be a problem for your application if your application thinks the queue is empty when it is not. Amazon documentation says, in most cases, you only need to wait for “a few seconds”, but it is unclear how many are “a few” since it depends on many factors, such as SQS load, data distribution and degree of replication etc.

We have devised a solution for the second problem. The solution is more widely applicable; hence, we describe it here in the hope that it is helpful to someone.

The idea is simple. A message producer could count how many messages it has generated for a particular queue so that the consumer for that queue would know how many messages to expect. In the solution we used for an application, we used Amazon SimpleDB to host this count data. Since we have many producers for each queue, we require each producer to write a separate row in SimpleDB, and all counts for a queue fall under the same column. A sample SimpleDB table is shown below:

queue1    queue2    queue3   ….
producer1    c_11      c_12          c_13
producer2    c_21      c_22         c_23
……

When the consumers start to process (may need a separate status to know all producers have stopped already), they can run a query against SimpleDB, such as

select queue1 from domain_name

The query will return a list of items. The consumers just need to iterate through the list and sum up all numbers under the same column (e.g., queue1). The sum is what the consumers should check. If the number of messages dequeued is less than the sum, the consumers should continue polling SQS. This solution avoids setting an arbitrary wait time, which either results in idle processing, or results in missed messages.

Hope this solution is helpful. Let me know if there are other symptoms of eventual consistency that I have missed.