Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- type Consumer interface {
- func Start(context.Context) (<-chan interface{}, <-chan error)
- }
- func (c *someConcreteImplementation) Start(ctx context.Context) (<-chan interface{}, <- chan error) {
- iChan := make(chan interface{})
- eChan := make(chan error)
- go func() {
- defer close(iChan)
- defer close(eChan)
- for {
- select {
- case <-ctx.Done():
- // do stuff to try to gracefully terminate
- return
- default:
- msg, err := getSomeThing()
- if err != nil {
- eChan <- err
- } else {
- iChan <- msg
- }
- }
- }
- }()
- return iChan, eChan
- }
- ctx, cancel := context.WithCancel()
- defer cancel()
- c, e := myConcreteConsumerImplementation.Start(ctx)
- for {
- select {
- case msg, ok := <- c:
- if !ok {
- return // the channel has been closed
- }
- // .. do something with msg
- case err := <- e:
- fmt.Println(err)
- cancel() // probably do better stuff than this in the real world
- }
- }
Add Comment
Please, Sign In to add comment