Guest User

Untitled

a guest
Dec 17th, 2017
140
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.98 KB | None | 0 0
  1. package mapreduce
  2.  
  3. import (
  4. "fmt"
  5. "log"
  6. "sync"
  7. )
  8.  
  9. //
  10. // schedule() starts and waits for all tasks in the given phase (Map
  11. // or Reduce). the mapFiles argument holds the names of the files that
  12. // are the inputs to the map phase, one per map task. nReduce is the
  13. // number of reduce tasks. the registerChan argument yields a stream
  14. // of registered workers; each item is the worker's RPC address,
  15. // suitable for passing to call(). registerChan will yield all
  16. // existing registered workers (if any) and new ones as they register.
  17. //
  18. func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
  19. var ntasks int
  20. var n_other int // number of inputs (for reduce) or outputs (for map)
  21. switch phase {
  22. case mapPhase:
  23. ntasks = len(mapFiles)
  24. n_other = nReduce
  25. case reducePhase:
  26. ntasks = nReduce
  27. n_other = len(mapFiles)
  28. }
  29.  
  30. fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
  31.  
  32. // All ntasks tasks have to be scheduled on workers, and only once all of
  33. // them have been completed successfully should the function return.
  34. // Remember that workers may fail, and that any given worker may finish
  35. // multiple tasks.
  36. //
  37. // TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO
  38. //
  39.  
  40. var cond sync.WaitGroup
  41. cond.Add(ntasks)
  42.  
  43. done := make(chan bool)
  44. tasks := make(chan DoTaskArgs, ntasks)
  45. for i := 0; i < ntasks; i++ {
  46. tasks <- DoTaskArgs{
  47. jobName,
  48. mapFiles[i],
  49. phase,
  50. i,
  51. n_other,
  52. }
  53. }
  54.  
  55. runEachWorker := func(workerAddr string) {
  56. for {
  57. select {
  58. case t := <-tasks:
  59. success := call(workerAddr, "Worker.DoTask", t, nil)
  60. if success {
  61. cond.Done()
  62. } else {
  63. // worker is down, give back task for re-scheduling
  64. tasks <- t
  65. return
  66. }
  67. case <-done:
  68. return
  69. }
  70. }
  71. }
  72.  
  73. go func() {
  74. for {
  75. select {
  76. case worker := <-registerChan:
  77. go runEachWorker(worker)
  78. case <-done:
  79. return
  80. }
  81. }
  82. }()
  83.  
  84. cond.Wait()
  85. close(done)
  86. log.Printf("Schedule: %v phase done\n", phase)
  87. }
Add Comment
Please, Sign In to add comment