Advertisement
Guest User

Untitled

a guest
Sep 9th, 2019
178
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 6.28 KB | None | 0 0
  1. package kinesis
  2.  
  3. import (
  4.     "context"
  5.     "fmt"
  6.     "os"
  7.     "time"
  8.  
  9.     envelope "git.worten.net/wise/message-envelope"
  10.     "git.worten.net/wise/messaging/internal/compression"
  11.     "git.worten.net/wise/messaging/middlewares"
  12.     "github.com/aws/aws-sdk-go/aws"
  13.     "github.com/aws/aws-sdk-go/aws/session"
  14.     "github.com/aws/aws-sdk-go/service/kinesis"
  15.     "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
  16.     "github.com/pkg/errors"
  17.     log "go.worten.net/digital/packages/json-log"
  18. )
  19.  
  20. const (
  21.     messagesPerSecond   = 1000
  22.     maxPayloadPerSecond = 1000000 // MiB
  23.     perSecond           = 1 * time.Second
  24. )
  25.  
  26. type produceMessages struct {
  27.     ctx      context.Context
  28.     messages []envelope.Message
  29.     done     chan error
  30. }
  31.  
  32. // Producer is used to send messages to a kinesis stream.
  33. type Producer struct {
  34.     stream                                string
  35.     client                                kinesisiface.KinesisAPI
  36.     middlewares                           middlewares.Middlewares
  37.     compressEnvelope                      bool
  38.     logger                                log.Logger
  39.     produceMessages                       chan produceMessages
  40.     currentBatchSize, currentSentMessages int
  41.     maxBatchSize, maxMessages             int
  42.     sentAt                                time.Time
  43. }
  44.  
  45. // Config configuration for Kinesis
  46. type Config struct {
  47.     Config           *aws.Config
  48.     Stream           string
  49.     CompressEnvelope bool
  50.     ConsumerName     string
  51.     RiskFactor       float64
  52.     Checkpoint       checkpoint
  53.     Logger           log.Logger
  54. }
  55.  
  56. // NewProducer returns a configured and initialized producer.
  57. func NewProducer(conf Config) (*Producer, error) {
  58.     p := Producer{
  59.         stream:           conf.Stream,
  60.         logger:           conf.Logger,
  61.         compressEnvelope: conf.CompressEnvelope,
  62.         produceMessages:  make(chan produceMessages),
  63.     }
  64.  
  65.     if err := p.init(conf); err != nil {
  66.         return nil, err
  67.     }
  68.  
  69.     go p.publisher()
  70.     return &p, nil
  71. }
  72.  
  73. // PreProduce chains the given middlewares before producing
  74. func (p *Producer) PreProduce(mws ...middlewares.Middleware) {
  75.     p.middlewares = middlewares.Middlewares(mws)
  76. }
  77.  
  78. // Produce a batch of messages to kinesis stream.
  79. func (p *Producer) Produce(ctx context.Context, data []envelope.Message) error {
  80.     request := produceMessages{
  81.         ctx:      ctx,
  82.         messages: data,
  83.         done:     make(chan error),
  84.     }
  85.     p.produceMessages <- request
  86.     err := <-request.done
  87.  
  88.     close(request.done)
  89.     return err
  90. }
  91.  
  92. func (p *Producer) publisher() {
  93.     for {
  94.         request := <-p.produceMessages
  95.         if !p.sentAt.IsZero() && time.Since(p.sentAt) >= perSecond {
  96.             p.currentBatchSize = 0
  97.             p.currentSentMessages = 0
  98.         }
  99.  
  100.         err := p.produce(request.ctx, request.messages)
  101.         request.done <- err
  102.     }
  103. }
  104.  
  105. func (p *Producer) produce(ctx context.Context, messages []envelope.Message) error {
  106.     var (
  107.         currentBatchSize    = p.currentBatchSize
  108.         currentSentMessages = p.currentSentMessages
  109.         batch               = make([]*kinesis.PutRecordsRequestEntry, 0, len(messages))
  110.     )
  111.     for _, message := range messages {
  112.         message, err := p.middlewares.PreProcess(ctx, message)
  113.         if err != nil {
  114.             return errors.Wrap(err, "error while pre processing the message")
  115.         }
  116.  
  117.         payload, err := p.payload(message)
  118.         if err != nil {
  119.             return err
  120.         }
  121.  
  122.         currentBatchSize += messageSize(message.Stream.Key, payload)
  123.         currentSentMessages++
  124.         if currentBatchSize < p.maxBatchSize || currentSentMessages < p.maxMessages {
  125.             batch = append(batch, &kinesis.PutRecordsRequestEntry{
  126.                 PartitionKey: aws.String(message.Stream.Key),
  127.                 Data:         payload,
  128.             })
  129.             continue
  130.         }
  131.  
  132.         if !p.sentAt.IsZero() && time.Since(p.sentAt) < perSecond {
  133.             time.Sleep(perSecond - time.Since(p.sentAt))
  134.         }
  135.  
  136.         if err := p.sendBatch(ctx, batch); err != nil {
  137.             return err
  138.         }
  139.  
  140.         batch = make([]*kinesis.PutRecordsRequestEntry, 0, len(messages))
  141.         batch = append(batch, &kinesis.PutRecordsRequestEntry{
  142.             PartitionKey: aws.String(message.Stream.Key),
  143.             Data:         payload,
  144.         })
  145.         currentBatchSize = messageSize(message.Stream.Key, payload)
  146.         currentSentMessages = 1
  147.     }
  148.  
  149.     p.currentBatchSize = currentBatchSize
  150.     p.currentSentMessages = currentSentMessages
  151.     return p.sendBatch(ctx, batch)
  152. }
  153.  
  154. func (p *Producer) sendBatch(ctx context.Context, records []*kinesis.PutRecordsRequestEntry) error {
  155.     if len(records) == 0 {
  156.         return nil
  157.     }
  158.  
  159.     result, err := p.client.PutRecordsWithContext(ctx, &kinesis.PutRecordsInput{
  160.         Records:    records,
  161.         StreamName: aws.String(p.stream),
  162.     })
  163.     if err != nil {
  164.         return errors.Wrap(err, "error while sending batch to kinesis stream")
  165.     }
  166.  
  167.     if *result.FailedRecordCount != 0 {
  168.         return fmt.Errorf("failed to send %d of %d messages", *result.FailedRecordCount, len(records))
  169.     }
  170.  
  171.     p.sentAt = time.Now()
  172.     return nil
  173. }
  174.  
  175. func (p *Producer) payload(message envelope.Message) ([]byte, error) {
  176.     messageRaw, err := message.Marshal()
  177.     if err != nil {
  178.         return nil, errors.Wrap(err, "error while marshalling the message")
  179.     }
  180.  
  181.     if !p.compressEnvelope {
  182.         return messageRaw, nil
  183.     }
  184.  
  185.     payload, err := compression.Compress(messageRaw)
  186.     if err != nil {
  187.         return nil, errors.Wrap(err, "error while compress payload")
  188.     }
  189.  
  190.     return payload, nil
  191. }
  192.  
  193. // Init check if the struct has everything needed to execute.
  194. func (p *Producer) init(conf Config) error {
  195.     config := conf.Config
  196.     if config == nil {
  197.         return errors.New("missing configurations")
  198.     }
  199.  
  200.     if p.stream == "" {
  201.         return errors.New("missing stream")
  202.     }
  203.  
  204.     session, err := session.NewSession(config)
  205.     if err != nil {
  206.         return errors.Wrap(err, "failed to create a new session for kinesis")
  207.     }
  208.  
  209.     p.client = kinesis.New(session)
  210.     describe, err := p.client.DescribeStream(&kinesis.DescribeStreamInput{
  211.         StreamName: aws.String(p.stream),
  212.         Limit:      aws.Int64(100),
  213.     })
  214.     if err != nil {
  215.         return errors.Wrap(err, "could not obtain stream description")
  216.     }
  217.  
  218.     if conf.RiskFactor == 0 {
  219.         conf.RiskFactor = 0.8
  220.     }
  221.  
  222.     shardCount := len(describe.StreamDescription.Shards)
  223.     p.maxMessages = int(float64(messagesPerSecond*shardCount) * conf.RiskFactor)
  224.     p.maxBatchSize = int(float64(maxPayloadPerSecond*shardCount) * conf.RiskFactor)
  225.     if p.logger == nil {
  226.         p.logger = log.New(log.Conf{
  227.             Writer: os.Stdout,
  228.             Level:  log.ErrorLevel,
  229.         })
  230.     }
  231.  
  232.     return nil
  233. }
  234.  
  235. func messageSize(streamKey string, payload []byte) int {
  236.     return len(payload) + len(streamKey)
  237. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement