Scheduled Parallel Computing with R: R + Rmpi + OpenMPI + Sun Grid Engine (SGE)

Recently I’ve learned how to do parallel computing in R on a cluster of machines thanks to the R packages snowfall, snow, and Rmpi. I’ve been using the SOCKET method with snowfall since together they make things simple. With these tools, I can reduce day/week long jobs to hours or a day across many (100) cores/cpus.

However, system admins would prefer me to do things using the sun grid engine (sge) or one of their job scheduler since clusters are usually a shared resource and having “rogue” jobs like mine that hog all the resources aren’t really a good thing. Aside from scheduling jobs, another great thing about SGE is that it determines which nodes to use (idle?) with R so I don’t have to determine the list of nodes.

Luckily, people have attacked this problem already. First, Revolutions Computing has an internal document that gives instructions on how to install R, Rmpi, OpenMPI, and SGE to get them to work together. If you email them and ask for it, they are more than willing to share it. The document is “sge-snow.pdf.” After things are installed, here is how to get things to work.

Rmpi with OpenMPI and SGE via qsub:

First, copy the content of Rprofile that is packaged in Rmpi into ~/.Rprofile. Place the following in a shell script to be submitted by qsub (an example script is at the end):

mpirun -np 51 R --no-save -q < SGEtest.R > SGEtest.Rout

NOTE: 51 is the number of cores/cpus to use, 1 master + 50 slaves. Inside the R script, do not use anything that belongs to snow or snowfall. Just use Rmpi’s functions. Also, by using mpirun, we do NOT need to spawn slaves as they are spawned in the mpirun command. We also do not need to call library(Rmpi). Put the following in the R script (SGEtest.R) to see that things are running:

mpi.remote.exec(paste("I am",mpi.comm.rank(),"of",mpi.comm.size()))
mpi.remote.exec(paste("I am",Sys.info(),"of",mpi.comm.size()))

snow with OpenMPI and SGE via qsub:

First, place the location of the executable RMPISNOW from the snow package in the PATH variable (or use the direct location wherever you see RMPISNOW in the command line). DO NOT put the Rprofile from Rmpi into ~/.Rprofile. Place the following in the shell script to be submitted by qsub:

mpirun -np 21 RMPISNOW < SGEtest2.R > SGEtest2.Rout

In the R script, use only snow functions (not Rmpi or snowfall). No need to call library(snow). Put the following in the R script (SGEtest2.R) to test:

cl <- makeCluster()
print(clusterCall(cl, function() Sys.info()))

snow with OpenMPI and SGE via qrsh (interactive)

Similar to 2, but run

qrsh -V -q int64 mpirun -np 9 RMPISNOW

instead of the qsub command to get an interactive session.

Sample script for SGE

For the first two cases, a sample openMPI_R.sh script is:

#!/bin/bash

# here's the SGE directives
# ------------------------------------------
#$ -q longbat-adc # <- the name of the Q you want to submit to
#$ -pe openmpi 51 # <- load the openmpi parallel env w/ 4 slots
#$ -S /bin/bash # <- run the job under bash
#$ -N MPI-SGE # <- name of the job in the qstat output
#$ -o MPI-SGE.out # <- name of the output file.
#$ -e MPI-SGE.stderr # <- name of the stderr file.

module load R/2.10.0
echo "calling mpirun now"
## mpirun -np 51 R --no-save -q < SGEtest.R > SGEtest.Rout
mpirun -np 21 RMPISNOW < SGEtest2.R > SGEtest2.Rout
## call via: qsub openMPI_R.sh

Finally, I would like to point out that currently snowfall does not yet work with SGE because it requires a call to sfInit(), and this conflicts with the cluster called from mpirun. This made me learn some functions from snow, which aren’t all that much different from snowfall.

Also, there is an rsge package for R that seems to work too.

