Advertisement
Guest User

Untitled

a guest
Feb 14th, 2020
132
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.77 KB | None | 0 0
  1. package mr
  2.  
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "hash/fnv"
  7. "io/ioutil"
  8. "log"
  9. "net/rpc"
  10. "os"
  11. "sort"
  12. "time"
  13. )
  14.  
  15. // for sorting by key.
  16. type ByKey []KeyValue
  17.  
  18. // for sorting by key.
  19. func (a ByKey) Len() int { return len(a) }
  20. func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  21. func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
  22.  
  23. //
  24. // Map functions return a slice of KeyValue.
  25. //
  26. type KeyValue struct {
  27. Key string
  28. Value string
  29. }
  30.  
  31. //
  32. // use ihash(key) % NReduce to choose the reduce
  33. // task number for each KeyValue emitted by Map.
  34. //
  35. func ihash(key string) int {
  36. h := fnv.New32a()
  37. h.Write([]byte(key))
  38. return int(h.Sum32() & 0x7fffffff)
  39. }
  40.  
  41. func generateFileNames(reduceTaskNo int, nMap int) []string {
  42. result := []string{}
  43. for i := 0; i < nMap; i++ {
  44. filename := fmt.Sprintf("mr-%d-%d", i, reduceTaskNo)
  45. result = append(result, filename)
  46. }
  47. return result
  48. }
  49.  
  50. //
  51. // main/mrworker.go calls this function.
  52. //
  53. func Worker(mapf func(string, string) []KeyValue,
  54. reducef func(string, []string) string) {
  55.  
  56. // Your worker implementation here.
  57. for {
  58. task := RequestTask()
  59. // fmt.Println(task)
  60.  
  61. if task.TaskType == Reduce {
  62. // this is a reduce task
  63.  
  64. var kva []KeyValue
  65. filenames := generateFileNames(task.TaskNumber, task.NMap)
  66.  
  67. for _, filename := range filenames {
  68. file, err := os.Open(filename)
  69.  
  70. if err != nil {
  71. log.Fatalf("cannot load file %v", filename)
  72. }
  73.  
  74. dec := json.NewDecoder(file)
  75. for {
  76. var kv KeyValue
  77. if err := dec.Decode(&kv); err != nil {
  78. break
  79. }
  80. kva = append(kva, kv)
  81. }
  82. file.Close()
  83. }
  84.  
  85. sort.Sort(ByKey(kva))
  86.  
  87. oname := fmt.Sprintf("mr-out-%d", task.TaskNumber)
  88. ofile, err := os.Create(oname)
  89.  
  90. if err != nil {
  91. log.Fatalf("cannot create reduce output file %v", oname)
  92. }
  93.  
  94. i := 0
  95. for i < len(kva) {
  96. j := i + 1
  97. for j < len(kva) && kva[j].Key == kva[i].Key {
  98. j++
  99. }
  100. values := []string{}
  101. for k := i; k < j; k++ {
  102. values = append(values, kva[k].Value)
  103. }
  104. output := reducef(kva[i].Key, values)
  105. // fmt.Println("Reducef completed ")
  106.  
  107. // this is the correct format for each line of Reduce output.
  108. fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)
  109.  
  110. i = j
  111. }
  112. ofile.Close()
  113. taskDone := TaskDone{}
  114.  
  115. call("Master.ReduceTaskDoneHandler", &task, &taskDone)
  116.  
  117. } else if task.TaskType == Map {
  118. //this is a mapTask
  119. content := GetFileContents(task.Filename)
  120. kva := mapf(task.Filename, string(content))
  121.  
  122. // sort.Sort(ByKey(kva))
  123.  
  124. reduceTaskToMapResults := make(map[int][]KeyValue)
  125.  
  126. for i := 1; i < task.NReduce+1; i++ {
  127. reduceTaskToMapResults[i] = []KeyValue{}
  128. }
  129.  
  130. for _, kv := range kva {
  131. reduceTaskNo := (ihash(kv.Key) % task.NReduce) + 1
  132. // fmt.Println(reduceTaskNo)
  133. if _, ok := reduceTaskToMapResults[reduceTaskNo]; ok {
  134. reduceTaskToMapResults[reduceTaskNo] = append(reduceTaskToMapResults[reduceTaskNo], kv)
  135. } else {
  136. reduceTaskToMapResults[reduceTaskNo] = []KeyValue{kv}
  137. }
  138. }
  139.  
  140. for reduceTaskNo, result := range reduceTaskToMapResults {
  141. mapTaskNumber := int(task.TaskNumber)
  142. oname := fmt.Sprintf("mr-%d-%d", mapTaskNumber, reduceTaskNo)
  143. ofile, _ := os.Create(oname)
  144. enc := json.NewEncoder(ofile)
  145.  
  146. // sort.Sort(ByKey(result))
  147.  
  148. for _, kv := range result {
  149. err := enc.Encode(&kv)
  150. if err != nil {
  151. log.Fatalf("Trouble writing to file %v", task.Filename)
  152. }
  153. }
  154. }
  155. taskDone := TaskDone{}
  156. call("Master.MapTaskDoneHandler", &task, &taskDone)
  157.  
  158. } else {
  159. return
  160. }
  161.  
  162. // //Exit loop by getting exit task from Master
  163. time.Sleep(time.Second)
  164. }
  165.  
  166. // uncomment to send the Example RPC to the master.
  167. // CallExample()
  168.  
  169. }
  170.  
  171. func GetFileContents(filename string) []byte {
  172. file, err := os.Open(filename)
  173. if err != nil {
  174. log.Fatalf("cannot open %v", filename)
  175. }
  176. content, err := ioutil.ReadAll(file)
  177. if err != nil {
  178. log.Fatalf("cannot read %v", filename)
  179. }
  180. file.Close()
  181.  
  182. return content
  183. }
  184.  
  185. // RPC call to Master to request a task
  186. func RequestTask() Task {
  187. task := Task{}
  188. taskRequest := TaskRequest{}
  189.  
  190. call("Master.RequestTaskHandler", &taskRequest, &task)
  191.  
  192. return task
  193. }
  194.  
  195. //
  196. // send an RPC request to the master, wait for the response.
  197. // usually returns true.
  198. // returns false if something goes wrong.
  199. //
  200. func call(rpcname string, args interface{}, reply interface{}) bool {
  201. // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
  202. c, err := rpc.DialHTTP("unix", "mr-socket")
  203. if err != nil {
  204. log.Fatal("dialing:", err)
  205. }
  206. defer c.Close()
  207.  
  208. err = c.Call(rpcname, args, reply)
  209. if err == nil {
  210. return true
  211. }
  212.  
  213. fmt.Println(err)
  214. return false
  215. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement