Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- suppressMessages(library(doParallel))
- library(itertools)
- library(data.table)
- args <- commandArgs(trailingOnly=TRUE)
- n <- if (length(args) > 0) as.integer(args[1]) else 1000000
- m <- if (length(args) > 1) as.integer(args[2]) else ceiling(n / 100)
- set.seed(107)
- td <- data.table(val=rnorm(n), id=sample(m, n, replace=TRUE))
- cl <- makePSOCKcluster(4)
- registerDoParallel(cl)
- workers <- getDoParWorkers()
- vadd <- function(a, ...) {
- for (v in list(...))
- a <- a + v
- a
- }
- cmean <- function(v, mine) if (mine) mean(v) else 0
- start <- proc.time()[3]
- nuniq <- length(unique(td$id))
- res <- foreach(grps=isplitIndices(nuniq, chunks=workers),
- .combine='vadd',
- .multicombine=TRUE,
- .inorder=FALSE,
- .packages='data.table') %dopar% {
- td[, means := cmean(td$val[-.I], .GRP %in% grps), by=id]
- td$means
- }
- elapsed <- proc.time()[3] - start
- cat(sprintf("foreach/doParallel/PSOCK/data.table with %d workers:\n", workers))
- cat(sprintf("Rows: %d, Unique IDs: %d\n", length(td$id), length(unique(td$id))))
- cat(sprintf("Elapsed time: %f\n", elapsed))
- library(digest)
- cat(sprintf("MD5 hash: %s\n", digest(res)))
- stopCluster(cl)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement