Jonathan Dursi
http://github.com/ljdursi/beyond-single-core-R
Today will look something like this:
Hardware:
Processes and threads:
Interpreted languages: generally you can only directly work with processes
Can call libraries that invoke threads (BLAS/LAPACK)
One turns to parallel computing to solve one of three problems:
My program is too slow.
Perhaps using more cores — e.g., all cores on my desktop — will make things faster.
My problem is too big.
Perhaps splitting the problem up onto multiple computers in a cluster will give it access to enough memory to run effectively.
There are too many computations to do - one task runs in a reasonable amount of time, but I have to run thousands!
Perhaps splitting the problem up onto multiple computers in a cluster will give it access to enough memory to run.
For more cores/nodes to help, there has to be something for them to do.
Find largely independent computations to occupy them.
Classic example of this is a parameter study, or set of simulations with different seeds:
“More” case
In this example, no individual task runs any faster with more processors, but the workload as a whole can.
How long it takes to process the N tasks you want done
Throughput: how many tasks/time
For completely independent tasks, P processors can increase throughput by factor P!
How a problem scales: how throughput behaves as processor number increases.
In this case, the throughput scales linearly with the number of processors.
This is the best case: “Perfect scaling”
Another way to look at it: time it takes to get some fixed amount of work done
More usual (and more important!)
Perfect scaling: time to completion ~ 1/P
P procesors - P times faster
Another way to look at it: time it takes to get some fixed amount of work done
More usual (and more important!)
Perfect scaling: time to completion ~ 1/P
P procesors - P times faster
Finally, how efficient is the scaling - when you throw 10 processors at the problem, are you getting 10 times the processing?
Perfect scaling - efficiency = 100%
Popularized by Hadley Wickham, this has become a model for thinking about data analysis in R in the tidyverse.
Split the data set up into relevant sub-sets; apply some analysis to it; combine the results.
This is exactly the way to think about scalable data analysis. Split the data - or tasks on that data - up between computing elements; do the analyses; then combine the results somehow.
The details depend a great deal on the analyses (and the nature of the data.)
“Split” and “Combine” aren't free!
Partitioning the work, and assembling the final results from the partial results, represents some overhead - some fraction of the work that must be done in serial.
Splitting the work and distributing it over a network takes even more time.
Amdal's Law: \[ T \approx \left ( f + \frac{1 - f}{P} \right ) \]
What we've described has been finding concurrency by analyzing different chunks of data the same way
Also possible is to identify different tasks that must be done and perform those in parallel: multiple fits, summary analysis + generating several plots, etc.
Generally more manual but can work very well.
Dependencies limit parallelism
It's important to realize that many fundamental routines as well as higher-level packages come with some degree of scalability and parallelism “baked in”.
Running top
(or glances
, or…) while executing the following in R:
n <- 4*1024
A <- matrix( rnorm(n*n), ncol=n, nrow=n )
B <- matrix( rnorm(n*n), ncol=n, nrow=n )
C <- A %*% B
One R process using 458% of a processor.
R can be built using high performance threaded libraries for math in general, and linear algebra — which underlies many data analysis algorithms — in particular.
Here the single R process has launched several threads of execution – all of which are part of the same process, and so can see the same memory, eg the large matrices.
For a complete list, see
http://cran.r-project.org/web/views/HighPerformanceComputing.html .
Plus packages that use linear algebra or other expensive math operations which can be implicitly multithreaded.
When at all possible, don't do the hard work yourself — look to see if a package already exists which will do your analysis at scale.
Caret is a widely-used machine learning package, that uses foreach
(which we'll learn about)
to parallelize things like CV-folds, etc:
data(iris)
library(caret)
control <- trainControl(method="repeatedcv", number=10, repeats=3)
metric <- "Accuracy"
system.time(fit.lda.ser <- train(Species~., data=iris, method="svmRadial", metric=metric, trControl=control)
)
user system elapsed
2.007 0.046 2.065
library(doParallel)
registerDoParallel(4)
system.time(fit.lda.par <- train(Species~., data=iris, method="svmRadial", metric=metric, trControl=control)
)
user system elapsed
1.211 0.271 1.065
stopImplicitCluster()
Since R 2.14.0 (late 2011), the parallel
package has been part of core R.
Incorporates - and mostly supersedes - two other packages:
multicore
: for using all processors on a single processor. Not on windows.snow
: for using any group of processors, possibly across a cluster.Many packages which use parallelism use one of these two, so worth understanding.
Both create new processes (not threads) to run on different processors; but in importantly different ways.
Multicore creates new processes by forking — cloning – the original process.
That means the new processes starts off seeing a copy of exactly the same data as the original. E.g., first process can read a file, and then fork two new processes - each will see copy of the file.
Not shared memory; changes in one process will not be reflected in others.
Windows doesn't have fork(), so windows can't use these routines.
Performance Tip: Modern OSs are lazy - the copy of memory isn't made unless it has to be, and it doesn't have to be until one process or the other writes to the memory.
That copy is slow, and takes new memory.
So in multicore
, don't overwrite old variables if possible.
Snow creates entirely new R processes to run the jobs.
A downside is that you need to explicitly copy over any needed data, functions.
But the upsides are that spawning a new process can be done on a remote machine, not just current machine. So you can in principle use entire clusters.
In addition, the flipside of the downside: new processes don't have any unneeded data
The simplest use of the multicore
package is the pair of functions mcparallel()
and
mccollect()
.
mcparallel()
forks a task to run a given function; it then runs in the background.
mccollect()
waits for and gets the result.
Let's pick an example: reading the airlines data set, we want — for a particular month — to know both the total number of planes in the data (by tail number) and the median elapsed flight time. These are two independant calculations, and so can be done independantly.
We start the two tasks with mcparallel
, and collect the answers with mccollect
:
library(parallel, quiet=TRUE)
source("data/airline/read_airline.R")
jan2010 <- read.airline("data/airline/airOT201001.csv.gz")
unique.planes <- mcparallel( length( unique( sort(jan2010$TAIL_NUM) ) ) )
median.elapsed <- mcparallel( median( jan2010$ACTUAL_ELAPSED_TIME, na.rm=TRUE ) )
ans <- mccollect( list(unique.planes, median.elapsed) )
ans
$`11420`
[1] 4555
$`11421`
[1] 110
We get a list of answers, with each element “named” by the process ID that ran the job. We find that there are 4555 planes in the data set, and the median flight in the data set is 110 minutes in the air.
Does this save any time? Let's do some independent fits to the data. Let's try to see what the average in-flight speed is by fitting time in the air to distance flown; and let's see how the arrival delay correlates with the departure delay. (Do planes, on average, make up some time in the air, or do delays compound?)
system.time(fit1 <- lm(DISTANCE ~ AIR_TIME, data=jan2010))
user system elapsed
0.345 0.036 0.381
system.time(fit2 <- lm(ARR_DELAY ~ DEP_DELAY, data=jan2010))
user system elapsed
0.298 0.026 0.324
So the time to beat is about 0.7s:
parfits <- function() {
pfit1 <- mcparallel(lm(DISTANCE ~ AIR_TIME, data=jan2010))
pfit2 <- mcparallel(lm(ARR_DELAY ~ DEP_DELAY, data=jan2010))
mccollect( list(pfit1, pfit2) )
}
system.time( parfits() )
user system elapsed
3.013 0.147 1.207
Clearly actually forking the processes and waiting for them to rejoin itself takes some time.
This overhead means that we want to launch jobs that take a significant length of time to run - much longer than the overhead (hundredths to tenths of seconds for fork().)
Typically we want to do more than an itemized list of independent tasks - we have a list of similar tasks we want to perform.
mclapply
is the multicore equivalent of lapply
- apply a function to a list, get
a list back.
Let's say we want to see what similarities there are between delays at O'Hare airport in Chicago in 2010. Clustering methods attempt to uncover “similar” rows in a dataset by finding points that are near each other in some \( p \)-dimensional space, where \( p \) is the number of columns.
\( k \)-Means is a particularly simple, randomized, method; it picks \( k \) cluster centre-points at random, finds the rows closest to them, assigns them to the cluster, then moves the cluster centres towards the centre of mass of their cluster, and repeats.
Quality of result depends on number of random trials.
Let's try that with our subset of data:
# columns listing various delay measures
delaycols <- c(18, 28, 40, 41, 42, 43, 44)
air2010 <- readRDS("data/airline/airOT2010.RDS")
ord.delays <- air2010[(air2010$ORIGIN=="ORD"), delaycols]
rm(air2010)
ord.delays <- ord.delays[(ord.delays$ARR_DELAY_NEW > 0),]
ord.delays <- ord.delays[complete.cases(ord.delays),]
system.time( serial.res <- kmeans(ord.delays, centers=2, nstart=40) )
user system elapsed
2.954 0.163 3.124
serial.res$withinss
[1] 125489706 254304505
Running 40 random trials is the same as running 10 random trials 4 times. Let's try that approach with lapply
:
do.n.kmeans <- function(n) { kmeans(ord.delays, centers=2, nstart=n) }
system.time( list.res <- lapply( rep(10,4), do.n.kmeans ) )
user system elapsed
7.974 0.292 8.340
res <- sapply( list.res, function(x) x$tot.withinss )
lapply.res <- list.res[[which.min(res)]]
lapply.res$withinss
[1] 254304505 125489706
Get the same answer, but a little longer - bit of overhead from splitting it up and starting the process four times. We could make the overhead less important by using more trials, which would be better anyway.
mclapply
works the same way as lapply, but forking off the processes (as with
mcparallel
)
system.time( list.res <- mclapply( rep(10,4), do.n.kmeans, mc.cores=4 ) )
user system elapsed
5.240 0.861 2.524
res <- sapply( list.res, function(x) x$tot.withinss )
mclapply.res <- list.res[[which.min(res)]]
mclapply.res$tot.withinss
[1] 379794211
Note what the output of top looks like when this is running:
There are four separate processes running - not one process using multiple CPUs via threads.
Looks good! Let's take a look at the list of results:
res
[1] 379794211 379794211 379794211 379794211
What happened here?
Depending on what you are doing, it may be very important to have different (or the same!) random numbers generated in each process.
Here, we definitely want them different - the whole point is to generate different random realizations.
parallel
has a good RNG suitable for parallel work based on the work of Pierre L'Ecuyer
in Montréal:
RNGkind("L'Ecuyer-CMRG")
mclapply( rep(1,4), rnorm, mc.cores=4, mc.set.seed=TRUE)
[[1]]
[1] -0.6316222
[[2]]
[1] -0.262412
[[3]]
[1] -0.5085514
[[4]]
[1] 0.8361448
Let's say that, instead of running multiple random trials to find the best given a set of clusters, we were unsure of how many clusters we wanted to run:
do.kmeans.nclusters <- function(n) { kmeans(ord.delays, centers=n, nstart=10) }
time.it <- function(n) { system.time( res <- do.kmeans.nclusters(n)) }
lapply(1:4, time.it)
[[1]]
user system elapsed
1.642 0.044 1.689
[[2]]
user system elapsed
1.897 0.035 1.935
[[3]]
user system elapsed
2.546 0.044 2.592
[[4]]
user system elapsed
3.151 0.043 3.197
More clusters takes longer. If we were to mclapply
these four tasks on 2 CPUs, the first
CPU would get the two short tasks, and the second CPU would get the second, longer tasks -
bad load balance.
Normally, we want to hand multiple tasks of work off to each processor and only hear back when they're completely done - minimal overhead. But that works best when all tasks have similar lengths of time.
If you don't know that this is true, you can do dynamic scheduling - give each processor one task, and when they're done they can ask for another task.
More overhead, but better distribution of work.
system.time( res <- mclapply(1:4, time.it, mc.cores=2) )
user system elapsed
4.676 0.414 5.989
system.time( res <- mclapply(1:4, time.it, mc.cores=2, mc.preschedule=FALSE) )
user system elapsed
11.783 1.404 5.504
So far we've seen splitting the tasks; let's consider splitting the dataset.
Let's make a histogram of the times flights took off - say, binned by the hour.
get.hour <- function(timeInt) timeInt %/% 100
count.hours <- function(range) {
counts <- rep(0,24)
hours <- sapply(jan2010$DEP_TIME[range], get.hour)
hist <- rle( sort(hours) )
for (i in 1:length(hist$values)) {
j <- hist$values[i] + 1
if (j == 25) j = 1
counts[j] <- hist$lengths[i]
}
counts
}
We can count up all flight hours like so:
system.time(scounts <- count.hours(1:nrow(jan2010)))
user system elapsed
0.525 0.023 0.547
scounts
[1] 23 383 127 40 205 12321 30203 31435 34210 32344 31989
[12] 34075 31938 33802 32555 31020 33869 33731 30760 29542 19592 14260
[23] 7470 2574
Can we split this up between tasks? Let's try this:
nr <- nrow(jan2010)
ncores <- 4
chunks <- split(1:nr, rep(1:ncores, each=nr/ncores))
system.time(counts <- mclapply( chunks, count.hours, mc.cores=ncores) )
user system elapsed
2.940 0.340 0.314
That was definitely faster - how do the answers look?
str(counts)
List of 4
$ 1: num [1:24] 8 204 75 32 54 ...
$ 2: num [1:24] 9 121 49 4 78 ...
$ 3: num [1:24] 4 49 2 2 64 ...
$ 4: num [1:24] 2 9 1 2 9 ...
Reduce("+", counts)
[1] 23 383 127 40 205 12321 30203 31435 34210 32344 31989
[12] 34075 31938 33802 32555 31020 33869 33731 30760 29542 19592 14260
[23] 7470 2574
To be fair, we'd have to include the Reduction time in the total time - but that's just the sum of four short vectors, probably not a big deal.
For the simple and common case of applying a function to each element of a vector and returning a
vector, the parallel package has a simplified version of mclapply called pvec
.
fx <- function(x) x^5-x^3+x^2-1
maxn <- 1e6
system.time( res <- sapply(1:maxn, fx) )
user system elapsed
2.236 0.027 2.263
system.time( res <- vapply(1:maxn, fx, 0.) )
user system elapsed
1.440 0.029 1.469
system.time( res <- pvec(1:maxn, fx, mc.cores=2) )
user system elapsed
0.874 0.522 0.104
system.time( res <- pvec(1:maxn, fx, mc.cores=4) )
user system elapsed
0.070 0.043 0.080
system.time( res <- mclapply(1:maxn, fx, mc.cores=4) )
user system elapsed
0.801 0.271 0.899
Using the entire 2010 dataset, and the examples above, examine one of the following questions:
split()
and the Reduce()
on the serial-vs-parallel timings for this histogram? Is there a better way of doing the splitting?The mc*
routines in parallel work particularly well when:
mcparallel
works very well for task parallelism; the mclapply
for data parallelism.Things to watch for:
mc.cores
is a lie. It's the number of tasks, not cores. Can easily oversubscribe cores explicitly or implicitlyThe other half of parallel, routines that were in the still-active snow
package, allow you
to again launch new R processes — by default, on the current computer, but also on any computer
you have access to. (SNOW stands for “Simple Network of Workstations”, which was the original
use case).
The recipe for doing computations with snow looks something like:
library(parallel)
cl <- makeCluster(nworkers,...)
results1 <- clusterApply(cl, ...)
results2 <- clusterApply(cl, ...)
stopCluster(cl)
other than the makeCluster()
/stopCluster()
, it looks very much like multicore and mclapply
.
Let's try starting up a “cluster” (eg, a set of workers) and generating some random numbers from each:
library(parallel)
cl <- makeCluster(4)
clusterCall(cl, rnorm, 5)
[[1]]
[1] -1.31973090 0.50914581 -0.07566584 0.13727399 -1.84968866
[[2]]
[1] 0.7409130 -0.3559644 1.1822262 -0.9311078 0.5177474
[[3]]
[1] -0.3316341 1.0436526 0.5964421 -1.3712068 0.3503339
[[4]]
[1] 0.5945040 -0.2918959 0.8942675 0.1000171 1.3576120
stopCluster(cl)
clusterCall()
runs the same function (here, rnorm
, with argument 5
) on all workers in
the cluster. A related helper function is clusterEvalQ()
which is handier to use for some
setup tasks - eg,
clusterEvalQ(cl, {library(parallel); library(foreach); NULL} )
Emboldened by our success so far, let's try re-doing our \( k \)-means calculations:
delaycols <- c(18, 28, 40, 41, 42, 43, 44)
source("data/airline/read_airline.R")
jan2010 <- read.airline("data/airline/airOT201001.csv.gz")
jan2010 <- jan2010[,delaycols]
jan2010 <- jan2010[complete.cases(jan2010),]
do.n.kmeans <- function(n) { kmeans(jan2010, centers=4, nstart=n) }
library(parallel)
cl <- makeCluster(4)
res <- clusterApply(cl, rep(5,4), do.n.kmeans)
stopCluster(cl)
Error in checkForRemoteErrors(val) :
-----------------------------
4 nodes produced errors; first error: object 'jan2010' not found
-----------------------------
Ah! Failure.
Recall that we aren't forking here; we are creating processes from scratch. These processes, new to this world, are not familiar with our ways, customs, or datasets. We actually have to ship the data out to the workers:
cl <- makeCluster(4)
system.time(clusterExport(cl, "jan2010"))
user system elapsed
0.128 0.027 0.574
system.time(cares <- clusterApply(cl, rep(5,4), do.n.kmeans))
user system elapsed
0.357 0.039 11.064
stopCluster(cl)
system.time( mcres <- mclapply(rep(5,4), do.n.kmeans, mc.cores=4) )
user system elapsed
29.584 1.655 10.165
Note that the costs of shipping out data back and forth, and creating the processes from scratch, is relatively costly - but this is the price we pay for being able to spawn the processes anywhere.
(And if our computations take hours to run, we don't really care about several-second delays.)
The default cluster is a sockets-based cluster; you can run on multiple machines by specifying them to a different call to makeCluster:
hosts <- c( rep("localhost",8), rep("192.168.0.10", 8) )
cl <- makePSOCKcluster(names=hosts)
clusterCall(cl, rnorm, 5)
clusterCall(cl, system, "hostname")
stopCluster(cl)
Once it is done, you have succcessfully run random number generators across multiple hosts.
parallel
has several different cluster types:
Because of the need to send (possibly large) data to the workers, the scheduling of workers is even more important than with multicore.
The snow
library has very nice visualization tools for timing that are inexplicably absent
from parallel
; so let's temporarily use snow:
library(snow,quiet=TRUE)
do.kmeans.nclusters <- function(n) { kmeans(jan2010, centers=n, nstart=10) }
cl <- makeCluster(2)
clusterExport(cl,"jan2010")
tm <- snow.time( clusterApply(cl, 1:6, do.kmeans.nclusters) )
plot(tm)
tm.lb <- snow.time(clusterApplyLB(cl, 1:6, do.kmeans.nclusters))
plot(tm.lb)
stopCluster(cl)
The default clusterApply
sends off one task to each worker, waits until they're both done, then
sends off another. (Question: why?)
clusterApplyLB
does something more like mc.preschedule=FALSE
; it fires off tasks to each worker
as needed.
Sending off one task at a time can be inefficient if there is a lot of commnication involved. But it allows flexibility in scheduling, which is vitally important if the tasks are of widely varying durations.
Of course, for some applications, we don't need to send the entire data structure across. Let's consider the departure-time histogram again. This time, we're only going to send across the data that's going t be computed:
jan2010 <- read.airline("data/airline/airOT201001.csv.gz")
jan2010 <- jan2010[complete.cases(jan2010),]
get.hour <- function(timeInt) timeInt %/% 100
count.hours <- function(timesInt) {
counts <- rep(0,24)
hours <- sapply(timesInt, get.hour)
hist <- rle( sort(hours) )
for (i in 1:length(hist$values)) {
j <- hist$values[i] + 1
if (j == 25) j = 1
counts[j] <- hist$lengths[i]
}
counts
}
This time, rather than exporting the entire data set, we'll just send across the bits we need:
cl <- makeCluster(2)
clusterExport(cl,"get.hour") # have to export _functions_, too.
datapieces <- clusterSplit(cl,jan2010$DEP_TIME)
str(datapieces)
List of 2
$ : int [1:253778] 1425 1228 1053 1047 1753 1755 1846 1859 1752 1757 ...
$ : int [1:253779] 1450 1459 1458 1455 723 704 659 658 701 702 ...
ans <- clusterApply(cl, datapieces, count.hours)
Reduce("+", ans)
[1] 23 381 126 40 205 12279 30069 31268 34042 32198 31881
[12] 33936 31843 33686 32437 30907 33750 33644 30686 29455 19522 14184
[23] 7434 2556
To look a little more closely at some communciations and load balance issues, I'm going to split the data up into more pieces than workers, and distribute them:
stopCluster(cl)
cl <- makeCluster(6)
datapieces <- clusterSplit(cl,jan2010$DEP_TIME)
stopCluster(cl)
cl <- makeCluster(2)
clusterExport(cl,"get.hour") # have to export _functions_, too.
str(datapieces)
List of 6
$ : int [1:84255] 1425 1228 1053 1047 1753 1755 1846 1859 1752 1757 ...
$ : int [1:84762] 1337 2045 658 1423 1645 1923 853 1613 2048 755 ...
$ : int [1:84761] 2048 2037 2031 2039 2034 2035 2036 1634 1840 1639 ...
$ : int [1:84762] 1450 1459 1458 1455 723 704 659 658 701 702 ...
$ : int [1:84762] 723 723 825 919 912 726 722 723 725 727 ...
$ : int [1:84255] 1705 1036 1242 942 1653 2135 1330 734 1831 1253 ...
tm <- snow.time( ans <- clusterApply(cl, datapieces, count.hours) )
plot(tm)
tm <- snow.time( ans <- parLapply(cl, datapieces, count.hours) )
plot(tm)
stopCluster(cl)
If the list you are operating on consists of big chunks of data, the relevant piece is sent to the worker for its task.
Sometimes that's exactly what you want:
clusterApplyLB
)But if it's not necessary, it adds a delay to the task. If you know ahead of time the tasks are of similar duration:
parLapply
to chunk up the data for you and send all the data for one task all at once.detach("package:snow", unload=TRUE)
The cluster
routines in parallel
are good if you know you will eventually have to move to using
multiple computers (nodes in a cluster, or desktops in a lab) for a single computation.
clusterExport
for functions and data that will be needed by everyone.clusterApplyLB
if the tasks vary greatly in runtime.clusterApply
if each task requires an enormous amount of data.parLapply
if tasks are similar duration and data from multiple tasks will fit in memory.snow::snow.time
is great for understanding performance.makePSOCKcluster
for small clusters; consider makeMPIcluster
for larger (but see pbdR
section online)On a set of workstations (or AWS instances) you have access to, try a session with two nodes,
and setup a PSOCK cluster across both nodes.
Call unlist(clusterCall(cl, system, "hostname"))
to make sure that you have workers on both nodes.
Load the 2010 data and break it up by month (look up the split
command) and see which month had the
highestfraction of cancelled flights.
Then split the data up by airline and see which airline had the highest fraction of cancelled flights.
There are two big downsides with how we are doing this: the master is doing a huge amount of the work by doing the pre-splitting, and the whole data set has to be in memory. Tackle one or the other of them:
The “master/worker” approach that parallel
enables works extremely well for moderately sized problems,
and isn't that difficult to use. It is all based on one form of R iteration, apply, which is well understood.
However, going from serial to parallel requires some re-writing, and even going from one method of
parallelism to another (eg, multicore
-style to snow
-style) requires some modification of code.
The foreach
package is based on another style of iterating through data - a for loop - and is designed
so that one can go from serial to several forms of parallel relatively easily. There are then a number of tools one can use in the library to improve performance.
The standard R for loop looks like this:
for (i in 1:3) print(sqrt(i))
[1] 1
[1] 1.414214
[1] 1.732051
The foreach operator looks similar, but returns a list of the iterations:
library(foreach)
foreach (i=1:3) %do% sqrt(i)
[[1]]
[1] 1
[[2]]
[1] 1.414214
[[3]]
[1] 1.732051
library(foreach)
foreach (i=1:3) %do% sqrt(i)
The foreach function creates an object, and the %do%
operator operates on the code (here just one statement,
but it can be multiple lines between braces, as with a for loop) and the foreach object.
Foreach works with a variety of backends to distribute computation - doParallel
, which allows snow- and
multicore-style parallelism, and doMPI
(not covered here).
Switching the above loop to paralleljust requires registering a backend and using %dopar%
rather than %do%
:
library(doParallel)
registerDoParallel(3) # use multicore-style forking
foreach (i=1:3) %dopar% sqrt(i)
[[1]]
[1] 1
[[2]]
[1] 1.414214
[[3]]
[1] 1.732051
stopImplicitCluster()
One can also use a PSOCK cluster:
cl <- makePSOCKcluster(3)
registerDoParallel(cl) # use the just-made PSOCK cluster
foreach (i=1:3) %dopar% sqrt(i)
[[1]]
[1] 1
[[2]]
[1] 1.414214
[[3]]
[1] 1.732051
stopCluster(cl)
While returning a list is the default, foreach
has a number of ways to combine the individual results:
foreach (i=1:3, .combine=c) %do% sqrt(i)
[1] 1.000000 1.414214 1.732051
foreach (i=1:3, .combine=cbind) %do% sqrt(i)
result.1 result.2 result.3
[1,] 1 1.414214 1.732051
foreach (i=1:3, .combine="+") %do% sqrt(i)
[1] 4.146264
foreach (i=1:3, .multicombine=TRUE, .combine="sum") %do% sqrt(i)
[1] 4.146264
Most of these are self explanatory.
multicombine
is worth mentioning: by default, foreach
will combine
each new item into the final result one-at-a-time.
If .multicombine=TRUE
, then you are saying that you're passing a function
which will do the right thing even if foreach gives it a whole wack of new results as a list or vector -
e.g., a whole chunk at a time.
There's one more operator: %:%
. This lets you compose or nest foreach objects:
foreach (i=1:3, .combine="c") %:%
foreach (j=1:3, .combine="c") %do% {
i*j
}
[1] 1 2 3 2 4 6 3 6 9
And you can also filter items, using when
:
foreach (a=rnorm(25), .combine="c") %:%
when(a >= 0) %do%
sqrt(a)
[1] 0.5598735 0.7148421 0.3468935 0.8949200 1.1892010 0.4669242 0.6889375
[8] 0.3459393 1.2330996 0.6494213 0.8319499 0.7037915 0.6425140 0.6792320
[15] 1.3340716 0.4545752 1.1219641 0.5707225 0.4611031
Let's consider our hour histogram again:
system.time(
foreach (i=1:2000, .combine="+") %do% {
hrs <- rep(0,24)
hr <- get.hour(jan2010$DEP_TIME[i])
hrs[hr+1] = hrs[hr+1] + 1
hrs
}
)
user system elapsed
0.815 0.031 0.846
Note: like a function, we have to make sure the function we want to return is the last line (or explicitly returned).
What's more, this automatically works in parallel:
cl <- makePSOCKcluster(3)
registerDoParallel(cl,cores=3)
system.time(
foreach (i=1:2000, .combine="+") %dopar% {
hrs <- rep(0,24)
hr <- get.hour(jan2010$DEP_TIME[i])
hrs[hr+1] = hrs[hr+1] + 1
hrs
}
)
user system elapsed
1.432 0.181 2.756
stopCluster(cl)
Which is actually sort of magic; PSOCK clusters don't share memory! foreach
does a good job of
exporting necessary variables; if something isn't automatically exported, it can be exported explicitly
in the foreach line with, eg, foreach(..., .export=c("jan2010"))
.
But this is incredibly slow:
system.time(
ans <- foreach (i=1:2000, .combine="+") %do% {
hrs <- rep(0,24)
hr <- get.hour(jan2010$DEP_TIME[i])
hrs[hr+1] = hrs[hr+1] + 1
hrs
}
)
user system elapsed
0.799 0.001 0.801
system.time(ans <- count.hours(jan2010$DEP_TIME[1:2000]))
user system elapsed
0.002 0.000 0.003
Mainly because it's not vectorized; by looping over the data one item at a time we've avoided using our lovely fast vector routines. Plus allocating a 24-hour-long vector per item!
Another problem - we've created a vector 1:2000
which in general is the same size as the data set
we're working on. For large data sets, big memory.
Foreach has iterators that can iterate through an object without creating something the size of the
object. For instance, icount()
is like the difference between Python 2.x range and xrange:
cl <- makePSOCKcluster(3)
registerDoParallel(cl,cores=3)
system.time(
ans <- foreach (i=icount(2000), .combine="+") %do% {
hrs <- rep(0,24)
hr <- get.hour(jan2010$DEP_TIME[i])
hrs[hr+1] = hrs[hr+1] + 1
hrs
}
)
But that doens't help with the performance issue here.
We do a little bit better by avoiding the intermediate index; we don't care about \( i \) at all, all we care about is the data. We can implicitly create an iterator on the object with
foreach (time=jan2010$DEP_TIME[1:2000],...
or explicitly, setting the chunk size to distribute between tasks:
system.time(
ans <- foreach (time=iter(jan2010$DEP_TIME[1:2000],chunksize=500), .combine="+") %do% {
hrs <- rep(0,24)
hr <- get.hour(time)
hrs[hr+1] = hrs[hr+1] + 1
hrs
}
)
user system elapsed
0.761 0.007 0.769
ans
[1] 0 0 0 0 0 86 53 166 154 107 117 102 221 138 109 96 137
[18] 142 141 59 95 66 7 4
As you can tell by the chunking, foreach can adjust the iteration scheduling in a number of ways. Chunking is one of them.
The underlying back-end obviously has a lot to do with the scheduling. For multicore, for instance, one can pass familiar options to multicore if we are using a multicore “cluster”:
foreach( ..., .options.multicore=list(preschedule=FALSE,set.seed=TRUE))
Performance Tip: If you don't care about the order that the results come back in, specifying
.inorder=FALSE
gives the scheduler more flexibility in sending out tasks. Otherwise, you're guaranteed
that the first result back is from the first iteration, etc.
But really, we want to work on entire slices of the data at once. For objects like matricies or data frames, you can send out a row, column, etc at a time; we can re-cast the data as a matrix and send it out one row at a time:
jan.matrix = matrix(jan2010$DEP_TIME[1:2000], ncol=500)
system.time(
ans <- foreach (times=iter(jan.matrix,by="row"), .combine="+") %do% {
count.hours(times)
}
)
user system elapsed
0.007 0.000 0.007
ans
[1] 0 0 0 0 0 86 53 166 154 107 117 102 221 138 109 96 137
[18] 142 141 59 95 66 7 4
And this works in parallel, as well
cl <- makePSOCKcluster(3)
registerDoParallel(cl,cores=3)
jan.matrix = matrix(jan2010$DEP_TIME[1:2000], ncol=500)
system.time(
ans <- foreach (times=iter(jan.matrix,by="row"), .combine="+") %dopar% {
count.hours(times)
}
)
user system elapsed
0.014 0.000 0.029
stopCluster(cl)
ans
[1] 0 0 0 0 0 86 53 166 154 107 117 102 221 138 109 96 137
[18] 142 141 59 95 66 7 4
If we want each task to only work on some subset of the data, the isplit
iterator will split the
data at the master, and send off the partitioned data to workers:
ans <- foreach (byAirline=isplit(jan2010$DEP_TIME, jan2010$UNIQUE_CARRIER),
.combine=cbind) %do% {
df <- data.frame(count.hours(byAirline$value)); colnames(df) <- byAirline$key; df
}
ans$UA
[1] 2 4 0 0 0 957 1595 1817 2598 1401 1713 1774 1509 1907
[15] 1442 1230 1510 1888 1775 1311 964 783 785 268
ans$OH
[1] 2 2 0 0 0 185 654 469 674 679 572 682 843 763 699 671 839
[18] 777 507 764 467 186 130 20
In data/stocks/stocks.csv
, we have 419 daily closing stock prices going back to 2000 (3654 prices). For stocks, it's often useful to deal with “log returns”, rather than absolute price numbers.
We use:
stocks <- read.csv("data/stocks//stocks.csv")
log.returns <- function(values) { nv=length(values); log(values[2:nv]/values[1:nv-1]) }
How would we parallelize this with foreach
? (Imagine we had thousands of stocks and decades of data,
which isn't implausable.)
registerDoParallel(4)
mat.log <-
foreach(col=iter(stocks[,-c(1,2)],by="col"), .combine="cbind") %dopar%
log.returns(col)
stopImplicitCluster()
stocks.log <- as.data.frame(mat.log)
colnames(stocks.log) <- colnames(stocks)[-c(1,2)]
stocks.log$date <- stocks$date[-1] # get rid of the first day; no "return" for then
A quantity we might be interested in is the correlation between the log returns of various stocks:
we can use R's cor()
function to do this.
nstocks <- 419
cors <- matrix(rep(0,nstocks*nstocks), nrow=nstocks, ncol=nstocks)
system.time(
for (i in 1:419) {
for (j in 1:419) {
cors[i,j] <- cor(stocks.log[[i]],stocks.log[[j]])
}
}
)
user system elapsed
15.543 0.044 15.593
Foreach is a wrapper for the other parallel methods we've seen, so it inherits some of the advantages and drawbacks of each.
Use foreach
if:
for
-style iteration; transition is easyparallel
use (or other kinds, like batch jobs): you can switch just by registering a different backend!Note that you can have portions of your analysis code use foreach
with parallel
and portions using the
backend with apply-style parallelism; it doesn't have to be all one or the other.
Parallelize the stock correlation matrix calculation with foreach
. You should get a proper speedup here.
Try working on just the first 10 stocks until you get things working.
Note: you can nest foreach()
loops using the '%:%' operator:
foreach(...) %:%
foreach(...)
When you're done that, take a look at a random year's airline data. Of the flights that have a departure delay, is the arrival delay (on average) less than or greater than the departure delay? Is: “This is the captain: Sorry for the delay folks, but we'll make it up in the air” a lie?
How would you use foreach to loop over the various years' data?
There's no one-size-fits-all-analysis approach to taking advantage of multiple processors or nodes (or accellerators).
Check to see if there already exists packages for doing your analysis in parallel.
It's almost always easier to efficiently take advantage of coarse-grained parallelism.
E.g. running each cross-validation fold on a different processor will almost certainly work better than running them sequentially but with parts of the training method parallelized.
Amdahl's Law!
For throwing more cores at the problem to help, the calculation has to have been limited by compute power.
I/O bound analyses, or network-bound analyses, or even memory-bandwidth-constrained analyses can easily have worse performance with more cores, not better.
Compute bound calculations slow down if something else is sharing the processor with them.
Except for debugging, you generally don't want to run (say) doParallel with more cores than you actually have on your computer.
Sometimes using the number of pseudo-cores/hyperthreads/hardware threads is helpful if memory bandwidth is a limiting factor, or not if not - you can test.
Because you only want one task for core, be careful about accidentally using parallelism!
Eg, a threaded blas/lapack is great for interactive use, but:
If you're
Now you're competing with yourself for your own laptop's cores.
R comes with an increasingly rich set of tools for taking advantage of more compute power:
Keep in mind what we talked about in terms of overhead, and:
Because R frequently needs to make temporary copies (R works best as a functional programming language), hitting memory limit frequently becomes a problem.
Avoiding hitting that limit too early requires some caution.
Need to know how R handles variables and memory.
Like a lot of dynamic languages, R relies on garbage collection to limit memory usage.
“Every so often”, a garbage collection task runs and deletes variables that won't be used any more.
You can force the garbage collection to run at any given time by calling gc()
, but this almost never fixes anything significant.
How can the gc know that you're not going to use that big variable in the next line?
Gc needs your help to be effective.
gc(verbose=TRUE)
, or just gc(TRUE)
.
ls()
object.size()
get("variablename")
(eg, quoted) and it will get that variable and print its size.rm()
sort( sapply( ls(), function(x) { object.size(get(x))} ),decreasing=TRUE )
Let's play with object.size() and gc(TRUE):
gc(TRUE)
used (Mb) gc trigger (Mb) max used (Mb)
Ncells 2402596 128.4 4703850 251.3 4703850 251.3
Vcells 31199516 238.1 79492131 606.5 273190741 2084.3
old.mem <- gc(TRUE)[,c(1:2,5:6)]
x <- rep(0.,(16*1024)**2)
xsize <- object.size(x)
xsize
2147483688 bytes
Let's play with object.size() and gc(TRUE):
xsize
2147483688 bytes
print(xsize,units="MB")
2048 Mb
new.mem <- gc(TRUE)[,c(1:2,5:6)]
new.mem-old.mem
used (Mb) max used (Mb)
Ncells 576 0 0 0.0
Vcells 268436146 2048 26530385 202.4
Now let's delete the object and see how system memory behaves:
rm(x)
final.mem <- gc(TRUE)[,c(1:2,5:6)]
final.mem
used (Mb) max used (Mb)
Ncells 2403174 128.4 4703850 251.3
Vcells 31200229 238.1 299721126 2286.7
final.mem-old.mem
used (Mb) max used (Mb)
Ncells 568 0 0 0.0
Vcells 684 0 26530385 202.4
… but if you use functions:
trunc.gc <- function() { gc(TRUE)[,c(1:2,5:6)] }
rnorm.sum <- function(n) {
x <- rnorm(n)
sum(x)
}
orig.gc <- trunc.gc()
rnorm.sum(16*1024*1024)
[1] 3121.226
after.gc <- trunc.gc()
after.gc - orig.gc
used (Mb) max used (Mb)
Ncells 15 0 0 0
Vcells 14 0 0 0
Some problems require doing fairly simple analysis on a data set too large to fit into memory.
In that case, one processor may be enough; you just want a way to not overrun memory.
Out of Core, or External Memory computation leaves the data on disk, bringing in to memory only what is needed/fits at any given point.
For some computations, this works well (but note: disk access is always much slower than memory access.)
The bigmemory
package defines a generalization of a matrix class, big.matrix
, which can be file-backed - that is, can exist primarily on disk, with parts brought into memory when necessary.
This approach works fairly well when one's data access involves passing through the entire data set once or a very small number of times, either combining data or extracting a subset.
Packages like bigalgebra
or biganalytics
(not covered here) build onbigmemory
.
In data/idealgas
, we have a set of synthetic data files describing an ideal gas experiment - setting temperature, amount of material, and volume, and measuring the pressure.
Simple data sets:
small.data <- read.csv("data/idealgas/ideal-gas-fixedT-small.csv")
small.data[1:2,]
X pres vol n temp
1 1 99000 0.02036345 0.8 300
2 2 99250 0.02018306 0.8 300
Row name, pressure (Pa), volume (m3), N (moles), and temperature (K).
A larger data set consisting of 124M rows, 5.8GB, is sitting in ideal-gas-fixedT-large.csv, and we'd like to do some analysis of this data set. But the size is a problem.
Let's consider the humble .csv file:
$ ls -sh1 airOT2010.*
151M airOT2010.RDS
151M airOT2010.Rdata
1.4G airOT2010.csv
$ Rscript timeexamples.R
[1] "Reading Rdata file"
user system elapsed
11.697 0.616 12.319
[1] "Reading RDS file"
user system elapsed
11.041 0.644 11.694
[1] "Reading CSV file"
user system elapsed
140.640 3.352 144.142
CSV — or really, any text-based format — is the worst possible format for quantiative data. It manages the trifecta of being:
Converting floating point numbers back and forth between internal representations and strings is slow and prone to truncation.
Use binary formats whenever possible. .Rdata is a bit prone to change; .RDS is modestly better. Portable file formats like HDF5 (for data frames) or NetCDF4 (for matrices and arrays) are compact, accurate, fast (not as fast as .Rdata/.RDS), and can be read by tools other than R.
We've already created a big.matrix file from this data set, using
data <- read.big.matrix("data/idealgas/ideal-gas-fixedT-large.csv",
header=TRUE,
backingfile="data/idealgas/ideal-gas-fixedT-large.bin",
descriptorfile="ideal-gas-fixedT-large.desc")
This reads in the .csv file and outputs a binary equivalent (the “backingfile”) and a descriptor (in the “descriptorfile”) which contains all of the information which describes the binary blob.
You can read the descriptorfile: more ideal-gas-fixedT-large.desc
Done for you since initial convertion takes 12 minutes for this set - kind of boring.
Note: converts into a matrix, which is a less flexible data type than a data frame; homogeneous type. Here, we'll use all numeric.
Let's do some simple analysis on the data set and see how memory behaves.
library(bigmemory, quiet=TRUE)
orig.gc <- trunc.gc()
data <- attach.big.matrix("data/idealgas/ideal-gas-fixedT-large.desc")
trunc.gc()-orig.gc
used (Mb) max used (Mb)
Ncells 136 0 0 0
Vcells 11 0 0 0
Let's do some simple analysis on the data set and see how memory behaves.
data[1:2,]
pres vol n temp
[1,] 90000 0.00565073 0.5 280
[2,] 90004 0.01952750 0.5 280
system.time(min.p <- min(data[,"pres"]))
user system elapsed
0.018 0.013 0.031
trunc.gc()-orig.gc
used (Mb) max used (Mb)
Ncells 1218 0.1 0 0
Vcells 1091 0.0 0 0
That only took ~7 seconds to scan through 124M records to find a minimum. Let's try a few other calculations: Let's do some simple analysis on the data set and see how memory behaves.
min.p
[1] 90000
system.time(max.p <- max(data[,"pres"]))
user system elapsed
0.012 0.002 0.014
system.time(mean.t <- mean(data[,"temp"]))
user system elapsed
0.014 0.007 0.021
Going through the same column a second time was faster, because some of the data was cached; going through a new column was about the same speed as the first. What has that done to memory?
trunc.gc()-orig.gc
used (Mb) max used (Mb)
Ncells 1176 0.1 0 0
Vcells 909 0.0 0 0
Let's try something more complicated: we know that averaged over our data, we should have \[ p V = n R T \]. Let's try to infer the gas constant$:
system.time(sum.pv <- sum(data[,"pres"]*data[,"vol"]))
user system elapsed
0.025 0.021 0.046
system.time(sum.nt <- sum(data[,"n"]*data[,"temp"]))
user system elapsed
0.027 0.012 0.039
sum.pv/sum.nt
[1] 8.316193
And we're still not using that much memory.
trunc.gc()-orig.gc
used (Mb) max used (Mb)
Ncells 1193 0.1 0 0
Vcells 974 0.0 0 0
Let's extract a subset of the data and analyze it.
The mwhich
command in bigmemory
lets us search through the data for multiple conditions, and extract that data:
system.time(subset.data <- data[mwhich(data, cols=c("n", "pres"),
vals=list(c(1,1.1), c(99000,101000)),
comps=list(c("ge","le"),c("ge","le")),
op="AND"),])
user system elapsed
0.058 0.001 0.059
class(subset.data)
[1] "matrix"
fit <- lm(vol ~ temp, data=as.data.frame(subset.data))
summary(fit)
Call:
lm(formula = vol ~ temp, data = as.data.frame(subset.data))
Residuals:
Min 1Q Median 3Q Max
-0.041951 -0.006885 0.000003 0.006825 0.040519
Coefficients:
Estimate Std. Error t value Pr(>|t|)
(Intercept) 1.553e-04 1.269e-03 0.122 0.903
temp 8.698e-05 4.228e-06 20.574 <2e-16 ***
---
Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
Residual standard error: 0.01013 on 40998 degrees of freedom
Multiple R-squared: 0.01022, Adjusted R-squared: 0.01019
F-statistic: 423.3 on 1 and 40998 DF, p-value: < 2.2e-16
object.size(subset.data)
1312632 bytes
trunc.gc()-orig.gc
used (Mb) max used (Mb)
Ncells 8935 0.5 0 0
Vcells 586030 4.5 0 0
Other options:
morder
or mpermute
allow you to sort the data in memory or on diskhead
and tail
let you get the start/end rowsmwhich
allows all sorts of slicing and dicingsub.big.matrix
lets you extract contiguous regions of the matrixIf you just have a data file much larger than memory that you have to crunch and the amount of actual computation is not a bottleneck, the bigmemory
and related packages may be all you need.
Works best if:
lm()
doesn't work natively on a big.matrix
- but we can write our own.
If we have an OLS model \[ \hat{y_i} = a x_i + b + \epsilon \], we can fit it with
\[ b = \bar{y} - a \bar{x} \]
\[ a = \frac{\sum_i{ x_i y_i } - n \bar{x} \bar{y}}{\sum_i{x_i^2} - n \bar{x} \bar{x}} \]
Using the examples above, fit a couple of columns of the ideal gas data set. Do the results make sense? (Once it's working, try fitting \( pV \propto nT \).) How much memory is used?
(Note: there is a biglm
package)
We've looked at some of the standard scalable computing packages for R.
Here are two somewhat more advanced packages, that solve very different problems.
While it's generally true that processes can't peer into each other's memory, there is an exception.
Processes can explicitly make a window of memory shared - visible to other processes.
This isn't necessary for threads within a process; but it is necessary for multiple processes working on the same data.
Only works on-node; can't share memory across a network.
Rdsm allows you to share a matrix across processes on a node - for reading and for writing.
Normally, when we split a data structure up across tasks, we make copies (PSOCK), or we use read-only (multicore/fork).
If output is also going to be large, we now have 2-3 copies of the data structure floating around.
Rdsm allows (on-node) cluster tasks to collaboratively make a large output without copies.
Simple example - let's create a shared matrix, and have everyone fill it.
Create PSOCK cluster, an Rdsm instance, shared matrix, and a barrier:
library(parallel)
library(Rdsm)
nrows <- 7
cl <- makePSOCKcluster(3) # form 3-process PSOCK (share-nothing) cluster
init <- mgrinit(cl) # initialize Rdsm
mgrmakevar(cl,"m",nrows,nrows) # make a 7x7 shared matrix
bar <- makebarr(cl)
Everyone gets their task id, and which rows are “theirs”;
# at each thread, set id to Rdsm built-in ID variable for that thread
clusterEvalQ(cl,myid <- myinfo$id)
[[1]]
[1] 1
[[2]]
[1] 2
[[3]]
[1] 3
clusterExport(cl,c("nrows"))
dmy <- clusterEvalQ(cl,myidxs <- getidxs(nrows))
dmy <- clusterEvalQ(cl, m[myidxs,1:nrows] <- myid)
dmy <- clusterEvalQ(cl,"barr()")
…then fills it with their id.
Now, print the results.
print(m[,])
[,1] [,2] [,3] [,4] [,5] [,6] [,7]
[1,] 1 1 1 1 1 1 1
[2,] 1 1 1 1 1 1 1
[3,] 2 2 2 2 2 2 2
[4,] 2 2 2 2 2 2 2
[5,] 2 2 2 2 2 2 2
[6,] 3 3 3 3 3 3 3
[7,] 3 3 3 3 3 3 3
stoprdsm(cl) # stops cluster
Allows collaborative use of a single pool of memory.
Avoids performance and memory problems of making copies to send back and forth.
Works well when:
The master-worker approach that all the methods we've used so far take works very well for interactive work, is easy to loadbalance, and is easy to understand.
But there's a fairly narrow range of number of workers where master-worker works well.
For a small number of total processors (2-4, say), it really hurts to have one processor doing nothing except some small amount of coordination.
For a very large number of processors (hundreds or more, depending on the size of each task), the worker scan easily overwhelm the master, meaning all of the workers are sitting around waiting while the master catches up.
At scale, idea of a single master isn't helpful.
Better: Coordinating peers.
Rather than a single master parcelling out work, the workers themselves decide which part of the problem they should be working on, and combine their results cooperatively.
More efficient and can scale better; Downsides:
In pbd/mpi-histogram.R
we have a script that does hour histogram for eight full years of data, sifting through
40 million flights, in about a minute:
$ time mpirun -np 8 Rscript mpi-histogram.R
COMM.RANK = 0
[1] 4081 118767 27633 7194 9141 194613 2235007 2902703 3003510
[10] 2649823 2373934 2473105 2757256 2772498 2362334 2485699 2503423 2794298
[19] 2626931 2282125 2074739 1386485 649392 344257
COMM.RANK = 0
[1] 41038948
real 1m15.357s
user 9m39.943s
sys 0m10.910s
What sorcery is this?
# count.hours and get.hour definitions...
start.year <- 1990
init()
rank <- comm.rank()
my.year <- start.year + rank
myfile <- paste0("data/airline/airOT",as.character(my.year),".RDS")
data <- readRDS(myfile); data <- data$DEP_TIME
myhrs <- count.hours(data)
hrs <- allreduce( myhrs, op="sum" )
comm.print( hrs )
comm.print( sum(hrs) )
finalize()
Let's take a look at the first few lines
# count.hours and get.hour definitions...
start.year <- 1990
init()
rank <- comm.rank()
my.year <- start.year + rank
myfile <- paste0("data/airline/airOT",as.character(my.year),".RDS")
data <- readRDS(myfile); data <- data$DEP_TIME
In this case, each task decides which year's data to work on. First (“zeroth”) task works on 1990, next on 1991, etc.
Every task has to call the init()
routine when starting, and finalize()
routine when done.
Then reads in the file.
data <- readRDS(myfile); data <- data$DEP_TIME
myhrs <- count.hours(data)
hrs <- allreduce( myhrs, op="sum" )
comm.print( hrs )
comm.print( sum(hrs) )
finalize()
Once the file is read, we use our trusty count.hours routine again to work on the entire vector.
Then an allreduce
function sums each workers hours, and returns the sum to all processors. We then
print it out.
Rather than only the master running the main program and handing off bits to workers, every task runs
this identical program; the only difference is the value of comm.rank()
.
Reductions are one way of combining results, and they're very powerful:
init()
rank <- comm.rank()
my.year <- start.year + rank
myfile <- paste0("../data/airline/airOT",as.character(my.year),".RDS")
data <- readRDS(myfile); data <- data$CRS_ELAPSED_TIME
data <- data[!is.na(data)]
data.median <- pbd.quantile(data,0.5)
data.min <- allreduce(min(data), op="min")
data.max <- allreduce(max(data), op="max")
data.N <- allreduce(length(data), op="sum")
data.mean <- allreduce(sum(data), op="sum")/data.N
comm.print(data.min)
comm.print(data.median)
comm.print(data.mean)
comm.print(data.max)
finalize()
$ mpirun -np 4 Rscript ./min-median-mean-max.R
COMM.RANK = 0
[1] -70
COMM.RANK = 0
[1] 93.00004
COMM.RANK = 0
[1] 112.8207
COMM.RANK = 0
[1] 1613
R's higher-level functions plus reductions are very powerful ways to do otherwise tricky distributed problems - like median of distributed data:
pbd.quantile <- function( data, q=0.5 ) {
if (q < 0 | q > 1) {
stop("q should be between 0 and 1.")
}
N <- allreduce(length(data), op="sum")
data.max <- allreduce(max(data), op="max")
data.min <- allreduce(min(data), op="min")
f.quantile <- function(x, prob=0.5) {
allreduce(sum(data <= x), op="sum" )/N - prob
}
uniroot(f.quantile, c(data.min, data.max), prob=q)$root
}
pbd
also has its parallel apply functions, but it's important to realize that these aren't being
farmed out by some master task; the tasks themselves decide which ones in the list are “theirs”.
pbd/histogram-pbdsapply.R
year.hours <- function(my.year) {
myfile <- paste0("data/airline/airOT",as.character(my.year),".RDS")
data <- readRDS(myfile)$DEP_TIME
count.hours(data)
}
init()
years <- 1990:1993
all.hours.list <- pbdLapply(years, year.hours)
all.hours <- Reduce("+", all.hours.list)
comm.print( all.hours )
comm.print( sum(all.hours) )
finalize()
pbd has a couple of ways of distributing data.
What we've used before is their so-called “GBD” distribution - globaly distributed data. It's split up by rows.
However, for linear algebra computations, a block-cyclic distribution is much more useful.
pbdR comes with several packages for reading a data file and distributing it as a ddmatrix:
read.csv.ddmatrix()
for reading from csvnc_get_dmat()
to read from a NetCDF4 filegbd2dmat()
for conversions from row-oriented to a ddmatrix.And the reason that you'd use a ddmatrix is that several operations defined
on regular R matrices also work transparently on ddmatrix: lm
, solve
, chol
.
pbd-lm.R
:
init.grid()
rank <- comm.rank()
my.year <- start.year + rank
data <- cleandata(my.year)
Y <- data[[1]]
X <- as.matrix(data[,-1])
X.dm <- gbd2dmat(X)
Y.dm <- gbd2dmat(Y)
fit <- lm(Y ~ X)
comm.print(summary(fit))
finalize()
$ mpirun -np 4 Rscript pbd-lm.R
Using 2x2 for the default grid size
COMM.RANK = 0
Call:
lm(formula = Y ~ X)
Residuals:
Min 1Q Median 3Q Max
-1307.62 -6.03 -2.29 3.53 1431.70
Coefficients: (6 not defined because of singularities)
Estimate Std. Error t value Pr(>|t|)
(Intercept) 1.152e+01 9.616e-02 119.77 <2e-16 ***
XORIGIN_AIRPORT_ID -1.895e-04 5.193e-06 -36.50 <2e-16 ***
XDEST_AIRPORT_ID -2.257e-04 5.213e-06 -43.29 <2e-16 ***
XDEP_TIME -3.382e-04 1.724e-05 -19.61 <2e-16 ***
XDEP_DELAY_NEW 1.426e+00 9.594e-03 148.68 <2e-16 ***
...
---
Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
Residual standard error: 13.43 on 2741063 degrees of freedom
Multiple R-squared: 0.7809, Adjusted R-squared: 0.7809
F-statistic: 1.628e+06 on 6 and 2741063 DF, p-value: < 2.2e-16