Guest User

Untitled

a guest
Apr 15th, 2025
1,841
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 5.84 KB | None | 0 0
  1. package media
  2.  
  3. import (
  4.     "context"
  5.     "errors"
  6.     "time"
  7.  
  8.     "golang.org/x/time/rate"
  9. )
  10.  
  11. var ErrPipeLineClosing = errors.New("pipeline closing")
  12.  
  13. type PipeLineStage interface {
  14.     // A function that modifies a byte slice's contents and returns any error encountered.
  15.     //  - The Effect function SHOULD interrupt any blocking calls upon ctx cancellation.
  16.     //  - If the context is cancelled, the Effect function SHOULD return an errors that 'Is'
  17.     //    context.Cancelled
  18.     Effect(context.Context, []byte) error
  19.  
  20.     // The channel buffer size for this stage's output. If this is the final stage,
  21.     // this is the buffer size of the tail of the pipe-line.
  22.     OutputBufferSize() int
  23. }
  24.  
  25. type PipeLineThrottler struct {
  26.     limiter *rate.Limiter
  27. }
  28.  
  29. func (p *PipeLineThrottler) Effect(ctx context.Context, _ []byte) error {
  30.     return p.limiter.Wait(ctx)
  31. }
  32. func (p *PipeLineThrottler) OutputBufferSize() int {
  33.     return 0
  34. }
  35. func (p *PipeLineThrottler) SetLimit(limit rate.Limit) {
  36.     p.limiter.SetLimit(rate.Limit(limit))
  37. }
  38. func (p *PipeLineThrottler) SetBurst(burst uint16) {
  39.     p.limiter.SetBurst(int(burst))
  40. }
  41.  
  42. // Makes a pipeline stage that can throttle the output of the previous stage to a certain freq.
  43. // the throughput at any instant may fall below this rate, or may burst if burst > 1
  44. func NewPipeLineThrottler(ratePerSecond rate.Limit, burst uint16) *PipeLineThrottler {
  45.     return &PipeLineThrottler{
  46.         limiter: rate.NewLimiter(ratePerSecond, int(burst)),
  47.     }
  48. }
  49.  
  50. type PipeLinePauser struct {
  51.     *PipeLineThrottler
  52. }
  53.  
  54. func (p *PipeLinePauser) SetPaused(isPaused bool) {
  55.     if isPaused {
  56.         p.SetLimit(0)
  57.     } else {
  58.         p.SetLimit(rate.Inf)
  59.     }
  60. }
  61.  
  62. // A pipeline pauser is a throttler where the rate can be toggled
  63. // between 0 and inf by the set paused function.
  64. // The default state is paused (a rate of 0).
  65. func NewPipeLinePauser() *PipeLinePauser {
  66.     return &PipeLinePauser{
  67.         PipeLineThrottler: NewPipeLineThrottler(0, 1),
  68.     }
  69. }
  70.  
  71. // Creates a concurrent pipeline that autoamtically manages all goroutines and
  72. // channels needed. The caller is only responsible for closing the pipe-head — it will
  73. // never automatically close as a result of calling this function. The caller MUST
  74. // tear down the pipe-line by either cancelling its context, or by closing the pipe-head.
  75. //
  76. // The function returns a channel representing the tail of the pipe-line, and a channel
  77. // that will send errors from any pipe-line stage effect.
  78. //
  79. // If a stage effect encounters any error, it will first send the error to the error
  80. // channel and begin a teardown of its subsequent pipe-line stages by closing its
  81. // own output channel. Next, the errored stage will enter "sink" mode. The pipe-line
  82. // will continue to pull data from the pipe-head and sink it at the errored stage
  83. // until the pipe-head is closed OR the pipe-line context is cancelled. The stage will
  84. // continue to consume input data from the previous stage at a throttled rate but
  85. // does not do work or output data.
  86. //   - Prevents deadlocks by ensuring the pipeline can drain
  87. //   - Maintains backpressure to avoid overwhelming upstream stages
  88. //   - Minimizes resource usage during error state
  89. func NewPipeLine(ctx context.Context, pipeHead <-chan []byte, stages ...PipeLineStage) (<-chan []byte, <-chan error) {
  90.     var pipeTail chan []byte
  91.     channelError := make(chan error, 1+len(stages)) // every stage MUST (including stage "-1") be able to error without blocking
  92.  
  93.     nextInput := make(chan []byte, 1)
  94.  
  95.     if len(stages) == 0 {
  96.         pipeTail = make(chan []byte, 1)
  97.         nextInput = pipeTail
  98.     }
  99.  
  100.     // pipeline-specific cancel to interrupt blocked pipeline effects
  101.     pipeLineContext, pipeLineContextCancel := context.WithCancelCause(ctx)
  102.  
  103.     // stage "-1" just cancels the context if the input is closed to reach stopped stages.
  104.     go func(head <-chan []byte, next chan<- []byte, ctx context.Context, cancelStages context.CancelCauseFunc) {
  105.         defer close(next)
  106.  
  107.         for {
  108.             select {
  109.             case <-ctx.Done():
  110.                 cancelStages(ErrPipeLineClosing)
  111.                 channelError <- errors.Join(context.Cause(ctx), ctx.Err(), ErrPipeLineClosing)
  112.                 return
  113.             default:
  114.             }
  115.  
  116.             buf, ok := <-head
  117.  
  118.             // Unblock stage effects upon pipe-head closure to prevent deadlock.
  119.             // This has the effect of causing all stage effects that honor
  120.             // context cancellation to enter "sink" mode while waiting for their
  121.             // previous stage to close.
  122.             if !ok {
  123.                 cancelStages(ErrPipeLineClosing)
  124.                 channelError <- ErrPipeLineClosing
  125.                 return
  126.             }
  127.  
  128.             next <- buf
  129.         }
  130.     }(pipeHead, nextInput, ctx, pipeLineContextCancel)
  131.  
  132.     // stages [0, n]
  133.     for i, stage := range stages {
  134.         currentOutput := make(chan []byte, stage.OutputBufferSize())
  135.  
  136.         if i+1 == len(stages) {
  137.             pipeTail = currentOutput
  138.         }
  139.  
  140.         go func(ctx context.Context, in <-chan []byte, out chan<- []byte, plStage PipeLineStage) {
  141.             defer close(out)
  142.  
  143.         pumpData:
  144.             for {
  145.                 select {
  146.                 case <-ctx.Done():
  147.                     break pumpData
  148.                 default:
  149.                 }
  150.  
  151.                 buf, ok := <-in
  152.  
  153.                 // We don't have to enter sink mode if previous stage
  154.                 // just got torn down. Immediately continue tearing down
  155.                 // the pipe-line from this stage onward.
  156.                 if !ok {
  157.                     return
  158.                 }
  159.  
  160.                 err := plStage.Effect(ctx, buf)
  161.  
  162.                 if err != nil {
  163.                     if !errors.Is(err, errors.Join(context.Canceled, ErrPipeLineClosing)) { // caller should already know that the ctx cancelled.
  164.                         channelError <- err // SHOULD be an error originating in the Effect func, not from cancellation.
  165.                     }
  166.                     break pumpData
  167.                 }
  168.  
  169.                 out <- buf
  170.             }
  171.  
  172.             close(out) // this won't be used anymore, begin teardown from stage i+1
  173.  
  174.             // sink data from previous buffer here until input is closed.
  175.             for {
  176.                 if _, ok := <-in; !ok {
  177.                     return
  178.                 }
  179.                 time.Sleep(15 * time.Millisecond)
  180.             }
  181.  
  182.         }(pipeLineContext, nextInput, currentOutput, stage)
  183.  
  184.         nextInput = currentOutput
  185.     }
  186.  
  187.     return pipeTail, channelError
  188. }
  189.  
Advertisement
Add Comment
Please, Sign In to add comment