Guest User

Untitled

a guest
Apr 15th, 2025
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 2.56 KB | None | 0 0
  1. package media
  2.  
  3. import "time"
  4.  
  5. type PipeLineStageEffect func([]byte) error
  6.  
  7. type PipeLineStage struct {
  8.     effect           PipeLineStageEffect
  9.     outputBufferSize int
  10. }
  11.  
  12. // The pipe head channel MUST be managed by the caller. Closing the channel is the
  13. // only reliable method to tear down the pipeline. Closing the head channel causes all
  14. // pipeline stages to return via a cascade of channel closures. After the final stage returns,
  15. // the pipe tail will be closed.
  16. //
  17. // The returned error channel should be used in a select statement with the pipe tail, if an
  18. // error is received, the caller MUST close the input channel or the pipeline stages will keep
  19. // spinning until the source is exhausted. An error in one of the stages will not close
  20. // the pipe tail channel, rather the errored pipeline stage will become a sink, and the tail will
  21. // stop receiving data after subsequent buffers are emptied.
  22. //
  23. // When a pipeline stage errors, it enters a "sink mode" where it continues
  24. // to consume input data at a throttled rate but does not do work or output data.
  25. // - Prevents deadlocks by ensuring the pipeline can drain
  26. // - Maintains backpressure to avoid overwhelming upstream stages
  27. // - Minimizes resource usage during error state
  28. //
  29. // tldr; The caller must always close the pipe head input channel to ensure a proper and timely cleanup.
  30. func NewPipeLine(pipeHead <-chan []byte, stages ...PipeLineStage) (<-chan []byte, <-chan error) {
  31.     var pipeTail chan []byte
  32.     cherror := make(chan error, 1)
  33.  
  34.     // just forward to output if no stages were added.
  35.     if len(stages) == 0 {
  36.         pipeTail = make(chan []byte, 1)
  37.  
  38.         go func() {
  39.             defer close(pipeTail)
  40.             for {
  41.                 b, ok := <-pipeHead
  42.  
  43.                 if !ok {
  44.                     return
  45.                 }
  46.  
  47.                 pipeTail <- b
  48.             }
  49.         }()
  50.         return pipeTail, cherror
  51.     }
  52.  
  53.     in := pipeHead
  54.     for idx, stage := range stages {
  55.         out := make(chan []byte, stage.outputBufferSize)
  56.  
  57.         if idx+1 == len(stages) {
  58.             pipeTail = out
  59.         }
  60.  
  61.         go func(in <-chan []byte, out chan<- []byte, f PipeLineStageEffect) {
  62.             defer close(out)
  63.  
  64.             for {
  65.                 buf, ok := <-in
  66.  
  67.                 if !ok { // closed
  68.                     return
  69.                 }
  70.  
  71.                 err := f(buf)
  72.  
  73.                 if err != nil {
  74.                     cherror <- err
  75.                     break
  76.                 }
  77.  
  78.                 out <- buf
  79.             }
  80.  
  81.             // sink data here until input is closed
  82.             // has the effect of stalling further reads from the pipe tail
  83.             // after subsequent buffers have been emptied.
  84.             for {
  85.                 if _, ok := <-in; !ok {
  86.                     return
  87.                 }
  88.                 time.Sleep(15 * time.Millisecond)
  89.             }
  90.  
  91.         }(in, out, stage.effect)
  92.  
  93.         in = out
  94.     }
  95.  
  96.     return pipeTail, cherror
  97. }
  98.  
Advertisement
Add Comment
Please, Sign In to add comment