Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package mr
- import (
- "encoding/json"
- "fmt"
- "hash/fnv"
- "io/ioutil"
- "log"
- "net/rpc"
- "os"
- "sort"
- "time"
- )
- // for sorting by key.
- type ByKey []KeyValue
- // for sorting by key.
- func (a ByKey) Len() int { return len(a) }
- func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
- func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
- //
- // Map functions return a slice of KeyValue.
- //
- type KeyValue struct {
- Key string
- Value string
- }
- //
- // use ihash(key) % NReduce to choose the reduce
- // task number for each KeyValue emitted by Map.
- //
- func ihash(key string) int {
- h := fnv.New32a()
- h.Write([]byte(key))
- return int(h.Sum32() & 0x7fffffff)
- }
- func generateFileNames(reduceTaskNo int, nMap int) []string {
- result := []string{}
- for i := 0; i < nMap; i++ {
- filename := fmt.Sprintf("mr-%d-%d", i, reduceTaskNo)
- result = append(result, filename)
- }
- return result
- }
- //
- // main/mrworker.go calls this function.
- //
- func Worker(mapf func(string, string) []KeyValue,
- reducef func(string, []string) string) {
- // Your worker implementation here.
- for {
- task := RequestTask()
- // fmt.Println(task)
- if task.TaskType == Reduce {
- // this is a reduce task
- var kva []KeyValue
- filenames := generateFileNames(task.TaskNumber, task.NMap)
- for _, filename := range filenames {
- file, err := os.Open(filename)
- if err != nil {
- log.Fatalf("cannot load file %v", filename)
- }
- dec := json.NewDecoder(file)
- for {
- var kv KeyValue
- if err := dec.Decode(&kv); err != nil {
- break
- }
- kva = append(kva, kv)
- }
- file.Close()
- }
- sort.Sort(ByKey(kva))
- oname := fmt.Sprintf("mr-out-%d", task.TaskNumber)
- ofile, err := os.Create(oname)
- if err != nil {
- log.Fatalf("cannot create reduce output file %v", oname)
- }
- i := 0
- for i < len(kva) {
- j := i + 1
- for j < len(kva) && kva[j].Key == kva[i].Key {
- j++
- }
- values := []string{}
- for k := i; k < j; k++ {
- values = append(values, kva[k].Value)
- }
- output := reducef(kva[i].Key, values)
- // fmt.Println("Reducef completed ")
- // this is the correct format for each line of Reduce output.
- fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)
- i = j
- }
- ofile.Close()
- taskDone := TaskDone{}
- call("Master.ReduceTaskDoneHandler", &task, &taskDone)
- } else if task.TaskType == Map {
- //this is a mapTask
- content := GetFileContents(task.Filename)
- kva := mapf(task.Filename, string(content))
- // sort.Sort(ByKey(kva))
- reduceTaskToMapResults := make(map[int][]KeyValue)
- for i := 1; i < task.NReduce+1; i++ {
- reduceTaskToMapResults[i] = []KeyValue{}
- }
- for _, kv := range kva {
- reduceTaskNo := (ihash(kv.Key) % task.NReduce) + 1
- // fmt.Println(reduceTaskNo)
- if _, ok := reduceTaskToMapResults[reduceTaskNo]; ok {
- reduceTaskToMapResults[reduceTaskNo] = append(reduceTaskToMapResults[reduceTaskNo], kv)
- } else {
- reduceTaskToMapResults[reduceTaskNo] = []KeyValue{kv}
- }
- }
- for reduceTaskNo, result := range reduceTaskToMapResults {
- mapTaskNumber := int(task.TaskNumber)
- oname := fmt.Sprintf("mr-%d-%d", mapTaskNumber, reduceTaskNo)
- ofile, _ := os.Create(oname)
- enc := json.NewEncoder(ofile)
- // sort.Sort(ByKey(result))
- for _, kv := range result {
- err := enc.Encode(&kv)
- if err != nil {
- log.Fatalf("Trouble writing to file %v", task.Filename)
- }
- }
- }
- taskDone := TaskDone{}
- call("Master.MapTaskDoneHandler", &task, &taskDone)
- } else {
- return
- }
- // //Exit loop by getting exit task from Master
- time.Sleep(time.Second)
- }
- // uncomment to send the Example RPC to the master.
- // CallExample()
- }
- func GetFileContents(filename string) []byte {
- file, err := os.Open(filename)
- if err != nil {
- log.Fatalf("cannot open %v", filename)
- }
- content, err := ioutil.ReadAll(file)
- if err != nil {
- log.Fatalf("cannot read %v", filename)
- }
- file.Close()
- return content
- }
- // RPC call to Master to request a task
- func RequestTask() Task {
- task := Task{}
- taskRequest := TaskRequest{}
- call("Master.RequestTaskHandler", &taskRequest, &task)
- return task
- }
- //
- // send an RPC request to the master, wait for the response.
- // usually returns true.
- // returns false if something goes wrong.
- //
- func call(rpcname string, args interface{}, reply interface{}) bool {
- // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
- c, err := rpc.DialHTTP("unix", "mr-socket")
- if err != nil {
- log.Fatal("dialing:", err)
- }
- defer c.Close()
- err = c.Call(rpcname, args, reply)
- if err == nil {
- return true
- }
- fmt.Println(err)
- return false
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement