How to run MapReduce in Amazon EC2 spot market

If you often run large-scale MapReduce/Hadoop jobs in Amazon EC2, you must have thought about using the spot market. EC2’s spot market price for a spot instance is typically 60+% less than that of an on-demand instance. For a large job, where you use many instances for many hours, a 60+% saving could be a substantial amount.

Unfortunately, using spot market has not been trivial. In exchange for the lower price, Amazon has your explicit agreement that they can terminate you at any time. This is a problem since you may lose all your work. A research paper from HotCloud last year showed that even adding more spot instances (not replacing existing nodes) could be detrimental to a running MapReduce job. In other words, you add more resources to your cluster, but your running time could actually be longer.

Beyond lengthening your computation, spot market could even make you lose your data. Existing MapReduce implementations, such as Google’s internal implementation or Hadoop, are designed with failure in mind already. However, the assumed scenario is a hardware failure, i.e., a small fraction of nodes may go down at any time. This assumption is not true in the spot market environment, where all nodes of a cluster may fail at the same time. You not only can lose all your states (when the master nodes go down), but you can also lose all your data (when nodes holding replicas for a piece of data all go down).

What about bidding for a really high price for your spot instances, and hoping that Amazon never increases the price that high? Unfortunately there is no guarantee on how high the spot market price could be. There are several occasions last year where the spot instances price actually exceeded the on-demand instances. This is likely because some guys were bidding at a high-than-on-demand-instance price, and Amazon really needed to kill those instances to free up capacity.

While the naive approach of bidding at a high price may not work, I am happy to report that there is a new technique that can help you leverage spot market to save money. We recently developed a MapReduce implementation that could tolerate large-scale node failures (e.g., when your bid price is below Amazon’s spot price). Even if all nodes in your cluster are terminated, we can guarantee that no state is lost, and that you can continue make forward progress when your cluster comes back online (e.g., when your bid price is higher than Amazon’s spot price).

Our implementation leverages two key things. First, when Amazon terminates your instance, it is not a hard power off. Instead, it is a soft OS shutdown, where you have a couple of minutes to execute your shutdown script. We modified our shutdown script where we save the current progress and generate a new task for the remaining work so that another node can take over in the future. In other words, we use on-demand checkpointing to save states only when needed.

Second, we constantly save intermediate data in order to minimize the volume of state we have to save in the shutdown phase. Our solution is built on Cloud MapReduce, which constantly streams intermediate data out of the local node. In comparison, other MapReduce implementations, such as Hadoop, save all intermediate data locally before a task finishes. This could result in too large a dataset to save during the short shutdown window.

I would not belabor the details of our implementation, except mentioning that it was published last week at USENIX HotCloud conference. You can read the Spot Cloud MapReduce paper for the full details.

Introducing Cloud MapReduce on Appistry

When we introduced Cloud MapReduce (CMR), many people were very interested in the technology. However, some would not consider using the technology because it only runs in the Amazon environment. Even though Amazon cloud is popular among startups and SMBs, large enterprises are often concerned with data security and privacy; and thus, they are not willing to put their data outside their firewall.

For those not familiar with Cloud MapReduce, it is an implementation of the MapReduce programming model popularized by Google. CMR innovates by employing a different architecture, which results in a number of nice advantages over other implementations, such as Hadoop and Google’s own implementation.

Taking these clients’ feedback into consideration, we have been securely working on the next version of Cloud MapReduce. I am happy to announce that we now have a version of Cloud MapReduce that runs inside an enterprise. This version is jointly developed with Appistry — a company focused on building the cloud platform for enterprises. You can read Appistry’s announcement, which is also covered by GigaOM and New York Times. Appistry’s product can provide a full stack of cloud services within an enterprise, just like what a public cloud offers. The capabilities provided by the Appistry platform include:

  • Compute service. This is similar to a cloud compute service such as Amazon EC2. But, instead of getting a virtual machine, a user can submit a task for execution in Appistry’s Fabric (their term for the cluster of servers under management). Unlike virtual machine based cloud offerings, Appistry Fabric can guarantee a task to be successfully executed through mirroring and checkpointing.
  • Storage service. Appistry offers two storage services:
    • Similar to Amazon S3, Appistry’s CloudIQ storage product offers reliable key-value pair storage. It transparently handles failure through replication.
    • In addition, Appistry also offers a tuple space-oriented storage called FAM (Fabric Accessible Memory). Although the data model is different, it can serve the same purpose as the Amazon SimpleDB service.
  • Communication service. Similar to Amazon SQS, Appistry also offers a queue service. The queue is stored entirely in memory; thus, the queue size is limited by the aggregate memory size across all servers in the fabric. Even though in memory, it is reliable through replication.

