This part of the course is dedicated to the use of distributed systems (no shared memory) in R.

MPI and R

There are essentially two packages that provide some direct access to MPI under R, Rmpi and pdbMPI. Both are intended to allow one to program for a MPI cluster with R and thus do not build complex abstraction over it. Programming in R for MPI is arguably simpler than in C because in the latter one has to define e.g. types for messages. In addition part of the communications is automatically handled by the R wrappers.

A very basic example with Rpmi is given below:

library(Rmpi)
## starts 8 R processes that will execute in parallel
mpi.spawn.Rslaves(nslaves=8)
## ask each slave to execute some code
## mpi.comm.rank() returns the number of the slave
mpi.remote.exec(mpi.comm.rank()^2)
## quit R after shutting down the slaves
mpi.quit()

As seen in this simple example, the Rmpi API provides a simple way to execute code on the remote processes and therefore a foreach like approach is always possible.

In fact, there is a doMPI backend for foreach that allows to leverage a MPI cluster directly with foreach without modifying the R code. However, because of the operational complexity of MPI, additional code will be needed. For instance, there is not direct registerDoMPI function. One needs for instance to perform the following initialization and cleanup code:

library(doMPI)
cluster <- startMPIcluster(count=8)
registerDoMPI(cluster)
## the main code goes here
closeCluster(cluster)
mpi.finalize()

Map Reduce and R

There are several ways to use a Hadoop cluster from R:

RHIPE
this package belongs to the Tessera framework (which contains 2 other packages, datadr and Trelliscope). The interface of RHIPE is very similar to the original Java interface from Hadoop. It provides utility functions to get inputs from the Hadoop cluster and to send results to it. The central piece of code is provided by a map expression (written in R) and a reduce expression (which can be complemented by some pre and post processing expressions).
RHadoop
this is a collection of five R packages which provide an interface to Hadoop. The map reduce package uis rmr. The interface provided by RHadoop is arguably of a slightly higher level than the one from RHIPE. In particular, rmr uses standard R functions rather than expressions. The expected interface for map functions is of the form function(k,v) (for key k and value v), while the one for reduce function is of the form function(k,vv) again for key k but for a list of values vv. The function keyval is used to output results from the map and reduce functions. A simple example:
library(rmr2)
results <- mapreduce(input = "/user/fabrice/census-income.data",
		     input.format =
			 make.input.format(format="csv",
					   sep=",",
					   dec="."),
		     map = function(k,v) keyval(v[,1],v[,6]),
		     reduce = function(k,vv) keyval(k,mean(vv)))
HadoopStreaming
this package offers a much more bare bone interface to Hadoop map reduce. It eases the development of R scripts that can then be used in a Hadoop cluster through the streaming facility (the one that allows using something else than Java to write map reduce jobs). No abstraction of map reduce is actually included in this package.

Notice than none of those packages helps administering the Hadoop cluster.

Spark and R

Spark has an official R interface, provided by the SparkR package (and associated tools). SparkR has its own version of the data frame concept, DataFrame, which represents in R a RDD. In the SparkR context, R scripts consist mainly in transformation operations on DataFrame, using operations that are comparable to the one available in Spark.

sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, mydataframe)
# accessing to a column
select(df, "colonne")
# filtering the data frame
filter(df, df$bidule < 20)
# computing on columns
df$truc <- df$bidule * df$machin
# aggregating
avg(groupBy(df,"departement"))
sc <- sparkR.init()
textFile <- textFile(sc,"data.txt")
words <- flatMap(textFile,
		 function(line) {
		     strsplit(line, " ")[[1]]
		 })
wordCount <- lapply(words,
		    function(word){
			list(word, 1L)
		    })
counts <- reduceByKey(wordCount, "+", 2L)
output <- collect(counts)