Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- func (s *Subscriber) SubscribeToRidsStatuses(
- ctx context.Context, wg *sync.WaitGroup, processor registrar.RidsStatusesProcessor,
- ) error {
- log := s.log.
- AddName("receiving_rids_statuses").
- AddName(ctxparam.GoID(ctx)).
- AddData(logs.XRequestID, ctxparam.RequestID(ctx))
- messages, err := s.kafka.Subscribe(ctx, s.config.SridsStatusesTopic)
- if err != nil {
- return fmt.Errorf("kafka subscribing: %w", err)
- }
- wg.Add(1)
- go func() {
- defer wg.Done()
- for m := range messages {
- var err error
- pbUpdate := &pb.EventRIDStatusUpdate{}
- err = protojson.Unmarshal(m.Payload, pbUpdate)
- if err != nil {
- log.Errorf("unmarshalling %q rid status update: %s", m.UUID, err)
- continue
- }
- if err := validateRIDStatusUpdate(pbUpdate); err != nil {
- log.Errorf("validation rid status update: %s", err)
- continue
- }
- msg := registrar.NewMessage(pbUpdate.Srid.Value, int(pbUpdate.Status.Value))
- wg.Add(1)
- go func() {
- defer wg.Done()
- processor(ctxparam.WithNewGoID(ctx), msg)
- }()
- }
- }()
- return nil
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement