Guest User

Untitled

a guest
Feb 22nd, 2018
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.19 KB | None | 0 0
  1. type Consumer interface {
  2. func Start(context.Context) (<-chan interface{}, <-chan error)
  3. }
  4.  
  5. func (c *someConcreteImplementation) Start(ctx context.Context) (<-chan interface{}, <- chan error) {
  6. iChan := make(chan interface{})
  7. eChan := make(chan error)
  8. go func() {
  9. defer close(iChan)
  10. defer close(eChan)
  11. for {
  12. select {
  13. case <-ctx.Done():
  14. // do stuff to try to gracefully terminate
  15. return
  16. default:
  17. msg, err := getSomeThing()
  18. if err != nil {
  19. eChan <- err
  20. } else {
  21. iChan <- msg
  22. }
  23. }
  24. }
  25. }()
  26. return iChan, eChan
  27. }
  28.  
  29. ctx, cancel := context.WithCancel()
  30. defer cancel()
  31. c, e := myConcreteConsumerImplementation.Start(ctx)
  32. for {
  33. select {
  34. case msg, ok := <- c:
  35. if !ok {
  36. return // the channel has been closed
  37. }
  38. // .. do something with msg
  39. case err := <- e:
  40. fmt.Println(err)
  41. cancel() // probably do better stuff than this in the real world
  42. }
  43. }
Add Comment
Please, Sign In to add comment