Guest User

Untitled

a guest
Apr 16th, 2025
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 5.82 KB | None | 0 0
  1. package bpipes
  2.  
  3. import (
  4.     "context"
  5.     "errors"
  6.     "time"
  7.  
  8.     "golang.org/x/time/rate"
  9. )
  10.  
  11. var ErrClosing = errors.New("pipeline closing")
  12. var ErrHeadClosed = errors.New("pipeline head channel closed")
  13.  
  14. type Stage interface {
  15.     // A function that modifies some data and returns any error encountered.
  16.     //  - The Effect function SHOULD interrupt any blocking calls upon ctx cancellation.
  17.     //  - If the context is cancelled, the Effect function SHOULD return an errors that 'Is'
  18.     //    context.Cancelled
  19.     Effect(context.Context, any) error
  20.  
  21.     // The channel buffer size for this stage's output. If this is the final stage,
  22.     // this is the buffer size of the tail of the pipe-line.
  23.     OutputBufferSize() int
  24. }
  25.  
  26. type ThrottlerStage struct {
  27.     limiter *rate.Limiter
  28. }
  29.  
  30. func (p *ThrottlerStage) Effect(ctx context.Context, _ any) error {
  31.     return p.limiter.Wait(ctx)
  32. }
  33. func (p *ThrottlerStage) OutputBufferSize() int {
  34.     return 0
  35. }
  36. func (p *ThrottlerStage) SetLimit(limit rate.Limit) {
  37.     p.limiter.SetLimit(rate.Limit(limit))
  38. }
  39. func (p *ThrottlerStage) SetBurst(burst uint16) {
  40.     p.limiter.SetBurst(int(burst))
  41. }
  42.  
  43. // Makes a pipeline stage that can throttle the output of the previous stage to a certain freq.
  44. // the throughput at any instant may fall below this rate, or may burst if burst > 1
  45. func NewPipeLineThrottler(ratePerSecond rate.Limit, burst uint16) *ThrottlerStage {
  46.     return &ThrottlerStage{
  47.         limiter: rate.NewLimiter(ratePerSecond, int(burst)),
  48.     }
  49. }
  50.  
  51. type PauserStage struct {
  52.     *ThrottlerStage
  53. }
  54.  
  55. func (p *PauserStage) SetPaused(isPaused bool) {
  56.     if isPaused {
  57.         p.SetLimit(0)
  58.     } else {
  59.         p.SetLimit(rate.Inf)
  60.     }
  61. }
  62.  
  63. // A pipeline pauser is a throttler where the rate can be toggled
  64. // between 0 and inf by the set paused function.
  65. // The default state is paused (a rate of 0).
  66. func NewPauserStage() *PauserStage {
  67.     return &PauserStage{
  68.         ThrottlerStage: NewPipeLineThrottler(0, 1),
  69.     }
  70. }
  71.  
  72. // Creates a concurrent pipeline that autoamtically manages all goroutines and
  73. // channels needed. The caller is only responsible for closing the pipe-head — it will
  74. // never automatically close as a result of calling this function. The caller MUST
  75. // tear down the pipe-line by either cancelling its context, or by closing the pipe-head.
  76. //
  77. // The function returns a channel representing the tail of the pipe-line, and a channel
  78. // that will send errors from any pipe-line stage effect.
  79. //
  80. // If a stage effect encounters any error, it will first send the error to the error
  81. // channel and begin a teardown of its subsequent pipe-line stages by closing its
  82. // own output channel. Next, the errored stage will enter "sink" mode. The pipe-line
  83. // will continue to pull data from the pipe-head and sink it at the errored stage
  84. // until the pipe-head is closed OR the pipe-line context is cancelled. The stage will
  85. // continue to consume input data from the previous stage at a throttled rate but
  86. // does not do work or output data.
  87. //   - Prevents deadlocks by ensuring the pipeline can drain
  88. //   - Maintains backpressure to avoid overwhelming upstream stages
  89. //   - Minimizes resource usage during error state
  90. func NewPipeline[T any](ctx context.Context, pipeHead <-chan T, stages ...Stage) (<-chan T, <-chan error) {
  91.     var pipeTail chan T
  92.     channelError := make(chan error, 1+len(stages)) // every stage MUST (including stage "-1") be able to error without blocking
  93.  
  94.     nextInput := make(chan T, 1)
  95.  
  96.     if len(stages) == 0 {
  97.         pipeTail = make(chan T, 1)
  98.         nextInput = pipeTail
  99.     }
  100.  
  101.     // pipeline-specific cancel to interrupt blocked pipeline effects
  102.     pipeLineContext, pipeLineContextCancel := context.WithCancelCause(ctx)
  103.  
  104.     // stage "-1" just cancels the context if the input is closed to reach stopped stages.
  105.     go func(head <-chan T, next chan<- T, ctx context.Context, cancelStages context.CancelCauseFunc) {
  106.         defer close(next)
  107.  
  108.         for {
  109.             select {
  110.             case <-ctx.Done():
  111.                 err := errors.Join(context.Cause(ctx), ctx.Err())
  112.                 cancelStages(err)
  113.                 channelError <- errors.Join(err, ErrClosing)
  114.                 return
  115.             default:
  116.             }
  117.  
  118.             buf, ok := <-head
  119.  
  120.             // Unblock stage effects upon pipe-head closure to prevent deadlock.
  121.             // This has the effect of causing all stage effects that honor
  122.             // context cancellation to enter "sink" mode while waiting for their
  123.             // previous stage to close.
  124.             if !ok {
  125.                 err := errors.Join(ErrClosing, ErrHeadClosed)
  126.                 cancelStages(err)
  127.                 channelError <- err
  128.                 return
  129.             }
  130.  
  131.             next <- buf
  132.         }
  133.     }(pipeHead, nextInput, ctx, pipeLineContextCancel)
  134.  
  135.     // stages [0, n]
  136.     for i, stage := range stages {
  137.         currentOutput := make(chan T, stage.OutputBufferSize())
  138.  
  139.         if i+1 == len(stages) {
  140.             pipeTail = currentOutput
  141.         }
  142.  
  143.         go func(ctx context.Context, in <-chan T, out chan<- T, plStage Stage) {
  144.             defer close(out)
  145.  
  146.         pumpData:
  147.             for {
  148.                 select {
  149.                 case <-ctx.Done():
  150.                     break pumpData
  151.                 default:
  152.                 }
  153.  
  154.                 data, ok := <-in
  155.  
  156.                 // We don't have to enter sink mode if previous stage
  157.                 // just got torn down. Immediately continue tearing down
  158.                 // the pipe-line from this stage onward; [i,n].
  159.                 if !ok {
  160.                     return
  161.                 }
  162.  
  163.                 err := plStage.Effect(ctx, data)
  164.  
  165.                 if err != nil {
  166.                     if !errors.Is(err, errors.Join(context.Canceled, ErrClosing, ErrHeadClosed)) { // stage -1 sends these errors already.
  167.                         channelError <- err // SHOULD be an error originating in the Effect func, not from cancellation / closing head.
  168.                     }
  169.                     break pumpData
  170.                 }
  171.  
  172.                 out <- data
  173.             }
  174.  
  175.             close(out) // this won't be used anymore, begin teardown of stages (i,n]
  176.  
  177.             // sink data from i-1 buffer here until output from i-1 is closed.
  178.             for {
  179.                 if _, ok := <-in; !ok {
  180.                     return
  181.                 }
  182.                 time.Sleep(15 * time.Millisecond)
  183.             }
  184.  
  185.         }(pipeLineContext, nextInput, currentOutput, stage)
  186.  
  187.         nextInput = currentOutput
  188.     }
  189.  
  190.     return pipeTail, channelError
  191. }
  192.  
Advertisement
Add Comment
Please, Sign In to add comment