UPDATE 1/25/2010

  1. We don’t need to specify -np 51 in the mpirun command. If we omit it, SGE passes this information directly to OpenMPI.
  2. I tried installing this myself. A few things to note are: a. compile OpenMPI with the –with-sge flag. b. Place the bin directory of OpenMPI in PATH if it is installed in a non-standard placed. Also, remember to place the directory where RMPISNOW resides into PATH as well. c. install Rmpi: R CMD INSTALL Rmpi_0.5-8.tar.gz –lib=~/Rlib –configure-args=”–with-mpi=/home/vqnguyen/openmpi-1.4.1-vqn/” OR specify MPI_ROOT environment variable as home/vqnguyen/openmpi-1.4.1-vqn. d. Place “export LD_LIBRARY_PATH=/path/to/libmpifolder:$LD_LIBRARY_PATH” in .bashrc if the variable does not include it. This is required for library(Rmpi) to work. Also place “.libPaths(“~/Rlib”)” in RMPISNOWprofile in order to see where my Rmpi is. e. Set up a parallel environment in SGE either with qmon or on the command line with:

$ qconf -Ap openmpi.config where the config file is as follow:

openmpi configuration:
===============================
pe_name openmpi
slots 666
user_lists arusers
xuser_lists NONE
start_proc_args /bin/true
stop_proc_args /bin/true
allocation_rule $round_robin
control_slaves TRUE
job_is_first_task FALSE
urgency_slots min
accounting_summary TRUE
===============================

You can name the PE anything and set the number slots. Make sure the user list has you in it. Also, make sure you add the Q’s u want to work with into this PE.

  1. specifying an outfile in the makeCluster() command in RMPISNOW doesn’t do anything since the cluster is called at the RMPISNOW’s invocation. If we look at the RMPISNOWprofile, we see that the output is sink to /dev/null. I tried a few ways to get the workers’ output out, via sink() on each worker via clusterEvalQ or setting the OUT or R_SNOW_OUTFILE variables (see RMPInode.R and RMPInode.sh). How I got it to work was with:
clusterEvalQ(cl, sinkWorkerOutput("nodes.out"))
  1. Of course, make sure u have passwordless ssh on. If you have host key messages (ie, type yes to accept key) and your job doesnt run, put

StrictHostKeyChecking no

in ~/.ssh/config according to this page. Check the stderr of your SGE log.

Parallel computing in R: snowfall/snow

I finally found the time to try parallel computing in R using snowfall/snow thanks to this article in the first issue of the R Journal (replacement of R News). I didn’t try parallel computing before because I didn’t have a good toy example, and it seemed like a steep learning curve. Snow and Snowfall is perfect for ‘embarrassingly parallel’ jobs, eg, a simulation study, bootstrap, or a cross-validation. I do simulation studies a lot, eg, assessing the properties of a statistical methodology, so implementing parallel computing will be very useful.

I got the toy example to work, but it was parallel on a single computer with multiple cores. Thanks to Michael Zeller, I got it to work on multiple machines. If we use multiple nodes, make sure we enable passwordless ssh.

Credit for getting snowfall to work on the BDUC servers (uci-nacs) goes to Harry Mangalam.

Here is a script, with a few examples:

 <pre class="src src-sh"><span style="color: #ff4500;">## </span><span style="color: #ff4500;">Example 1 - Multi-core on a single computer</span>

sink(‘SnowFallExample.Rout’, split=TRUE) .Platform .Machine R.version Sys.info()

library(snowfall) # 1. Initialisation of snowfall. # (if used with sfCluster, just call sfInit()) sfInit(parallel=TRUE, cpus=2)

# 2. Loading data. require(mvna) data(sir.adm) # 3. Wrapper, which can be parallelised. wrapper <- function(idx) { # Output progress in worker logfile cat( “Current index: “, idx, “\n” ) index <- sample(1:nrow(sir.adm), replace=TRUE) temp <- sir.adm[index, ] fit <- crr(temp$time, temp$status, temp$pneu) return(fit$coef) } # 4. Exporting needed data and loading required # packages on workers. sfExport(“sir.adm”) sfLibrary(cmprsk)

# 5. Start network random number generator # (as “sample” is using random numbers). sfClusterSetupRNG() # 6. Distribute calculation

start <- Sys.time(); result <- sfLapply(1:1000, wrapper) ; Sys.time()-start # Result is always in list form. mean(unlist(result)) # 7. Stop snowfall sfStop()

## Example 2 – Multiple nodes on a cluster (namely, the family-guy cluster at uci-ics) sink(‘SnowFallExample.Rout’, split=TRUE) .Platform .Machine R.version Sys.info()

library(snowfall) # 1. Initialisation of snowfall. # (if used with sfCluster, just call sfInit()) sfInit(socketHosts=rep(c(‘peter-griffin.ics.uci.edu’,‘stewie-griffin.ics.uci.edu’, ‘neil-goldman.ics.uci.edu’, ‘mort-goldman.ics.uci.edu’,‘lois-griffin.ics.uci.edu’),each=2), cpus=10,type=‘SOCK’,parallel=T)

# 2. Loading data. require(mvna) data(sir.adm) # 3. Wrapper, which can be parallelised. wrapper <- function(idx) { # Output progress in worker logfile cat( “Current index: “, idx, “\n” ) index <- sample(1:nrow(sir.adm), replace=TRUE) temp <- sir.adm[index, ] fit <- crr(temp$time, temp$status, temp$pneu) return(fit$coef) } # 4. Exporting needed data and loading required # packages on workers. sfExport(“sir.adm”) sfLibrary(cmprsk)

# 5. Start network random number generator # (as “sample” is using random numbers). sfClusterSetupRNG() # 6. Distribute calculation

start <- Sys.time(); result <- sfLapply(1:1000, wrapper) ; Sys.time()-start # Result is always in list form. mean(unlist(result)) # 7. Stop snowfall sfStop()

## Example 3 – Multiple nodes on a cluster (namely, the BDUC servers of uci-ics) ## ssh to bduc, then ssh to one of their claws (the head node is 32bit whereas the other wones are 64) ## put something like ## export LD_LIBRARY_PATH=/home/vqnguyen/lib:/usr/local/lib:/usr/lib:/lib:/sge62/lib/lx24-x86 in .bashrc ## or ## Sys.setenv(LD_LIBRARY_PATH=”/home/vqnguyen/lib:/usr/local/lib:/usr/lib:/lib:/sge62/lib/lx24-x86″) ## in an R session. Note: modify path to your home directory ## might have to install required packages elsewhere, like ~/Rlib, and use .libPaths() to add library path. Put this in .Rprofile sink(‘SnowFallExample.Rout’, split=TRUE) .Platform .Machine R.version Sys.info()

# 1. Initialisation of snowfall. # (if used with sfCluster, just call sfInit()) library(snowfall) sfInit(socketHosts=rep(c(‘claw1′, ‘claw2′),each=4), cpus=8,type=‘SOCK’,parallel=T)

# 2. Loading data. require(mvna) data(sir.adm) # 3. Wrapper, which can be parallelised. wrapper <- function(idx) { # Output progress in worker logfile cat( “Current index: “, idx, “\n” ) index <- sample(1:nrow(sir.adm), replace=TRUE) temp <- sir.adm[index, ] fit <- crr(temp$time, temp$status, temp$pneu) return(fit$coef) } # 4. Exporting needed data and loading required # packages on workers. sfExport(“sir.adm”) sfLibrary(cmprsk)

# 5. Start network random number generator # (as “sample” is using random numbers). sfClusterSetupRNG() # 6. Distribute calculation

start <- Sys.time(); result <- sfLapply(1:1000, wrapper) ; Sys.time()-start # Result is always in list form. mean(unlist(result)) # 7. Stop snowfall sfStop()

This is a good general reference for snowfall. Next thing to try is getting rpvm (PVM) to work for snowfall!