Guest User

Untitled

a guest
Apr 21st, 2018
97
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.76 KB | None | 0 0
  1. # progress information in parallel processes (that use the same filesystem)
  2.  
  3. # the master function sets up a tempfile for each process, spawns processes, and
  4. # passes the corresponding tempfile location to each; each process dumps
  5. # progress information into its tempfile; the master function polls those files
  6. # for the progress information and returns it to the screen; the previous line
  7. # is overwritten, as for progress bars
  8. library (future)
  9.  
  10. # an environment to stash file info in, to hack around scoping issues. A package
  11. # namespace could be used instead, but there's probably a more elegant solution.
  12. mock_namespace <- new.env()
  13. mock_namespace$file <- ""
  14.  
  15. # users can insert this in their code to send out progress information. Ideally
  16. # this would be replaced with a progress bar.
  17. update_parallel_progress <- function (i, n) {
  18. progress_text <- sprintf("%i%%\n", round(100 * i / n))
  19. cat(progress_text, file = mock_namespace$file)
  20. }
  21.  
  22. run_job <- function (job_info) {
  23. # use a mock namespace to make the communication file visible to
  24. # update_parallel_progress on this run
  25. mock_namespace$file <- job_info$file
  26. eval(job_info$expression)
  27. }
  28.  
  29. all_resolved <- function (futures) {
  30. each_resolved <- vapply(futures, resolved, FALSE)
  31. all(each_resolved)
  32. }
  33.  
  34. # replicate expr in parallel across n_cores processes, and print live progress
  35. # information for all the processes
  36. future_replicate <- function (n, expr, simplify = "array") {
  37.  
  38. jobs <- seq_len(n)
  39.  
  40. # create tempfiles for communication and populate them with something
  41. files <- replicate(n, tempfile())
  42.  
  43. lapply(files,
  44. function(file) {
  45. cat("0%\n", file = file)
  46. })
  47.  
  48. # dispatch the jobs
  49. futures <- list()
  50. for (job in jobs) {
  51. mock_namespace$file <- files[[job]]
  52. expression <- substitute(expr)
  53. futures[[job]] <- future(eval.parent(expression))
  54. }
  55.  
  56. # poll the files until all the jobs are complete
  57. while (!all_resolved(futures)) {
  58.  
  59. # get and print the progress information
  60. job_text <- paste("job", jobs)
  61. progress_text <- vapply(files, readLines, "")
  62. all_text <- paste0(job_text, ": ", progress_text,
  63. collapse = " ")
  64. cat("\r", all_text)
  65. flush.console()
  66.  
  67. }
  68.  
  69. cat("\n")
  70.  
  71. # get the values optionally simplify, and return
  72. results <- lapply(futures, value)
  73.  
  74. if (!identical(simplify, FALSE) && length(results) > 0) {
  75. results <- simplify2array(results,
  76. higher = (simplify == "array"))
  77. }
  78.  
  79. results
  80.  
  81. }
  82.  
  83. # # demo of a user-defined function, with parallel progress information
  84. #
  85. # library (future)
  86. #
  87. # foo <- function (n) {
  88. # for (i in seq_len(n)) {
  89. # update_parallel_progress(i, n)
  90. # Sys.sleep(runif(1))
  91. # }
  92. # "success!"
  93. # }
  94. #
  95. # plan(multiprocess)
  96. # future_replicate(4, foo(30))
Add Comment
Please, Sign In to add comment