1 why?

1.1 embarassingly parallel tasks

These are computational tasks which involve many separate, independently executable calculations. Some common statistical examples of embarassingly parallel processes:

  • bootstrapping
  • cross-validation
  • simulating independent random variables (dorng)

In contrast, some sequential or non-parallel processes:

  • MCMC algorithms
  • several types of model selection (e.g.: step() or the LARS algorithm for LASSO)

for loops that do not explicitly involve dependent calculations are wasteful if we have multiple processors available. Perhaps even worse, the time cost of using such an approach can put some useful statistical tools beyond our reach!

1.2 options

  • Changing from a for loop to one of the apply() functions can help, but still doesn’t use multiple processors.
  • Use the parallel package (thanks, Miranda!).
  • Don’t use R.
  • Use the foreach package! (Analytics and Weston 2014)

1.3 why foreach?

We would like to find a way to make use of our whole computer, and make valuable tasks like bootstrapping available, but without having to invest large amounts of time in learning new programming languages. Enter foreach, which keeps the structure of a for loop, but allows us to drop two key assumptions:

  • sequentiality
  • single processor architecture

Our goal: We will begin with a simple chunk of R code involving a for loop and transform it into a foreach loop. Along the way, we’ll take a look at the equivalent computation done with an apply() function, and see that using foreach and multiple processors outperforms this.

2 example: data and research question

We are going to look at data from the New York City bikeshare program Citibike.

One of the costliest parts of operating a bike share program comes from the finiteness of the bicycle stations. A station can only hold so many bicycles, and a full (empty) station means customers cannot drop off (pick up) a bike. Thus, managers are forced to use trucks to manually redistribute bicycles.

We want to find a model which can offer good prediction, with the hope that this will inform our plans for future station locations/sizes. For this example, we start with a few plausible models and use K-fold cross validation to decide which one to use.

2.1 locations of our 7 sites

locations


2.2 data

data


3 fitting GLMs and extracting prediction error

We are considering three increasingly complex models of the arrival behavior. In order to compare three candidate models’ prediction error, we’ll use K-fold cross validation, where we use the same folds for all three models.

First, we the load the data and make our K-fold test sets (and implicitly, our training sets):

load(url("http://www.stat.colostate.edu/~scharfh/CSP_parallel/data/arrivals_subset.RData"))
K <- 50
N <- dim(arrivals.sub)[1]

## for convenience kill off 8 observations (we have 5208) and make cv test sets
set.seed(1985)
discarded <- sample(1:N, size = 8)
cv.test.sets <- matrix(sample((1:N)[-discarded], size = N - 8), ncol = K)

Next, we build a function to fit the data to the training sets and extract the corresponding estimate of prediciton error. This should still be familiar code, no new packages yet.

lq.loss <- function(y, y.hat, q = 1) {(abs(y - y.hat))^q}
get.errs <- function(test.set = NULL,
                     discarded = NULL,
                     q = 1) {
    sml.glm <- glm(arrivals ~
                   bs(hour, degree = 4)
                   + weekend
                   + as.factor(id),
                   data = arrivals.sub[-c(discarded, test.set), ],
                   family = "poisson")
    med.glm <- glm(arrivals ~
                   bs(hour, degree = 4)*weekend
                   + as.factor(id),
                   data = arrivals.sub[-c(discarded, test.set), ],
                   family = "poisson")
    big.glm <- glm(arrivals ~
                   bs(hour, degree = 4)*weekend
                   + bs(hour, degree = 4)*as.factor(id),
                   data = arrivals.sub[-c(discarded, test.set), ],
                   family = "poisson")
    sml.err <- mean(lq.loss(predict(object = sml.glm,
                                    newdata = arrivals.sub[test.set, -7],
                                    type = "response"),
                            arrivals.sub[test.set, 7],
                            q = q))
    med.err <- mean(lq.loss(predict(object = med.glm,
                                    newdata = arrivals.sub[test.set, -7],
                                    type = "response"),
                            arrivals.sub[test.set, 7],
                            q = q))
    big.err <- mean(lq.loss(predict(object = big.glm,
                                    newdata = arrivals.sub[test.set, -7],
                                    type = "response"),
                            arrivals.sub[test.set, 7],
                            q = q))
    return(c(sml.err, med.err, big.err))
}

The fits using all the data look like: fit.all

3.1 K-fold CV with a for loop

Using a naive for loop, we could implement this as:

err.for <- NULL
system.time(
    for (i in 1:K) {
        err.for <- cbind(err.for, get.errs(test.set = cv.test.sets[, i],
                                           discarded = discarded,
                                           q = 1))
        }
    )
##    user  system elapsed 
##  16.819   0.851  17.719

3.2 K-fold CV with an apply function

If you’re good with apply() functions you might upgrade (slightly) to

## apply version
system.time(
    err.apply <- sapply(X = 1:K, 
                        FUN = function(i) {
                            get.errs(test.set = cv.test.sets[, i],
                                     discarded = discarded,
                                     q = 1)
                            }
                        )
    )
##    user  system elapsed 
##  16.840   0.825  17.727

Neither of the first two methods take advantage of multiple processors. While the apply() functions avoid the inherently sluggish nature of for loops in R, they are still ignorant of the processor structure. We want to chop the job into halves, fourths, etc. and use the whole computer!

