Guest User

Untitled

a guest
Mar 22nd, 2018
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.07 KB | None | 0 0
  1. func (n *natsEventSource) allEvents(callback func(eventWithTs) error) {
  2. errChan := make(chan error, 1)
  3. tick := make(chan struct{})
  4. timeout := time.Duration(n.config.messageTimeout) * time.Second
  5. msgHandler := func(m *stan.Msg) {
  6. evt, err := unmarshalEventWithTs(m.Data)
  7. if err != nil {
  8. errChan <- errors.Wrap(err, fmt.Sprintf("unmarshalling msg with seqno: %d", m.Sequence))
  9. return
  10. }
  11. if err := callback(*evt); err != nil {
  12. errChan <- err
  13. return
  14. }
  15. tick <- struct{}{}
  16. }
  17. sub, err := n.conn.QueueSubscribe(n.config.subject, n.config.qgroup, msgHandler, stan.DurableName(n.config.clientID), stan.StartAt(pb.StartPosition_First), stan.MaxInflight(stan.DefaultMaxInflight))
  18. if err != nil {
  19. n.err = err
  20. _ = sub.Close()
  21. return
  22. }
  23. lastMessage := time.NewTimer(timeout)
  24. LOOP:
  25. for {
  26. select {
  27. case <-tick:
  28. if !lastMessage.Stop() {
  29. <-lastMessage.C
  30. }
  31. lastMessage.Reset(timeout)
  32. case e := <-errChan:
  33. n.err = e
  34. _ = sub.Close()
  35. break LOOP
  36. case <-lastMessage.C:
  37. err = sub.Close()
  38. if n.err == nil {
  39. n.err = err
  40. }
  41. break LOOP
  42. }
  43. }
  44. }
Add Comment
Please, Sign In to add comment