Introduction

When the data cannot be handled efficiently on a single computer, one has to rely on a distributed system which consists in several computers interconnected by a network. The main difference between a distributed system and a multi-core/CPU system is that in the former, the memory is not shared directly between threads in a single process. On the contrary, several processes are running (on different computers) and communicate with messages (even if this communication can be implicit when using high level API). Notice that distributed computing is sometimes reserved for tightly coupled machines (with very fast network for instance) and opposed to cluster computing where the machines can be loosely coupled. I tend to avoid this distinction.

The main advantage of distributed systems over shared memory systems is the reduced risk of inconsistent behavior induced by the explicit aspect of sharing (which is done via message passing). However, this is probably the only advantage as distributed systems are very difficult to master and have significant overhead compared to SMP systems. As such, numerous API have been proposed in order to ease programming (to some extent).

At the low level, programming for a distributed system means having some way of knowing the structure of the system (e.g., how many computing nodes are available) and some communication means. At a higher level, one can hope to describe a complex task as a set of simple tasks with dependencies and then let a framework turn this description into a fast distributed program (this could apply to SMP).

Some API and programming models

  • API:
    Parallel Virtual Machine (PVM)
    PVM is the ancestor of parallel programming API designed for distributed systems. It was created in the late 80's and is based on message passing. It is still used but as been somehow superseded by MPI.
    Message Passing Interface (MPI)
    MPI is standard created in the 90's. It focuses on message passing and can be seen as a communication protocol.
  • programming paradigm:
    Map Reduce
    Map Reduce is a quite simple paradigm which was popularized by Google in 2004 (DeanGhemawat2004MapReduce). It was initially the main computational model for the Hadoop framework. It suffers from major performance problems in many situations.
    Spark
    Spark is one of the recent paradigms that address Map Reduce limitations.

Message Passing Interface

MPI (Message Passing Interface) is the current standard for high performance computing. It has commercial and open source implementations that are generally optimized for some hardware. Interestingly, MPI implementations are sufficiently optimized to be used also for shared memory computers. In fact the first version of MPI was limited to message passing, but starting from the second version (in the late 90's), distributed memory operations are available. They can easily be implemented as shared memory operations.

The API of MPI is very rich and of quite low level. It can be seen as the distributed equivalent of low level thread API for SMP systems. A MPI program consists in several processes that are mapped to possibly different computers. The API allows processes to communicate in various ways:

  • point to point direct communication
  • broadcasting (one to all)
  • reducing (all to one)
  • etc.

Communication is used to synchronize processes via rendez-vous (as part of point to point communication) and barrier (global rendez-vous). Communications can be organized in a quite convenient way with some form of virtual topology.

The API corresponds to a library to which some runtime tools are associated. As in the case of threads based programs (e.g. openMP), one starts a unique program that is replicated on the different machines that form the cluster. The runtime configuration can be quite complex as the user must have access to all the machines used in the distributed system. Some form of distributed identification has therefore to be in place. In addition, having a shared file system is recommended. Finally, firewall can introduce a lot of problems and it is recommended to avoid using filtering inside the cluster.

MPI and R

There are essentially two packages that provide some direct access to MPI under R, Rmpi and pdbMPI. Both are intended to allow one to program for a MPI cluster with R and thus do not build complex abstraction over it. Programming in R for MPI is arguably simpler than in C because in the latter one has to define e.g. types for messages. In addition part of the communications is automatically handled by the R wrappers.

A very basic example with Rpmi is given below:

library(Rmpi)
## starts 8 R processes that will execute in parallel
mpi.spawn.Rslaves(nslaves=8)
## ask each slave to execute some code
## mpi.comm.rank() returns the number of the slave
mpi.remote.exec(mpi.comm.rank()^2)
## quit R after shutting down the slaves
mpi.quit()

As seen in this simple example, the Rmpi API provides a simple way to execute code on the remote processes and therefore a foreach like approach is always possible.

In fact, there is a doMPI backend for foreach that allows to leverage a MPI cluster directly with foreach without modifying the R code. However, because of the operational complexity of MPI, additional code will be needed. For instance, there is not direct registerDoMPI function. One needs for instance to perform the following initialization and cleanup code:

library(doMPI)
cluster <- startMPIcluster(count=8)
registerDoMPI(cluster)
## the main code goes here
closeCluster(cluster)
mpi.finalize()

Map reduce

Map reduce is a simple parallel processing paradigm proposed by Google in 2004 (DeanGhemawat2004MapReduce). The novelty does not lay in the idea of separating computation between map operations and reduce operations (this is fairly standard and was available in 1995 in MPI) but rather in the underlying implementation.

High level principles

The base idea of Map Reduce is to decompose a computational task into map operations and reduce operations:

map
the map operation is applied to some local data by each computational node
reduce
the reduce operation is applied to groups of data produced by the map operations

More specifically, map reduce operates on indexed data. Each data point consists in a pair (key, value). The map operation takes one pair and outputs a list of pairs. The reduce operation takes one key and a set of values associated to that key, resulting from map operations. It outputs a list of values (with no key).

The classical example is the word counting one:

  • the map operation takes a document as input (with the key of the document, for instance its title) and outputs a list of pairs (word, 1);
  • the reduce operation takes as input a word (keys from the map step) and all associated values and outputs the sum of the obtained values.

This can be obviously improved by having the map operation to count words. This can be somewhat automated by a Combiner functionality in Google's map reduce.

Some examples

Co-buying
assume we have shopping carts (orders) given as sets of items. The goal is to count the pairs of items (for correlation analysis). Two solutions
  1. simple pairwise:
    • map each set to a series of 1 keyed by item pairs
    • reduce on pairs
  2. hashmap based:
    • map each set to a series of hashmaps. Each hashmap maps items to 1 and is keyed by another item
    • reduce emits pairs
Conditional statistics
assume we have records with two variables: one integer (or nominal) variable \(X\) and one numerical variable \(Y\). The goal is to compute some statistics on \(Y\) conditionally on \(X\). It could be for instance the average revenue based on age. A possible solution:
  • map each record to the value of \(Y\) keyed by \(X\)
  • reduce on \(X\) and compute the statistics

Design principle

While Map Reduce is frequently presented as a general purpose paradigm for distributed system (something that it is not), its main interest resides in the features of its implementations that are quite different from what is expected for a typical MPI cluster, for instance. In addition, its design is quite specific and in a way is taking a very different path from the one taken by MPI. One could say that MPI targets computationally intensive tasks while map reduce is more adapted to data intensive tasks. In particular, map reduce is adapted to situations where each datum is processed once while MPI is well suited for iterative algorithms, for instance. Map reduce is also designed to take advantage of local storage while MPI is generally built on the idea of a distributed storage.

In practice Map reduce is designed to be run on a very large number of machines that provide a distributed storage. Each machine should be aware of what it stores locally. When a map reduce job is run on the cluster, maps are performed locally: each machine is given a copy of the map task and will run it on the data is stores. Then a shuffle operation allows partial results to be shared and transferred from machine to machine in an optimized way. Reduce operations are handled by idle machines using the shuffling facility.

It should be noted that in theory, map reduce could be run on some computers that do not implemented a distributed storage. However, the main advantage of map reduce is the automated leveraging of locality which cannot be done if the computational backend is not aware of the fine details of the storage backend.

As work distribution and communications are handled by the map reduce framework, a typical implementation can be fault tolerant. The original Google use case targeted a loosely integrated cluster of standard computers with a standard network. In such a cluster, machine failure is common, a fact that motivated this fault tolerance feature.

Hadoop MapReduce

Hadoop is an open source framework for distributed storage and distributed processing. It provides in particular:

Hadoop Distributed File System (HDFS)
distributed storage component
Hadoop MapReduce
distributed processing

As explained above, Map reduce is only efficient if it has some knowledge of data location. This is why the execution engine of Hadoop is tightly coupled with its storage engine.

Hadoop is implemented in Java and provides therefore a native Java API for MapReduce. This is basically done by implementing a Mapper interface and a Reducer interface. The main way to use other languages instead of Java is via the Hadoop Streaming approach which provides a basic text based interface.

R and Map Reduce

There are several ways to use a Hadoop cluster from R:

RHIPE
this package belongs to the Tessera framework (which contains 2 other packages, datadr and Trelliscope). The interface of RHIPE is very similar to the original Java interface from Hadoop. It provides utility functions to get inputs from the Hadoop cluster and to send results to it. The central piece of code is provided by a map expression (written in R) and a reduce expression (which can be complemented by some pre and post processing expressions).
RHadoop
this is a collection of five R packages which provide an interface to Hadoop. The map reduce package uis rmr. The interface provided by RHadoop is arguably of a slightly higher level than the one from RHIPE. In particular, rmr uses standard R functions rather than expressions. The expected interface for map functions is of the form function(k,v) (for key k and value v), while the one for reduce function is of the form function(k,vv) again for key k but for a list of values vv. The function keyval is used to output results from the map and reduce functions. A simple example:
library(rmr2)
results <- mapreduce(input = "/user/fabrice/census-income.data",
		     input.format =
			 make.input.format(format="csv",
					   sep=",",
					   dec="."),
		     map = function(k,v) keyval(v[,1],v[,6]),
		     reduce = function(k,vv) keyval(k,mean(vv)))
HadoopStreaming
this package offers a much more bare bone interface to Hadoop map reduce. It eases the development of R scripts that can then be used in a Hadoop cluster through the streaming facility (the one that allows using something else than Java to write map reduce jobs). No abstraction of map reduce is actually included in this package.

Notice than none of those packages helps administering the Hadoop cluster.

Spark

Spark is an advanced processing paradigm that overcomes many of the limitations of map reduce. One of the main idea of Spark is the introduction of the notion of a working set of data on which operations are conducted. This eases the implementation of both iterative methods and interactive analyses, while improving a lot their performances over a map reduce implementation.

Spark is implemented in Scala, a very high level functional and object oriented programming language. Spark has also a Java and a Python interface that are quite close to the Scala one. It offers also an official R interface.

Resilient Distributed Datasets (RDD)

The concept of working set is implemented by the Resilient Distributed Dataset (RDD). A RDD is a read only collection of objects that are distributed on the underlying cluster. In general, a Spark program start by getting a RDD from a distributed storage and then produces a series of RDD via transformation operators. A very interesting aspects of RDD is that they can be kept in the main memory of the computers of the cluster (this is called caching) which generally improves (a lot) the performances.

The term resilient refers to the fact RDD can be recomputed from their starting point in case of a machine failure.

Operations on RDD

A spark program consists in performing operations on RDD. Typical operations include:

mapping
apply a transformation to each object of the RDD to built another object (in another RDD)
flatMap
similar to map in map reduce (man one object to possibly many objects)
join
the join operation from data bases

None of those operations is actually performed on the fly during the program execution. On the contrary, they are only recorded. The actual computation takes place when an action is performed on a RDD. This enables optimizing the series of operations. Actions include:

count
the size of a RDD
collect
allows to iterate on a RDD
reduce
accumulation function

A word count example:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

R and Spark

Spark has an official R interface, provided by the SparkR package (and associated tools). SparkR has its own version of the data frame concept, DataFrame, which represents in R a RDD. In the SparkR context, R scripts consist mainly in transformation operations on DataFrame, using operations that are comparable to the one available in Spark.

sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, mydataframe)
# accessing to a column
select(df, "colonne")
# filtering the data frame
filter(df, df$bidule < 20)
# computing on columns
df$truc <- df$bidule * df$machin
# aggregating
avg(groupBy(df,"departement"))
sc <- sparkR.init()
textFile <- textFile(sc,"data.txt")
words <- flatMap(textFile,
		 function(line) {
		     strsplit(line, " ")[[1]]
		 })
wordCount <- lapply(words,
		    function(word){
			list(word, 1L)
		    })
counts <- reduceByKey(wordCount, "+", 2L)
output <- collect(counts)

Bibliography