serialize or turn a large parallel R job into smaller chunks for use with SGE

I use the snow package in R with OpenMPI and SGE quite often for my simulation studies; I’ve outlined how this can be done in the past.

The ease of these methods make it so simple for me to just specify the maximum number of cores available all the time. However, unless you own your own dedicated cluster, you are most likely sharing the resources with many others at your institution, and the resources are managed by a scheduler (SGE). The scheduler is nice in that your job automatically starts whenever the resources are available and it tries to be fair with everyone. Other people’s smaller jobs will most likely be running, and unless no one is using the cluster, my job is usually waiting in limbo when I specify a lot of cores (100). I’ve outlined how to determine the number of free nodes in the cluster to help in the core specification. However, what if after your job starts, other jobs are done and more cores are available? Are you stuck waiting for your job that’s barely using 5 CPU’s?

My system admin always advised me to break my job into smaller chunks and request a small number of cores for each to be more efficient (getting the available CPU time through the scheduler whenever they are available). I finally got around to thinking about how I can implement this, and it’s quite easy. For random number generation, I just specify a different seed for each chunk as described here; this isn’t the ideal solution for random number generation, but it suffices for now. A wrapper function that gets called repeatedly is always written when I use snow anyways, so adapting it is quite easy. Here is a quick toy example.

sim.R, with my simulation function:

<pre class="src src-sh"><span style="color: #ff4500;">#</span><span style="color: #ff4500;">! /bin/</span><span style="color: #00ffff;">env</span><span style="color: #ff4500;"> Rscript</span>

## get arguments: “seed <- 100″ “iter.start <- 1″ “iter.end <- 100″ arguments <- commandArgs(trailingOnly=TRUE) for(i in 1:length(arguments)) { eval(parse(text=arguments[i])) }

set.seed(seed) n <- 100 beta0 <- 1 beta1 <- 1

simulate <- function(iter=1) { x <- rnorm(n) epsilon <- rnorm(n) y <- beta0 + beta1*x + epsilon fit <- lm(y ~ x) return(list(iter=iter, beta=coef(fit), varcov=vcov(fit))) }

result <- lapply(iter.start:iter.end, simulate) dput(result, paste(“SimResult”, iter.start, “.Robj”, sep=“”))

submit.R, submitting many smaller chunks to SGE:

<pre class="src src-sh"><span style="color: #ff4500;">#</span><span style="color: #ff4500;">! /bin/</span><span style="color: #00ffff;">env</span><span style="color: #ff4500;"> Rscript</span>

job.name <- “MySim” sim.script <- “sim.R” Q <- “12hour_cluster.q” set.seed(111) n.chunk <- 2 n.chunk.each <- 10 my.seeds <- runif(n.chunk, max=100000000) dput(my.seeds, “my.seeds.Robj”) for(current.chunk in 1:n.chunk) { seed <- my.seeds[current.chunk] iter.start <- current.chunk * n.chunk.each – n.chunk.each + 1 iter.end <- current.chunk * n.chunk.each current.job <- paste(job.name, current.chunk, sep=“”) current.job.files <- paste(current.job, c(“”, “.stdout”, “.stderr”), sep=“”) submit <- paste(“qsub -q “, Q, ” -S /usr/bin/Rscript -N “, current.job.files[1], ” -o “, current.job.files[2], ” -e “, current.job.files[3], ” -M email@institution -m beas “, sim.script, ” ‘seed=”, seed, “‘ ‘iter.start=”, iter.start, “‘ ‘iter.end=”, iter.end, “‘”,sep=“”) ## read sim.R directly ## qsub -q 12hour_cluster.q -S /usr/bin/Rscript -N MySim1 -o MySim1.stdout -e MySim1.stderr -M email@institution -m beas sim.R ‘seed=123′ ‘iter.start=1′ ‘iter.end=50′ system(submit)

#### OR USE FOLLOWING METHOD ## submit <- paste(“qsub -q “, Q, ” -S /bin/bash -N “, current.job.files[1], ” -o “, current.job.files[2], ” -e “, current.job.files[3], ” -M email@institution -m beas “, sep=””) ## command <- paste(“Rscript “, sim.script, ” ‘seed=”, seed, “‘ ‘iter.start=”, iter.start, “‘ ‘iter.end=”, iter.end, “‘”, sep=””) ## job.script <- paste(job.name, current.chunk, “.sh”, sep=””)

## sink(job.script) ## cat(“#! /bin/env bashn”) ## cat(“module load R/2.12.1n”) ## cat(command, “n”) ## sink() ## system(paste(submit, job.script)) ## ## qsub -q 12hour_cluster.q -S /bin/bash -N MySim1 -o MySim1.stdout -e MySim1.stderr -M email@institution -m beas MySim1.sh ## ## MySim1.sh: Rscript sim.R ‘seed=123′ ‘iter.start=1′ ‘iter.end=50′ }

I apologize for the html replacements that kind of screws up the code. Just replace-string in emacs.

Now, I can just specify some parameters in submit.R and multiple smaller jobs will be submitted to SGE. I just have to write another script (aggregate.R) to put the results together and compute the information I need. The nice thing is that OpenMPI or other third-party software isn’t even required.

Determining number of nodes or cores available in an SGE Queue

To determine the status of a queue in SGE, one can issue the command qstat -g c to get such information like number of CPU available and the current CPU and memory load. However, this information can be misleading when nodes can be cross-listed in multiple Q’s. A Q can say X number of nodes are unused, when in reality, they are in use in a different Q. Consequently, a submitted parallel job asking for X cores can wait in limbo for quite some time depending on the cluster’s load. The following sgeQload.R R script uses some commands explained in the cheat sheet to output the number of cores really available:

 <pre class="example">#! /bin/env Rscript

This script shows me the number of cores available for each Q.

Since many Q’s on BDUC contain overlapping nodes, information from “qstat -g c” could be misleading and lead to submitted jobs that are waiting…

This script utilizes R, qconf

References

http://moo.nac.uci.edu/~hjm/bduc/sge-quick-reference_v3_cheatsheet.pdf

http://www.troubleshooters.com/codecorn/littperl/perlreg.htm

qstatgc <- system(“qstat -g c”, intern=TRUE) qstatgc.list <- strsplit(qstatgc, split=”\s+”, perl=TRUE) ## remove — line and all.q qstatgc.list[[1]] <- qstatgc.list[[1]][-1] ## CLUSTER QUEUE is one thing -> QUEUE qstat <- t(sapply(qstatgc.list[-1], function(x) as.numeric(x[-1]))) colnames(qstat) <- qstatgc.list[[1]][-1] rownames(qstat) <- sapply(qstatgc.list[-1], function(x) x[1]) qstat <- cbind(qstat, NCPU=NA, LOAD=NA, AVAILABLE=NA)

for(Q in rownames(qstat)){ host.list <- strsplit(grep(“hostlist”, system(paste(“qconf -sq”, Q), intern=TRUE), value=TRUE), split=”\s+”, perl=TRUE)[[1]][-1] host.vec <- NULL for(host in host.list){ host.vec <- c(host.vec, strsplit(strsplit(gsub(“\”, “”, paste(system(paste(“qconf -shgrp”, host, sep=” “), intern=TRUE), collapse=” “), fixed=TRUE), “hostlist”, fixed=TRUE)[[1]][2], “\s+”, perl=TRUE)[[1]]) } host.vec <- unique(host.vec) host.vec <- host.vec[host.vec != “”] host.vec <- gsub(“.bduc”, “”, host.vec, fixed=TRUE)

qhost <- system(“qhost”, intern=TRUE) qhost.matrix <- do.call(rbind, strsplit(qhost[-1], “\s+”, perl=TRUE)) colnames(qhost.matrix) <- strsplit(qhost[1], “\s+”, perl=TRUE)[[1]] NCPU <- sum(as.numeric(qhost.matrix[qhost.matrix[, “HOSTNAME”] %in% host.vec, “NCPU”])) LOAD <- sum(as.numeric(qhost.matrix[qhost.matrix[, “HOSTNAME”] %in% host.vec, “LOAD”])) qstat[Q, “NCPU”] <- NCPU qstat[Q, “LOAD”] <- LOAD qstat[Q, “AVAILABLE”] <- NCPU-LOAD }

qstat

Note that this script is specific to the cluster I use. It should be modified for other clusters. It does not work immediately on another cluster I have access to.

Scheduled Parallel Computing with R: R + Rmpi + OpenMPI + Sun Grid Engine (SGE)

Recently I’ve learned how to do parallel computing in R on a cluster of machines thanks to the R packages snowfall, snow, and Rmpi. I’ve been using the SOCKET method with snowfall since together they make things simple. With these tools, I can reduce day/week long jobs to hours or a day across many (100) cores/cpus.

However, system admins would prefer me to do things using the sun grid engine (sge) or one of their job scheduler since clusters are usually a shared resource and having “rogue” jobs like mine that hog all the resources aren’t really a good thing. Aside from scheduling jobs, another great thing about SGE is that it determines which nodes to use (idle?) with R so I don’t have to determine the list of nodes.

Luckily, people have attacked this problem already. First, Revolutions Computing has an internal document that gives instructions on how to install R, Rmpi, OpenMPI, and SGE to get them to work together. If you email them and ask for it, they are more than willing to share it. The document is “sge-snow.pdf.” After things are installed, here is how to get things to work.

Rmpi with OpenMPI and SGE via qsub:

First, copy the content of Rprofile that is packaged in Rmpi into ~/.Rprofile. Place the following in a shell script to be submitted by qsub (an example script is at the end):

mpirun -np 51 R --no-save -q < SGEtest.R > SGEtest.Rout

NOTE: 51 is the number of cores/cpus to use, 1 master + 50 slaves. Inside the R script, do not use anything that belongs to snow or snowfall. Just use Rmpi’s functions. Also, by using mpirun, we do NOT need to spawn slaves as they are spawned in the mpirun command. We also do not need to call library(Rmpi). Put the following in the R script (SGEtest.R) to see that things are running:

mpi.remote.exec(paste("I am",mpi.comm.rank(),"of",mpi.comm.size()))
mpi.remote.exec(paste("I am",Sys.info(),"of",mpi.comm.size()))

snow with OpenMPI and SGE via qsub:

First, place the location of the executable RMPISNOW from the snow package in the PATH variable (or use the direct location wherever you see RMPISNOW in the command line). DO NOT put the Rprofile from Rmpi into ~/.Rprofile. Place the following in the shell script to be submitted by qsub:

mpirun -np 21 RMPISNOW < SGEtest2.R > SGEtest2.Rout

In the R script, use only snow functions (not Rmpi or snowfall). No need to call library(snow). Put the following in the R script (SGEtest2.R) to test:

cl <- makeCluster()
print(clusterCall(cl, function() Sys.info()))

snow with OpenMPI and SGE via qrsh (interactive)

Similar to 2, but run

qrsh -V -q int64 mpirun -np 9 RMPISNOW

instead of the qsub command to get an interactive session.

Sample script for SGE

For the first two cases, a sample openMPI_R.sh script is:

#!/bin/bash

# here's the SGE directives
# ------------------------------------------
#$ -q longbat-adc # <- the name of the Q you want to submit to
#$ -pe openmpi 51 # <- load the openmpi parallel env w/ 4 slots
#$ -S /bin/bash # <- run the job under bash
#$ -N MPI-SGE # <- name of the job in the qstat output
#$ -o MPI-SGE.out # <- name of the output file.
#$ -e MPI-SGE.stderr # <- name of the stderr file.

module load R/2.10.0
echo "calling mpirun now"
## mpirun -np 51 R --no-save -q < SGEtest.R > SGEtest.Rout
mpirun -np 21 RMPISNOW < SGEtest2.R > SGEtest2.Rout
## call via: qsub openMPI_R.sh

Finally, I would like to point out that currently snowfall does not yet work with SGE because it requires a call to sfInit(), and this conflicts with the cluster called from mpirun. This made me learn some functions from snow, which aren’t all that much different from snowfall.

Also, there is an rsge package for R that seems to work too.

UPDATE 1/25/2010

  1. We don’t need to specify -np 51 in the mpirun command. If we omit it, SGE passes this information directly to OpenMPI.
  2. I tried installing this myself. A few things to note are: a. compile OpenMPI with the –with-sge flag. b. Place the bin directory of OpenMPI in PATH if it is installed in a non-standard placed. Also, remember to place the directory where RMPISNOW resides into PATH as well. c. install Rmpi: R CMD INSTALL Rmpi_0.5-8.tar.gz –lib=~/Rlib –configure-args=”–with-mpi=/home/vqnguyen/openmpi-1.4.1-vqn/” OR specify MPI_ROOT environment variable as home/vqnguyen/openmpi-1.4.1-vqn. d. Place “export LD_LIBRARY_PATH=/path/to/libmpifolder:$LD_LIBRARY_PATH” in .bashrc if the variable does not include it. This is required for library(Rmpi) to work. Also place “.libPaths(“~/Rlib”)” in RMPISNOWprofile in order to see where my Rmpi is. e. Set up a parallel environment in SGE either with qmon or on the command line with:

$ qconf -Ap openmpi.config where the config file is as follow:

openmpi configuration:
===============================
pe_name openmpi
slots 666
user_lists arusers
xuser_lists NONE
start_proc_args /bin/true
stop_proc_args /bin/true
allocation_rule $round_robin
control_slaves TRUE
job_is_first_task FALSE
urgency_slots min
accounting_summary TRUE
===============================

You can name the PE anything and set the number slots. Make sure the user list has you in it. Also, make sure you add the Q’s u want to work with into this PE.

  1. specifying an outfile in the makeCluster() command in RMPISNOW doesn’t do anything since the cluster is called at the RMPISNOW’s invocation. If we look at the RMPISNOWprofile, we see that the output is sink to /dev/null. I tried a few ways to get the workers’ output out, via sink() on each worker via clusterEvalQ or setting the OUT or R_SNOW_OUTFILE variables (see RMPInode.R and RMPInode.sh). How I got it to work was with:
clusterEvalQ(cl, sinkWorkerOutput("nodes.out"))
  1. Of course, make sure u have passwordless ssh on. If you have host key messages (ie, type yes to accept key) and your job doesnt run, put

StrictHostKeyChecking no

in ~/.ssh/config according to this page. Check the stderr of your SGE log.