You will notice that there is a one-to-one mapping from Amazon services to Appistry services, that makes it easy for us to port Cloud MapReduce to run on top of the Appistry’s platform.

In addition to giving customers a choice on whether they run CMR inside or outside their firewall. The latest CMR also offers several new capabilities:

  • Locality optimization. One of the concerns of the early CMR is that it uses network exclusively for I/O. Given the limited network bisection bandwidth in today’s data center networks, this could limit the throughput. In Appistry, a server in the Fabric provides both the storage service and the compute service. By leveraging their affinity primitive, we try to place the computation on the server that holds the corresponding data; thus, fully leverage the local hard disk bandwidth.
  • Streaming support. MapReduce is designed for batch applications, but with simple modification, it can support continuous streaming applications and incremental batch applications. We made a modification to the original CMR where we store the intermediate data in a key-value storage instead of in the queues, and only keep a pointer to the blob in the queues. This change makes it really easy to keep intermediate data around when you want to do incremental MapReduce later. Our design is similar in concept to the HOP work, but it is using a different architecture and it is implemented on a commercial grade.
  • Rolling window support. Many time series applications compute statistics in a rolling window fashion, e.g., compute average for the last 30 minutes. In the near future, we will support rolling window update. Because we use a push iterator interface, we can easily take out results contributed by older data and add in results contributed by newer data.

With the Appistry support, you can now run CMR in several setups.

  • You can run CMR inside your firewall on top of the Appistry platform.
  • You can run CMR outside your firewall in any cloud, such as Amazon EC2 or GoGrid, on top of the Appistry platform. You benefit from Appistry’s management platform and the locality optimization we discussed above.
  • You can run CMR natively on Amazon EC2. You benefit from the native integration and Amazon’s cheap pay-per-use storage without running your own storage cluster.

We are in the process of writing up what we have developed into a research paper, where we will be able to discuss more technical details beyond what we can in this blog. Stay tuned if you are technical inclined.

Eventual consistency — a further manifestation in Amazon SQS and its solution

A cloud is a large distributed system, whose design requires tradeoffs among competing goals. Notably, the CAP theorem, conjectured by Eric Brewer — a professor at UC Berkeley and the founder of Inktomi, governs the tradeoff. The Amazon cloud is designed to trade off consistency in favor of availability and tolerance to network partition, and it has adopted a consistency model called “Eventual consistency“. Following on my earlier article on manifestations of eventual consistency, I will describe another manifestation that we are able to observe in Amazon SQS and the techniques that we used to get around the problem.

The manifestation is around reading from Amazon SQS. On the surface, this is surprising because a read is normally not associated with consistency problems. To understand the problem, we have to look at what a read in SQS does. Amazon SQS has a feature called “visibility timeout”. When you read a message, the message disappears from the queue for a period specified by the “visibility timeout”, and if you do not explicitly delete the message, it would reappear in the queue after the timeout. This feature is designed for fault tolerance purposes, where even if a reader of a message dies in the middle of processing that message, another reader could take over the processing in the future. Because a read must hide the message for a period, it has to modify the state; thus, a read is also a “write” and potential consistency conflict could arise.

A concrete manifestation is that when two readers read from the same SQS queue at the same time, both may get the same message at the same time. If you read serially, it is safe to assume that you will only read each message once. Unfortunately, when you read in parallel, you have to handle duplicate messages in your application. How do you handle the duplicate depends on your application. In Cloud MapReduce, we handled in three different ways depending on the application requirement, all use SimpleDB, or other central data stores, to resolve the conflict. I believe the techniques we used are general enough that they can be used in other applications as well.

Case 1: Duplicate processing is ok.

