serialize or turn a large parallel R job into smaller chunks for use with SGE

R
Author

Vinh Nguyen

Published

June 16, 2011

I use the snow package in R with OpenMPI and SGE quite often for my simulation studies; I've outlined how this can be done in the past.

The ease of these methods make it so simple for me to just specify the maximum number of cores available all the time. However, unless you own your own dedicated cluster, you are most likely sharing the resources with many others at your institution, and the resources are managed by a scheduler (SGE). The scheduler is nice in that your job automatically starts whenever the resources are available and it tries to be fair with everyone. Other people's smaller jobs will most likely be running, and unless no one is using the cluster, my job is usually waiting in limbo when I specify a lot of cores (100). I've outlined how to determine the number of free nodes in the cluster to help in the core specification. However, what if after your job starts, other jobs are done and more cores are available? Are you stuck waiting for your job that's barely using 5 CPU's?

My system admin always advised me to break my job into smaller chunks and request a small number of cores for each to be more efficient (getting the available CPU time through the scheduler whenever they are available). I finally got around to thinking about how I can implement this, and it's quite easy. For random number generation, I just specify a different seed for each chunk as described here; this isn't the ideal solution for random number generation, but it suffices for now. A wrapper function that gets called repeatedly is always written when I use snow anyways, so adapting it is quite easy. Here is a quick toy example.

sim.R, with my simulation function:

#! /bin/env Rscript

## get arguments: "seed <- 100" "iter.start <- 1" "iter.end <- 100"
arguments <- commandArgs(trailingOnly=TRUE)
for(i in 1:length(arguments)) {
eval(parse(text=arguments[i]))
}


set.seed(seed)
n <- 100
beta0 <- 1
beta1 <- 1

simulate <- function(iter=1) {
x <- rnorm(n)
epsilon <- rnorm(n)
y <- beta0 + beta1*x + epsilon
fit <- lm(y ~ x)
return(list(iter=iter, beta=coef(fit), varcov=vcov(fit)))
}

result <- lapply(iter.start:iter.end, simulate)
dput(result, paste("SimResult", iter.start, ".Robj", sep=""))

submit.R, submitting many smaller chunks to SGE:

#! /bin/env Rscript

job.name <- "MySim"
sim.script <- "sim.R"
Q <- "12hour_cluster.q"
set.seed(111)
n.chunk <- 2
n.chunk.each <- 10
my.seeds <- runif(n.chunk, max=100000000)
dput(my.seeds, "my.seeds.Robj")
for(current.chunk in 1:n.chunk) {
seed <- my.seeds[current.chunk]
iter.start <- current.chunk * n.chunk.each - n.chunk.each + 1
iter.end <- current.chunk * n.chunk.each
current.job <- paste(job.name, current.chunk, sep="")
current.job.files <- paste(current.job, c("", ".stdout", ".stderr"), sep="")
submit <- paste("qsub -q ", Q, " -S /usr/bin/Rscript -N ", current.job.files[1], " -o ", current.job.files[2], " -e ", current.job.files[3], " -M email@institution -m beas ", sim.script, " 'seed=", seed, "' 'iter.start=", iter.start, "' 'iter.end=", iter.end, "'",sep="") ## read sim.R directly
## qsub -q 12hour_cluster.q -S /usr/bin/Rscript -N MySim1 -o MySim1.stdout -e MySim1.stderr -M email@institution -m beas sim.R 'seed=123' 'iter.start=1' 'iter.end=50'
system(submit)

#### OR USE FOLLOWING METHOD
## submit <- paste("qsub -q ", Q, " -S /bin/bash -N ", current.job.files[1], " -o ", current.job.files[2], " -e ", current.job.files[3], " -M email@institution -m beas ", sep="")
## command <- paste("Rscript ", sim.script, " 'seed=", seed, "' 'iter.start=", iter.start, "' 'iter.end=", iter.end, "'", sep="")
## job.script <- paste(job.name, current.chunk, ".sh", sep="")

## sink(job.script)
## cat("#! /bin/env bashn")
## cat("module load R/2.12.1n")
## cat(command, "n")
## sink()
## system(paste(submit, job.script))
## ## qsub -q 12hour_cluster.q -S /bin/bash -N MySim1 -o MySim1.stdout -e MySim1.stderr -M email@institution -m beas MySim1.sh
## ## MySim1.sh: Rscript sim.R 'seed=123' 'iter.start=1' 'iter.end=50'
}

I apologize for the html replacements that kind of screws up the code. Just replace-string in emacs.

Now, I can just specify some parameters in submit.R and multiple smaller jobs will be submitted to SGE. I just have to write another script (aggregate.R) to put the results together and compute the information I need. The nice thing is that OpenMPI or other third-party software isn't even required.