Open Source Cloud Stack

Sunday Nov. 15 2009

Potentially interesting links



  • Michael Kozuch (gave introduction) PE with Intel Labs at Pittsburgh.
  • Milind Bhandarkar (Hadoop)
  • Michael Ryan (Tashi)
  • Richard Gass (Zoni)

Why a cloud?

  1. Efficiency: admin costs aggregated
  2. Scalability: from 1 to 1000 servers in 10 seconds

Big data

  • Computing comes to the data: it is harder to move huge data to the computing center; move the computing to the data.


Intel, HP, Yahoo, working with UIUC. Also:
  • KIT (Germany, Karlsruhe Institute of Technology)
  • ISPRAS (Russian Academy of Sciences)
  • ETRI (Korea?)
  • IDS (Singapore)
  • MIMOS (Malaysia)

Federation of heterogeneous datacenters. Each site is managed independently.

Not sure exactly where each of these are located.

User access organized around research projects, led by a principal investigator (PI). PIs apply to each site independently.

Not suitable for "traditional" HPC applications; useful for investigating large dataset techniques.

  1. Uses HDFS (storage system under Hadoop) to access data.
  2. Virtual Machine allocation (AWS compatible: Tashi or Eucalyptus)
  3. Application services: Hadoop

Kozuch provided a general overview of the suite -- or stack -- of tools and technologies. They don't need each other, but they work nicely together.

Theme for the day: moving computation to the data, rather than moving the data to the machine that can do computing. The technologies discussed here are appropriate when this is the right solution for the problem at hand.

Relatively fast computations running on very large datasets are appropriate. But what if these computations produce very large outputs?

Slow calculations that require large computational resources (not just large data sets).


Miland Bhandarkar

The speaker was very difficult to understand: I lost about half of his sentences.

Hadoop is not Ian Foster's style of grid.

Hadoop has run on clusters up to 4000 nodes.

It doesn't have a good security system (coming "early next year"). Right now, separate clusters are used to isolate different security realms.

Miland presented several examples of what gets programmed in Hadoop. What they had in common is that the calculations were individually very simple, and each did not return a large amount of data.


Google's "breakthrough" with MapReduce was the fact that they found how to use it for almost all their calculations. The technique itself has been in Lisp, and other functional languages, since 1970's.

  1. Map: apply a function to each element of a list. Embarrassingly parallel.
  2. Reduce: apply a function to the entire resulting list, to give the result. Inherently sequential, unless processing multiple lists. Group the data to produce multiple lists!

With this technique, one can program a large number of machines to parallel process data, given only two sequential functions.

"Chunking" the data appropriately is important in order to reduce the overhead. Shipping the computation to the data is also important (as noted in the introduction). Dealing with individual process failures is also important.

Latest version: 0.20.x; stable is 0.18.x. This means they have not yet reached API stability. Version 1.0 is intended to come out "later next year". There are many subprojects. Maybe interesting is Pig, a language that drives the MapReduce algorithm for large data sets. See the Hadoop home page for details.

Bandwidth to data: The filesystem interface allows the system to discover what data is local to each system, so that the right processing job can be sent to the right data. This allows parallel reading across the ensemble of machines, each using local bandwidth which is greater than network. But what if our network is fast? Can it be fast enough for the network i/o speed not to matter?

On giant clusters, failures are routine. Jobs must be fault tolerant -- failing of one computation should not cause failure of the whole job. Hadoop supports a fault-tolerant store to protect against this. Common cases of failure don't destroy results or lose data. Of course, if half a cluster goes away, then the calculation will not continue. Common cases are dealt with, not every case. And the program will be slower if some of the resources fail. Reasonable (not perfect) guarantees are provided.

Hadoop gives up some performance (i.e., written in Java) to obtain some ease of development.

HDFS provides the data blocking. Default block size is 64 MB. HDFS provides the ability to tell where a specific data block is located (on what machine), so that the job can be sent to the data. It is aware of rack organization, so that it will avoid putting replicas of a block on the same rack as the original. HDFS namenodes keep track of the mapping of files (inodes) to their locations. They keep all the data in memory, and have to be robust; they form a single point of failure for the system. Hadoop also operates backup namenodes, in case of failure of the main namenode. Namenodes know to place more files on faster disks, and fewer files on slower disks. Datanodes hold the actual data, and send "heartbeats" and block reports back to the namenodes. The datanodes have the direct access to the data. They also report on data corruption, and the API the client uses allows discovery that data have been corrupted, and for finding a second datanode where the data are stored. Hadoop uses three-fold redundant storage of data. This data is write once, read many. There is not even append mode (which is coming in Hadoop version 0.21).

(Aside) Yahoo! sees about 1% node failure per month, so on their clusters, they see about 8 nodes failing per day!

(Aside) Data block size of 64MB by default: for CMS, this would be 30-200 events. For mu2e, how many events would this be? This is clearly a large block for the storage of "user ntuples". Can this replace the need for user ntuples?

File replication is dynamic; by default it is 3, but "hot" data can be replicated more to allow more parallel access, and "stale" data are replicated fewer times. Their backup solution is to have another HDFS system that replicates everything. This has been less expensive than trying to back up multi-petabyte dataset on tape!

HDFS is not a POSIX-compliant filesystem. But there is a command-line interface, and the ability to use FUSE to provide a write-once, read-many filesystem. The native method for accessing HDFS is through a Java API.

Hadoop MapReduce

Processing lists of elements (Hadoop records). Records are keys and values;
keys must be comparable and serializable, and values must be serializable.

Data flow: input -> map -> shuffle -> reduce -> output.

Example task:

cat /var/log/auth.log* | \ grep “session opened” | cut -d’ ‘ -f10 | \ sort | \ uniq -c > \ ~/userlist

Input to the map processes come from the HDFS; output from the map step gets grouped by key, and given to reducers. This is the step they call shuffle. The reduce output typically goes back to the HDFS. Intermediate files are kept on local storage, and are not available at the end of the job.

A common performance bottleneck is the shuffle step: if the output of the map step is large, the shuffle phase needs to be tuned to give good performance. Hadoop works to balance the load on the reducers; this is what shuffle is doing. It has many tunable parameters. Since this is sending data over the network it is often expensive.

Each reduce task is given a bunch of key -> List(value) pairs; their output takes each list of values and produces one summary value, returning key -> value pairs.

In the example tasks (looking for most common words in web pages, of course), it seems common for map tasks to be trivial: e.g. map turns (word, freq) -> (freq, word).

If there are a few keys with a large number of values, one needs to provide one's own partitioner to help balance the load. The default-provided Hadoop partitioner doesn't do so well with this. If the reducer can deal with processing less than the full output for a given key, Hadoop can deal with this situation automatically.

Each reducer writes to its own file. The output of a typical job is many files written out to HDFS.

Hadoop streaming

Hadoop is written in Java, but it allows the use of non-Java code. Hadoop streaming supports this. As long as your application reads from stdin, and writes to stdout, streaming can deal with it. People use Python and R and shell (among others). This is slower than Java because of the additional streaming overhead.

Page 47 in S06-2-of-4.pdf (slides from this Hadoop talk) gives an example, using shell scripts.

The contract of Hadoop reducers is that all the keys of one value have to be processed by the same reducer. Thus reducers can not finish until all mappers are done. However, reducers can start before mappers are done; in fact they are started as soon as mapper output is available.

MR architecture

The system runs a bunch of daemons that control jobs. No user-code runs in any of the daemons. These daemons launch the user-supplied mappers and reducers, and manage them, e.g. handling failures. They make sure the right jobs are launched on the right machines. The job tracker does polling to discover when jobs are done. The execution engine takes care to put jobs that need the same files onto the same machine, and on the same machine that have that data. For example, it tries to land reduce tasks on the nodes that are running the map tasks that produce that output. The task tracker does the communication and organization to make this all happen.

(Aside) User-code can go berserk, and cause havoc on the system, making the infrastructure fail. Virtualization should help solve this problem. We should be hearing about this later.

Reduce tasks are notified, by the job tracker, whenever a map task is finished. This is how data transfer gets done. The map task's output happens as soon as the map task emits enough output to have its buffer flushed (a disk spill). When the map task is done, it will have produced an output file with as many partitions as there are reducers to run on the output. There is some pipeline parallelism between the mappers and the reducers.

Pig (the language and execution engine)

Pig is a system for processing large semi-structured data sets using Hadoop. It consists of:

  1. Pig Latin: the procedural language (similar syntax to SQL, but quite different)
  2. Pig Engine: parser, optimizer and distributed query execution

Pig "really shines in scan-centric processing tasks", and in batch mode

It is easier to program in Pig than directly in Hadoop, and insulates against Hadoop complexity (especially against Hadoop API non-stability). Pig also handles job chains, which is the most common way of performing real tasks with map/reduce. Pig can reduce a several-hundred-line Hadoop program (in Java) to 10 or so lines of script. The result is typically 2 times slower than a direct implementation in Hadoop. The advantage is dramatically reduced development time for specific jobs. It is easy to plug in user code.

(Aside) Yahoo! considers a set of 100-200 million user ids and ages as a "small data set".

Pig data types:

  • tuple (ordered set of fields; can be nested)
  • bag (collection of tuples; entire data sets are typically bags)
  • map (set of key/value pairs; values can be tuples)
  • int, long, float, double, chararray (UTF-8), bytearray (blob)
  • null (same as SQL null: unknown or non-existent data)

Pig comes with a variety of data loaders, for reading files of specified types. The user can write his own data loaders.

In the example, it is shown again that each step in a process is often very simple. The speaker avoids having one step perform two operations; this seems silly in the very small examples given, but is claimed to be critical for very large samples. Also, Pig has intelligence for figuring out how many map/reduce jobs it has to run to get the output needed. It does not necessary execute a job for every step in the processing.

The Pig translator and executor performs some optimizations. It is not perfect, because the map/reduce plan implementation is not fully as clever as a human expert. Translation if done first to a "logical plan", and then to a "physical plan". The logical plan is a DAG of operators (nodes) and data flows (edges). Pig has the ability to stream records through a user-supplied binary.

The logical plan can be made into a physical plan for local-mode execution (for debugging), or a Hadoop map/reduce physical plan (for real execution). When the logical plan is translated to a physical plan, the places where map/reduce boundaries need to be are detected (group, cogroup, order, distinct ... operators). The physical plan parallelizes these operations. Most others are created in a 1:1 relation to the logical plan. Pig then creates a Job.jar file, and submits it to the JobControl facility of Hadoop. Execution is lazy; nothing gets generated until output is requested. The Pig optimizer can thus do in-memory pipelining, and filter re-ordering across multiple logical commands. Pig figures out, on the map size, what parallelism is possible. On the reduce side, Pig needs help (supplied by the parallel keyword) to know what to parallelize.


Michael Ryan, Intel

An infrastructure through which service providers are able to build applications that harness cluster computing resources to efficiently access repositories of Big Data.

Note: the slides for this talk, in PDF form, are rather poor; the live presentation had quite a bit of animation not in the PDF. Also, the speaker had added fairly many slides not present in the distributed PDFs. Much of this talk was buzzword-laden, and the speaker assumed that the audience was familiar with a wide variety of VM technologies and names. I did not even feel like the speaker ever gave a clear description of was Tashi was for. The advertising blurb above is what he provided. The introductory speaker explained better in one sentence: Tashi is for virtual machine allocation.

Tashi supports Xen, KVM, and Qemu virtualization packages. Xen does better with network bandwidth; KVM is faster for page-table manipulation.

Look into Tashi's concept of an instance, and the difference between persistent and non-persistent instances. One can have many instances non-persistent instances, but only one persistent instance, of a given image.

The speaker spent some time talking about performance measurements, showing DFS (distributed file system?) performance. But these slides were not in the PDF, and the graphs on the screen were unintelligible.

There are many slides giving installation and configuration instructions. This seems to be an extremely complicated thing to do -- there are many configurations to specify. There are many quirks -- like using the Python debug console for interacting with pickles for keeping resource allocations (or one can use MySQL). As usual for this talk, the complexities are not well-explained. But this part of the talk was of greater incoherence than the majority of the talk.

After the talk, and after the break, the speaker provided the URL for updated slides; this is:


Richard Gass, Intel

Note: as with the previous speaker, the slides provided on the memory stick are out-of-date and not at all what the speaker used.

Zoni is a "Physical Resourece Set (PRS) service".

Tashi lets you set up a virtual cluster; Zoni lets you manage the real hardware that is used to run Tashi (or other systems, like Eucalyptus, etc.). Allow for the provision of isolated "mini-datacenters". The goal is to reduce the complexity of allocating physical resources, and thus make the use of the resources more efficient and more fluid.

On the systems where Intel is trying this, there has been a problem with students "squatting" on resources: it takes weeks to allocate resources, so students don't want to give them up when they get them. Zoni is intended to make this better, and to stop the "squatting".

Can use IPMI to allow control of the physical platform.

"Everybody knows what PXE is, right?". (see

Zoni and Tashi are both clearly immature. Both "will be" open source; they're in "Apache incubator" status now. Zoni code is Python "the most recent version" -- except that it is Python 2.5, not 3.1 (or even 3.0, or 2.6). Python 2.5 was released on Sept 19, 2006. Python 2.6 was released Oct 1, 2008. Python 3.0 was released on Dec 3, 2008.