Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- func serve(ctx context.Context, addr string, port int) <-chan Message {
- type session struct {
- id int
- conn *net.UDPAddr
- len int64
- countMessage int64
- expiration int64
- buffer chan []byte
- run func(wg *sync.WaitGroup, in chan []byte, ip string)
- }
- var (
- out = make(chan Message, 64)
- done = make(chan error, 1)
- wg sync.WaitGroup
- localAddres = &net.UDPAddr{IP: net.ParseIP(addr), Port: port}
- bufPool = sync.Pool{New: func() interface{} { return make([]byte, bufferSize) }}
- sessions []*session
- // TODO может будет не линейный поиск, пока посмотрим так
- getSession = func(addr *net.UDPAddr) (*session, bool) {
- for _, s := range sessions {
- if reflect.DeepEqual(s.conn, addr) {
- return s, true
- }
- }
- return nil, false
- }
- addSession = func(s *session) {
- fmt.Printf("Add new session %s:%d id: %d \n", s.conn.IP.String(), s.conn.Port, s.id)
- sessions = append(sessions, s)
- }
- removeSession = func(sess *session) {
- for i, s := range sessions {
- if s.id == sess.id {
- fmt.Printf("Remove session %s:%d id: %d \n", s.conn.IP.String(), s.conn.Port, s.id)
- sessions = sessions[:i+copy(sessions[i:], sessions[i+1:])]
- }
- }
- }
- gc = func() {
- for {
- <-time.After(time.Duration(3 * time.Second))
- for _, s := range sessions {
- if time.Now().UnixNano() > s.expiration && s.expiration > 0 {
- removeSession(s)
- }
- }
- }
- }
- )
- go gc()
- go func() {
- pc, err := net.ListenUDP("udp", localAddres)
- if err != nil {
- done <- err
- }
- defer pc.Close()
- go func() {
- for {
- buff := bufPool.Get().([]byte)
- size, addr, err := pc.ReadFromUDP(buff[0:])
- if err != nil {
- done <- err
- return
- }
- switch s, ok := getSession(addr); ok {
- case true:
- s.buffer <- buff[0:size]
- bufPool.Put(buff)
- s.expiration = time.Now().Add(time.Duration(time.Second * 10)).UnixNano()
- atomic.AddInt64(&s.countMessage, 1)
- atomic.AddInt64(&s.len, int64(size))
- case false:
- s := &session{
- id: rand.Int(),
- conn: addr,
- expiration: time.Now().UnixNano(),
- buffer: make(chan []byte, 64),
- run: func(wg *sync.WaitGroup, in chan []byte, ip string) {
- for b := range in {
- var m Message
- err := json.Unmarshal(b, &m)
- if err != nil {
- log.Fatal(err)
- continue
- }
- m.Device_ip = ip
- out <- m
- }
- },
- }
- wg.Add(1)
- s.buffer <- buff[0:size]
- bufPool.Put(buff)
- atomic.AddInt64(&s.countMessage, 1)
- atomic.AddInt64(&s.len, int64(size))
- go s.run(&wg, s.buffer, s.conn.IP.String())
- addSession(s)
- }
- }
- }()
- select {
- case <-ctx.Done():
- wg.Wait()
- log.Println("cancelled")
- err = ctx.Err()
- case err = <-done:
- panic(err)
- }
- }()
- return out
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement