serialize or turn a large parallel R job into smaller chunks for use with SGE

I use the snow package in R with OpenMPI and SGE quite often for my simulation studies; I’ve outlined how this can be done in the past.

The ease of these methods make it so simple for me to just specify the maximum number of cores available all the time. However, unless you own your own dedicated cluster, you are most likely sharing the resources with many others at your institution, and the resources are managed by a scheduler (SGE). The scheduler is nice in that your job automatically starts whenever the resources are available and it tries to be fair with everyone. Other people’s smaller jobs will most likely be running, and unless no one is using the cluster, my job is usually waiting in limbo when I specify a lot of cores (100). I’ve outlined how to determine the number of free nodes in the cluster to help in the core specification. However, what if after your job starts, other jobs are done and more cores are available? Are you stuck waiting for your job that’s barely using 5 CPU’s?

My system admin always advised me to break my job into smaller chunks and request a small number of cores for each to be more efficient (getting the available CPU time through the scheduler whenever they are available). I finally got around to thinking about how I can implement this, and it’s quite easy. For random number generation, I just specify a different seed for each chunk as described here; this isn’t the ideal solution for random number generation, but it suffices for now. A wrapper function that gets called repeatedly is always written when I use snow anyways, so adapting it is quite easy. Here is a quick toy example.

sim.R, with my simulation function:

<pre class="src src-sh"><span style="color: #ff4500;">#</span><span style="color: #ff4500;">! /bin/</span><span style="color: #00ffff;">env</span><span style="color: #ff4500;"> Rscript</span>

## get arguments: “seed <- 100″ “iter.start <- 1″ “iter.end <- 100″ arguments <- commandArgs(trailingOnly=TRUE) for(i in 1:length(arguments)) { eval(parse(text=arguments[i])) }

set.seed(seed) n <- 100 beta0 <- 1 beta1 <- 1

simulate <- function(iter=1) { x <- rnorm(n) epsilon <- rnorm(n) y <- beta0 + beta1*x + epsilon fit <- lm(y ~ x) return(list(iter=iter, beta=coef(fit), varcov=vcov(fit))) }

result <- lapply(iter.start:iter.end, simulate) dput(result, paste(“SimResult”, iter.start, “.Robj”, sep=“”))

submit.R, submitting many smaller chunks to SGE:

<pre class="src src-sh"><span style="color: #ff4500;">#</span><span style="color: #ff4500;">! /bin/</span><span style="color: #00ffff;">env</span><span style="color: #ff4500;"> Rscript</span>

