Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "bytes"
- "errors"
- "fmt"
- "io"
- "os"
- "runtime"
- "sync"
- "time"
- "github.com/valyala/bytebufferpool"
- )
- var bufferPool = sync.Pool{
- New: func() interface{} {
- b := make([]byte, 4096)
- return &b
- },
- }
- func main() {
- //print infinite current number of goroutines
- go func() {
- for {
- time.Sleep(time.Second * 2)
- fmt.Println(runtime.NumGoroutine())
- }
- }()
- //spawn infinite goroutines of receive
- for {
- go func() {
- bb := bytebufferpool.Get()
- defer bytebufferpool.Put(bb)
- err := Receive(bb, []byte("#"), time.Second*2)
- if err != nil {
- fmt.Println(err)
- }
- }()
- time.Sleep(time.Second * 4)
- }
- runtime.Goexit()
- }
- func Receive(bb *bytebufferpool.ByteBuffer, delimiter []byte, timeout time.Duration) (err error) {
- rx := os.Stdin
- rx.SetDeadline(time.Now().Add(time.Second * 4))
- donec := make(chan error, 1)
- cancelc := make(chan struct{}, 1)
- go func() {
- donec <- receive(cancelc, rx, bb, delimiter)
- }()
- select {
- case err = <-donec:
- break
- case <-time.After(timeout):
- cancelc <- struct{}{}
- return errors.New("timeout")
- }
- return err
- }
- //receive reads from s till the delimiter is reached
- func receive(cancelc chan struct{}, s io.Reader, bb *bytebufferpool.ByteBuffer, delimiter []byte) error {
- buf := bufferPool.Get().(*[]byte)
- defer bufferPool.Put(buf)
- b := *buf
- for {
- select {
- case <-cancelc:
- return errors.New("timeout")
- default:
- n, err := s.Read(b) // read block infinite if no data is ever send
- //How to stop it?
- if n > 0 {
- bb.Write(b[:n])
- if err != nil {
- if err == io.EOF {
- if i := bytes.Index(bb.Bytes(), delimiter); i != -1 {
- bb.Set(bb.Bytes()[:i])
- return nil
- }
- }
- return err
- }
- if i := bytes.Index(bb.Bytes(), delimiter); i != -1 {
- bb.Set(bb.Bytes()[:i])
- return nil
- }
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement