Advertisement
ddsKrtm

Untitled

Sep 17th, 2019
127
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 4.12 KB | None | 0 0
  1. package client
  2.  
  3. import (
  4.     "context"
  5.     "encoding/json"
  6.     "errors"
  7.     "github.com/satori/go.uuid"
  8.     "google.golang.org/grpc"
  9.     "io"
  10.     "log"
  11.     "pds-sample/service"
  12. )
  13.  
  14. // Buffer size for preloaded task
  15. const _bufferSize = 100
  16.  
  17. type PDSAgent interface {
  18.     Start() error
  19.     GetTaskChannel() chan<- map[string]interface{} // OLD code
  20.     GetReceiveChannel() chan int32
  21.     ChangeErrorRate(value float64) error
  22. }
  23.  
  24. type agent struct {
  25.     RcTask chan map[string]interface{}
  26.     // inner entities
  27.     authConf  *AuthConf
  28.     pdsConf   *PDSConf
  29.     client    service.PDSClient
  30.     rcErrRate chan float64
  31.     done      chan interface{}
  32.     ReceiveCh chan int32
  33. }
  34.  
  35. func NewConn(hostCfg *HostConf) (*grpc.ClientConn, error) {
  36.     var additionalDealOpt = make([]grpc.DialOption, 0)
  37.     if !hostCfg.UseTls {
  38.         additionalDealOpt = append(additionalDealOpt, grpc.WithInsecure())
  39.     }
  40.     conn, err := grpc.Dial(hostCfg.GetAddr(), additionalDealOpt...)
  41.     if err != nil {
  42.         return nil, err
  43.     }
  44.     return conn, err
  45. }
  46.  
  47. func NewAgent(conn *grpc.ClientConn, authConf *AuthConf, pdsConf *PDSConf) (PDSAgent, error) {
  48.     if pdsConf == nil {
  49.         return nil, errors.New("invalid pdsConf argument")
  50.     }
  51.     if err := pdsConf.Validate(); err != nil {
  52.         return nil, err
  53.     }
  54.     c := service.NewPDSClient(conn)
  55.  
  56.     res := &agent{
  57.         client:    c,
  58.         pdsConf:   pdsConf,
  59.         authConf:  authConf,
  60.         RcTask:    make(chan map[string]interface{}, _bufferSize),
  61.         rcErrRate: make(chan float64),
  62.         done:      make(chan interface{}),
  63.         ReceiveCh: make(chan int32),
  64.     }
  65.     return res, nil
  66. }
  67.  
  68. func (c *agent) GetTaskChannel() chan<- map[string]interface{} {
  69. //func (c *agent) GetTaskChannel() chan map[string]interface{} {
  70.     return c.RcTask
  71. }
  72.  
  73. func (c *agent) GetReceiveChannel() chan int32 {
  74.     return c.ReceiveCh
  75. }
  76.  
  77. func (c *agent) ChangeErrorRate(value float64) error {
  78.     if value <= 0.0 || value >= 1.0 {
  79.         return errors.New("error rate should be greater then 0 but less then 1")
  80.     }
  81.     if value > 0.5 {
  82.         log.Println("WARNING: error rate is very high:", value)
  83.     }
  84.     c.rcErrRate <- value
  85.     return nil
  86. }
  87.  
  88. func (c *agent) Stop() {
  89.     close(c.done)
  90. }
  91.  
  92. func (c *agent) Start() error {
  93.     initConf := service.RequestMessage{
  94.         Type: service.RequestMessage_INIT,
  95.         Init: &service.Init{
  96.             InitStat: &service.Statistic{
  97.                 AvgTimeTalkSec:    c.pdsConf.AvgTimeTalkSec,
  98.                 PercentSuccessful: c.pdsConf.PercentSuccessful,
  99.             },
  100.             AccountId:        c.authConf.AccountID,
  101.             ApiKey:           c.authConf.ApiKey,
  102.             RuleId:           c.pdsConf.RuleID,
  103.             ReferenceIp:      c.pdsConf.ReferenceIP,
  104.             QueueId:          c.pdsConf.QueueID,
  105.             MaximumErrorRate: c.pdsConf.MaximumErrorRate,
  106.             SessionId:        c.pdsConf.SessionID,
  107.         },
  108.     }
  109.  
  110.     cntx := context.Background()
  111.     stream, err := c.client.Start(cntx)
  112.     if err != nil {
  113.         return err
  114.     }
  115.     err = stream.Send(&initConf)
  116.     if err != nil {
  117.         return err
  118.     }
  119.  
  120.     waitc := make(chan error)
  121.     go func() {
  122.         defer close(waitc)
  123.         for {
  124.             in, err := stream.Recv()
  125.             if err == io.EOF {
  126.                 return
  127.             }
  128.             if err != nil {
  129.                 waitc <- err
  130.                 return
  131.             }
  132.             log.Println("Receive message:", in)
  133.             switch in.Type {
  134.             case service.ServiceMessage_INIT_RESPONSE:
  135.                 log.Println("success init ...")
  136.             case service.ServiceMessage_GET_TASK:
  137.                 log.Println("get tasks ... ", in.Request.Count)
  138.                 toConsume := in.Request.Count
  139.  
  140.                 if toConsume == 0 {
  141.                     continue
  142.                 }
  143.  
  144.                 // Подаем в канал количество которое пришло
  145.                 c.ReceiveCh <- in.Request.Count
  146.  
  147.                 for customData := range c.RcTask {
  148.  
  149.                     //fmt.Printf("%#v\n", c.RcTask); _OLEG_
  150.                     toConsume--
  151.                     b, _ := json.Marshal(customData)
  152.                     s := string(b)
  153.  
  154.                     //log.Println("customData:  ", s); _OLEG_
  155.  
  156.                     cd := service.PutTask{
  157.                         CustomData: []byte(s),
  158.                         TaskUUID:   uuid.NewV4().String(),
  159.                     }
  160.  
  161.                     err := stream.Send(&service.RequestMessage{
  162.                         Type: service.RequestMessage_PUT_TASK,
  163.                         Task: &cd,
  164.                     })
  165.                     if err != nil {
  166.                         waitc <- err
  167.                         return
  168.                     }
  169.                     if toConsume == 0 {
  170.                         break
  171.                     }
  172.                 }
  173.             }
  174.         }
  175.     }()
  176.     select {
  177.     case err := <-waitc:
  178.         return err
  179.     case <-c.done:
  180.         stream.CloseSend()
  181.     }
  182.     return nil
  183. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement