This part of the course is dedicated to the use of distributed systems (no shared memory) in R.
There are essentially two packages that provide some direct access to MPI
under R, Rmpi and pdbMPI. Both are intended to allow one to program
for a MPI cluster with R and thus do not build complex abstraction over
it. Programming in R for MPI is arguably simpler than in C because in the
latter one has to define e.g. types for messages. In addition part of the
communications is automatically handled by the R wrappers.
A very basic example with Rpmi is given below:
library(Rmpi)
## starts 8 R processes that will execute in parallel
mpi.spawn.Rslaves(nslaves=8)
## ask each slave to execute some code
## mpi.comm.rank() returns the number of the slave
mpi.remote.exec(mpi.comm.rank()^2)
## quit R after shutting down the slaves
mpi.quit()
As seen in this simple example, the Rmpi API provides a simple way to
execute code on the remote processes and therefore a foreach like
approach is always possible.
In fact, there is a doMPI backend for foreach that allows to leverage
a MPI cluster directly with foreach without modifying the R
code. However, because of the operational complexity of MPI,
additional code will be needed. For instance, there is not direct
registerDoMPI function. One needs for instance to perform the following
initialization and cleanup code:
library(doMPI)
cluster <- startMPIcluster(count=8)
registerDoMPI(cluster)
## the main code goes here
closeCluster(cluster)
mpi.finalize()There are several ways to use a Hadoop cluster from R:
rmr. The interface
provided by RHadoop is arguably of a slightly higher level than the
one from RHIPE. In particular, rmr uses standard R functions rather
than expressions. The expected interface for map functions is of
the form function(k,v) (for key k and value v), while the one
for reduce function is of the form function(k,vv) again for key
k but for a list of values vv. The function keyval is used to
output results from the map and reduce functions. A simple example:library(rmr2)
results <- mapreduce(input = "/user/fabrice/census-income.data",
input.format =
make.input.format(format="csv",
sep=",",
dec="."),
map = function(k,v) keyval(v[,1],v[,6]),
reduce = function(k,vv) keyval(k,mean(vv)))Notice than none of those packages helps administering the Hadoop cluster.
Spark has an official R interface, provided by the SparkR package (and
associated tools). SparkR has its own version of the data frame concept,
DataFrame, which represents in R a RDD. In the SparkR context, R scripts
consist mainly in transformation operations on DataFrame, using
operations that are comparable to the one available in Spark.
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, mydataframe)
# accessing to a column
select(df, "colonne")
# filtering the data frame
filter(df, df$bidule < 20)
# computing on columns
df$truc <- df$bidule * df$machin
# aggregating
avg(groupBy(df,"departement"))sc <- sparkR.init()
textFile <- textFile(sc,"data.txt")
words <- flatMap(textFile,
function(line) {
strsplit(line, " ")[[1]]
})
wordCount <- lapply(words,
function(word){
list(word, 1L)
})
counts <- reduceByKey(wordCount, "+", 2L)
output <- collect(counts)