Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- suppressMessages(library(doParallel))
- library(itertools)
- library(data.table)
- set.seed(107)
- n <- 1000000
- m <- 10000
- td <- data.frame(val=rnorm(n), id=sample(m, n, replace=TRUE))
- cl <- makePSOCKcluster(4)
- registerDoParallel(cl)
- workers <- getDoParWorkers()
- vadd <- function(a, ...) {
- for (v in list(...))
- a <- a + v
- a
- }
- ownermean <- function(v, mine) if (mine) mean(v) else 0
- start <- proc.time()[3]
- nuniq <- length(unique(td$id))
- DT <- data.table(td)
- res <- foreach(grps=isplitIndices(nuniq, chunks=workers),
- .combine='vadd',
- .multicombine=TRUE,
- .inorder=FALSE,
- .packages='data.table') %dopar% {
- DT[, means := ownermean(DT[-.I, val], .GRP %in% grps), by=id]
- DT$means
- }
- elapsed <- proc.time()[3] - start
- library(digest)
- cat(sprintf("foreach/doParallel/PSOCK/data.table with %d workers:\n", workers))
- cat(sprintf("Elapsed time: %f, MD5 hash: %s\n", elapsed, digest(res)))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement