Advertisement
k98kurz

chatv3

Jul 10th, 2023 (edited)
3,420
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 11.28 KB | Source Code | 0 0
  1. package main
  2.  
  3. import (
  4.     "bufio"
  5.     "encoding/binary"
  6.     "fmt"
  7.     "net"
  8.     "os"
  9.     "strconv"
  10.     "strings"
  11.     "sync"
  12.     "time"
  13. )
  14.  
  15. type Client struct {
  16.     socket   net.Conn
  17.     username string
  18.     inbox    chan string
  19. }
  20.  
  21. type ActiveClients struct {
  22.     mu      sync.RWMutex
  23.     clients map[string]*Client
  24. }
  25.  
  26. func (ac *ActiveClients) add(c *Client) bool {
  27.     ac.mu.Lock()
  28.     defer ac.mu.Unlock()
  29.     if ac.clients == nil {
  30.         ac.clients = make(map[string]*Client, 0)
  31.     }
  32.     if c.inbox == nil {
  33.         c.inbox = make(chan string, 100)
  34.     }
  35.  
  36.     for username := range ac.clients {
  37.         if username == c.username {
  38.             return false
  39.         }
  40.     }
  41.     ac.clients[c.username] = c
  42.     return true
  43. }
  44.  
  45. func (ac *ActiveClients) remove(c Client) bool {
  46.     ac.mu.Lock()
  47.     defer ac.mu.Unlock()
  48.     debug("Removing %v\n", c.username)
  49.  
  50.     if ac.clients == nil {
  51.         return false
  52.     }
  53.  
  54.     _, ok := ac.clients[c.username]
  55.  
  56.     if ok {
  57.         delete(ac.clients, c.username)
  58.     }
  59.     return ok
  60. }
  61.  
  62. func (ac *ActiveClients) forwardMessage(message string, from string) {
  63.     ac.mu.RLock()
  64.     defer ac.mu.RUnlock()
  65.     msgWithFrom := from + ": " + message
  66.  
  67.     for username, c := range ac.clients {
  68.         if username != from {
  69.             debug("Attempting to forward to %v...", username)
  70.             select {
  71.             case c.inbox <- msgWithFrom:
  72.                 debug("Forwarding message to %v\n", username)
  73.             default:
  74.                 // if buffer is full, drop the message
  75.                 debug("Message dropped because %v inbox is full.\n", c.username)
  76.             }
  77.         }
  78.     }
  79. }
  80.  
  81. var wg sync.WaitGroup
  82. var shouldDebug bool = false
  83.  
  84. func debug(message string, vals ...any) {
  85.     if shouldDebug {
  86.         fmt.Printf(message, vals...)
  87.     }
  88. }
  89.  
  90. func uint16ToBytes(i uint16) []byte {
  91.     // return []byte{byte(i >> 8), byte(i & 0xFF)}
  92.     b := make([]byte, 2)
  93.     binary.BigEndian.PutUint16(b, i)
  94.     return b
  95. }
  96.  
  97. func bytesToUint16(b []byte) uint16 {
  98.     return binary.BigEndian.Uint16(b[:2])
  99. }
  100.  
  101. func sendMessage(message string, connection net.Conn, milliseconds int) (int, error) {
  102.     var timeout time.Duration
  103.     msg_bytes := []byte(message)
  104.     msg_size := len(msg_bytes)
  105.     msg := append(uint16ToBytes(uint16(msg_size)), msg_bytes...)
  106.     if milliseconds > 0 {
  107.         timeout = time.Millisecond * time.Duration(milliseconds)
  108.         connection.SetWriteDeadline(time.Now().Add(timeout))
  109.         defer connection.SetWriteDeadline(time.Time{})
  110.     }
  111.     size, err := connection.Write(msg)
  112.     if err != nil {
  113.         if !strings.Contains(err.Error(), "i/o timeout") {
  114.             fmt.Println(err)
  115.         }
  116.     }
  117.     debug("Sent %v bytes to %v\n", size, connection.RemoteAddr())
  118.     return size, err
  119. }
  120.  
  121. func receiveMessage(connection net.Conn, milliseconds int) (string, int, error) {
  122.     var msg_size [2]byte
  123.     var msg []byte
  124.     var timeout time.Duration
  125.     if milliseconds > 0 {
  126.         timeout = time.Millisecond * time.Duration(milliseconds)
  127.         connection.SetReadDeadline(time.Now().Add(timeout))
  128.         defer connection.SetReadDeadline(time.Time{})
  129.     }
  130.     size, err := connection.Read(msg_size[:])
  131.     if err != nil {
  132.         if !strings.Contains(err.Error(), "i/o timeout") {
  133.             fmt.Println(err)
  134.         }
  135.     }
  136.     if err != nil || size == 0 {
  137.         return "", size, err
  138.     }
  139.     size = int(bytesToUint16(msg_size[:]))
  140.     msg = make([]byte, size)
  141.     size, err = connection.Read(msg)
  142.     if err != nil {
  143.         if !strings.Contains(err.Error(), "i/o timeout") {
  144.             fmt.Println(err)
  145.         }
  146.     }
  147.     if err != nil || size == 0 {
  148.         return "", size, err
  149.     }
  150.  
  151.     debug("Received message (%v bytes) from %v\n", size+2, connection.RemoteAddr())
  152.     return string(msg), size, err
  153. }
  154.  
  155. func server(numberOfClients int, hostInterface string) {
  156.     shouldDebug = true
  157.     users := ActiveClients{}
  158.     listener, err := net.Listen("tcp", hostInterface)
  159.     if err != nil {
  160.         fmt.Println(err)
  161.         os.Exit(1)
  162.     }
  163.  
  164.     for i := 0; i < numberOfClients; i++ {
  165.         wg.Add(1)
  166.         go listen(listener, &users)
  167.     }
  168.  
  169.     wg.Wait()
  170. }
  171.  
  172. func listen(listener net.Listener, users *ActiveClients) {
  173.     defer wg.Done()
  174.     var username string
  175.     var thisClient Client
  176.     var lg sync.WaitGroup
  177.     signalShouldEnd := make(chan bool, 1)
  178.  
  179. loop:
  180.     connection, err := listener.Accept()
  181.     if err != nil {
  182.         debug(err.Error() + "\n")
  183.         return
  184.     }
  185.  
  186.     debug("Received connection from %v\n", connection.RemoteAddr())
  187.  
  188.     hasSetUsername := false
  189.     lg.Add(2)
  190.     go func() {
  191.         defer lg.Done()
  192.         for {
  193.             // wait up to 100ms for the client to send a message
  194.             message, size, err := receiveMessage(connection, 100)
  195.             if err != nil {
  196.                 if strings.Contains(err.Error(), "i/o timeout") {
  197.                     // timed out, so go back to start of loop
  198.                     // fmt.Println("i/o timeout")
  199.                     continue
  200.                 }
  201.                 debug(err.Error())
  202.                 select {
  203.                 case signalShouldEnd <- true:
  204.                     if users.remove(thisClient) {
  205.                         users.forwardMessage("Goodbye "+thisClient.username, "server")
  206.                     }
  207.                 default:
  208.                 }
  209.                 return
  210.             }
  211.  
  212.             if size > 0 {
  213.                 if !hasSetUsername {
  214.                     username = message
  215.                     if username == "server" {
  216.                         sendMessage("server: error: username is invalid; please send new username", connection, 0)
  217.                         break
  218.                     }
  219.                     thisClient = Client{
  220.                         socket:   connection,
  221.                         username: username,
  222.                     }
  223.                     ok := users.add(&thisClient)
  224.                     if !ok {
  225.                         sendMessage("server: error: username is invalid; please send new username", connection, 0)
  226.                         break
  227.                     } else {
  228.                         users.forwardMessage("Welcome "+username, "server")
  229.                     }
  230.                     hasSetUsername = true
  231.                 } else {
  232.                     users.forwardMessage(message, username)
  233.                 }
  234.             }
  235.  
  236.             select {
  237.             case <-signalShouldEnd:
  238.                 return
  239.             default:
  240.             }
  241.         }
  242.     }()
  243.  
  244.     go func() {
  245.         defer lg.Done()
  246.         for {
  247.             select {
  248.             case incoming := <-thisClient.inbox:
  249.                 debug("Message found in inbox; sending to %v\n", thisClient.username)
  250.                 _, err := sendMessage(incoming, thisClient.socket, 0)
  251.                 if err != nil {
  252.                     if !strings.Contains(err.Error(), "i/o timeout") {
  253.                         debug(err.Error() + "\n")
  254.                         select {
  255.                         case signalShouldEnd <- true:
  256.                             if users.remove(thisClient) {
  257.                                 users.forwardMessage("Goodbye "+thisClient.username, "server")
  258.                             }
  259.                         default:
  260.                         }
  261.                         return
  262.                     }
  263.                     continue
  264.                 }
  265.             case <-signalShouldEnd:
  266.                 return
  267.             default:
  268.                 time.Sleep(time.Millisecond * 10)
  269.             }
  270.         }
  271.     }()
  272.  
  273.     lg.Wait()
  274.     debug("Recycling server socket for new client\n")
  275.     goto loop
  276. }
  277.  
  278. func client(host string, username string) {
  279.     connection, err := net.Dial("tcp", host+":1337")
  280.     endSignals := []string{"bye", "quit", "exit"}
  281.  
  282.     if err != nil {
  283.         debug("%v\n", err)
  284.         return
  285.     }
  286.  
  287.     go func() {
  288.         for {
  289.             message, size, err := receiveMessage(connection, 100)
  290.  
  291.             if err != nil {
  292.                 if strings.Contains(err.Error(), "i/o timeout") {
  293.                     continue
  294.                 }
  295.                 debug("%v\n", err)
  296.                 os.Exit(0)
  297.             }
  298.  
  299.             if size == 0 {
  300.                 continue
  301.             }
  302.  
  303.             // fmt.Printf("Received %v bytes: ", size)
  304.             fmt.Println(message)
  305.         }
  306.     }()
  307.  
  308.     fmt.Println("Connected to server.")
  309.  
  310.     _, err = sendMessage(username, connection, 0)
  311.     if err != nil {
  312.         fmt.Println(err)
  313.         return
  314.     }
  315.  
  316.     for {
  317.         message := askInput("")
  318.         if contains(endSignals, message) {
  319.             return
  320.         }
  321.         _, err := sendMessage(message, connection, 0)
  322.         if err != nil {
  323.             fmt.Println(err)
  324.             return
  325.         }
  326.     }
  327. }
  328.  
  329. func benchmark(host string, numberOfClients int, numberOfMessages int, baseMessage string) {
  330.     wg.Add(numberOfClients)
  331.     for i := 0; i < numberOfClients; i++ {
  332.         go func(i int) {
  333.             defer wg.Done()
  334.             var bg sync.WaitGroup
  335.             username := "benchmark" + strconv.Itoa(i)
  336.             connection, err := net.Dial("tcp", host+":1337")
  337.             if err != nil {
  338.                 fmt.Println(err)
  339.                 return
  340.             }
  341.             // current limitation: must wait for username to process
  342.             _, _ = sendMessage(username, connection, 100)
  343.             time.Sleep(time.Millisecond * 100)
  344.  
  345.             bg.Add(numberOfMessages)
  346.             for d := 0; d < numberOfMessages; d++ {
  347.                 go func(message string) {
  348.                     defer bg.Done()
  349.                     _, err = sendMessage(message, connection, 100)
  350.                     if err != nil {
  351.                         fmt.Println(err)
  352.                         return
  353.                     }
  354.                 }(baseMessage + strconv.Itoa(d))
  355.             }
  356.  
  357.             bg.Add(1)
  358.             go func() {
  359.                 defer bg.Done()
  360.                 for {
  361.                     message, size, err := receiveMessage(connection, 2000)
  362.                     if err != nil {
  363.                         fmt.Println(err)
  364.                         return
  365.                     }
  366.                     if size == 0 {
  367.                         fmt.Println("aborting from empty receive")
  368.                         return
  369.                     }
  370.                     fmt.Println(message)
  371.                 }
  372.             }()
  373.  
  374.             bg.Wait()
  375.         }(i)
  376.     }
  377.     wg.Wait()
  378. }
  379.  
  380. func askInput(query string) string {
  381.     if query != "" {
  382.         fmt.Printf("%v ", query)
  383.     }
  384.     in := bufio.NewReader(os.Stdin)
  385.     data, _ := in.ReadString('\n')
  386.     data, _, _ = strings.Cut(data, "\n")
  387.     data, _, _ = strings.Cut(data, "\r")
  388.     return data
  389. }
  390.  
  391. func contains[T comparable](list []T, query T) bool {
  392.     // contains function adapted from https://stackoverflow.com/a/10485970
  393.     for _, item := range list {
  394.         if item == query {
  395.             return true
  396.         }
  397.     }
  398.     return false
  399. }
  400.  
  401. func usage(programName string) {
  402.     fmt.Printf("usage: %v server numerOfThreads [hostInterface]\n", programName)
  403.     fmt.Println("\thostInterface is of form hostname:port or IP:port")
  404.     fmt.Println("\tuse 0.0.0.0:port to listen on every available IP interface")
  405.     fmt.Println("\tdefault is localhost:1337")
  406.     fmt.Printf("usage: %v client hostIP username\n", programName)
  407.     fmt.Printf("usage: %v benchmark hostIP numberOfClients numberOfMessages baseMessage\n", programName)
  408.     fmt.Println("arguments must be in order shown")
  409. }
  410.  
  411. func main() {
  412.     programName := os.Args[0]
  413.     nameParts := strings.Split(programName, "\\")
  414.     programName = nameParts[len(nameParts)-1]
  415.  
  416.     if len(os.Args) < 2 {
  417.         usage(programName)
  418.         return
  419.     }
  420.  
  421.     mode := os.Args[1]
  422.  
  423.     switch mode {
  424.     case "server", "serve":
  425.         var numberOfClients int
  426.         if len(os.Args) < 3 {
  427.             usage(programName)
  428.             return
  429.         }
  430.         n, _ := strconv.ParseInt(os.Args[2], 10, 16)
  431.         numberOfClients = int(n)
  432.         var hostInterface string
  433.         if len(os.Args) > 3 {
  434.             hostInterface = os.Args[3]
  435.         } else {
  436.             hostInterface = "localhost:1337"
  437.         }
  438.         server(numberOfClients, hostInterface)
  439.     case "client", "connect":
  440.         var host string
  441.         var username string
  442.         if len(os.Args) < 4 {
  443.             usage(programName)
  444.             return
  445.         }
  446.         host = os.Args[2]
  447.         username = os.Args[3]
  448.         client(host, username)
  449.     case "benchmark":
  450.         var host string
  451.         var nClients string
  452.         var numberOfClients int
  453.         var nMsgs string
  454.         var numberOfMessages int
  455.         var message string
  456.         if len(os.Args) < 6 {
  457.             usage(programName)
  458.             return
  459.         }
  460.         host = os.Args[2]
  461.         nClients = os.Args[3]
  462.         nMsgs = os.Args[4]
  463.         message = os.Args[5]
  464.         n, _ := strconv.ParseInt(nClients, 10, 16)
  465.         numberOfClients = int(n)
  466.         n, _ = strconv.ParseInt(nMsgs, 10, 16)
  467.         numberOfMessages = int(n)
  468.         benchmark(host, numberOfClients, numberOfMessages, message)
  469.     default:
  470.         usage(programName)
  471.     }
  472. }
  473.  
  474. // ISC License
  475.  
  476. // Copyleft (c) 2023 k98kurz
  477.  
  478. // Permission to use, copy, modify, and/or distribute this software
  479. // for any purpose with or without fee is hereby granted, provided
  480. // that the above copyleft notice and this permission notice appear in
  481. // all copies.
  482.  
  483. // THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
  484. // WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
  485. // WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
  486. // AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
  487. // CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
  488. // OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  489. // NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
  490. // CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  491.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement