Guest User

Untitled

a guest
May 9th, 2024
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 3.22 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4.     "encoding/base64"
  5.     "flag"
  6.     "fmt"
  7.     "io"
  8.     "net"
  9.     "strings"
  10.     "sync"
  11. )
  12.  
  13. var Config struct {
  14.     host      string
  15.     port      string
  16.     replicaOf string
  17. }
  18.  
  19. var RDB = "UkVESVMwMDEx+glyZWRpcy12ZXIFNy4yLjD6CnJlZGlzLWJpdHPAQPoFY3RpbWXCbQi8ZfoIdXNlZC1tZW3CsMQQAPoIYW9mLWJhc2XAAP/wbjv+wP9aog=="
  20.  
  21. func handleConnection(conn net.Conn, kv *Kv) {
  22.     defer conn.Close()
  23.     kv.Clients[conn.RemoteAddr().String()] = conn
  24.  
  25.     for {
  26.         r := NewResp(conn)
  27.         value, err := r.Read()
  28.         if err != nil {
  29.             if err == io.EOF {
  30.                 fmt.Println("Client disconnected: ", conn.RemoteAddr().String())
  31.             } else {
  32.                 fmt.Println("ERR IS", err)
  33.             }
  34.             return
  35.         } else {
  36.             fmt.Println("Client connected: ", conn.RemoteAddr().String())
  37.         }
  38.  
  39.         if value.Typ != "array" {
  40.             fmt.Println("Invalid request, expected array")
  41.             continue
  42.         }
  43.  
  44.         if len(value.Array) == 0 {
  45.             fmt.Println("Invalid request, expected array length > 0")
  46.             continue
  47.         }
  48.  
  49.         command := strings.ToUpper(value.Array[0].Bulk)
  50.         args := value.Array[1:]
  51.  
  52.         writer := NewWriter(conn)
  53.  
  54.         handler, ok := Handlers[command]
  55.         if !ok {
  56.             fmt.Println("Invalid command: ", command)
  57.             err := writer.Write(Value{Typ: "string", Str: "Invalid command" + command})
  58.             if err != nil {
  59.                 fmt.Println("Error writing invalid command message:", err)
  60.                 break
  61.             }
  62.             continue
  63.         }
  64.  
  65.         result := handler(args, kv)
  66.         fmt.Println(kv.Info.Role, "DB VALUE:", kv.SETs)
  67.         writer.Write(result)
  68.  
  69.         if kv.Info.Role == "master" {
  70.  
  71.             switch command {
  72.             case "SET":
  73.                 wg := sync.WaitGroup{}
  74.                 for _, conn := range kv.Slaves {
  75.                     wg.Add(1)
  76.                     go propagateToSlave(conn, value, &wg)
  77.                 }
  78.                 wg.Wait()
  79.  
  80.             case "PSYNC":
  81.                 decodedStr, _ := base64.StdEncoding.DecodeString(RDB)
  82.                 writer.Write(Value{Typ: "bulk", Bulk: string(decodedStr), NoCRLF: true})
  83.                 if kv.Info.Role == "master" {
  84.  
  85.                     kv.Slaves = append(kv.Slaves, writer.writer)
  86.  
  87.                 }
  88.             default:
  89.                 // do nothing
  90.             }
  91.         }
  92.  
  93.     }
  94. }
  95.  
  96. func propagateToSlave(conn io.Writer, value Value, wg *sync.WaitGroup) {
  97.     fmt.Println("replicating command")
  98.     defer wg.Done()
  99.     writer := NewWriter(conn)
  100.     writer.Write(value)
  101. }
  102.  
  103. func main() {
  104.     flag.StringVar(&Config.port, "port", "6379", "port for the redis server")
  105.     flag.StringVar(&Config.host, "host", "0.0.0.0", "port for the redis server")
  106.     flag.StringVar(&Config.replicaOf, "replicaof", "master", "replicate and give the slave role")
  107.     flag.Parse()
  108.  
  109.     kv := NewKv()
  110.     kv.Info.Host = Config.host
  111.     kv.Info.Port = Config.port
  112.  
  113.     if Config.replicaOf == "master" {
  114.         kv.Info.Role = "master"
  115.     } else {
  116.         kv.Info.Role = "slave"
  117.         kv.Info.MasterHost = Config.replicaOf
  118.         kv.Info.MasterPort = flag.Args()[len(flag.Args())-1]
  119.         kv.Info.MasterConn = masterHandshake(kv.Info.MasterHost, kv.Info.MasterPort, kv.Info.Port)
  120.    
  121.         go handleConnection(kv.Info.MasterConn, kv)
  122.     }
  123.    
  124.     // Create a new server
  125.     l, err := net.Listen("tcp", fmt.Sprintf("%s:%s", kv.Info.Host, kv.Info.Port))
  126.     if err != nil {
  127.         fmt.Println(err)
  128.         return
  129.     }
  130.     fmt.Println("Listening on port:", kv.Info.Port)
  131.  
  132.     defer l.Close()
  133.  
  134.     for {
  135.         // Listen for connections
  136.         conn, err := l.Accept()
  137.         if err != nil {
  138.             fmt.Println(err)
  139.             return
  140.         }
  141.  
  142.         go handleConnection(conn, kv)
  143.     }
  144. }
  145.  
Add Comment
Please, Sign In to add comment