If a message could be processed by two readers independently, then the solution is very easy: just do nothing. You may be wasting some computation cycles, but you do not need to do any special handling. In Cloud MapReduce, we read the map task messages from the input queue. Since a map task could be processed by two different readers twice, we do not do anything special.

Even if duplicate processing is ok, you may not want to see duplicate results coming from the two independent processings. So, you may want to filter the results to remove those duplicates. How to filter depends on your application, which may be as easy as sorting the output and removing the duplicate. In Cloud MapReduce, we write a commit message for each map task processing, and the consumer of the map output (the reducers) uses the commit messages to filter out duplicate results.

Case 2: One reader per queue.

Even if you are sure that a queue will only be processed by one reader, there is still a possibility that the reader may receive the same message twice. It happens when the reader uses multiple threads to read in parallel — a common technique to hide the long latency for SQS access. The solution is to tag each message when writing it into the queue; then, when the reader reads it, it keeps track of a list of tags that it has seen. If duplicate arises, the reader can easily tell that it has seen the same tag twice. In Cloud MapReduce, all reduce queues are processed by one reader only, and the above technique is exactly what we used to handle the message duplication problem.

Case 3: No duplicate processing allowed.

For some applications, it is simply not ok to have two readers processing the same message twice. This is the case for the reduce task messages from the master reduce queue in Cloud MapReduce, since a reduce task has to be processed by one and only one reader. The solution is to use a central data store to resolve the conflict. Each reader writes to a central store stating that it is processing a message, and it then reads back to see who else is also processing the same message. If a conflict is found, a deterministic resolution protocol is run to determine who should be responsible for processing the message. The resolution protocol has to be deterministic because two readers may run the protocol indepedently and they need to arrive at the same answer independently.

Even though a conflict happens rarely, the conflict resolution is quite expensive as it involves writing to and reading from a data store. It would be helpful to know when a conflict may be happening in order to reduce the number of times a reader needs to invoke conflict resolution. In Cloud MapReduce, we detect duplicate reduce messages by checking how many readers are working on the same queue. We keep track of how many messages are in each reduce queue. If two readers are working on the same reduce queue, neither can process all messages; thus, we know there is potentially a conflict.

Google’s MapReduce patent and its impact on Hadoop and Cloud MapReduce

It is widely covered that Google finally received its patent on MapReduce, after several rejections. Derrick argued that Google would not enforce its patent because Google would not “risk the legal and monetary consequences of losing any hypothetical lawsuit“. Regardless of its business decision (whether to risk or not), I want to comment on the technical novelty aspects. Before I proceed, I have to disclaim that I am not a lawyer, and the following does not constitute a legal advice. It is purely a personal opinion based on my years of experience as a consulting expert in patent litigations.

First of all, in my view, the patent is an implementation patent, where it covers the Google implementation of the MapReduce programming model, but not the programming model itself. The independent claims 1 (system claim) and 9 (method claim) both describe in details the Google implementation including the processes used, how the operators are invoked and how to coordinate the processing.

The reason that Google did not get a patent on the programming model is because the model is not novel, at least in legal terms (that is probably why the patent took so long to be granted). First, it borrows ideas from functional programming, where the idea of “map” and “reduce” has been around for a long time. As pointed out by the database community, MapReduce is a step backward partly because it is “not novel at all — it represents a specific implementation of well known techniques developed nearly 25 years ago”. Second, the User Defined Function (UDF) aspect is also a well known idea in the database community,  which has been implemented in several database product before Google’s invention.

Even though it is arguable whether the programming model is novel in  legal terms, it is clear to me that the specific Google implementation is novel. For example, the fine grain fault tolerance capability is clearly missing in other products. A recent debate on MapReduce vs. DBMS would shed light on what aspects of MapReduce is novel, see CACM articles here, and here, so I would not elaborate further.

Let us first talk about what the patent means to Cloud MapReduce. The answer is: Cloud MapReduce does not infringe. The independent claims 1 and 9 state that “the plurality of processes including a master process, for coordinating a data processing job for processing a set of input data, and worker processes“. Since Cloud MapReduce does not have any master node, it clearly does not infringe. Cloud MapReduce uses a totally different architecture than what Google described in their MapReduce paper, so it only implements the MapReduce programming model, but does not copy the implementation.

For Hadoop, my personal opinion is that it infringes the patent, because Hadoop exactly copies the Google implementation as described in the Google paper. If Google enforces the patent, Hadoop can do several things. First, Hadoop can find an invalidity argument, but I personally think it is hard. The Google patent is narrow, it only covers the specific Google implementation of MapReduce. Given how widely MapReduce is known, if there were a similar system, we would have known about it by now. Second, Hadoop could change its implementation. The patent claim language includes many “wherein” clauses. If Hadoop does not meet any one of those “wherein” clauses, it can be off the hook. The downside, though, is that a change in implementation could introduce a lot of inefficiencies. Last, Hadoop can adopt an architecture like Cloud MapReduce‘s. Hadoop is already moving in this direction. The latest code base moved HDFS into a separate module. This is the right move to separate out functions into independent cloud services. Now only if Hadoop can implement a queue service, Cloud MapReduce can port right over :-).

Top five reasons you should adopt Cloud MapReduce

There are a lot of MapReduce implementations out there, including the popular Hadoop project. So why would you want to adopt a new implementation like Cloud MapReduce? I list the top five reasons here.

1. No single failure point.

Almost all other MapReduce implementations adopted a master/slave architecture as described in Google’s MapReduce paper. The master node presents a single point of failure. Even though there are secondary nodes, failure recovery is still a hassle at best. For example, in the Hadoop implementation, the secondary node only keeps a log. When the primary master fails, you have to bring back up the primary, then replay the log file in the secondary master. Many enterprise clients we work with simply cannot accept a single point of failure for their critical data.

2. Single storage location.

When running MapReduce in a cloud, most people store their data permanently in the cloud storage (e.g., Amazon S3), and copy over their data to the Hadoop file system before they start the analysis. The copy stage not only wastes valueable time, but it is also a hassle to maintain two copies of the same data. In comparison, Cloud MapReduce stores everything in a single location (e.g., Amazon S3) and all accesses during analysis go directly to the storage location. In our test, Amazon S3 can sustain a high throughput and it is not a bottleneck in analysis.

3. No cluster configuration.

Unlike other MapReduce implementations, you do not have to setup a cluster first, e.g., setup a master and then add in slaves. You simply launch a number of machines and each will be working away on the job. Further, there is no hassle when you need to dynamically reconfigure your cluster. If you feel the job progress is too slow, you can simply launch more machines, and they will join the computation right away. No complicated cluster reconfiguration is needed.

4. Simple to change.

Some applications do not fit the MapReduce programming model. One can try to change the application to fit the rigid programming model, which will result in either inefficiency or complicated change or setup on the framework (e.g., Hadoop). With Cloud MapReduce, you can easily change the framework to suit your needs. Since there are only 3,000 lines of code, it is easy to change.

5. Higher performance.

