Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package kinesis
- import (
- "context"
- "fmt"
- "os"
- "time"
- envelope "git.worten.net/wise/message-envelope"
- "git.worten.net/wise/messaging/internal/compression"
- "git.worten.net/wise/messaging/middlewares"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/session"
- "github.com/aws/aws-sdk-go/service/kinesis"
- "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
- "github.com/pkg/errors"
- log "go.worten.net/digital/packages/json-log"
- )
- const (
- messagesPerSecond = 1000
- maxPayloadPerSecond = 1000000 // MiB
- perSecond = 1 * time.Second
- )
- type produceMessages struct {
- ctx context.Context
- messages []envelope.Message
- done chan error
- }
- // Producer is used to send messages to a kinesis stream.
- type Producer struct {
- stream string
- client kinesisiface.KinesisAPI
- middlewares middlewares.Middlewares
- compressEnvelope bool
- logger log.Logger
- produceMessages chan produceMessages
- currentBatchSize, currentSentMessages int
- maxBatchSize, maxMessages int
- sentAt time.Time
- }
- // Config configuration for Kinesis
- type Config struct {
- Config *aws.Config
- Stream string
- CompressEnvelope bool
- ConsumerName string
- RiskFactor float64
- Checkpoint checkpoint
- Logger log.Logger
- }
- // NewProducer returns a configured and initialized producer.
- func NewProducer(conf Config) (*Producer, error) {
- p := Producer{
- stream: conf.Stream,
- logger: conf.Logger,
- compressEnvelope: conf.CompressEnvelope,
- produceMessages: make(chan produceMessages),
- }
- if err := p.init(conf); err != nil {
- return nil, err
- }
- go p.publisher()
- return &p, nil
- }
- // PreProduce chains the given middlewares before producing
- func (p *Producer) PreProduce(mws ...middlewares.Middleware) {
- p.middlewares = middlewares.Middlewares(mws)
- }
- // Produce a batch of messages to kinesis stream.
- func (p *Producer) Produce(ctx context.Context, data []envelope.Message) error {
- request := produceMessages{
- ctx: ctx,
- messages: data,
- done: make(chan error),
- }
- p.produceMessages <- request
- err := <-request.done
- close(request.done)
- return err
- }
- func (p *Producer) publisher() {
- for {
- request := <-p.produceMessages
- if !p.sentAt.IsZero() && time.Since(p.sentAt) >= perSecond {
- p.currentBatchSize = 0
- p.currentSentMessages = 0
- }
- err := p.produce(request.ctx, request.messages)
- request.done <- err
- }
- }
- func (p *Producer) produce(ctx context.Context, messages []envelope.Message) error {
- var (
- currentBatchSize = p.currentBatchSize
- currentSentMessages = p.currentSentMessages
- batch = make([]*kinesis.PutRecordsRequestEntry, 0, len(messages))
- )
- for _, message := range messages {
- message, err := p.middlewares.PreProcess(ctx, message)
- if err != nil {
- return errors.Wrap(err, "error while pre processing the message")
- }
- payload, err := p.payload(message)
- if err != nil {
- return err
- }
- currentBatchSize += messageSize(message.Stream.Key, payload)
- currentSentMessages++
- if currentBatchSize < p.maxBatchSize || currentSentMessages < p.maxMessages {
- batch = append(batch, &kinesis.PutRecordsRequestEntry{
- PartitionKey: aws.String(message.Stream.Key),
- Data: payload,
- })
- continue
- }
- if !p.sentAt.IsZero() && time.Since(p.sentAt) < perSecond {
- time.Sleep(perSecond - time.Since(p.sentAt))
- }
- if err := p.sendBatch(ctx, batch); err != nil {
- return err
- }
- batch = make([]*kinesis.PutRecordsRequestEntry, 0, len(messages))
- batch = append(batch, &kinesis.PutRecordsRequestEntry{
- PartitionKey: aws.String(message.Stream.Key),
- Data: payload,
- })
- currentBatchSize = messageSize(message.Stream.Key, payload)
- currentSentMessages = 1
- }
- p.currentBatchSize = currentBatchSize
- p.currentSentMessages = currentSentMessages
- return p.sendBatch(ctx, batch)
- }
- func (p *Producer) sendBatch(ctx context.Context, records []*kinesis.PutRecordsRequestEntry) error {
- if len(records) == 0 {
- return nil
- }
- result, err := p.client.PutRecordsWithContext(ctx, &kinesis.PutRecordsInput{
- Records: records,
- StreamName: aws.String(p.stream),
- })
- if err != nil {
- return errors.Wrap(err, "error while sending batch to kinesis stream")
- }
- if *result.FailedRecordCount != 0 {
- return fmt.Errorf("failed to send %d of %d messages", *result.FailedRecordCount, len(records))
- }
- p.sentAt = time.Now()
- return nil
- }
- func (p *Producer) payload(message envelope.Message) ([]byte, error) {
- messageRaw, err := message.Marshal()
- if err != nil {
- return nil, errors.Wrap(err, "error while marshalling the message")
- }
- if !p.compressEnvelope {
- return messageRaw, nil
- }
- payload, err := compression.Compress(messageRaw)
- if err != nil {
- return nil, errors.Wrap(err, "error while compress payload")
- }
- return payload, nil
- }
- // Init check if the struct has everything needed to execute.
- func (p *Producer) init(conf Config) error {
- config := conf.Config
- if config == nil {
- return errors.New("missing configurations")
- }
- if p.stream == "" {
- return errors.New("missing stream")
- }
- session, err := session.NewSession(config)
- if err != nil {
- return errors.Wrap(err, "failed to create a new session for kinesis")
- }
- p.client = kinesis.New(session)
- describe, err := p.client.DescribeStream(&kinesis.DescribeStreamInput{
- StreamName: aws.String(p.stream),
- Limit: aws.Int64(100),
- })
- if err != nil {
- return errors.Wrap(err, "could not obtain stream description")
- }
- if conf.RiskFactor == 0 {
- conf.RiskFactor = 0.8
- }
- shardCount := len(describe.StreamDescription.Shards)
- p.maxMessages = int(float64(messagesPerSecond*shardCount) * conf.RiskFactor)
- p.maxBatchSize = int(float64(maxPayloadPerSecond*shardCount) * conf.RiskFactor)
- if p.logger == nil {
- p.logger = log.New(log.Conf{
- Writer: os.Stdout,
- Level: log.ErrorLevel,
- })
- }
- return nil
- }
- func messageSize(streamKey string, payload []byte) int {
- return len(payload) + len(streamKey)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement