Beyond Single-Core R

Jonathan Dursi
http://github.com/ljdursi/beyond-single-core-R

Today's Outline

Today will look something like this:

  • How to think about scaling
  • Parallel Package
    • Multicore
      • mcparallel/mccollect/mclapply
      • parallel RNG
      • load balancing, chunking
      • pvec
    • Snow
      • makecluster/stopcluster/clusterExport
      • clusterSplit
  • Foreach
    • chunking, iterators
  • Scalable Data Analysis Best Practices

Extra material online

  • R and Memory
  • Data file formats
  • BigMemory
  • Rdsm
  • pbdR

Not Covered

  • R in other frameworks
    • SparkR (R + Apache Spark)
    • RHadoop
    • Cool but more about the other framework than R

Thinking about scaling

Some hardware terms

Hardware:

  • Node: A single motherboard, with possibly multiple sockets
  • Processor/Socket: the silicon containing likely multiple cores
  • Core: the unit of computation; often has hardware support for
  • Pseudo-cores: can appear to the OS as multiple cores but share much functionality between other pseudo-cores on the same core

Sockets, Cores, and Hardware therads

Some software terms

Processes and threads:

  • Process: Data and code in memory
  • One or more threads of execution within a process
  • Threads in the same process can see most of the same memory
  • Processes generally cannot peer into another processes memory

Interpreted languages: generally you can only directly work with processes

Can call libraries that invoke threads (BLAS/LAPACK)

Processes vs threads

Parallel computing: faster, bigger, more

One turns to parallel computing to solve one of three problems:

My program is too slow.

Perhaps using more cores — e.g., all cores on my desktop — will make things faster.

  • Compute bound.
  • Tools:
    • parallel/multicore
    • Rdsm
    • GPUs

Rack of Computers

Parallel computing: faster, bigger, more

My problem is too big.

Perhaps splitting the problem up onto multiple computers in a cluster will give it access to enough memory to run effectively.

  • Memory bound
  • Tools:
    • parallel/snow
    • pbdR

Rack of Computers

Parallel computing: faster, bigger, more

There are too many computations to do - one task runs in a reasonable amount of time, but I have to run thousands!

Perhaps splitting the problem up onto multiple computers in a cluster will give it access to enough memory to run.

  • Tools:
    • gnu-parallel
    • parallel
    • job queues…

Rack of Computers

Concurrency: Multiple Independant Computations

For more cores/nodes to help, there has to be something for them to do.

Find largely independent computations to occupy them.

Classic example of this is a parameter study, or set of simulations with different seeds:

“More” case

Parameter Study

Scaling of parameter study: Througput

In this example, no individual task runs any faster with more processors, but the workload as a whole can.

How long it takes to process the N tasks you want done

Throughput: how many tasks/time

For completely independent tasks, P processors can increase throughput by factor P!

Throughput

Scaling of parameter study with number of processors

How a problem scales: how throughput behaves as processor number increases.

In this case, the throughput scales linearly with the number of processors.

This is the best case: “Perfect scaling”

plot of chunk unnamed-chunk-1

Scaling of parameter study with number of processors

Another way to look at it: time it takes to get some fixed amount of work done

More usual (and more important!)

Perfect scaling: time to completion ~ 1/P

P procesors - P times faster

plot of chunk unnamed-chunk-2

Scaling of parameter study with number of processors

Another way to look at it: time it takes to get some fixed amount of work done

More usual (and more important!)

Perfect scaling: time to completion ~ 1/P

P procesors - P times faster

plot of chunk unnamed-chunk-3

Scaling of parameter study with number of processors

Finally, how efficient is the scaling - when you throw 10 processors at the problem, are you getting 10 times the processing?

Perfect scaling - efficiency = 100%

plot of chunk unnamed-chunk-4

Finding concurrency: Split, Apply, Combine

Popularized by Hadley Wickham, this has become a model for thinking about data analysis in R in the tidyverse.

Split the data set up into relevant sub-sets; apply some analysis to it; combine the results.

This is exactly the way to think about scalable data analysis. Split the data - or tasks on that data - up between computing elements; do the analyses; then combine the results somehow.

The details depend a great deal on the analyses (and the nature of the data.)

Split, Apply, Combine

Imperfect parallelism

“Split” and “Combine” aren't free!

Partitioning the work, and assembling the final results from the partial results, represents some overhead - some fraction of the work that must be done in serial.

Splitting the work and distributing it over a network takes even more time.

Amdal's Law: \[ T \approx \left ( f + \frac{1 - f}{P} \right ) \]

plot of chunk unnamed-chunk-5

Task vs data parallelism

What we've described has been finding concurrency by analyzing different chunks of data the same way

  • seeds for simulation
  • parameters for parameter sweep
  • subsets of data

Also possible is to identify different tasks that must be done and perform those in parallel: multiple fits, summary analysis + generating several plots, etc.

Generally more manual but can work very well.

Dependencies limit parallelism

Diagram showing dependencies

Existing parallelism (BLAS, package support)

Existing parallelism

It's important to realize that many fundamental routines as well as higher-level packages come with some degree of scalability and parallelism “baked in”.

Running top (or glances, or…) while executing the following in R:

n <- 4*1024
A <- matrix( rnorm(n*n), ncol=n, nrow=n )
B <- matrix( rnorm(n*n), ncol=n, nrow=n )
C <- A %*% B

Existing parallelism

Top while running matrix mult

One R process using 458% of a processor.

R can be built using high performance threaded libraries for math in general, and linear algebra — which underlies many data analysis algorithms — in particular.

Here the single R process has launched several threads of execution – all of which are part of the same process, and so can see the same memory, eg the large matrices.

Packages that explicitly use parallelism

For a complete list, see

http://cran.r-project.org/web/views/HighPerformanceComputing.html .

  • Biopara
  • BiocParallel for Bioconductor
  • bigrf - Random Forests
  • caret - cross-validation, bootstrap characterization of predictive models
  • GAMBoost - boosting glms

Plus packages that use linear algebra or other expensive math operations which can be implicitly multithreaded.

When at all possible, don't do the hard work yourself — look to see if a package already exists which will do your analysis at scale.

Caret

Caret is a widely-used machine learning package, that uses foreach (which we'll learn about) to parallelize things like CV-folds, etc:

data(iris)
library(caret)

control <- trainControl(method="repeatedcv", number=10, repeats=3)
metric <- "Accuracy"

system.time(fit.lda.ser <- train(Species~., data=iris, method="svmRadial", metric=metric, trControl=control)
)
   user  system elapsed 
  2.007   0.046   2.065 

Caret

library(doParallel)
registerDoParallel(4)

system.time(fit.lda.par <- train(Species~., data=iris, method="svmRadial", metric=metric, trControl=control)
)
   user  system elapsed 
  1.211   0.271   1.065 
stopImplicitCluster()

Packages for Implementing Parallel Workflows

The Parallel Package

Since R 2.14.0 (late 2011), the parallel package has been part of core R.

Incorporates - and mostly supersedes - two other packages:

  • multicore: for using all processors on a single processor. Not on windows.
  • snow: for using any group of processors, possibly across a cluster.

Many packages which use parallelism use one of these two, so worth understanding.

Both create new processes (not threads) to run on different processors; but in importantly different ways.

Multicore - forking

Multicore creates new processes by forking — cloning – the original process.

That means the new processes starts off seeing a copy of exactly the same data as the original. E.g., first process can read a file, and then fork two new processes - each will see copy of the file.

Not shared memory; changes in one process will not be reflected in others.

Windows doesn't have fork(), so windows can't use these routines.

Multicore: fork()

Multicore - forking

Performance Tip: Modern OSs are lazy - the copy of memory isn't made unless it has to be, and it doesn't have to be until one process or the other writes to the memory.

That copy is slow, and takes new memory.

So in multicore, don't overwrite old variables if possible.

Multicore: fork()

Snow - Spawning

Snow creates entirely new R processes to run the jobs.

A downside is that you need to explicitly copy over any needed data, functions.

But the upsides are that spawning a new process can be done on a remote machine, not just current machine. So you can in principle use entire clusters.

In addition, the flipside of the downside: new processes don't have any unneeded data

  • less total memory footprint.

SNOW: spawn()

mcparallel/mccollect

The simplest use of the multicore package is the pair of functions mcparallel() and mccollect().

mcparallel() forks a task to run a given function; it then runs in the background.
mccollect() waits for and gets the result.

Let's pick an example: reading the airlines data set, we want — for a particular month — to know both the total number of planes in the data (by tail number) and the median elapsed flight time. These are two independant calculations, and so can be done independantly.

mcparallel/mccollect

We start the two tasks with mcparallel, and collect the answers with mccollect:

library(parallel, quiet=TRUE)
source("data/airline/read_airline.R")
jan2010 <- read.airline("data/airline/airOT201001.csv.gz")
unique.planes <- mcparallel( length( unique( sort(jan2010$TAIL_NUM) ) ) ) 
median.elapsed <- mcparallel( median( jan2010$ACTUAL_ELAPSED_TIME, na.rm=TRUE ) )
ans <- mccollect( list(unique.planes, median.elapsed) )
ans
$`11420`
[1] 4555

$`11421`
[1] 110

We get a list of answers, with each element “named” by the process ID that ran the job. We find that there are 4555 planes in the data set, and the median flight in the data set is 110 minutes in the air.

mcparallel/mccollect

Does this save any time? Let's do some independent fits to the data. Let's try to see what the average in-flight speed is by fitting time in the air to distance flown; and let's see how the arrival delay correlates with the departure delay. (Do planes, on average, make up some time in the air, or do delays compound?)

system.time(fit1 <-  lm(DISTANCE ~ AIR_TIME, data=jan2010))
   user  system elapsed 
  0.345   0.036   0.381 
system.time(fit2 <-  lm(ARR_DELAY ~ DEP_DELAY, data=jan2010))
   user  system elapsed 
  0.298   0.026   0.324 

mcparallel/mccollect

So the time to beat is about 0.7s:

parfits <- function() {
  pfit1 <- mcparallel(lm(DISTANCE ~ AIR_TIME, data=jan2010))
  pfit2 <- mcparallel(lm(ARR_DELAY ~ DEP_DELAY, data=jan2010))
  mccollect( list(pfit1, pfit2) )
}
system.time( parfits() )
   user  system elapsed 
  3.013   0.147   1.207 

Clearly actually forking the processes and waiting for them to rejoin itself takes some time.

This overhead means that we want to launch jobs that take a significant length of time to run - much longer than the overhead (hundredths to tenths of seconds for fork().)

Clustering

Typically we want to do more than an itemized list of independent tasks - we have a list of similar tasks we want to perform.

mclapply is the multicore equivalent of lapply - apply a function to a list, get a list back.

Let's say we want to see what similarities there are between delays at O'Hare airport in Chicago in 2010. Clustering methods attempt to uncover “similar” rows in a dataset by finding points that are near each other in some \( p \)-dimensional space, where \( p \) is the number of columns.

\( k \)-Means is a particularly simple, randomized, method; it picks \( k \) cluster centre-points at random, finds the rows closest to them, assigns them to the cluster, then moves the cluster centres towards the centre of mass of their cluster, and repeats.

Quality of result depends on number of random trials.

Clustering

Let's try that with our subset of data:

# columns listing various delay measures
delaycols <- c(18, 28, 40, 41, 42, 43, 44)
air2010 <- readRDS("data/airline/airOT2010.RDS")
ord.delays <- air2010[(air2010$ORIGIN=="ORD"), delaycols]
rm(air2010)
ord.delays <- ord.delays[(ord.delays$ARR_DELAY_NEW > 0),]
ord.delays <- ord.delays[complete.cases(ord.delays),]

system.time( serial.res   <- kmeans(ord.delays, centers=2, nstart=40) )
   user  system elapsed 
  2.954   0.163   3.124 
serial.res$withinss
[1] 125489706 254304505

Clustering with lapply

Running 40 random trials is the same as running 10 random trials 4 times. Let's try that approach with lapply:

do.n.kmeans <- function(n) { kmeans(ord.delays, centers=2, nstart=n) }
system.time( list.res <- lapply( rep(10,4), do.n.kmeans ) )
   user  system elapsed 
  7.974   0.292   8.340 
res <- sapply( list.res, function(x) x$tot.withinss )
lapply.res <- list.res[[which.min(res)]]
lapply.res$withinss
[1] 254304505 125489706

Get the same answer, but a little longer - bit of overhead from splitting it up and starting the process four times. We could make the overhead less important by using more trials, which would be better anyway.

Clustering with mclapply

mclapply works the same way as lapply, but forking off the processes (as with mcparallel)

system.time( list.res <- mclapply( rep(10,4), do.n.kmeans, mc.cores=4 ) )
   user  system elapsed 
  5.240   0.861   2.524 
res <- sapply( list.res, function(x) x$tot.withinss )
mclapply.res <- list.res[[which.min(res)]]
mclapply.res$tot.withinss
[1] 379794211

Clustering with mclapply

Note what the output of top looks like when this is running:

top with mclapply

There are four separate processes running - not one process using multiple CPUs via threads.

Clustering with mclapply

Looks good! Let's take a look at the list of results:

res
[1] 379794211 379794211 379794211 379794211

What happened here?

Parallel RNG

Depending on what you are doing, it may be very important to have different (or the same!) random numbers generated in each process.

Here, we definitely want them different - the whole point is to generate different random realizations.

parallel has a good RNG suitable for parallel work based on the work of Pierre L'Ecuyer in Montréal:

RNGkind("L'Ecuyer-CMRG")
mclapply( rep(1,4), rnorm, mc.cores=4, mc.set.seed=TRUE)
[[1]]
[1] -0.6316222

[[2]]
[1] -0.262412

[[3]]
[1] -0.5085514

[[4]]
[1] 0.8361448

Load balancing

Let's say that, instead of running multiple random trials to find the best given a set of clusters, we were unsure of how many clusters we wanted to run:

do.kmeans.nclusters <- function(n) { kmeans(ord.delays, centers=n, nstart=10) }
time.it <- function(n) { system.time( res <- do.kmeans.nclusters(n)) }
lapply(1:4, time.it)
[[1]]
   user  system elapsed 
  1.642   0.044   1.689 

[[2]]
   user  system elapsed 
  1.897   0.035   1.935 

[[3]]
   user  system elapsed 
  2.546   0.044   2.592 

[[4]]
   user  system elapsed 
  3.151   0.043   3.197 

Load balancing

More clusters takes longer. If we were to mclapply these four tasks on 2 CPUs, the first CPU would get the two short tasks, and the second CPU would get the second, longer tasks - bad load balance.

Normally, we want to hand multiple tasks of work off to each processor and only hear back when they're completely done - minimal overhead. But that works best when all tasks have similar lengths of time.

If you don't know that this is true, you can do dynamic scheduling - give each processor one task, and when they're done they can ask for another task.

More overhead, but better distribution of work.

Load balancing

system.time( res <- mclapply(1:4, time.it, mc.cores=2) )
   user  system elapsed 
  4.676   0.414   5.989 
system.time( res <- mclapply(1:4, time.it, mc.cores=2, mc.preschedule=FALSE) )
   user  system elapsed 
 11.783   1.404   5.504 

Splitting the data set

So far we've seen splitting the tasks; let's consider splitting the dataset.

Let's make a histogram of the times flights took off - say, binned by the hour.

get.hour <- function(timeInt) timeInt %/% 100
count.hours <- function(range) {
  counts <- rep(0,24)
  hours <- sapply(jan2010$DEP_TIME[range], get.hour)
  hist <- rle( sort(hours) )
  for (i in 1:length(hist$values)) {
    j <- hist$values[i] + 1
    if (j == 25) j = 1
    counts[j] <- hist$lengths[i]
  }
  counts
}

Splitting the data set

We can count up all flight hours like so:

system.time(scounts <- count.hours(1:nrow(jan2010)))
   user  system elapsed 
  0.525   0.023   0.547 
scounts
 [1]    23   383   127    40   205 12321 30203 31435 34210 32344 31989
[12] 34075 31938 33802 32555 31020 33869 33731 30760 29542 19592 14260
[23]  7470  2574

Splitting the data set

Can we split this up between tasks? Let's try this:

nr <- nrow(jan2010)
ncores <- 4
chunks <- split(1:nr, rep(1:ncores, each=nr/ncores))
system.time(counts <- mclapply( chunks, count.hours, mc.cores=ncores) )
   user  system elapsed 
  2.940   0.340   0.314 

Splitting the data set

That was definitely faster - how do the answers look?

str(counts)
List of 4
 $ 1: num [1:24] 8 204 75 32 54 ...
 $ 2: num [1:24] 9 121 49 4 78 ...
 $ 3: num [1:24] 4 49 2 2 64 ...
 $ 4: num [1:24] 2 9 1 2 9 ...
Reduce("+", counts)
 [1]    23   383   127    40   205 12321 30203 31435 34210 32344 31989
[12] 34075 31938 33802 32555 31020 33869 33731 30760 29542 19592 14260
[23]  7470  2574

To be fair, we'd have to include the Reduction time in the total time - but that's just the sum of four short vectors, probably not a big deal.

pvec - simplified mclapply

For the simple and common case of applying a function to each element of a vector and returning a vector, the parallel package has a simplified version of mclapply called pvec.

fx <- function(x) x^5-x^3+x^2-1
maxn <- 1e6
system.time( res <- sapply(1:maxn, fx) )
   user  system elapsed 
  2.236   0.027   2.263 
system.time( res <- vapply(1:maxn, fx, 0.) )
   user  system elapsed 
  1.440   0.029   1.469 

pvec - simplified mclapply

system.time( res <- pvec(1:maxn, fx, mc.cores=2) )
   user  system elapsed 
  0.874   0.522   0.104 
system.time( res <- pvec(1:maxn, fx, mc.cores=4) )
   user  system elapsed 
  0.070   0.043   0.080 
system.time( res <- mclapply(1:maxn, fx, mc.cores=4) )
   user  system elapsed 
  0.801   0.271   0.899 

Test your skills: parallel/multicore

Using the entire 2010 dataset, and the examples above, examine one of the following questions:

  • In 2010, what airport (with more than say 10 outgoing flights) had the largest fraction of outgoing flights delayed?
  • For some given airport - what hour of the day had the highest relative fraction of delayed flights?
  • For all airports?
  • What is the effect of including the split() and the Reduce() on the serial-vs-parallel timings for this histogram? Is there a better way of doing the splitting?

Summary: parallel/multicore

The mc* routines in parallel work particularly well when:

  • You want to make full use of the processors on a single computer
  • Each task only reads from some big common data structure and produces modest-sized results
  • mcparallel works very well for task parallelism; the mclapply for data parallelism.

Things to watch for:

  • Modifying the big common data structure:
    • Won't be seen by other processes,
    • But will blow up the memory requirements
  • You can only use one machine's processors
  • Won't work on Windows (but what does?)
  • mc.cores is a lie. It's the number of tasks, not cores. Can easily oversubscribe cores explicitly or implicitly

Multiple computers with parallel/snow

Multiple computers with parallel/snow

The other half of parallel, routines that were in the still-active snow package, allow you to again launch new R processes — by default, on the current computer, but also on any computer you have access to. (SNOW stands for “Simple Network of Workstations”, which was the original use case).

The recipe for doing computations with snow looks something like:

library(parallel)
cl <- makeCluster(nworkers,...)
results1 <- clusterApply(cl, ...)
results2 <- clusterApply(cl, ...)
stopCluster(cl)

other than the makeCluster()/stopCluster(), it looks very much like multicore and mclapply.

Hello world

Let's try starting up a “cluster” (eg, a set of workers) and generating some random numbers from each:

library(parallel)
cl <- makeCluster(4)
clusterCall(cl, rnorm, 5)
[[1]]
[1] -1.31973090  0.50914581 -0.07566584  0.13727399 -1.84968866

[[2]]
[1]  0.7409130 -0.3559644  1.1822262 -0.9311078  0.5177474

[[3]]
[1] -0.3316341  1.0436526  0.5964421 -1.3712068  0.3503339

[[4]]
[1]  0.5945040 -0.2918959  0.8942675  0.1000171  1.3576120
stopCluster(cl)

Hello world

clusterCall() runs the same function (here, rnorm, with argument 5) on all workers in the cluster. A related helper function is clusterEvalQ() which is handier to use for some setup tasks - eg,

clusterEvalQ(cl, {library(parallel); library(foreach); NULL} )

Clustering on Clusters

Emboldened by our success so far, let's try re-doing our \( k \)-means calculations:

delaycols <- c(18, 28, 40, 41, 42, 43, 44)

source("data/airline/read_airline.R")
jan2010 <- read.airline("data/airline/airOT201001.csv.gz")
jan2010 <- jan2010[,delaycols]
jan2010 <- jan2010[complete.cases(jan2010),]
do.n.kmeans <- function(n) { kmeans(jan2010, centers=4, nstart=n) }
library(parallel)
cl <- makeCluster(4)
res <- clusterApply(cl, rep(5,4), do.n.kmeans)
stopCluster(cl)
 Error in checkForRemoteErrors(val) : 
-----------------------------
   4 nodes produced errors; first error: object 'jan2010' not found
-----------------------------

Ah! Failure.

Clustering on Clusters

Recall that we aren't forking here; we are creating processes from scratch. These processes, new to this world, are not familiar with our ways, customs, or datasets. We actually have to ship the data out to the workers:

cl <- makeCluster(4)
system.time(clusterExport(cl, "jan2010"))
   user  system elapsed 
  0.128   0.027   0.574 
system.time(cares <- clusterApply(cl, rep(5,4), do.n.kmeans))
   user  system elapsed 
  0.357   0.039  11.064 
stopCluster(cl)
system.time( mcres <- mclapply(rep(5,4), do.n.kmeans, mc.cores=4) )
   user  system elapsed 
 29.584   1.655  10.165 

Clustering on Clusters

Note that the costs of shipping out data back and forth, and creating the processes from scratch, is relatively costly - but this is the price we pay for being able to spawn the processes anywhere.

(And if our computations take hours to run, we don't really care about several-second delays.)

Running across machines

The default cluster is a sockets-based cluster; you can run on multiple machines by specifying them to a different call to makeCluster:

hosts <- c( rep("localhost",8), rep("192.168.0.10", 8) )
cl <- makePSOCKcluster(names=hosts)
clusterCall(cl, rnorm, 5)
clusterCall(cl, system, "hostname")
stopCluster(cl)

Once it is done, you have succcessfully run random number generators across multiple hosts.

Cluster types

parallel has several different cluster types:

  • PSOCK (Posix sockets): the default type
  • Fork workers: but if you're going to use this, you may as well just use multicore.
  • MPI: this is similar in a way to PSOCK clusters, but startup and communications can be much faster once you start going to large numbers (say >64) of hosts. We won't cover this today; using the MPI cluster type is conceptually identical to PSOCK clusters.

Work distribution and Load Balancing

Because of the need to send (possibly large) data to the workers, the scheduling of workers is even more important than with multicore.

The snow library has very nice visualization tools for timing that are inexplicably absent from parallel; so let's temporarily use snow:

library(snow,quiet=TRUE)

Work distribution and Load Balancing

do.kmeans.nclusters <- function(n) { kmeans(jan2010, centers=n, nstart=10) }

cl <- makeCluster(2)
clusterExport(cl,"jan2010")
tm <- snow.time( clusterApply(cl, 1:6, do.kmeans.nclusters) )

Work distribution and Load Balancing

plot(tm)

plot of chunk unnamed-chunk-34

Work distribution and Load Balancing

tm.lb <- snow.time(clusterApplyLB(cl, 1:6, do.kmeans.nclusters))
plot(tm.lb)

plot of chunk unnamed-chunk-35

stopCluster(cl)

Work distribution and Load Balancing

The default clusterApply sends off one task to each worker, waits until they're both done, then sends off another. (Question: why?)

clusterApplyLB does something more like mc.preschedule=FALSE; it fires off tasks to each worker as needed.

Sending off one task at a time can be inefficient if there is a lot of commnication involved. But it allows flexibility in scheduling, which is vitally important if the tasks are of widely varying durations.

clusterSplit and Hour Histogram

Of course, for some applications, we don't need to send the entire data structure across. Let's consider the departure-time histogram again. This time, we're only going to send across the data that's going t be computed:

jan2010 <- read.airline("data/airline/airOT201001.csv.gz")
jan2010 <- jan2010[complete.cases(jan2010),]

get.hour <- function(timeInt) timeInt %/% 100
count.hours <- function(timesInt) {
  counts <- rep(0,24)
  hours <- sapply(timesInt, get.hour)
  hist <- rle( sort(hours) )
  for (i in 1:length(hist$values)) {
    j <- hist$values[i] + 1
    if (j == 25) j = 1
    counts[j] <- hist$lengths[i]
  }
  counts
}

clusterSplit and Hour Histogram

This time, rather than exporting the entire data set, we'll just send across the bits we need:

cl <- makeCluster(2)
clusterExport(cl,"get.hour")  # have to export _functions_, too.
datapieces <- clusterSplit(cl,jan2010$DEP_TIME)
str(datapieces)
List of 2
 $ : int [1:253778] 1425 1228 1053 1047 1753 1755 1846 1859 1752 1757 ...
 $ : int [1:253779] 1450 1459 1458 1455 723 704 659 658 701 702 ...
ans <- clusterApply(cl, datapieces, count.hours)
Reduce("+", ans)
 [1]    23   381   126    40   205 12279 30069 31268 34042 32198 31881
[12] 33936 31843 33686 32437 30907 33750 33644 30686 29455 19522 14184
[23]  7434  2556

clusterSplit and Hour Histogram

To look a little more closely at some communciations and load balance issues, I'm going to split the data up into more pieces than workers, and distribute them:

stopCluster(cl)
cl <- makeCluster(6)
datapieces <- clusterSplit(cl,jan2010$DEP_TIME)
stopCluster(cl)

cl <- makeCluster(2)
clusterExport(cl,"get.hour")  # have to export _functions_, too.
str(datapieces)
List of 6
 $ : int [1:84255] 1425 1228 1053 1047 1753 1755 1846 1859 1752 1757 ...
 $ : int [1:84762] 1337 2045 658 1423 1645 1923 853 1613 2048 755 ...
 $ : int [1:84761] 2048 2037 2031 2039 2034 2035 2036 1634 1840 1639 ...
 $ : int [1:84762] 1450 1459 1458 1455 723 704 659 658 701 702 ...
 $ : int [1:84762] 723 723 825 919 912 726 722 723 725 727 ...
 $ : int [1:84255] 1705 1036 1242 942 1653 2135 1330 734 1831 1253 ...

clusterSplit and Hour Histogram

tm <- snow.time( ans <- clusterApply(cl, datapieces, count.hours) )
plot(tm)

plot of chunk unnamed-chunk-39

clusterSplit and Hour Histogram

tm <- snow.time( ans <- parLapply(cl, datapieces, count.hours) )
plot(tm)

plot of chunk unnamed-chunk-40

stopCluster(cl)

clusterSplit and Hour Histogram

If the list you are operating on consists of big chunks of data, the relevant piece is sent to the worker for its task.

Sometimes that's exactly what you want:

  • The chunks nearly fill up memory
  • You don't know which task will do which chunk (clusterApplyLB)

But if it's not necessary, it adds a delay to the task. If you know ahead of time the tasks are of similar duration:

  • clusterExport the whole data set (if everyone needs the whole data set)
  • Use clusterSplit to split the data set into exactly what each worker needs
  • or use parLapply to chunk up the data for you and send all the data for one task all at once.

Back to parallel

detach("package:snow", unload=TRUE)

Summary: parallel/snow

The cluster routines in parallel are good if you know you will eventually have to move to using multiple computers (nodes in a cluster, or desktops in a lab) for a single computation.

  • Use clusterExport for functions and data that will be needed by everyone.
  • Communicating data is slow, but much faster than having every worker read the same data from a file.
  • Use clusterApplyLB if the tasks vary greatly in runtime.
  • Use clusterApply if each task requires an enormous amount of data.
  • Use parLapply if tasks are similar duration and data from multiple tasks will fit in memory.
  • snow::snow.time is great for understanding performance.
  • Use makePSOCKcluster for small clusters; consider makeMPIcluster for larger (but see pbdR section online)

Test your skills: parallel/snow (1/2)

On a set of workstations (or AWS instances) you have access to, try a session with two nodes, and setup a PSOCK cluster across both nodes. Call unlist(clusterCall(cl, system, "hostname")) to make sure that you have workers on both nodes.

Load the 2010 data and break it up by month (look up the split command) and see which month had the highestfraction of cancelled flights.

Then split the data up by airline and see which airline had the highest fraction of cancelled flights.

Test your skills: parallel/snow (2/2)

There are two big downsides with how we are doing this: the master is doing a huge amount of the work by doing the pre-splitting, and the whole data set has to be in memory. Tackle one or the other of them:

  • Master doing too much work: Just partition the data into chunks, and let each worker do the splitting up and counting itself. For the combined results to be meaningful, the worker will need to know the full set of airlines (or the full set of months, which is somewhat easier.)
    How to do that?
  • Master having whole problem in memory: use bigmemory along with parallel.

foreach and doparallel

foreach and doparallel

The “master/worker” approach that parallel enables works extremely well for moderately sized problems, and isn't that difficult to use. It is all based on one form of R iteration, apply, which is well understood.

However, going from serial to parallel requires some re-writing, and even going from one method of parallelism to another (eg, multicore-style to snow-style) requires some modification of code.

The foreach package is based on another style of iterating through data - a for loop - and is designed so that one can go from serial to several forms of parallel relatively easily. There are then a number of tools one can use in the library to improve performance.

foreach - serial

The standard R for loop looks like this:

for (i in 1:3) print(sqrt(i))
[1] 1
[1] 1.414214
[1] 1.732051

The foreach operator looks similar, but returns a list of the iterations:

library(foreach)
foreach (i=1:3) %do% sqrt(i)
[[1]]
[1] 1

[[2]]
[1] 1.414214

[[3]]
[1] 1.732051

foreach - serial

library(foreach)
foreach (i=1:3) %do% sqrt(i)

The foreach function creates an object, and the %do% operator operates on the code (here just one statement, but it can be multiple lines between braces, as with a for loop) and the foreach object.

foreach + doParallel

Foreach works with a variety of backends to distribute computation - doParallel, which allows snow- and multicore-style parallelism, and doMPI (not covered here).

Switching the above loop to paralleljust requires registering a backend and using %dopar% rather than %do%:

library(doParallel)
registerDoParallel(3)  # use multicore-style forking
foreach (i=1:3) %dopar% sqrt(i)
[[1]]
[1] 1

[[2]]
[1] 1.414214

[[3]]
[1] 1.732051
stopImplicitCluster()

foreach + doParallel

One can also use a PSOCK cluster:

cl <- makePSOCKcluster(3)
registerDoParallel(cl)  # use the just-made PSOCK cluster
foreach (i=1:3) %dopar% sqrt(i)
[[1]]
[1] 1

[[2]]
[1] 1.414214

[[3]]
[1] 1.732051
stopCluster(cl)

Combining results

While returning a list is the default, foreach has a number of ways to combine the individual results:

foreach (i=1:3, .combine=c) %do% sqrt(i)
[1] 1.000000 1.414214 1.732051
foreach (i=1:3, .combine=cbind) %do% sqrt(i)
     result.1 result.2 result.3
[1,]        1 1.414214 1.732051
foreach (i=1:3, .combine="+") %do% sqrt(i)
[1] 4.146264
foreach (i=1:3, .multicombine=TRUE, .combine="sum") %do% sqrt(i)
[1] 4.146264

Combining results

Most of these are self explanatory.

multicombine is worth mentioning: by default, foreach will combine each new item into the final result one-at-a-time.

If .multicombine=TRUE, then you are saying that you're passing a function which will do the right thing even if foreach gives it a whole wack of new results as a list or vector - e.g., a whole chunk at a time.

Composing foreach Objects

There's one more operator: %:%. This lets you compose or nest foreach objects:

foreach (i=1:3, .combine="c") %:% 
  foreach (j=1:3, .combine="c") %do% {
    i*j
  }
[1] 1 2 3 2 4 6 3 6 9

Filtering Items

And you can also filter items, using when:

foreach (a=rnorm(25), .combine="c") %:%
  when(a >= 0) %do%
    sqrt(a)
 [1] 0.5598735 0.7148421 0.3468935 0.8949200 1.1892010 0.4669242 0.6889375
 [8] 0.3459393 1.2330996 0.6494213 0.8319499 0.7037915 0.6425140 0.6792320
[15] 1.3340716 0.4545752 1.1219641 0.5707225 0.4611031

Histogram

Let's consider our hour histogram again:

system.time(
  foreach (i=1:2000, .combine="+") %do% {
    hrs <- rep(0,24)
    hr <- get.hour(jan2010$DEP_TIME[i])
    hrs[hr+1] = hrs[hr+1] + 1
    hrs
  }
)
   user  system elapsed 
  0.815   0.031   0.846 

Note: like a function, we have to make sure the function we want to return is the last line (or explicitly returned).

Parallel Histogram

What's more, this automatically works in parallel:

cl <- makePSOCKcluster(3)
registerDoParallel(cl,cores=3)
system.time(
  foreach (i=1:2000, .combine="+") %dopar% {
    hrs <- rep(0,24)
    hr <- get.hour(jan2010$DEP_TIME[i])
    hrs[hr+1] = hrs[hr+1] + 1
    hrs
  }
)
   user  system elapsed 
  1.432   0.181   2.756 
stopCluster(cl)

Which is actually sort of magic; PSOCK clusters don't share memory! foreach does a good job of exporting necessary variables; if something isn't automatically exported, it can be exported explicitly in the foreach line with, eg, foreach(..., .export=c("jan2010")).

Histogram Performance

But this is incredibly slow:

system.time(
  ans <- foreach (i=1:2000, .combine="+") %do% {
    hrs <- rep(0,24)
    hr <- get.hour(jan2010$DEP_TIME[i])
    hrs[hr+1] = hrs[hr+1] + 1
    hrs
  }
)
   user  system elapsed 
  0.799   0.001   0.801 
system.time(ans <- count.hours(jan2010$DEP_TIME[1:2000]))
   user  system elapsed 
  0.002   0.000   0.003 

Mainly because it's not vectorized; by looping over the data one item at a time we've avoided using our lovely fast vector routines. Plus allocating a 24-hour-long vector per item!

Histogram Performance

Another problem - we've created a vector 1:2000 which in general is the same size as the data set we're working on. For large data sets, big memory.

Foreach has iterators that can iterate through an object without creating something the size of the object. For instance, icount() is like the difference between Python 2.x range and xrange:

cl <- makePSOCKcluster(3)
registerDoParallel(cl,cores=3)
system.time(
  ans <- foreach (i=icount(2000), .combine="+") %do% {
    hrs <- rep(0,24)
    hr <- get.hour(jan2010$DEP_TIME[i])
    hrs[hr+1] = hrs[hr+1] + 1
    hrs
  }
)

But that doens't help with the performance issue here.

Histogram Performance

We do a little bit better by avoiding the intermediate index; we don't care about \( i \) at all, all we care about is the data. We can implicitly create an iterator on the object with

 foreach (time=jan2010$DEP_TIME[1:2000],...

or explicitly, setting the chunk size to distribute between tasks:

system.time(
  ans <- foreach (time=iter(jan2010$DEP_TIME[1:2000],chunksize=500), .combine="+") %do% {
    hrs <- rep(0,24)
    hr <- get.hour(time)
    hrs[hr+1] = hrs[hr+1] + 1
    hrs
  }
)
   user  system elapsed 
  0.761   0.007   0.769 
ans
 [1]   0   0   0   0   0  86  53 166 154 107 117 102 221 138 109  96 137
[18] 142 141  59  95  66   7   4

Histogram Performance

As you can tell by the chunking, foreach can adjust the iteration scheduling in a number of ways. Chunking is one of them.

The underlying back-end obviously has a lot to do with the scheduling. For multicore, for instance, one can pass familiar options to multicore if we are using a multicore “cluster”:

foreach( ..., .options.multicore=list(preschedule=FALSE,set.seed=TRUE))

Performance Tip: If you don't care about the order that the results come back in, specifying .inorder=FALSE gives the scheduler more flexibility in sending out tasks. Otherwise, you're guaranteed that the first result back is from the first iteration, etc.

Histogram Performance

But really, we want to work on entire slices of the data at once. For objects like matricies or data frames, you can send out a row, column, etc at a time; we can re-cast the data as a matrix and send it out one row at a time:

jan.matrix = matrix(jan2010$DEP_TIME[1:2000], ncol=500)
system.time(
  ans <- foreach (times=iter(jan.matrix,by="row"), .combine="+") %do% {
    count.hours(times)
  }
)
   user  system elapsed 
  0.007   0.000   0.007 
ans
 [1]   0   0   0   0   0  86  53 166 154 107 117 102 221 138 109  96 137
[18] 142 141  59  95  66   7   4

Histogram Performance

And this works in parallel, as well

cl <- makePSOCKcluster(3)
registerDoParallel(cl,cores=3)
jan.matrix = matrix(jan2010$DEP_TIME[1:2000], ncol=500)
system.time(
  ans <- foreach (times=iter(jan.matrix,by="row"), .combine="+") %dopar% {
    count.hours(times)
  }
)
   user  system elapsed 
  0.014   0.000   0.029 
stopCluster(cl)
ans
 [1]   0   0   0   0   0  86  53 166 154 107 117 102 221 138 109  96 137
[18] 142 141  59  95  66   7   4

isplit

If we want each task to only work on some subset of the data, the isplit iterator will split the data at the master, and send off the partitioned data to workers:

ans <- foreach (byAirline=isplit(jan2010$DEP_TIME, jan2010$UNIQUE_CARRIER), 
                .combine=cbind) %do% {
  df <- data.frame(count.hours(byAirline$value)); colnames(df) <- byAirline$key; df
}
ans$UA
 [1]    2    4    0    0    0  957 1595 1817 2598 1401 1713 1774 1509 1907
[15] 1442 1230 1510 1888 1775 1311  964  783  785  268
ans$OH
 [1]   2   2   0   0   0 185 654 469 674 679 572 682 843 763 699 671 839
[18] 777 507 764 467 186 130  20

Stock prices example

In data/stocks/stocks.csv, we have 419 daily closing stock prices going back to 2000 (3654 prices). For stocks, it's often useful to deal with “log returns”, rather than absolute price numbers. We use:

stocks <- read.csv("data/stocks//stocks.csv")
log.returns <- function(values) { nv=length(values); log(values[2:nv]/values[1:nv-1]) }

How would we parallelize this with foreach? (Imagine we had thousands of stocks and decades of data, which isn't implausable.)

Stock Prices Example

registerDoParallel(4)
mat.log <- 
  foreach(col=iter(stocks[,-c(1,2)],by="col"), .combine="cbind")  %dopar% 
      log.returns(col)
stopImplicitCluster()

stocks.log <- as.data.frame(mat.log)
colnames(stocks.log) <- colnames(stocks)[-c(1,2)] 
stocks.log$date <- stocks$date[-1]   # get rid of the first day; no "return" for then

Stock Correlations

A quantity we might be interested in is the correlation between the log returns of various stocks: we can use R's cor() function to do this.

nstocks <- 419
cors <- matrix(rep(0,nstocks*nstocks), nrow=nstocks, ncol=nstocks)
system.time(
for (i in 1:419) {
  for (j in 1:419) {
    cors[i,j] <- cor(stocks.log[[i]],stocks.log[[j]])    
  }
}
)
   user  system elapsed 
 15.543   0.044  15.593 

Summary: foreach

Foreach is a wrapper for the other parallel methods we've seen, so it inherits some of the advantages and drawbacks of each.

Use foreach if:

  • Your code already relys on for-style iteration; transition is easy
  • You don't know if you want multicore vs. snow style parallel use (or other kinds, like batch jobs): you can switch just by registering a different backend!
  • You want to be able to incrementally improve the performance of your code.

Note that you can have portions of your analysis code use foreach with parallel and portions using the backend with apply-style parallelism; it doesn't have to be all one or the other.

Test your skills: Stock Correlations

Parallelize the stock correlation matrix calculation with foreach. You should get a proper speedup here. Try working on just the first 10 stocks until you get things working.

Note: you can nest foreach() loops using the '%:%' operator:

foreach(...) %:%
  foreach(...)

When you're done that, take a look at a random year's airline data. Of the flights that have a departure delay, is the arrival delay (on average) less than or greater than the departure delay? Is: “This is the captain: Sorry for the delay folks, but we'll make it up in the air” a lie?

How would you use foreach to loop over the various years' data?

Scalable Data Analysis: Best Practices

Best Practices: Don't reinvent wheels

There's no one-size-fits-all-analysis approach to taking advantage of multiple processors or nodes (or accellerators).

Check to see if there already exists packages for doing your analysis in parallel.

Best Practices: Big chunks better than little chunks

It's almost always easier to efficiently take advantage of coarse-grained parallelism.

E.g. running each cross-validation fold on a different processor will almost certainly work better than running them sequentially but with parts of the training method parallelized.

Amdahl's Law!

Best Practices: Parallelism gives you more _compute_

For throwing more cores at the problem to help, the calculation has to have been limited by compute power.

I/O bound analyses, or network-bound analyses, or even memory-bandwidth-constrained analyses can easily have worse performance with more cores, not better.

Best Practices: One task per core

Compute bound calculations slow down if something else is sharing the processor with them.

Except for debugging, you generally don't want to run (say) doParallel with more cores than you actually have on your computer.

Sometimes using the number of pseudo-cores/hyperthreads/hardware threads is helpful if memory bandwidth is a limiting factor, or not if not - you can test.

Best Practices: Don't trip over your own feet

Because you only want one task for core, be careful about accidentally using parallelism!

Eg, a threaded blas/lapack is great for interactive use, but:

If you're

  • doing four tasks at once with multicore on your laptop, and
  • Each of those tasks calls a matrix solve which uses a threaded BLAS to launch 4 threads

Now you're competing with yourself for your own laptop's cores.

Summary

R comes with an increasingly rich set of tools for taking advantage of more compute power:

  • parallel
  • foreach/doParallel

Keep in mind what we talked about in terms of overhead, and:

  • Don't reinvent wheels
  • Big chunks are better than little chunks
  • Parallelism gives you more compute, not I/O
  • One task per core
  • Don't trip over your own feet

Extra material

A Few Words on R and Memory

A Few Words on R and Memory

Because R frequently needs to make temporary copies (R works best as a functional programming language), hitting memory limit frequently becomes a problem.

Avoiding hitting that limit too early requires some caution.

Need to know how R handles variables and memory.

Garbage Collection

Like a lot of dynamic languages, R relies on garbage collection to limit memory usage.

“Every so often”, a garbage collection task runs and deletes variables that won't be used any more.

You can force the garbage collection to run at any given time by calling gc(), but this almost never fixes anything significant.

How can the gc know that you're not going to use that big variable in the next line?

Gc needs your help to be effective.

Image

Useful commands for memory management

  • gc(verbose=TRUE), or just gc(TRUE).
    • gc() alone probably won't help anything. This calls gc() - likely not very useful - but gives verbose output, returning current memory usage as a matrix.
  • ls()
    • Lists current variables
  • object.size()
    • Pass it a variable, it prints out its size. Pass it get("variablename") (eg, quoted) and it will get that variable and print its size.
  • rm()
    • Deletes a variable you're not going to use. Lets gc() go to work.
  • Fun little one-liner which prints out all variables by size in bytes:
sort( sapply( ls(), function(x) { object.size(get(x))} ),decreasing=TRUE )

Object.size() and gc(TRUE)

Let's play with object.size() and gc(TRUE):

gc(TRUE)
           used  (Mb) gc trigger  (Mb)  max used   (Mb)
Ncells  2402596 128.4    4703850 251.3   4703850  251.3
Vcells 31199516 238.1   79492131 606.5 273190741 2084.3
old.mem <- gc(TRUE)[,c(1:2,5:6)]
x <- rep(0.,(16*1024)**2)
xsize <- object.size(x)
xsize
2147483688 bytes

Object.size() and gc(TRUE)

Let's play with object.size() and gc(TRUE):

xsize
2147483688 bytes
print(xsize,units="MB")
2048 Mb
new.mem <- gc(TRUE)[,c(1:2,5:6)]
new.mem-old.mem
            used (Mb) max used  (Mb)
Ncells       576    0        0   0.0
Vcells 268436146 2048 26530385 202.4

Object.size() and gc(TRUE)

Now let's delete the object and see how system memory behaves:

rm(x)
final.mem <- gc(TRUE)[,c(1:2,5:6)]
final.mem
           used  (Mb)  max used   (Mb)
Ncells  2403174 128.4   4703850  251.3
Vcells 31200229 238.1 299721126 2286.7
final.mem-old.mem
       used (Mb) max used  (Mb)
Ncells  568    0        0   0.0
Vcells  684    0 26530385 202.4

Better to Use Functions

… but if you use functions:

  • Variables deleted as they fall out of scope
  • Code is more readable, maintable, reusable
trunc.gc <- function() { gc(TRUE)[,c(1:2,5:6)] }
rnorm.sum <- function(n) {
  x <- rnorm(n)
  sum(x)
}
orig.gc <- trunc.gc()
rnorm.sum(16*1024*1024)
[1] 3121.226
after.gc <- trunc.gc()
after.gc - orig.gc
       used (Mb) max used (Mb)
Ncells   15    0        0    0
Vcells   14    0        0    0

Out Of Core / External Memory Computation

Out Of Core / External Memory Computation

Some problems require doing fairly simple analysis on a data set too large to fit into memory.

  • Min/mean/max
  • Data cleaning
  • Even linear fitting is pretty simple

In that case, one processor may be enough; you just want a way to not overrun memory.

Out of Core, or External Memory computation leaves the data on disk, bringing in to memory only what is needed/fits at any given point.

For some computations, this works well (but note: disk access is always much slower than memory access.)

Bigmemory Package

The bigmemory package defines a generalization of a matrix class, big.matrix, which can be file-backed - that is, can exist primarily on disk, with parts brought into memory when necessary.

This approach works fairly well when one's data access involves passing through the entire data set once or a very small number of times, either combining data or extracting a subset.

Packages like bigalgebra or biganalytics (not covered here) build onbigmemory.

External Memory

Ideal gas Data Set

In data/idealgas, we have a set of synthetic data files describing an ideal gas experiment - setting temperature, amount of material, and volume, and measuring the pressure.

Simple data sets:

small.data <- read.csv("data/idealgas/ideal-gas-fixedT-small.csv")
small.data[1:2,]
  X  pres        vol   n temp
1 1 99000 0.02036345 0.8  300
2 2 99250 0.02018306 0.8  300

Row name, pressure (Pa), volume (m3), N (moles), and temperature (K).

A larger data set consisting of 124M rows, 5.8GB, is sitting in ideal-gas-fixedT-large.csv, and we'd like to do some analysis of this data set. But the size is a problem.

A Note on File Formats

Let's consider the humble .csv file:

$ ls -sh1 airOT2010.*
151M airOT2010.RDS
151M airOT2010.Rdata
1.4G airOT2010.csv

$ Rscript  timeexamples.R
[1] "Reading Rdata file"
   user  system elapsed
 11.697   0.616  12.319
[1] "Reading RDS file"
   user  system elapsed
 11.041   0.644  11.694
[1] "Reading CSV file"
   user  system elapsed
140.640   3.352 144.142

A Note on File Formats

CSV — or really, any text-based format — is the worst possible format for quantiative data. It manages the trifecta of being:

  • Slow to read
  • Huge
  • Inaccurate

Converting floating point numbers back and forth between internal representations and strings is slow and prone to truncation.

Use binary formats whenever possible. .Rdata is a bit prone to change; .RDS is modestly better. Portable file formats like HDF5 (for data frames) or NetCDF4 (for matrices and arrays) are compact, accurate, fast (not as fast as .Rdata/.RDS), and can be read by tools other than R.

Creating a file-backed big matrix

We've already created a big.matrix file from this data set, using

data <- read.big.matrix("data/idealgas/ideal-gas-fixedT-large.csv", 
                        header=TRUE,  
                        backingfile="data/idealgas/ideal-gas-fixedT-large.bin", 
                        descriptorfile="ideal-gas-fixedT-large.desc")

This reads in the .csv file and outputs a binary equivalent (the “backingfile”) and a descriptor (in the “descriptorfile”) which contains all of the information which describes the binary blob.

You can read the descriptorfile: more ideal-gas-fixedT-large.desc

Done for you since initial convertion takes 12 minutes for this set - kind of boring.

Note: converts into a matrix, which is a less flexible data type than a data frame; homogeneous type. Here, we'll use all numeric.

Using a big.matrix

Let's do some simple analysis on the data set and see how memory behaves.

library(bigmemory, quiet=TRUE)
orig.gc <- trunc.gc()
data <- attach.big.matrix("data/idealgas/ideal-gas-fixedT-large.desc")
trunc.gc()-orig.gc
       used (Mb) max used (Mb)
Ncells  136    0        0    0
Vcells   11    0        0    0

Using a big.matrix

Let's do some simple analysis on the data set and see how memory behaves.

data[1:2,]
      pres        vol   n temp
[1,] 90000 0.00565073 0.5  280
[2,] 90004 0.01952750 0.5  280
system.time(min.p <- min(data[,"pres"]))
   user  system elapsed 
  0.018   0.013   0.031 
trunc.gc()-orig.gc
       used (Mb) max used (Mb)
Ncells 1218  0.1        0    0
Vcells 1091  0.0        0    0

Using a big.matrix

That only took ~7 seconds to scan through 124M records to find a minimum. Let's try a few other calculations: Let's do some simple analysis on the data set and see how memory behaves.

min.p
[1] 90000
system.time(max.p <- max(data[,"pres"]))
   user  system elapsed 
  0.012   0.002   0.014 
system.time(mean.t <- mean(data[,"temp"]))
   user  system elapsed 
  0.014   0.007   0.021 

Using a big.matrix

Going through the same column a second time was faster, because some of the data was cached; going through a new column was about the same speed as the first. What has that done to memory?

trunc.gc()-orig.gc
       used (Mb) max used (Mb)
Ncells 1176  0.1        0    0
Vcells  909  0.0        0    0

Using a big.matrix

Let's try something more complicated: we know that averaged over our data, we should have \[ p V = n R T \]. Let's try to infer the gas constant$:

system.time(sum.pv <- sum(data[,"pres"]*data[,"vol"]))
   user  system elapsed 
  0.025   0.021   0.046 
system.time(sum.nt <- sum(data[,"n"]*data[,"temp"]))
   user  system elapsed 
  0.027   0.012   0.039 
sum.pv/sum.nt
[1] 8.316193

Using a big.matrix

And we're still not using that much memory.

trunc.gc()-orig.gc
       used (Mb) max used (Mb)
Ncells 1193  0.1        0    0
Vcells  974  0.0        0    0

Using a big.matrix

Let's extract a subset of the data and analyze it.

The mwhich command in bigmemory lets us search through the data for multiple conditions, and extract that data:

system.time(subset.data <- data[mwhich(data, cols=c("n", "pres"), 
                                       vals=list(c(1,1.1), c(99000,101000)),
                                       comps=list(c("ge","le"),c("ge","le")), 
                                       op="AND"),])
   user  system elapsed 
  0.058   0.001   0.059 
class(subset.data)
[1] "matrix"
fit <- lm(vol ~ temp, data=as.data.frame(subset.data))

Using a big.matrix

summary(fit)

Call:
lm(formula = vol ~ temp, data = as.data.frame(subset.data))

Residuals:
      Min        1Q    Median        3Q       Max 
-0.041951 -0.006885  0.000003  0.006825  0.040519 

Coefficients:
             Estimate Std. Error t value Pr(>|t|)    
(Intercept) 1.553e-04  1.269e-03   0.122    0.903    
temp        8.698e-05  4.228e-06  20.574   <2e-16 ***
---
Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1

Residual standard error: 0.01013 on 40998 degrees of freedom
Multiple R-squared:  0.01022,   Adjusted R-squared:  0.01019 
F-statistic: 423.3 on 1 and 40998 DF,  p-value: < 2.2e-16

Using a big.matrix

object.size(subset.data)
1312632 bytes
trunc.gc()-orig.gc
         used (Mb) max used (Mb)
Ncells   8935  0.5        0    0
Vcells 586030  4.5        0    0

Using a big.matrix

Other options:

  • morder or mpermute allow you to sort the data in memory or on disk
  • head and tail let you get the start/end rows
  • mwhich allows all sorts of slicing and dicing
  • sub.big.matrix lets you extract contiguous regions of the matrix

Summary: bigmemory

If you just have a data file much larger than memory that you have to crunch and the amount of actual computation is not a bottleneck, the bigmemory and related packages may be all you need.

Works best if:

  • Data is of homogeneous type - eg, all integer, all numeric, all string
  • Just need to work on a subset of data at a time, or
  • Just need to make one or two passes through the data to complete analysis

Test your skills: Bigmemory

lm() doesn't work natively on a big.matrix - but we can write our own.

If we have an OLS model \[ \hat{y_i} = a x_i + b + \epsilon \], we can fit it with

\[ b = \bar{y} - a \bar{x} \]

\[ a = \frac{\sum_i{ x_i y_i } - n \bar{x} \bar{y}}{\sum_i{x_i^2} - n \bar{x} \bar{x}} \]

Using the examples above, fit a couple of columns of the ideal gas data set. Do the results make sense? (Once it's working, try fitting \( pV \propto nT \).) How much memory is used?

(Note: there is a biglm package)

Advanced R: Rdsm, pbdR

Advanced R: Rdsm, pbdR

We've looked at some of the standard scalable computing packages for R.

Here are two somewhat more advanced packages, that solve very different problems.

  • Rdsm: Get the most (performance, memory) out of a single-computer computation by using shared memory.
  • pbdR: Get the most (performance, scale) out of a cluster computation by ditching master-worker, and using very large-scale distributed routines.

Rdsm

While it's generally true that processes can't peer into each other's memory, there is an exception.

Processes can explicitly make a window of memory shared - visible to other processes.

This isn't necessary for threads within a process; but it is necessary for multiple processes working on the same data.

Only works on-node; can't share memory across a network.

Shared Memory

Rdsm

Rdsm allows you to share a matrix across processes on a node - for reading and for writing.

Normally, when we split a data structure up across tasks, we make copies (PSOCK), or we use read-only (multicore/fork).

If output is also going to be large, we now have 2-3 copies of the data structure floating around.

Rdsm allows (on-node) cluster tasks to collaboratively make a large output without copies.

Rdsm

Simple example - let's create a shared matrix, and have everyone fill it.

Create PSOCK cluster, an Rdsm instance, shared matrix, and a barrier:

library(parallel)
library(Rdsm)

nrows <- 7

cl <- makePSOCKcluster(3)       # form 3-process PSOCK (share-nothing) cluster
init <- mgrinit(cl)             # initialize Rdsm
mgrmakevar(cl,"m",nrows,nrows)  # make a 7x7 shared matrix
bar <- makebarr(cl)

Rdsm

Everyone gets their task id, and which rows are “theirs”;

# at each thread, set id to Rdsm built-in ID variable for that thread
clusterEvalQ(cl,myid <- myinfo$id)
[[1]]
[1] 1

[[2]]
[1] 2

[[3]]
[1] 3
clusterExport(cl,c("nrows"))
dmy <- clusterEvalQ(cl,myidxs <- getidxs(nrows))
dmy <- clusterEvalQ(cl, m[myidxs,1:nrows] <- myid)
dmy <- clusterEvalQ(cl,"barr()")

…then fills it with their id.

Rdsm

Now, print the results.

print(m[,])
     [,1] [,2] [,3] [,4] [,5] [,6] [,7]
[1,]    1    1    1    1    1    1    1
[2,]    1    1    1    1    1    1    1
[3,]    2    2    2    2    2    2    2
[4,]    2    2    2    2    2    2    2
[5,]    2    2    2    2    2    2    2
[6,]    3    3    3    3    3    3    3
[7,]    3    3    3    3    3    3    3
stoprdsm(cl)  # stops cluster

Summary: Rdsm

Allows collaborative use of a single pool of memory.

Avoids performance and memory problems of making copies to send back and forth.

Works well when:

  • Outputs are as large/larger than inputs. (Correlation matrix of stocks).
  • Inputs are very large, and want to do transformation in-place (values to log-returns).

pbdR

The master-worker approach that all the methods we've used so far take works very well for interactive work, is easy to loadbalance, and is easy to understand.

But there's a fairly narrow range of number of workers where master-worker works well.

For a small number of total processors (2-4, say), it really hurts to have one processor doing nothing except some small amount of coordination.

For a very large number of processors (hundreds or more, depending on the size of each task), the worker scan easily overwhelm the master, meaning all of the workers are sitting around waiting while the master catches up.

Master Worker Imbalances

pbdR

At scale, idea of a single master isn't helpful.

Better: Coordinating peers.

Rather than a single master parcelling out work, the workers themselves decide which part of the problem they should be working on, and combine their results cooperatively.

More efficient and can scale better; Downsides:

  • Dynamic load-balancing is substantially trickier (but doable)
  • Can't really do this interactively; need to write a script

Workers control the means of data production

Departure Hour Histogram Example

In pbd/mpi-histogram.R we have a script that does hour histogram for eight full years of data, sifting through 40 million flights, in about a minute:

$ time mpirun -np 8 Rscript mpi-histogram.R
COMM.RANK = 0
 [1]    4081  118767   27633    7194    9141  194613 2235007 2902703 3003510
[10] 2649823 2373934 2473105 2757256 2772498 2362334 2485699 2503423 2794298
[19] 2626931 2282125 2074739 1386485  649392  344257
COMM.RANK = 0
[1] 41038948

real  1m15.357s
user    9m39.943s
sys 0m10.910s

Departure Hour Histogram Example

What sorcery is this?

# count.hours and get.hour definitions...
start.year <- 1990

init()
rank <- comm.rank()
my.year <- start.year + rank

myfile <- paste0("data/airline/airOT",as.character(my.year),".RDS")
data <- readRDS(myfile); data <- data$DEP_TIME
myhrs <- count.hours(data)

hrs <- allreduce( myhrs, op="sum" )
comm.print( hrs )
comm.print( sum(hrs) )

finalize()

Departure Hour Histogram Example

Let's take a look at the first few lines

# count.hours and get.hour definitions...
start.year <- 1990

init()
rank <- comm.rank()
my.year <- start.year + rank

myfile <- paste0("data/airline/airOT",as.character(my.year),".RDS")
data <- readRDS(myfile); data <- data$DEP_TIME

In this case, each task decides which year's data to work on. First (“zeroth”) task works on 1990, next on 1991, etc.

Every task has to call the init() routine when starting, and finalize() routine when done.

Then reads in the file.

Departure Hour Histogram Example

data <- readRDS(myfile); data <- data$DEP_TIME
myhrs <- count.hours(data)

hrs <- allreduce( myhrs, op="sum" )
comm.print( hrs )
comm.print( sum(hrs) )

finalize()

Once the file is read, we use our trusty count.hours routine again to work on the entire vector.

Then an allreduce function sums each workers hours, and returns the sum to all processors. We then print it out.

Rather than only the master running the main program and handing off bits to workers, every task runs this identical program; the only difference is the value of comm.rank().

Reductions

Reductions are one way of combining results, and they're very powerful:

init()
rank <- comm.rank()
my.year <- start.year + rank

myfile <- paste0("../data/airline/airOT",as.character(my.year),".RDS")
data <- readRDS(myfile); data <- data$CRS_ELAPSED_TIME
data <- data[!is.na(data)]

data.median <- pbd.quantile(data,0.5)
data.min <- allreduce(min(data), op="min")
data.max <- allreduce(max(data), op="max")
data.N <- allreduce(length(data), op="sum")
data.mean <- allreduce(sum(data), op="sum")/data.N

comm.print(data.min)
comm.print(data.median)
comm.print(data.mean)
comm.print(data.max)

finalize()

Reductions

$ mpirun -np 4 Rscript ./min-median-mean-max.R
COMM.RANK = 0
[1] -70
COMM.RANK = 0
[1] 93.00004
COMM.RANK = 0
[1] 112.8207
COMM.RANK = 0
[1] 1613

Median finding:

R's higher-level functions plus reductions are very powerful ways to do otherwise tricky distributed problems - like median of distributed data:

pbd.quantile <- function( data, q=0.5 ) {
    if (q < 0 | q > 1) {
        stop("q should be between 0 and 1.")
    }

    N <- allreduce(length(data), op="sum")
    data.max <- allreduce(max(data), op="max")
    data.min <- allreduce(min(data), op="min")

    f.quantile <- function(x, prob=0.5) {
        allreduce(sum(data <= x), op="sum" )/N - prob
    }

    uniroot(f.quantile, c(data.min, data.max), prob=q)$root
}

pbd*apply

pbd also has its parallel apply functions, but it's important to realize that these aren't being farmed out by some master task; the tasks themselves decide which ones in the list are “theirs”.

pbd/histogram-pbdsapply.R

year.hours <- function(my.year) {
    myfile <- paste0("data/airline/airOT",as.character(my.year),".RDS")
    data <- readRDS(myfile)$DEP_TIME
    count.hours(data)
}

init()
years <- 1990:1993
all.hours.list <- pbdLapply(years, year.hours)
all.hours <- Reduce("+", all.hours.list)

comm.print( all.hours )
comm.print( sum(all.hours) )

finalize()

pbd Data Distributions

pbd has a couple of ways of distributing data.

What we've used before is their so-called “GBD” distribution - globaly distributed data. It's split up by rows.

However, for linear algebra computations, a block-cyclic distribution is much more useful.

pbd Data Distributions

Reading a pbdR Ddmatrix

pbdR comes with several packages for reading a data file and distributing it as a ddmatrix:

  • read.csv.ddmatrix() for reading from csv
  • nc_get_dmat() to read from a NetCDF4 file
  • gbd2dmat() for conversions from row-oriented to a ddmatrix.

pbd lm

And the reason that you'd use a ddmatrix is that several operations defined on regular R matrices also work transparently on ddmatrix: lm, solve, chol.

pbd-lm.R:

init.grid()
rank <- comm.rank()
my.year <- start.year + rank

data <- cleandata(my.year)
Y <- data[[1]]
X <- as.matrix(data[,-1])

X.dm <- gbd2dmat(X)
Y.dm <- gbd2dmat(Y)

fit <- lm(Y ~ X)
comm.print(summary(fit))

finalize()

pbd lm

$ mpirun -np 4 Rscript pbd-lm.R
Using 2x2 for the default grid size

COMM.RANK = 0

Call:
lm(formula = Y ~ X)

Residuals:
     Min       1Q   Median       3Q      Max
-1307.62    -6.03    -2.29     3.53  1431.70

Coefficients: (6 not defined because of singularities)
                       Estimate Std. Error t value Pr(>|t|)
(Intercept)           1.152e+01  9.616e-02  119.77   <2e-16 ***
XORIGIN_AIRPORT_ID   -1.895e-04  5.193e-06  -36.50   <2e-16 ***
XDEST_AIRPORT_ID     -2.257e-04  5.213e-06  -43.29   <2e-16 ***
XDEP_TIME            -3.382e-04  1.724e-05  -19.61   <2e-16 ***
XDEP_DELAY_NEW        1.426e+00  9.594e-03  148.68   <2e-16 ***
...
---
Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1

Residual standard error: 13.43 on 2741063 degrees of freedom
Multiple R-squared:  0.7809,  Adjusted R-squared:  0.7809
F-statistic: 1.628e+06 on 6 and 2741063 DF,  p-value: < 2.2e-16