Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- func (n *natsEventSource) allEvents(callback func(eventWithTs) error) {
- errChan := make(chan error, 1)
- tick := make(chan struct{})
- timeout := time.Duration(n.config.messageTimeout) * time.Second
- msgHandler := func(m *stan.Msg) {
- evt, err := unmarshalEventWithTs(m.Data)
- if err != nil {
- errChan <- errors.Wrap(err, fmt.Sprintf("unmarshalling msg with seqno: %d", m.Sequence))
- return
- }
- if err := callback(*evt); err != nil {
- errChan <- err
- return
- }
- tick <- struct{}{}
- }
- sub, err := n.conn.QueueSubscribe(n.config.subject, n.config.qgroup, msgHandler, stan.DurableName(n.config.clientID), stan.StartAt(pb.StartPosition_First), stan.MaxInflight(stan.DefaultMaxInflight))
- if err != nil {
- n.err = err
- _ = sub.Close()
- return
- }
- lastMessage := time.NewTimer(timeout)
- LOOP:
- for {
- select {
- case <-tick:
- if !lastMessage.Stop() {
- <-lastMessage.C
- }
- lastMessage.Reset(timeout)
- case e := <-errChan:
- n.err = e
- _ = sub.Close()
- break LOOP
- case <-lastMessage.C:
- err = sub.Close()
- if n.err == nil {
- n.err = err
- }
- break LOOP
- }
- }
- }
Add Comment
Please, Sign In to add comment