This part of the course is dedicated to the use of distributed systems (no shared memory) in R.
There are essentially two packages that provide some direct access to MPI
under R, Rmpi
and pdbMPI
. Both are intended to allow one to program
for a MPI cluster with R and thus do not build complex abstraction over
it. Programming in R for MPI is arguably simpler than in C because in the
latter one has to define e.g. types for messages. In addition part of the
communications is automatically handled by the R wrappers.
A very basic example with Rpmi
is given below:
library(Rmpi)
## starts 8 R processes that will execute in parallel
mpi.spawn.Rslaves(nslaves=8)
## ask each slave to execute some code
## mpi.comm.rank() returns the number of the slave
mpi.remote.exec(mpi.comm.rank()^2)
## quit R after shutting down the slaves
mpi.quit()
As seen in this simple example, the Rmpi
API provides a simple way to
execute code on the remote processes and therefore a foreach
like
approach is always possible.
In fact, there is a doMPI
backend for foreach
that allows to leverage
a MPI cluster directly with foreach
without modifying the R
code. However, because of the operational complexity of MPI,
additional code will be needed. For instance, there is not direct
registerDoMPI
function. One needs for instance to perform the following
initialization and cleanup code:
library(doMPI)
cluster <- startMPIcluster(count=8)
registerDoMPI(cluster)
## the main code goes here
closeCluster(cluster)
mpi.finalize()
There are several ways to use a Hadoop cluster from R:
rmr
. The interface
provided by RHadoop is arguably of a slightly higher level than the
one from RHIPE. In particular, rmr
uses standard R functions rather
than expressions. The expected interface for map
functions is of
the form function(k,v)
(for key k
and value v
), while the one
for reduce
function is of the form function(k,vv)
again for key
k
but for a list of values vv
. The function keyval
is used to
output results from the map and reduce functions. A simple example:library(rmr2)
results <- mapreduce(input = "/user/fabrice/census-income.data",
input.format =
make.input.format(format="csv",
sep=",",
dec="."),
map = function(k,v) keyval(v[,1],v[,6]),
reduce = function(k,vv) keyval(k,mean(vv)))
Notice than none of those packages helps administering the Hadoop cluster.
Spark has an official R interface, provided by the SparkR
package (and
associated tools). SparkR has its own version of the data frame concept,
DataFrame
, which represents in R a RDD. In the SparkR context, R scripts
consist mainly in transformation operations on DataFrame
, using
operations that are comparable to the one available in Spark.
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, mydataframe)
# accessing to a column
select(df, "colonne")
# filtering the data frame
filter(df, df$bidule < 20)
# computing on columns
df$truc <- df$bidule * df$machin
# aggregating
avg(groupBy(df,"departement"))
sc <- sparkR.init()
textFile <- textFile(sc,"data.txt")
words <- flatMap(textFile,
function(line) {
strsplit(line, " ")[[1]]
})
wordCount <- lapply(words,
function(word){
list(word, 1L)
})
counts <- reduceByKey(wordCount, "+", 2L)
output <- collect(counts)