Advertisement
korsasupreme

Untitled

Apr 4th, 2020
1,012
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 2.98 KB | None | 0 0
  1. func serve(ctx context.Context, addr string, port int) <-chan Message {
  2.     type session struct {
  3.         id           int
  4.         conn         *net.UDPAddr
  5.         len          int64
  6.         countMessage int64
  7.         expiration   int64
  8.         buffer       chan []byte
  9.         run          func(wg *sync.WaitGroup, in chan []byte, ip string)
  10.     }
  11.  
  12.     var (
  13.         out         = make(chan Message, 64)
  14.         done        = make(chan error, 1)
  15.         wg          sync.WaitGroup
  16.         localAddres = &net.UDPAddr{IP: net.ParseIP(addr), Port: port}
  17.         bufPool     = sync.Pool{New: func() interface{} { return make([]byte, bufferSize) }}
  18.         sessions    []*session
  19.  
  20.         // TODO может будет не линейный поиск, пока посмотрим так
  21.         getSession = func(addr *net.UDPAddr) (*session, bool) {
  22.             for _, s := range sessions {
  23.                 if reflect.DeepEqual(s.conn, addr) {
  24.                     return s, true
  25.                 }
  26.             }
  27.             return nil, false
  28.         }
  29.         addSession = func(s *session) {
  30.             fmt.Printf("Add new session %s:%d id: %d \n", s.conn.IP.String(), s.conn.Port, s.id)
  31.             sessions = append(sessions, s)
  32.         }
  33.         removeSession = func(sess *session) {
  34.             for i, s := range sessions {
  35.                 if s.id == sess.id {
  36.                     fmt.Printf("Remove session %s:%d id: %d \n", s.conn.IP.String(), s.conn.Port, s.id)
  37.                     sessions = sessions[:i+copy(sessions[i:], sessions[i+1:])]
  38.                 }
  39.             }
  40.         }
  41.         gc = func() {
  42.             for {
  43.                 <-time.After(time.Duration(3 * time.Second))
  44.                 for _, s := range sessions {
  45.                     if time.Now().UnixNano() > s.expiration && s.expiration > 0 {
  46.                         removeSession(s)
  47.                     }
  48.                 }
  49.             }
  50.         }
  51.     )
  52.     go gc()
  53.     go func() {
  54.         pc, err := net.ListenUDP("udp", localAddres)
  55.         if err != nil {
  56.             done <- err
  57.         }
  58.         defer pc.Close()
  59.         go func() {
  60.             for {
  61.                 buff := bufPool.Get().([]byte)
  62.                 size, addr, err := pc.ReadFromUDP(buff[0:])
  63.                 if err != nil {
  64.                     done <- err
  65.                     return
  66.                 }
  67.                 switch s, ok := getSession(addr); ok {
  68.                 case true:
  69.                     s.buffer <- buff[0:size]
  70.                     bufPool.Put(buff)
  71.                     s.expiration = time.Now().Add(time.Duration(time.Second * 10)).UnixNano()
  72.                     atomic.AddInt64(&s.countMessage, 1)
  73.                     atomic.AddInt64(&s.len, int64(size))
  74.                 case false:
  75.                     s := &session{
  76.                         id:         rand.Int(),
  77.                         conn:       addr,
  78.                         expiration: time.Now().UnixNano(),
  79.                         buffer:     make(chan []byte, 64),
  80.                         run: func(wg *sync.WaitGroup, in chan []byte, ip string) {
  81.                             for b := range in {
  82.                                 var m Message
  83.                                 err := json.Unmarshal(b, &m)
  84.                                 if err != nil {
  85.                                     log.Fatal(err)
  86.                                     continue
  87.                                 }
  88.                                 m.Device_ip = ip
  89.                                 out <- m
  90.                             }
  91.                         },
  92.                     }
  93.                     wg.Add(1)
  94.                     s.buffer <- buff[0:size]
  95.                     bufPool.Put(buff)
  96.                     atomic.AddInt64(&s.countMessage, 1)
  97.                     atomic.AddInt64(&s.len, int64(size))
  98.                     go s.run(&wg, s.buffer, s.conn.IP.String())
  99.                     addSession(s)
  100.                 }
  101.             }
  102.         }()
  103.         select {
  104.         case <-ctx.Done():
  105.             wg.Wait()
  106.             log.Println("cancelled")
  107.             err = ctx.Err()
  108.         case err = <-done:
  109.             panic(err)
  110.         }
  111.     }()
  112.  
  113.     return out
  114. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement