Advertisement
Guest User

Untitled

a guest
Apr 5th, 2020
336
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 9.43 KB | None | 0 0
  1. package encoders
  2.  
  3. import (
  4.     "errors"
  5.     "fmt"
  6.     "io"
  7.     "strconv"
  8.  
  9.     "code.cloudfoundry.org/lager"
  10.  
  11.     "github.com/3d0c/gmf"
  12.     "wmt/transcoder/db"
  13.     "wmt/transcoder/types"
  14. )
  15.  
  16. // FFMPEGEncode function is responsible for encoding the file
  17. func FFMPEGEncode(logger lager.Logger, dbInstance db.Storage, jobID string) error {
  18.     var pts int64 = 0
  19.     log := logger.Session("ffmpeg-encode")
  20.     log.Info("started", lager.Data{"job": jobID})
  21.     defer log.Info("finished")
  22.  
  23.     gmf.LogSetLevel(gmf.AV_LOG_FATAL)
  24.     job, _ := dbInstance.RetrieveJob(jobID)
  25.  
  26.     // create input context
  27.     inputCtx, err := gmf.NewInputCtx(job.LocalSource)
  28.     if err != nil {
  29.         log.Error("input-failed", err)
  30.         return err
  31.     }
  32.     defer inputCtx.Free()
  33.  
  34.     // create output context
  35.     outputCtx, err := gmf.NewOutputCtx(job.LocalDestination)
  36.     if err != nil {
  37.         log.Error("output-failed", err)
  38.         return err
  39.     }
  40.     defer outputCtx.Free()
  41.  
  42.     job.Status = types.JobEncoding
  43.     job.Progress = 0
  44.     _, err = dbInstance.UpdateJob(job.ID, job)
  45.     if err != nil {
  46.         log.Error("job-failed", err)
  47.         return err
  48.     }
  49.  
  50.     streamMap, srcVideoStream, srcAudioStream, err := getAudioVideoStreamSource(inputCtx, outputCtx, job, log)
  51.     if err != nil {
  52.         log.Error("failed-get-stream", err)
  53.         return err
  54.     }
  55.  
  56.     log.Debug("stream-map", lager.Data{
  57.         "data":           streamMap,
  58.         "srcVideoStream": srcVideoStream,
  59.         "srcAudioStream": srcAudioStream,
  60.     })
  61.  
  62.     var (
  63.         pkt         *gmf.Packet
  64.         ist, ost    *gmf.Stream
  65.         streamIdx   int
  66.         flush               = -1
  67.         framesCount float64 = 0
  68.     )
  69.  
  70.     totalFrames := float64(srcVideoStream.NbFrames() + srcAudioStream.NbFrames())
  71.  
  72.     for {
  73.         if flush < 0 {
  74.             pkt, err = inputCtx.GetNextPacket()
  75.             if err != nil && err != io.EOF {
  76.                 if pkt != nil {
  77.                     pkt.Free()
  78.                 }
  79.                 log.Error("error getting next packet", err)
  80.             } else if err != nil && pkt == nil {
  81.                 log.Debug("=== flushing")
  82.                 flush++
  83.                 break
  84.             }
  85.         }
  86.  
  87.         if flush == len(streamMap) {
  88.             break
  89.         }
  90.  
  91.         if flush < 0 {
  92.             streamIdx = pkt.StreamIndex()
  93.         } else {
  94.             streamIdx = flush
  95.             flush++
  96.         }
  97.  
  98.         if _, ok := streamMap[streamIdx]; !ok {
  99.             if pkt != nil {
  100.                 pkt.Free()
  101.             }
  102.  
  103.             continue
  104.         }
  105.  
  106.         ist, err = inputCtx.GetStream(streamIdx)
  107.         if err != nil {
  108.             if pkt != nil {
  109.                 pkt.Free()
  110.             }
  111.             log.Error("error getting stream", err)
  112.         }
  113.  
  114.         ost, err = outputCtx.GetStream(streamMap[ist.Index()])
  115.         if err != nil {
  116.             if pkt != nil {
  117.                 pkt.Free()
  118.             }
  119.             log.Error("error getting stream", err)
  120.         }
  121.  
  122.         frames, err := ist.CodecCtx().Decode(pkt)
  123.         if err != nil {
  124.             log.Error("error decoding - %s\n", err)
  125.         }
  126.  
  127.         for _, frame := range frames {
  128.             frame.SetPts(pts)
  129.             pts++
  130.             framesCount++
  131.             percentage := framesCount / totalFrames * 100
  132.             if percentage != job.Progress {
  133.                 job.Progress = percentage
  134.                 _, err = dbInstance.UpdateJob(job.ID, job)
  135.                 if err != nil {
  136.                     log.Error("error update job", err)
  137.                     return err
  138.                 }
  139.             }
  140.         }
  141.  
  142.         packets, err := ost.CodecCtx().Encode(frames, flush)
  143.  
  144.         for _, op := range packets {
  145.             packet := configurePacket(op, ost)
  146.  
  147.             if err = outputCtx.WritePacket(packet); err != nil {
  148.                 break
  149.             }
  150.  
  151.             op.Free()
  152.         }
  153.  
  154.         for _, frame := range frames {
  155.             if frame != nil {
  156.                 frame.Free()
  157.             }
  158.         }
  159.  
  160.         if pkt != nil {
  161.             pkt.Free()
  162.         }
  163.     }
  164.  
  165.     for i := 0; i < inputCtx.StreamsCnt(); i++ {
  166.         st, _ := inputCtx.GetStream(i)
  167.         st.CodecCtx().Free()
  168.         st.Free()
  169.     }
  170.  
  171.     inputCtx.Free()
  172.  
  173.     outputCtx.WriteTrailer()
  174.  
  175.     for i := 0; i < outputCtx.StreamsCnt(); i++ {
  176.         st, _ := outputCtx.GetStream(i)
  177.         st.CodecCtx().Free()
  178.         st.Free()
  179.     }
  180.  
  181.     if job.Progress != 100 {
  182.         job.Progress = 100
  183.         _, err = dbInstance.UpdateJob(job.ID, job)
  184.         if err != nil {
  185.             log.Error("job-failed", err)
  186.             return err
  187.         }
  188.     }
  189.  
  190.     return nil
  191. }
  192.  
  193. func configurePacket(packet *gmf.Packet, outputStream *gmf.Stream) *gmf.Packet {
  194.     if packet.Pts() != gmf.AV_NOPTS_VALUE {
  195.         packet.SetPts(gmf.RescaleQ(packet.Pts(), outputStream.CodecCtx().TimeBase(), outputStream.TimeBase()))
  196.     }
  197.  
  198.     if packet.Dts() != gmf.AV_NOPTS_VALUE {
  199.         packet.SetDts(gmf.RescaleQ(packet.Dts(), outputStream.CodecCtx().TimeBase(), outputStream.TimeBase()))
  200.     }
  201.  
  202.     packet.SetStreamIndex(outputStream.Index())
  203.  
  204.     return packet
  205. }
  206.  
  207. func getAudioVideoStreamSource(inputCtx *gmf.FmtCtx, outputCtx *gmf.FmtCtx, job types.Job, log lager.Logger) (map[int]int, *gmf.Stream, *gmf.Stream, error) {
  208.     streamMap := make(map[int]int, 0)
  209.  
  210.     // add video stream to streamMap
  211.     srcVideoStream, err := inputCtx.GetBestStream(gmf.AVMEDIA_TYPE_VIDEO)
  212.     if err != nil {
  213.         return nil, nil, nil, errors.New("unable to get the best video stream inside the input context")
  214.     }
  215.     videoCodec := getVideoCodec(job)
  216.     fmt.Println(videoCodec)
  217.     inputIndex, outputIndex, err := addStream(job, videoCodec, outputCtx, srcVideoStream, log)
  218.     if err != nil {
  219.         return nil, nil, nil, err
  220.     }
  221.     streamMap[inputIndex] = outputIndex
  222.  
  223.     // add audio stream to streamMap
  224.     srcAudioStream, err := inputCtx.GetBestStream(gmf.AVMEDIA_TYPE_AUDIO)
  225.     if err != nil {
  226.         return nil, nil, nil, errors.New("unable to get the best audio stream inside the input context")
  227.     }
  228.     audioCodec := getAudioCodec(job)
  229.     fmt.Println(audioCodec)
  230.     inputIndex, outputIndex, err = addStream(job, audioCodec, outputCtx, srcAudioStream, log)
  231.     if err != nil {
  232.         return nil, nil, nil, err
  233.     }
  234.     streamMap[inputIndex] = outputIndex
  235.     if err := outputCtx.WriteHeader(); err != nil {
  236.         return nil, nil, nil, err
  237.     }
  238.  
  239.     return streamMap, srcVideoStream, srcAudioStream, nil
  240. }
  241.  
  242. func addStream(job types.Job, codecName string, oc *gmf.FmtCtx, inputStream *gmf.Stream, log lager.Logger) (int, int, error) {
  243.     log.Debug("add-stream", lager.Data{
  244.         "job": job,
  245.     })
  246.     var (
  247.         codecContext *gmf.CodecCtx
  248.         outputStream *gmf.Stream
  249.     )
  250.  
  251.     codec, err := gmf.FindEncoder(codecName)
  252.     if err != nil {
  253.         return 0, 0, err
  254.     }
  255.  
  256.     if codecContext = gmf.NewCodecCtx(codec); codecContext == nil {
  257.         return 0, 0, fmt.Errorf("unable to create codec context")
  258.     }
  259.  
  260.     if oc.IsGlobalHeader() {
  261.         codecContext.SetFlag(gmf.CODEC_FLAG_GLOBAL_HEADER)
  262.     }
  263.  
  264.     if codec.IsExperimental() {
  265.         codecContext.SetStrictCompliance(gmf.FF_COMPLIANCE_EXPERIMENTAL)
  266.     }
  267.  
  268.     if codecContext.Type() == gmf.AVMEDIA_TYPE_AUDIO {
  269.         bitrate, err := strconv.Atoi(job.Preset.Audio.Bitrate)
  270.         if err != nil {
  271.             return 0, 0, err
  272.         }
  273.  
  274.         options := append(
  275.             []gmf.Option{
  276.                 {Key: "time_base", Val: inputStream.CodecCtx().TimeBase().AVR()},
  277.                 {Key: "ar", Val: inputStream.CodecCtx().SampleRate()},
  278.                 {Key: "ac", Val: inputStream.CodecCtx().Channels()},
  279.                 {Key: "channel_layout", Val: codecContext.SelectChannelLayout()},
  280.             },
  281.         )
  282.         codecContext.SetOptions(options)
  283.         codecContext.SetSampleFmt(inputStream.CodecCtx().SampleFmt()).
  284.             SetBitRate(bitrate).
  285.             SelectSampleRate()
  286.     }
  287.  
  288.     if codecContext.Type() == gmf.AVMEDIA_TYPE_VIDEO {
  289.         codecContext.SetTimeBase(gmf.AVR{Num: 1, Den: 24})
  290.  
  291.         if job.Preset.Video.Codec == "h264" {
  292.             profile := getProfile(job)
  293.             codecContext.SetProfile(profile)
  294.         }
  295.  
  296.         gop, err := strconv.Atoi(job.Preset.Video.GopSize)
  297.         if err != nil {
  298.             return 0, 0, err
  299.         }
  300.  
  301.         width, height := getResolution(job, inputStream.CodecCtx().Width(), inputStream.CodecCtx().Height())
  302.  
  303.         bitrate, err := strconv.Atoi(job.Preset.Video.Bitrate)
  304.         if err != nil {
  305.             return 0, 0, err
  306.         }
  307.  
  308.         codecContext.
  309.             SetDimension(width, height).
  310.             SetGopSize(gop).
  311.             SetBitRate(bitrate).
  312.             SetPixFmt(inputStream.CodecCtx().PixFmt())
  313.     }
  314.  
  315.     if err := codecContext.Open(nil); err != nil {
  316.         return 0, 0, err
  317.     }
  318.  
  319.     par := gmf.NewCodecParameters()
  320.     if err = par.FromContext(codecContext); err != nil {
  321.         return 0, 0, fmt.Errorf("error creating codec parameters from context - %s", err)
  322.     }
  323.     defer par.Free()
  324.  
  325.     if outputStream = oc.NewStream(codec); outputStream == nil {
  326.         return 0, 0, fmt.Errorf("unable to create new stream in output context")
  327.     }
  328.  
  329.     err = outputStream.CopyCodecPar(par)
  330.     if err != nil {
  331.         return 0, 0, err
  332.     }
  333.  
  334.     outputStream.SetCodecCtx(codecContext)
  335.  
  336.     return inputStream.Index(), outputStream.Index(), nil
  337. }
  338.  
  339. func getProfile(job types.Job) int {
  340.     profiles := map[string]int{
  341.         "baseline": gmf.FF_PROFILE_H264_BASELINE,
  342.         "main":     gmf.FF_PROFILE_H264_MAIN,
  343.         "high":     gmf.FF_PROFILE_H264_HIGH,
  344.     }
  345.  
  346.     if job.Preset.Video.Profile != "" {
  347.         return profiles[job.Preset.Video.Profile]
  348.     }
  349.     return gmf.FF_PROFILE_H264_MAIN
  350. }
  351.  
  352. func getVideoCodec(job types.Job) string {
  353.     codecs := map[string]string{
  354.         "h264":   "libx264",
  355.         "vp8":    "libvpx",
  356.         "vp9":    "libvpx-vp9",
  357.         "theora": "libtheora",
  358.         "aac":    "aac",
  359.     }
  360.  
  361.     if codec, ok := codecs[job.Preset.Video.Codec]; ok {
  362.         return codec
  363.     }
  364.     return "libx264"
  365. }
  366.  
  367. func getAudioCodec(job types.Job) string {
  368.     codecs := map[string]string{
  369.         "aac":    "aac",
  370.         "vorbis": "vorbis",
  371.     }
  372.     if codec, ok := codecs[job.Preset.Audio.Codec]; ok {
  373.         return codec
  374.     }
  375.     return "aac"
  376. }
  377.  
  378. func getResolution(job types.Job, inputWidth int, inputHeight int) (int, int) {
  379.     var width, height int
  380.     if job.Preset.Video.Width == "" && job.Preset.Video.Height == "" {
  381.         return inputWidth, inputHeight
  382.     } else if job.Preset.Video.Width == "" {
  383.         height, _ = strconv.Atoi(job.Preset.Video.Height)
  384.         width = (inputWidth * height) / inputHeight
  385.     } else if job.Preset.Video.Height == "" {
  386.         width, _ = strconv.Atoi(job.Preset.Video.Width)
  387.         height = (inputHeight * width) / inputWidth
  388.     } else {
  389.         width, _ = strconv.Atoi(job.Preset.Video.Width)
  390.         height, _ = strconv.Atoi(job.Preset.Video.Height)
  391.     }
  392.     return width, height
  393. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement