Advertisement
SteveWeston

foreach/doParallel/PSOCK/data.table benchmark V2

Sep 16th, 2013
331
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
R 1.19 KB | None | 0 0
  1. suppressMessages(library(doParallel))
  2. library(itertools)
  3. library(data.table)
  4.  
  5. args <- commandArgs(trailingOnly=TRUE)
  6. n <- if (length(args) > 0) as.integer(args[1]) else 1000000
  7. m <- if (length(args) > 1) as.integer(args[2]) else ceiling(n / 100)
  8. set.seed(107)
  9. td <- data.table(val=rnorm(n), id=sample(m, n, replace=TRUE))
  10.  
  11. cl <- makePSOCKcluster(4)
  12. registerDoParallel(cl)
  13. workers <- getDoParWorkers()
  14.  
  15. vadd <- function(a, ...) {
  16.   for (v in list(...))
  17.     a <- a + v
  18.   a
  19. }
  20. cmean <- function(v, mine) if (mine) mean(v) else 0
  21.  
  22. start <- proc.time()[3]
  23. nuniq <- length(unique(td$id))
  24. res <- foreach(grps=isplitIndices(nuniq, chunks=workers),
  25.                .combine='vadd',
  26.                .multicombine=TRUE,
  27.                .inorder=FALSE,
  28.                .packages='data.table') %dopar% {
  29.   td[, means := cmean(td$val[-.I], .GRP %in% grps), by=id]
  30.   td$means
  31. }
  32. elapsed <- proc.time()[3] - start
  33.  
  34. cat(sprintf("foreach/doParallel/PSOCK/data.table with %d workers:\n", workers))
  35. cat(sprintf("Rows: %d, Unique IDs: %d\n", length(td$id), length(unique(td$id))))
  36. cat(sprintf("Elapsed time: %f\n", elapsed))
  37.  
  38. library(digest)
  39. cat(sprintf("MD5 hash: %s\n", digest(res)))
  40.  
  41. stopCluster(cl)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement