The only way to deal with large to big data is to use some form of parallel processing. There is simply no computational unit that is able to deliver the number of flops that is needed for modern data handling. Unfortunately, parallel processing is difficult. I discuss first the easiest situation, the case of shared memory parallel programming (see HerlihyShavit2008ArtOfMultiprocessorProgramming for an excellent textbook on efficient concurrent data structures).

Shared memory hardware structure

A computer is made of processing units (PU) and of memory (among other things). Current computers (even phones) have multiple PU (the cores) that all use the same memory. This is the standard shared memory structure. Sharing is done at the hardware level which guarantees good performances, at least in theory. Software level shared memory is also available, but it comes with a higher programming cost and with lower performances when there is no direct hardware support.

Threads, processes and memory

The simplest way to use shared memory is via the thread model. The general unit for a program is a process which regroups several execution contexts (the threads) and a unique memory context. Each process runs under its unique virtual memory space, which is mapped by the Memory Management Unit (MMU) to the physical memory. This is completely transparent for the programmer. Threads have access to the full virtual memory space, leading to a fully shared memory structure with hardware support.

MMU and caches

The MMU is responsible off the virtual to real mapping. It also handles its caches (the Translation lookaside buffer). This cache and the CPU caches are crucial to reach high performances because the cheap big memory has enormous latency (somewhat mitigated by a very high bandwidth). I cannot cover the details in this course (see Drepper2007Memory and my course in French), but even getting single threaded high performance code is difficult because of the caches. Some numbers, the typical latency of a memory request at each level of the cache (intel):

  • register (core): 1 cycle
  • L1 (core): 3 cycles
  • L2 (core): 12 cycles
  • L3 (CPU): 38 cycles
  • local RAM (CPU): 65 ns (130 cycles at least)
  • non local RAM (NUMA, see below): 105 ns (40ns from QPI)

Non-uniform memory access

There is a limit to the number of cores that can be integrated on a single CPU (18 for the current Intel architecture). When a server needs more PU, it has to integrate several CPU. In order to maintain the shared memory principle in this case, two solutions exist.

The simplest one consists in having a unique shared memory for all the processors and interconnection bus to which the processors send requests for memory access. This does not scale well to a large number of CPU as the bus becomes a contention point.

A more advanced solution the Non-uniform memory access (NUMA) principle. The main idea is that each CPU has its own memory. When a process uses more memory than the one attached directly to its preferential CPU, it will use memory from the other CPUs transparently (via the MMU). Of course, access to the distant memory is slightly less efficient than access to the local memory. This explains the idea of a preferred CPU.

Programming difficulties

There are essentially two issues with parallel programming on shared memory architectures:

  1. the correction issue: is a parallel program really producing the same results as its sequential version?
  2. the efficiency issue: does the parallel program make a good use of the hardware by reducing the user time needed to perform a task?

The first issue is very complex and cannot be solved without hardware guarantees, such as locks and related tools (in particular compare and set primitives). The key point is to enable an a priori unbounded number of threads to agree on a common value.

The second issue is somehow even more annoying in the sense that solving the first issue with obvious solutions (such as locks) leads frequently to inefficient solutions: they do work but the user time (the wall clock time) is not significantly reduced compared to a sequential program, while in theory one could expect the wall clock time to be divided by the number of threads (as long as each thread maps to a core).

While those two problems are extremely important, they should be overemphasized in the context of machine learning algorithms, for instance. Many machine learning algorithms have "embarrassingly parallel" structures in the sense that the result is obtained by combining a series of almost independent calculations (this is the case of resampling methods, for instance). In fact, the difficulties outlined above are a consequence of coupling between the calculations, either via some shared data structures (a database) or because of important communication needs.

Let consider for instance the base problem of computing a distance matrix between N points in \(\mathbb{R}^P\). The square distance between two points is calculated as the sum of squared differences and involves \(3P\) operations. This has to be done for \(M=\frac{N(N-1)}{2}\) pairs, so the total sequential cost is \(\frac{3PN(N-1)}{2}\). Obviously, distances can be computed independently. If we number the distances from 1 to \(M=\frac{N(N-1)}{2}\), we can then split the calculation on \(T\) threads which will be responsible of compute \(M/T\) distances. No communication will be needed between the threads and we can expect the wall clock time to be divided by the number of threads.

Standard API

In order to program for shared memory systems, one needs some Application Programming Interface (API) that allows to either to manipulate threads and locks (low level API) or to express that some parts of the program can be executed concurrently (high level API). High level languages such as Java and C# provide both levels, including very high level tools that mask to some extent the difficulties outlined above.

In particular the very popular Fork-Join paradigm is readily available. In this paradigm a master thread forks into several tasks (which are mapped transparently to threads), wait for them to terminate, and collects the results in a sequential joint phase. For instance in the distance calculation above, the master thread would load the data set and forks on the pairwise distance calculations themselves.

For C/C++, the standard solution is OpenMP. This is an API integrated into compilers and their runtime libraries. It is based on pragmas of the form #pragma (this is a standard way to introduce compiler level extensions in C). Examples of pragmas include:

#pragma omp parallel
this creates additional threads to execute the current construct
#pragma omp parallel for
this will execute the subsequent for loop over multiple threads
#pragma omp barrier
this corresponds to the join part of the fork join paradigm, all threads will wait for each other before passing the pragma

OpenMP is very declarative in the sense that explicit fine management of the underlying threads is not really needed (and is not possible). An example from Tim Mattson follows: the program computes \(\pi\) by integration.

Sequential program

static long num_steps = 100000;
double step;
int main ()
    int i; double x, pi, sum = 0.0;
    step = 1.0/(double) num_steps;
    for (i = 0; i < num_steps; i++){
	x = (i + 0.5) * step;
	sum = sum + 4.0 / (1.0 + x * x);
    pi = step * sum;

A parallel version in OpenMP

#include <stdio.h>
#include <omp.h>
static long num_steps = 100000000;
double step;
#define NUM_THREADS 4
int main ()
    double pi;
    int nthreads;
    step = 1.0/(double) num_steps;
#pragma omp parallel
	int i, id,nthrds; double x, sum;
	id = omp_get_thread_num();
	nthrds = omp_get_num_threads();
	if (id == 0) nthreads = nthrds;
	for (i = id, sum = 0.0; i < num_steps; i = i + nthreads){
	    x = (i + 0.5) * step;
	    sum += 4.0 / (1.0 + x * x);
	sum = sum*step;
#pragma atomic
	pi += sum;
    return 0;

Implicit parallel programming in R

The simplest way to use parallel processing in R is to rely on implicit parallelism, that is to leverage libraries/packages that have been designed to use multiple cores. The main example of this is linear algebra (matrix and vector).

Because of the importance of linear algebra in mathematics, standards have been designed in order to provide efficient API and associated implementations. The standard low level API is BLAS. It provides the elementary operations around matrices and vectors (dot product, matrix vector product, etc.). Numerous efficient implementations of BLAS are available. Most of them are designed for multicore CPU and can reach the theoretical peak performances of those CPU. Two open source very efficient implementations are well known:

  1. ATLAS
  2. GotoBLAS

R can be configured to use any BLAS implementation instead of its integrated one. This can bring tremendous improvement in computation time for programs that make heavy use of linear algebra. For instance consider the very simple following code:

a <- matrix(rnorm(5000*5000), 5000, 5000)
b <- matrix(rnorm(5000*5000), 5000, 5000)
c <- a%*%b

It runs in roughly 80 seconds on a Xeon CPU E3-1271 v3 @ 3.60GHz using the single threaded BLAS from R (this is a 4 cores CPU). With Atlas in single threaded mode, the program runs in around 20 seconds. With parallel Atlas the program runs in 6 seconds. The theoretical peak is at 28.8 \(10^9\) flops while the computational load is of 250 \(10^9\) operations, leading to an observed value of almost 42 \(10^9\) flops. Larger matrices lead to even better flops.

Explicit parallel programming in R

When a program does not rely heavily on linear algebra, implicit parallel programming is not sufficient and one has to rely on explicit parallel programming. The core facility for this is provided by the parallel package. However, parallel is rather low level and its simpler to use the foreach package that provides a programming interface that is quite different from the fork-join principle. The foreach principle belongs to the data parallelism model (same code different data) rather than to the task parallelism model (different codes).

The \(\pi\) example: base R code

Let's first translate the C code into R:

## loop version, very slow
compute.pi.loop <- function(nsteps) {
    sum <- 0
    step <- 1.0/nsteps
    for(i in 1:nsteps) {
	x <- (i-0.5)*step
	sum <- sum + 4/(1+x^2)
    step * sum

This is not idiomatic (and in addition quite slow), so we move on to a more idiomatic version.

## idiomatic version much faster
compute.pi.idiomatic <- function(nsteps) {

This version is very fast (by R standard) but be warned that it allocates a lot of memory. We obtain the following running time.

nstep \(10^7\) \(10^8\)
loop 8s 83s
idiomatic 0.11s 1.2s

Foreach version

The foreach loop construct in R is quite different from the standard for loop in the sense that it does not iterate over things but rather apply a block of code to different values. For instance the result of

foreach(i=1:5) %do% i^2

is a list containing the following values:

1 4 9 16 25

The values of variables in the block are set via the parameters of the foreach call while the results are aggregated via a combination function, for instance as in:

foreach(i=1:5,.combine=sum) %do% i^2

with returns 55. Then the \(\pi\) calculation example can be done with foreach as follows:

compute.pi.foreach.loop <- function(nsteps) {
    step <- 1.0/nsteps
    sum <- foreach(i=1:nsteps,.combine=sum) %do% {
    step * sum

Unfortunately, this is amazingly slow: 2.4 seconds for nstep = \(10^4\). This is because foreach is optimized for large tasks while here each task is a very basic calculation. The correct solution consists in wrapping the idiomatic calculation into a foreach loop, for instance as follows:

compute.pi.foreach.idiomatic <- function(nsteps,ntimes) {
    step <- 1.0/nsteps
    foreach(i=1:ntimes,.combine=sum) %do% {

Then the effects of the foreach remain very small as long as ntimes is also small, as shown in this timing table.

\texttt{nstep} \(10^7\) \(10^8\)
\texttt{ntimes} = 1 0.37s 3.76s
\texttt{ntimes} = 10 0.37s 4.2s
\texttt{ntimes} = 100 0.42s 4.1s
\texttt{ntimes} = 1000 0.62s 3.8s

Parallel foreach

The main interest of foreach is that its calculation can be done in parallel. One has only to replace the %do% construction by a %dopar% construction and to load a parallel backend. So the modified code is

compute.pi.foreach.idiomatic.parallel <- function(nsteps,ntimes) {
    step <- 1.0/nsteps
    foreach(i=1:ntimes,.combine=sum) %dopar% {

The standard parallel backend is doMC which is registered as follows:

# load the library
# register the backend

Then, one can test the parallel version. The natural value for ntimes is here the number of cores, but it can be interesting to use higher counts. Indeed with nstep = \(10^8\) and times = 4 (on a 4 cores CPU), we get a run time around 1.4s slightly larger than the sequential code. But the sequential code cannot run with nstep = \(10^{10}\) because it allocates too much memory (75 GB!) and thus the calculation has to be cut in several parts. This is provided by foreach in the idiomatic design which can be run with nstep = \(10^{10}\) and e.g. times = 100. This takes approximately 116s (as expected).

Foreach do and don't

The foreach package is very useful to bring concurrency to R in a quite simple way. It has additional features that we will study latter in the course. However, it can also be misused as shown above.

  • things to avoid
    1. do not use foreach for short tasks
    2. do not nest foreach loops without using the specialized feature
  • things to do
    1. optimize the R code before going to foreach
    2. test the foreach code with a sequential backend first
    3. test different numbers of cores
    4. leverage the combine facility when possible

Take home message

Even in the case of shared memory, parallel programming is very difficult: getting quickly correct results is very challenging. The problem revolves around communications between parts of the program: it's easy to lock everything (to get correct results) but then efficiency suffers a lot. Fortunately, many problems can be handled with some simplified approaches: the fork-join principle that underlines OpenMP and the data parallelism that appears in foreach. The latter principle is particularly adapted to many learning tasks, for example.