Cloud MapReduce is faster than Hadoop in our study. The exact speed up really depends on the application. In one representative case, we saw a 60x speedup. This is neither the maximum nor the minimum speedup you can get. We could massage the data (e.g., having more and even smaller files) to show a much bigger speedup, but we decide to make the experiment more realistic (uses the “reverse index” application — the application the MapReduce framework was designed for — and a public set of data to enable easy replication). One may argue that the comparison is unfair becasue Hadoop is not designed to handle small files. It is true that we can apply bandit to Hadoop to close the gap, but the experiment is really a scaled down version of a large-scale test with many large files and many slave nodes. The experiment highlights a bottleneck in the master/slave architecture that you will eventually encounter. Even without hitting the scalability bottleneck, Cloud MapReduce is faster than Hadoop. The detailed reasons are listed in the paper.

    Open sourcing Cloud MapReduce

    After a lengthy review process, I finally received the approval to open source Cloud MapReduce — an implementation of MapReduce on top of the Amazon cloud Operating System (OS). It was developed as part of a  research project we have done at Accenture Technology Labs. This shows that Accenture is not only committed to using open source technology, but we are also committed to continue our contribution to the community.

    MapReduce was first invented by Google in 2003 to cope with the challenge of processing an exponentially growing amount of data. In the same year the technology was invented, Google’s production index system was converted to MapReduce. Since then, it is quickly proven to be applicable to a wide range of problems. For example, there are roughly 10,000 MapReduce jobs written in Google by June 2007, and there are 2,217,000 MapReduce job runs in the month of September 2007.

    MapReduce enjoyed wide adoption outside of Google too. Many enterprises are increasingly facing the same challenges of dealing with a large amount of data. They want to analyze and act on their data quickly to gain competitive advantages, but their existing technology could not keep up with the workload. MapReduce could be the perfect answer to address the challenge.

    There are already several open source implementations of MapReduce. The most popular one is Hadoop. Recently, it has gained a lot of tractions in the market. Even Amazon is offering an Elastic MapReduce service which is providing Hadoop on-demand. However, even after 3 years of many engineer’s dedication, Hadoop still has many limitations. For example, Hadoop is still based on a master/slave architecture, where the master node is not only the scalability bottleneck, but it is also a single point of failure. The reason is that implementing a fully distributed system is very difficult.

    Cloud MapReduce is not just another implementation — it is not a clone of Hadoop. Instead, it is based on a totally different concept. Hadoop is complex and inefficient because it is designed to run on bare-bone hardware; therefore, Hadoop has to implement many functionalities to make a cluster of servers appear as a single big server. In  comparison, Cloud MapReduce is built on top of the Amazon cloud Operating System(OS), using cloud services such as S3/SQS/SimpleDB. Even though a cloud service could be running on many servers behind the scene, Amazon presents a single big server abstraction, which greatly simplifies a MapReduce implementation.

    By building on the Amazon cloud OS, Cloud MapReduce achieves three key advantages over Hadoop.

    • It is faster. In one case, it is 60 times faster than Hadoop (Actual speedup depends on the application and the input data).
    • It is more scalable and failure resistant. It is fully distributed and there is not a single point of bottleneck or a single point of failure.
    • It is dramatically simpler. It has only 3,000 lines of code, two orders of magnitude smaller than Hadoop.

    All these advantages directly translate into lower cost, higher reliability and faster turn-around for enterprises to gain competitive advantages.

    On the surface, it looks surprising that a simple implementation like Cloud MapReduce could outperform Hadoop. However, if you count in the efforts from hundreds of Amazon engineers, it is natural that we are able to develop a more scalable and higher performance system. Cloud MapReduce demonstrates the power of leveraging cloud services for application design.

    Cloud MapReduce has an ambitious vision, so there are many areas that we are looking for help on from the community. Even though Cloud MapReduce was only developed on Amazon OS initially, we envision it will run on many cloud services in the future. For example, it could be ported to Windows Azure, filling a missing capability in Azure that there is no large-scale processing framework at all (Hadoop does not run in Azure). The ultimate goal is to run Cloud MapReduce inside a private cloud. We envision an enterprise would deploy similar cloud services behind the firewall, so that Cloud MapReduce can just build on top. There are already open source projects filling that vision, such as project Voldemort for storage and ActiveMQ for queuing.

    Check out the Cloud MapReduce project. We welcome your contributions. Please join the Cloud MapReduce discussions to share you thoughts.

    Cloud MapReduce — MapReduce built on a cloud operating system

    We have finally finished a cool project — Cloud MapReduce — that we have been working on on-off for almost the whole past year. It is a new MapReduce implementation built on top of a cloud operating system. I described what is a cloud operating system before. We looked hard to understand how different is a cloud operating system (OS) from a traditional OS.  I think we have found the key difference — a cloud OS’s scalability. Unlike a traditional OS, a cloud OS has to be much more scalable because it must manage a large infrastructure (much bigger than a PC) and it must serve many customers.  By exploiting a cloud OS’ scalability, Cloud MapReduce achieves three advantages against other MapReduce implementations, such as Hadoop:

    Faster: Cloud MapReduce is faster than Hadoop, up to 60 times in one case.

    More scalable: No single point of bottleneck, i.e., no single master node that coordinates everything. It is a fully distributed implementation.

    Simpler: Only 3000 lines of Java code. Which means it is very easy to change it to suit your needs. Have you ever thought about changing Hadoop? I got a headache even thinking about the 280K lines of code in Hadoop.

    I encourage you to read the Cloud MapReduce technical report to learn more about what we have done.