3.3 K-fold CV with a foreach loop

Here is the same computation written with a foreach loop

## foreach version
library(foreach)
library(doParallel)
registerDoParallel(cl = 2)
system.time(
    err.foreach <- foreach(i=1:K,
                           .inorder = FALSE,
                           .combine = "cbind",
                           .packages = "splines") %dopar% {
                               get.errs(test.set = cv.test.sets[, i],
                                        discarded = discarded,
                                        q = 1)
                               }
    )
##    user  system elapsed 
##   0.042   0.014   9.657

4 components of a foreach loop

Note that despite the syntactically similar structure, foreach() is a different animal than for. It is a function with several important arguments. The first is the object we iterate over and functions like the (i in 1:K) in the for loop. The rest of the structure is new:

5 results

fit.all

6 additional topics

6.1 iterators

Sometimes the list or vector that we are iterating over (in the above case, the vector 1:K) can be a very large object. In our case the vector is quite reasonable in size, but the object we are iterating over could instead be a multi-dimensional object with many values and a high level of precision. In this case, we’d be storing a complete copy of this massive object for each processor, which could potentially fill up our memory and slow things down. We could save memory by only keeping the piece of our list we need for a given calculation as they are computed, and dumping the rest. This is the idea behind the iterators package in R.

Here is our same example with an iterator instead of a list.

library(iterators)
registerDoParallel(cl = 2)
system.time(
    err.foreach.iter <- foreach(x = iter(cv.test.sets, by = "col"),
                               .inorder = FALSE,
                               .combine = "cbind",
                               .packages = "splines") %dopar% {
                                   get.errs(test.set = x,
                                            discarded = discarded,
                                            q = 1)
                                   }
    )
##    user  system elapsed 
##  27.629   1.531  10.630

Iterators can also be used to keep from ever having to store even a single copy of the object. For more on these, see Using the foreach package and Using the iterators package.

6.2 random numbers

When parallelizing a process which generates random numbers we need to be careful, since we aren’t guaranteed that foreach will handle this properly. We could wind up getting numbers that aren’t in fact random! Moreover, if we want to be able to reproduce an analysis which uses a random number generator, the usual set.seed() isn’t guaranteed to work.

Fortunately, there is doRNG. There are many ways to implement this package, the two easiest of which are:

library(doRNG)
## Loading required package: rngtools
## Loading required package: pkgmaker
## Loading required package: registry
## 
## Attaching package: 'pkgmaker'
## 
## The following object is masked from 'package:base':
## 
##     isNamespaceLoaded
registerDoParallel(cl = 2)
blah1 <- foreach(x = 1:10, 
                 .options.RNG = 1985,
                 .combine = 'c') %dorng% {
                     rnorm(1)
                     }

and

registerDoParallel(cl = 2)
registerDoRNG(seed = 1985)
blah2 <- foreach(x = 1:10,
                 .combine = 'c') %dopar% {
    rnorm(1)
    }

Note that this gives reproducible results!

sum(blah1 != blah2) 
## [1] 0

6.3 packages that support foreach

Some packages come with parallel functionality built in via foreach.

7 exercise: fix up the following code

The following calculation wouldn’t typically require parallelization because it isn’t a huge task, however we use it for practice’s sake.

Suppose we wish to figure out which day in May had the most combined arrivals across these seven stations. Here’s a function to get started, you’re welcome to scrap it for your own, or use it and fill in the gaps in what follows.

sum.arrivals <- function(date = NULL,
                         id = NULL){
    sum.arr <- sum(arrivals.sub$arrivals[arrivals.sub$date == date &
                                         arrivals.sub$id == id])
    return(sum.arr)
}

The following code needs some help. When nesting foreach loops (see this vignette), we need to use %:%. Fill in the missing parts to make the code work.

system.time(
    registerDoParallel(cl = 2)
    busiest <- foreach(date = ___,
                       ._______ = FALSE,
                       .combine = ___,
                       .packages = "_______") %:%
                           foreach(id = unique(arrivals.sub$id),
                                   .inorder = ______,
                                   .combine = ___) %_____% {
                                       sum.arrivals(date = date,
                                                    id = id)
                                       }                       
    )
which(busiest==max(busiest))    

For something this small, the overhead of managing the parallelization means a regular nested for loop is quicker.

sum.arrivals <- function(date = NULL,
                         id = NULL){
    sum.arr <- sum(arrivals.sub$arrivals[arrivals.sub$date == date &
                                         arrivals.sub$id == id])
    return(sum.arr)
}
busiest.for <- rep(0, 31)
system.time(
    for(date in 1:31){
        for(id in unique(arrivals.sub$id)){
            busiest.for[date] <- sum(busiest.for[date],
                                     sum.arrivals(date = date, id = id))
        }
    }
    )
##    user  system elapsed 
##   0.052   0.008   0.063
which(busiest.for==max(busiest.for))    
## [1] 20

parallel puppies

8 references

foreach

8.3 Other neat tools:

menu meters

Analytics, Revolution, and Steve Weston. 2014. Foreach: Foreach Looping Construct for R. http://CRAN.R-project.org/package=foreach.