job.name <- “MySim” sim.script <- “sim.R” Q <- “12hour_cluster.q” set.seed(111) n.chunk <- 2 n.chunk.each <- 10 my.seeds <- runif(n.chunk, max=100000000) dput(my.seeds, “my.seeds.Robj”) for(current.chunk in 1:n.chunk) { seed <- my.seeds[current.chunk] iter.start <- current.chunk * n.chunk.each – n.chunk.each + 1 iter.end <- current.chunk * n.chunk.each current.job <- paste(job.name, current.chunk, sep=“”) current.job.files <- paste(current.job, c(“”, “.stdout”, “.stderr”), sep=“”) submit <- paste(“qsub -q “, Q, ” -S /usr/bin/Rscript -N “, current.job.files[1], ” -o “, current.job.files[2], ” -e “, current.job.files[3], ” -M email@institution -m beas “, sim.script, ” ‘seed=”, seed, “‘ ‘iter.start=”, iter.start, “‘ ‘iter.end=”, iter.end, “‘”,sep=“”) ## read sim.R directly ## qsub -q 12hour_cluster.q -S /usr/bin/Rscript -N MySim1 -o MySim1.stdout -e MySim1.stderr -M email@institution -m beas sim.R ‘seed=123′ ‘iter.start=1′ ‘iter.end=50′ system(submit)

#### OR USE FOLLOWING METHOD ## submit <- paste(“qsub -q “, Q, ” -S /bin/bash -N “, current.job.files[1], ” -o “, current.job.files[2], ” -e “, current.job.files[3], ” -M email@institution -m beas “, sep=””) ## command <- paste(“Rscript “, sim.script, ” ‘seed=”, seed, “‘ ‘iter.start=”, iter.start, “‘ ‘iter.end=”, iter.end, “‘”, sep=””) ## job.script <- paste(job.name, current.chunk, “.sh”, sep=””)

## sink(job.script) ## cat(“#! /bin/env bashn”) ## cat(“module load R/2.12.1n”) ## cat(command, “n”) ## sink() ## system(paste(submit, job.script)) ## ## qsub -q 12hour_cluster.q -S /bin/bash -N MySim1 -o MySim1.stdout -e MySim1.stderr -M email@institution -m beas MySim1.sh ## ## MySim1.sh: Rscript sim.R ‘seed=123′ ‘iter.start=1′ ‘iter.end=50′ }

I apologize for the html replacements that kind of screws up the code. Just replace-string in emacs.

Now, I can just specify some parameters in submit.R and multiple smaller jobs will be submitted to SGE. I just have to write another script (aggregate.R) to put the results together and compute the information I need. The nice thing is that OpenMPI or other third-party software isn’t even required.

Scheduled Parallel Computing with R: R + Rmpi + OpenMPI + Sun Grid Engine (SGE)

Recently I’ve learned how to do parallel computing in R on a cluster of machines thanks to the R packages snowfall, snow, and Rmpi. I’ve been using the SOCKET method with snowfall since together they make things simple. With these tools, I can reduce day/week long jobs to hours or a day across many (100) cores/cpus.

However, system admins would prefer me to do things using the sun grid engine (sge) or one of their job scheduler since clusters are usually a shared resource and having “rogue” jobs like mine that hog all the resources aren’t really a good thing. Aside from scheduling jobs, another great thing about SGE is that it determines which nodes to use (idle?) with R so I don’t have to determine the list of nodes.

Luckily, people have attacked this problem already. First, Revolutions Computing has an internal document that gives instructions on how to install R, Rmpi, OpenMPI, and SGE to get them to work together. If you email them and ask for it, they are more than willing to share it. The document is “sge-snow.pdf.” After things are installed, here is how to get things to work.

Rmpi with OpenMPI and SGE via qsub:

First, copy the content of Rprofile that is packaged in Rmpi into ~/.Rprofile. Place the following in a shell script to be submitted by qsub (an example script is at the end):

mpirun -np 51 R --no-save -q < SGEtest.R > SGEtest.Rout

NOTE: 51 is the number of cores/cpus to use, 1 master + 50 slaves. Inside the R script, do not use anything that belongs to snow or snowfall. Just use Rmpi’s functions. Also, by using mpirun, we do NOT need to spawn slaves as they are spawned in the mpirun command. We also do not need to call library(Rmpi). Put the following in the R script (SGEtest.R) to see that things are running:

mpi.remote.exec(paste("I am",mpi.comm.rank(),"of",mpi.comm.size()))
mpi.remote.exec(paste("I am",Sys.info(),"of",mpi.comm.size()))

snow with OpenMPI and SGE via qsub:

First, place the location of the executable RMPISNOW from the snow package in the PATH variable (or use the direct location wherever you see RMPISNOW in the command line). DO NOT put the Rprofile from Rmpi into ~/.Rprofile. Place the following in the shell script to be submitted by qsub:

mpirun -np 21 RMPISNOW < SGEtest2.R > SGEtest2.Rout

In the R script, use only snow functions (not Rmpi or snowfall). No need to call library(snow). Put the following in the R script (SGEtest2.R) to test:

cl <- makeCluster()
print(clusterCall(cl, function() Sys.info()))

snow with OpenMPI and SGE via qrsh (interactive)

Similar to 2, but run

qrsh -V -q int64 mpirun -np 9 RMPISNOW

instead of the qsub command to get an interactive session.

Sample script for SGE

For the first two cases, a sample openMPI_R.sh script is:

#!/bin/bash

# here's the SGE directives
# ------------------------------------------
#$ -q longbat-adc # <- the name of the Q you want to submit to
#$ -pe openmpi 51 # <- load the openmpi parallel env w/ 4 slots
#$ -S /bin/bash # <- run the job under bash
#$ -N MPI-SGE # <- name of the job in the qstat output
#$ -o MPI-SGE.out # <- name of the output file.
#$ -e MPI-SGE.stderr # <- name of the stderr file.

module load R/2.10.0
echo "calling mpirun now"
## mpirun -np 51 R --no-save -q < SGEtest.R > SGEtest.Rout
mpirun -np 21 RMPISNOW < SGEtest2.R > SGEtest2.Rout
## call via: qsub openMPI_R.sh

Finally, I would like to point out that currently snowfall does not yet work with SGE because it requires a call to sfInit(), and this conflicts with the cluster called from mpirun. This made me learn some functions from snow, which aren’t all that much different from snowfall.

Also, there is an rsge package for R that seems to work too.

UPDATE 1/25/2010

  1. We don’t need to specify -np 51 in the mpirun command. If we omit it, SGE passes this information directly to OpenMPI.
  2. I tried installing this myself. A few things to note are: a. compile OpenMPI with the –with-sge flag. b. Place the bin directory of OpenMPI in PATH if it is installed in a non-standard placed. Also, remember to place the directory where RMPISNOW resides into PATH as well. c. install Rmpi: R CMD INSTALL Rmpi_0.5-8.tar.gz –lib=~/Rlib –configure-args=”–with-mpi=/home/vqnguyen/openmpi-1.4.1-vqn/” OR specify MPI_ROOT environment variable as home/vqnguyen/openmpi-1.4.1-vqn. d. Place “export LD_LIBRARY_PATH=/path/to/libmpifolder:$LD_LIBRARY_PATH” in .bashrc if the variable does not include it. This is required for library(Rmpi) to work. Also place “.libPaths(“~/Rlib”)” in RMPISNOWprofile in order to see where my Rmpi is. e. Set up a parallel environment in SGE either with qmon or on the command line with:

$ qconf -Ap openmpi.config where the config file is as follow:

openmpi configuration:
===============================
pe_name openmpi
slots 666
user_lists arusers
xuser_lists NONE
start_proc_args /bin/true
stop_proc_args /bin/true
allocation_rule $round_robin
control_slaves TRUE
job_is_first_task FALSE
urgency_slots min
accounting_summary TRUE
===============================

You can name the PE anything and set the number slots. Make sure the user list has you in it. Also, make sure you add the Q’s u want to work with into this PE.

  1. specifying an outfile in the makeCluster() command in RMPISNOW doesn’t do anything since the cluster is called at the RMPISNOW’s invocation. If we look at the RMPISNOWprofile, we see that the output is sink to /dev/null. I tried a few ways to get the workers’ output out, via sink() on each worker via clusterEvalQ or setting the OUT or R_SNOW_OUTFILE variables (see RMPInode.R and RMPInode.sh). How I got it to work was with:
clusterEvalQ(cl, sinkWorkerOutput("nodes.out"))
  1. Of course, make sure u have passwordless ssh on. If you have host key messages (ie, type yes to accept key) and your job doesnt run, put

StrictHostKeyChecking no

in ~/.ssh/config according to this page. Check the stderr of your SGE log.

Parallel computing in R: snowfall/snow

I finally found the time to try parallel computing in R using snowfall/snow thanks to this article in the first issue of the R Journal (replacement of R News). I didn’t try parallel computing before because I didn’t have a good toy example, and it seemed like a steep learning curve. Snow and Snowfall is perfect for ‘embarrassingly parallel’ jobs, eg, a simulation study, bootstrap, or a cross-validation. I do simulation studies a lot, eg, assessing the properties of a statistical methodology, so implementing parallel computing will be very useful.

I got the toy example to work, but it was parallel on a single computer with multiple cores. Thanks to Michael Zeller, I got it to work on multiple machines. If we use multiple nodes, make sure we enable passwordless ssh.

Credit for getting snowfall to work on the BDUC servers (uci-nacs) goes to Harry Mangalam.

Here is a script, with a few examples:

 <pre class="src src-sh"><span style="color: #ff4500;">## </span><span style="color: #ff4500;">Example 1 - Multi-core on a single computer</span>

sink(‘SnowFallExample.Rout’, split=TRUE) .Platform .Machine R.version Sys.info()

library(snowfall) # 1. Initialisation of snowfall. # (if used with sfCluster, just call sfInit()) sfInit(parallel=TRUE, cpus=2)

# 2. Loading data. require(mvna) data(sir.adm) # 3. Wrapper, which can be parallelised. wrapper <- function(idx) { # Output progress in worker logfile cat( “Current index: “, idx, “\n” ) index <- sample(1:nrow(sir.adm), replace=TRUE) temp <- sir.adm[index, ] fit <- crr(temp$time, temp$status, temp$pneu) return(fit$coef) } # 4. Exporting needed data and loading required # packages on workers. sfExport(“sir.adm”) sfLibrary(cmprsk)

# 5. Start network random number generator # (as “sample” is using random numbers). sfClusterSetupRNG() # 6. Distribute calculation

start <- Sys.time(); result <- sfLapply(1:1000, wrapper) ; Sys.time()-start # Result is always in list form. mean(unlist(result)) # 7. Stop snowfall sfStop()

## Example 2 – Multiple nodes on a cluster (namely, the family-guy cluster at uci-ics) sink(‘SnowFallExample.Rout’, split=TRUE) .Platform .Machine R.version Sys.info()

library(snowfall) # 1. Initialisation of snowfall. # (if used with sfCluster, just call sfInit()) sfInit(socketHosts=rep(c(‘peter-griffin.ics.uci.edu’,‘stewie-griffin.ics.uci.edu’, ‘neil-goldman.ics.uci.edu’, ‘mort-goldman.ics.uci.edu’,‘lois-griffin.ics.uci.edu’),each=2), cpus=10,type=‘SOCK’,parallel=T)

# 2. Loading data. require(mvna) data(sir.adm) # 3. Wrapper, which can be parallelised. wrapper <- function(idx) { # Output progress in worker logfile cat( “Current index: “, idx, “\n” ) index <- sample(1:nrow(sir.adm), replace=TRUE) temp <- sir.adm[index, ] fit <- crr(temp$time, temp$status, temp$pneu) return(fit$coef) } # 4. Exporting needed data and loading required # packages on workers. sfExport(“sir.adm”) sfLibrary(cmprsk)

# 5. Start network random number generator # (as “sample” is using random numbers). sfClusterSetupRNG() # 6. Distribute calculation

start <- Sys.time(); result <- sfLapply(1:1000, wrapper) ; Sys.time()-start # Result is always in list form. mean(unlist(result)) # 7. Stop snowfall sfStop()

## Example 3 – Multiple nodes on a cluster (namely, the BDUC servers of uci-ics) ## ssh to bduc, then ssh to one of their claws (the head node is 32bit whereas the other wones are 64) ## put something like ## export LD_LIBRARY_PATH=/home/vqnguyen/lib:/usr/local/lib:/usr/lib:/lib:/sge62/lib/lx24-x86 in .bashrc ## or ## Sys.setenv(LD_LIBRARY_PATH=”/home/vqnguyen/lib:/usr/local/lib:/usr/lib:/lib:/sge62/lib/lx24-x86″) ## in an R session. Note: modify path to your home directory ## might have to install required packages elsewhere, like ~/Rlib, and use .libPaths() to add library path. Put this in .Rprofile sink(‘SnowFallExample.Rout’, split=TRUE) .Platform .Machine R.version Sys.info()

# 1. Initialisation of snowfall. # (if used with sfCluster, just call sfInit()) library(snowfall) sfInit(socketHosts=rep(c(‘claw1′, ‘claw2′),each=4), cpus=8,type=‘SOCK’,parallel=T)

# 2. Loading data. require(mvna) data(sir.adm) # 3. Wrapper, which can be parallelised. wrapper <- function(idx) { # Output progress in worker logfile cat( “Current index: “, idx, “\n” ) index <- sample(1:nrow(sir.adm), replace=TRUE) temp <- sir.adm[index, ] fit <- crr(temp$time, temp$status, temp$pneu) return(fit$coef) } # 4. Exporting needed data and loading required # packages on workers. sfExport(“sir.adm”) sfLibrary(cmprsk)

# 5. Start network random number generator # (as “sample” is using random numbers). sfClusterSetupRNG() # 6. Distribute calculation

start <- Sys.time(); result <- sfLapply(1:1000, wrapper) ; Sys.time()-start # Result is always in list form. mean(unlist(result)) # 7. Stop snowfall sfStop()

This is a good general reference for snowfall. Next thing to try is getting rpvm (PVM) to work for snowfall!