SHARE
TWEET

Untitled

a guest Feb 14th, 2020 84 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package mr
  2.  
  3. import (
  4.     "encoding/json"
  5.     "fmt"
  6.     "hash/fnv"
  7.     "io/ioutil"
  8.     "log"
  9.     "net/rpc"
  10.     "os"
  11.     "sort"
  12.     "time"
  13. )
  14.  
  15. // for sorting by key.
  16. type ByKey []KeyValue
  17.  
  18. // for sorting by key.
  19. func (a ByKey) Len() int           { return len(a) }
  20. func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
  21. func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
  22.  
  23. //
  24. // Map functions return a slice of KeyValue.
  25. //
  26. type KeyValue struct {
  27.     Key   string
  28.     Value string
  29. }
  30.  
  31. //
  32. // use ihash(key) % NReduce to choose the reduce
  33. // task number for each KeyValue emitted by Map.
  34. //
  35. func ihash(key string) int {
  36.     h := fnv.New32a()
  37.     h.Write([]byte(key))
  38.     return int(h.Sum32() & 0x7fffffff)
  39. }
  40.  
  41. func generateFileNames(reduceTaskNo int, nMap int) []string {
  42.     result := []string{}
  43.     for i := 0; i < nMap; i++ {
  44.         filename := fmt.Sprintf("mr-%d-%d", i, reduceTaskNo)
  45.         result = append(result, filename)
  46.     }
  47.     return result
  48. }
  49.  
  50. //
  51. // main/mrworker.go calls this function.
  52. //
  53. func Worker(mapf func(string, string) []KeyValue,
  54.     reducef func(string, []string) string) {
  55.  
  56.     // Your worker implementation here.
  57.     for {
  58.         task := RequestTask()
  59.         // fmt.Println(task)
  60.  
  61.         if task.TaskType == Reduce {
  62.             // this is a reduce task
  63.  
  64.             var kva []KeyValue
  65.             filenames := generateFileNames(task.TaskNumber, task.NMap)
  66.  
  67.             for _, filename := range filenames {
  68.                 file, err := os.Open(filename)
  69.  
  70.                 if err != nil {
  71.                     log.Fatalf("cannot load file %v", filename)
  72.                 }
  73.  
  74.                 dec := json.NewDecoder(file)
  75.                 for {
  76.                     var kv KeyValue
  77.                     if err := dec.Decode(&kv); err != nil {
  78.                         break
  79.                     }
  80.                     kva = append(kva, kv)
  81.                 }
  82.                 file.Close()
  83.             }
  84.  
  85.             sort.Sort(ByKey(kva))
  86.  
  87.             oname := fmt.Sprintf("mr-out-%d", task.TaskNumber)
  88.             ofile, err := os.Create(oname)
  89.  
  90.             if err != nil {
  91.                 log.Fatalf("cannot create reduce output file %v", oname)
  92.             }
  93.  
  94.             i := 0
  95.             for i < len(kva) {
  96.                 j := i + 1
  97.                 for j < len(kva) && kva[j].Key == kva[i].Key {
  98.                     j++
  99.                 }
  100.                 values := []string{}
  101.                 for k := i; k < j; k++ {
  102.                     values = append(values, kva[k].Value)
  103.                 }
  104.                 output := reducef(kva[i].Key, values)
  105.                 // fmt.Println("Reducef  completed ")
  106.  
  107.                 // this is the correct format for each line of Reduce output.
  108.                 fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)
  109.  
  110.                 i = j
  111.             }
  112.             ofile.Close()
  113.             taskDone := TaskDone{}
  114.  
  115.             call("Master.ReduceTaskDoneHandler", &task, &taskDone)
  116.  
  117.         } else if task.TaskType == Map {
  118.             //this is a mapTask
  119.             content := GetFileContents(task.Filename)
  120.             kva := mapf(task.Filename, string(content))
  121.  
  122.             // sort.Sort(ByKey(kva))
  123.  
  124.             reduceTaskToMapResults := make(map[int][]KeyValue)
  125.  
  126.             for i := 1; i < task.NReduce+1; i++ {
  127.                 reduceTaskToMapResults[i] = []KeyValue{}
  128.             }
  129.  
  130.             for _, kv := range kva {
  131.                 reduceTaskNo := (ihash(kv.Key) % task.NReduce) + 1
  132.                 // fmt.Println(reduceTaskNo)
  133.                 if _, ok := reduceTaskToMapResults[reduceTaskNo]; ok {
  134.                     reduceTaskToMapResults[reduceTaskNo] = append(reduceTaskToMapResults[reduceTaskNo], kv)
  135.                 } else {
  136.                     reduceTaskToMapResults[reduceTaskNo] = []KeyValue{kv}
  137.                 }
  138.             }
  139.  
  140.             for reduceTaskNo, result := range reduceTaskToMapResults {
  141.                 mapTaskNumber := int(task.TaskNumber)
  142.                 oname := fmt.Sprintf("mr-%d-%d", mapTaskNumber, reduceTaskNo)
  143.                 ofile, _ := os.Create(oname)
  144.                 enc := json.NewEncoder(ofile)
  145.  
  146.                 // sort.Sort(ByKey(result))
  147.  
  148.                 for _, kv := range result {
  149.                     err := enc.Encode(&kv)
  150.                     if err != nil {
  151.                         log.Fatalf("Trouble writing to file %v", task.Filename)
  152.                     }
  153.                 }
  154.             }
  155.             taskDone := TaskDone{}
  156.             call("Master.MapTaskDoneHandler", &task, &taskDone)
  157.  
  158.         } else {
  159.             return
  160.         }
  161.  
  162.         // //Exit loop by getting exit task from Master
  163.         time.Sleep(time.Second)
  164.     }
  165.  
  166.     // uncomment to send the Example RPC to the master.
  167.     // CallExample()
  168.  
  169. }
  170.  
  171. func GetFileContents(filename string) []byte {
  172.     file, err := os.Open(filename)
  173.     if err != nil {
  174.         log.Fatalf("cannot open %v", filename)
  175.     }
  176.     content, err := ioutil.ReadAll(file)
  177.     if err != nil {
  178.         log.Fatalf("cannot read %v", filename)
  179.     }
  180.     file.Close()
  181.  
  182.     return content
  183. }
  184.  
  185. // RPC call to Master to request a task
  186. func RequestTask() Task {
  187.     task := Task{}
  188.     taskRequest := TaskRequest{}
  189.  
  190.     call("Master.RequestTaskHandler", &taskRequest, &task)
  191.  
  192.     return task
  193. }
  194.  
  195. //
  196. // send an RPC request to the master, wait for the response.
  197. // usually returns true.
  198. // returns false if something goes wrong.
  199. //
  200. func call(rpcname string, args interface{}, reply interface{}) bool {
  201.     // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
  202.     c, err := rpc.DialHTTP("unix", "mr-socket")
  203.     if err != nil {
  204.         log.Fatal("dialing:", err)
  205.     }
  206.     defer c.Close()
  207.  
  208.     err = c.Call(rpcname, args, reply)
  209.     if err == nil {
  210.         return true
  211.     }
  212.  
  213.     fmt.Println(err)
  214.     return false
  215. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Top