Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package media
- import "time"
- type PipeLineStageEffect func([]byte) error
- type PipeLineStage struct {
- effect PipeLineStageEffect
- outputBufferSize int
- }
- // The pipe head channel MUST be managed by the caller. Closing the channel is the
- // only reliable method to tear down the pipeline. Closing the head channel causes all
- // pipeline stages to return via a cascade of channel closures. After the final stage returns,
- // the pipe tail will be closed.
- //
- // The returned error channel should be used in a select statement with the pipe tail, if an
- // error is received, the caller MUST close the input channel or the pipeline stages will keep
- // spinning until the source is exhausted. An error in one of the stages will not close
- // the pipe tail channel, rather the errored pipeline stage will become a sink, and the tail will
- // stop receiving data after subsequent buffers are emptied.
- //
- // When a pipeline stage errors, it enters a "sink mode" where it continues
- // to consume input data at a throttled rate but does not do work or output data.
- // - Prevents deadlocks by ensuring the pipeline can drain
- // - Maintains backpressure to avoid overwhelming upstream stages
- // - Minimizes resource usage during error state
- //
- // tldr; The caller must always close the pipe head input channel to ensure a proper and timely cleanup.
- func NewPipeLine(pipeHead <-chan []byte, stages ...PipeLineStage) (<-chan []byte, <-chan error) {
- var pipeTail chan []byte
- cherror := make(chan error, 1)
- // just forward to output if no stages were added.
- if len(stages) == 0 {
- pipeTail = make(chan []byte, 1)
- go func() {
- defer close(pipeTail)
- for {
- b, ok := <-pipeHead
- if !ok {
- return
- }
- pipeTail <- b
- }
- }()
- return pipeTail, cherror
- }
- in := pipeHead
- for idx, stage := range stages {
- out := make(chan []byte, stage.outputBufferSize)
- if idx+1 == len(stages) {
- pipeTail = out
- }
- go func(in <-chan []byte, out chan<- []byte, f PipeLineStageEffect) {
- defer close(out)
- for {
- buf, ok := <-in
- if !ok { // closed
- return
- }
- err := f(buf)
- if err != nil {
- cherror <- err
- break
- }
- out <- buf
- }
- // sink data here until input is closed
- // has the effect of stalling further reads from the pipe tail
- // after subsequent buffers have been emptied.
- for {
- if _, ok := <-in; !ok {
- return
- }
- time.Sleep(15 * time.Millisecond)
- }
- }(in, out, stage.effect)
- in = out
- }
- return pipeTail, cherror
- }
Advertisement
Add Comment
Please, Sign In to add comment