@jajabin

Как более правильно определять сессии UDP соединения?

Прослушивая сообщения с udp сокета,хотелось бы,как-то определять откуда приходят пакеты и раскидывать по сессиям, чтобы получать более детальный отчёт по пришедшим данным, я сделал просто в лоб, поиском текущей сессии и записи в её канал, хотелось бы узнать есть ли более элегантные методы ?
func serve(ctx context.Context, addr string, port int) <-chan Message {
	type session struct {
		conn         *net.UDPAddr
		expiration   int64
		buffer       chan []byte
		run          func()
	}

	var (
		out         = make(chan Message, 64)
		done        = make(chan error, 1)
		wg          sync.WaitGroup
		localAddres = &net.UDPAddr{IP: net.ParseIP(addr), Port: port}
		bufPool     = sync.Pool{New: func() interface{} { return make([]byte, bufferSize) }}
		sessions    []*session

		// TODO может будет не линейный поиск, пока посмотрим так
		getSession = func(addr *net.UDPAddr) (*session, bool) {
			for _, s := range sessions {
				if reflect.DeepEqual(s.conn, addr) {
					return s, true
				}
			}
			return nil, false
		}
		addSession = func(s *session) {
			sessions = append(sessions, s)
		}
		removeSession = func(sess *session) {
			for i, s := range sessions {
				if s.id == sess.id {
					sessions = sessions[:i+copy(sessions[i:], sessions[i+1:])]
				}
			}
		}
		gc = func() {
			for {
				<-time.After(time.Duration(3 * time.Second))
				for _, s := range sessions {
					if time.Now().UnixNano() > s.expiration && s.expiration > 0 {
						removeSession(s)
					}
				}
			}
		}
	)
	go gc()
	go func() {
		pc, err := net.ListenUDP("udp", localAddres)
		if err != nil {
			done <- err
		}
		defer pc.Close()
		go func() {
			for {
				buff := bufPool.Get().([]byte)
				size, addr, err := pc.ReadFromUDP(buff[0:])
				if err != nil {
					done <- err
					return
				}
				switch s, ok := getSession(addr); ok {
				case true:
					s.buffer <- buff[0:size]
					bufPool.Put(buff)
					s.expiration = time.Now().Add(time.Duration(time.Second * 10)).UnixNano()
					atomic.AddInt64(&s.countMessage, 1)
					atomic.AddInt64(&s.len, int64(size))
				case false:
					s := &session{
						id:         rand.Int(),
						conn:       addr,
						expiration: time.Now().UnixNano(),
						buffer:     make(chan []byte, 64),
						run: func() {//working},
					}
					wg.Add(1)
          go s.run()
					addSession(s)
					s.buffer <- buff[0:size]
					bufPool.Put(buff)
					
				}
			}
		}()
		select {
		case <-ctx.Done():
			wg.Wait()
			err = ctx.Err()
		case err = <-done:
			panic(err)
		}
	}()

	return out
}
  • Вопрос задан
  • 225 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы