R: an unparalleled success

R as a data science tool has surpassed its alphabetical and reptilian competitors across many domains. However, its performance suffers tremendously from a lack of native parallel support. Although some packages (e.g., foreach and doParallel) have gone a long way in providing parallel computing for R, these plug-and-play parallelization packages have some key disadvantages. First, these packages often use special syntax that may require heavy revision of already drafted code. Second, without extreme attention to detail, these packages tend to invoke scope errors in the setting of layered dependencies. Third, these packages use frameworks that do not necessarily scale to large multi-core clusters.

In this article, we introduce another way to parallelize: through batch processing. With batch processing, instead of executing a large task as one multi-threaded process, we break up the task into multiple single-threaded processes called jobs. We then pass each job to a batch processing job manager to hold in a queue until resources become available. This allows each single-threaded process to eventually get its own node for analysis. Although job managers are most often used in high-performance computing as a convenient way to share a single resource among multiple users, it is possible to set up a job manager like TORQUE on a personal computer. Since this lacks an easy installation, I put together this run-at-your-own-risk script to install TORQUE on Ubuntu 16.04.

Although a complete review of batch processing is beyond the scope of this article (and frankly beyond the scope of my expertise), this tutorial will hopefully show you how easy it is to harness TORQUE to parallelize R scripts. However, keep in mind that the method discussed here relies on the R system function to execute bash commands in the OS console. As such, this pipeline may not work outside of the Linux environment. Also, if you ever encounter a connection error, take note that you may need to reconfigure the /etc/hosts file with an updated public IP address.

From cluster to cluster

Parallelization of R through batch processing involves two steps. First, we obviate the burden of having to write a parallel-process script by writing a script that writes single-process scripts. Second, we deliver each new script to the batch processing queue where it will wait to get executed on an unoccupied node.

In this endeavor, we make use of two helper functions, writeR and qsub, that simplify these two steps, respectively. I have made these available through the miSciTools package, an R library that bundles miscellaneous software tools to expedite scientific analyses. We can install miSciTools directly from GitHub using the devtools package.

devtools::install_github("tpq/miSciTools")
library(miSciTools)

Next, to show the logic behind batch parallelization, we create a computationally expensive for-loop that clusters 10 large mock datasets. In this example, parallelization is easy enough to implement using an R package like foreach. However, for illustration purposes, we will instead parallelize this task through batch processing. Consider the single-threaded loop below.

for(i in 1:10){
  
  N <- 2000
  someData <- matrix(rnorm(N^2), N, N)
  hc <- hclust(dist(someData))
  cuts <- cutree(hc, 10)
  write.csv(data.frame(cuts),
            file = paste0("cluster-", i, ".csv"))
}

During each pass through the loop, a large dataset is created, clustered, and labelled. In a more realistic scenario, each iteration might import and pre-process a different dataset. Since calculating the Euclidean distance of a dataset carries a high computational burden, we will almost certainly benefit here from parallelization.

R writing R

In this example, we break up the task into 10 parallel parts by writing a single script that writes 10 separate scripts. Each script will generate a random matrix, cluster the data into 10 groups, and then save the cluster labels. We simplify this task by using writeR which creates an R script from any number of “free text” representations of R code with intermingled variables from the parent environment (i.e., the environment where the new script is written). To generate a preview of how the R script will appear when saved, we toggle the argument preview = TRUE. The excerpt below will hopefully clarify the behavior of this function in a way that practical English cannot. Keep in mind that extraneous space does not impact R code in any way.

outsideCode = "something else from outside"
file <- writeR('
          this = "the first line of code"
          and = "then, there is a second line of code"
          that = paste("you can also add", "', outsideCode, '",
                       "as long as you remember punctuation")'
  , preview = TRUE)

We see in this preview an additional line of code that we did not provide explicitly. The load() command is added to each new script so that it automatically loads the parent environment. This ensures that any variables (or functions or packages) not explicitly passed to the new script will remain available to procedures within the for-loop. Note that the new script saves the parent environment at the time in which the script is written, which you can exploit in order to create a unique child environment for each script.

The writeR function also accepts the optional argument, file, which allows the user to change the location and name of the temporary R script. By default, all new R scripts (along with the .RData file from the parent environment) are saved in a temporary directory. Note that this directory, along with its contents, gets deleted upon termination of the parent R session.

Finally, keep in mind that writeR generates a new file using escaped text (e.g., try running cat("\"Quotes\" and Tabs?\n\tYes.") in the R console). Since you need to wrap the “free text” R code within a set of quotes, you may need to use escaped quotations if you use more than one quotation style within your code (i.e., double quotes and single quotes). To avoid having to escape quotations, stick to one style when writing R code for batch processing. Otherwise, new lines and blank space included in the “free text” R code will get written to the new file automatically.

R bashing R

To use qsub, we can provide either a “free text” bash command or the file location of an R script. The function will then deliver this command (or script) to the batch processing queue. In the latter case, this function pipes an R CMD BATCH bash command to the qsub bash command.

To test that qsub works properly on your machine, try the following function call. If successful, a new process should appear in the queue. You view the queue from the OS console using the qstat bash command.

qsub("sleep 30")

Now, we can use qsub to put writeR to work. Note that we supply i from outside of the writeR environment. However, because each new writeR script imports the working directory of the parent script, we could just as well have named the output files using file = paste0("cluster-", i, ".csv").

for(i in 1:10){
  
  cmd <- writeR('
  N <- 2000
  someData <- matrix(rnorm(N^2), N, N)
  hc <- hclust(dist(someData))
  cuts <- cutree(hc, 10)
  write.csv(data.frame(cuts),
            file = paste0("cluster-", ', i, ', ".csv"))
  ')
  
  qsub(cmd)
}

A place in the queue

When this for-loop completes, 10 single-process R scripts will have joined the queue. From the console, you can check the TORQUE queue using the qstat bash command. In addition, you can use the qdel bash command to remove a queued job and the qrun bash command to force the execution of a queued job.

In this example, each script saves the cluster labels as a comma-delimited file in the working directory. By default, the working directory of batch processed R scripts is the home directory of the computer. To combine the results from the parallelized processes, we could write a simple loop that reads in the .csv file and joins the contents.

files <- vector("list", 10)
for(i in 1:10){
  
  files[[i]] <- read.csv(paste0("cluster-", i, ".csv"))
}

do.call(rbind, files)

Scaling to an HPC

When using a job manager on a high-performance computer (HPC), system administrators often request that users provide additional parameters that help guide optimal resource utilization. This includes, for example, the anticipated run time or expected RAM overhead. The qsub function for R will pass along any number of specified TORQUE parameters to the OS console: simply provide them as additional arguments. For example, to replicate the TORQUE command qsub -M thom@tpq.me [someBashCmd], call instead the R function qsub(someBashCmd, M = "thom@tpq.me").

Depending on how you access the HPC, you may not have an R IDE like RStudio. Instead, you may need to use the OS console to run the master script-that-writes-scripts. To do this, simply call R CMD BATCH script-that-writes-scripts.R from the OS console. This will execute the R script and pass the individual processes to the job manager.


© 2019 | thom@tpq.me | Twitter | GitHub