Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package encoders
- import (
- "errors"
- "fmt"
- "io"
- "strconv"
- "code.cloudfoundry.org/lager"
- "github.com/3d0c/gmf"
- "wmt/transcoder/db"
- "wmt/transcoder/types"
- )
- // FFMPEGEncode function is responsible for encoding the file
- func FFMPEGEncode(logger lager.Logger, dbInstance db.Storage, jobID string) error {
- var pts int64 = 0
- log := logger.Session("ffmpeg-encode")
- log.Info("started", lager.Data{"job": jobID})
- defer log.Info("finished")
- gmf.LogSetLevel(gmf.AV_LOG_FATAL)
- job, _ := dbInstance.RetrieveJob(jobID)
- // create input context
- inputCtx, err := gmf.NewInputCtx(job.LocalSource)
- if err != nil {
- log.Error("input-failed", err)
- return err
- }
- defer inputCtx.Free()
- // create output context
- outputCtx, err := gmf.NewOutputCtx(job.LocalDestination)
- if err != nil {
- log.Error("output-failed", err)
- return err
- }
- defer outputCtx.Free()
- job.Status = types.JobEncoding
- job.Progress = 0
- _, err = dbInstance.UpdateJob(job.ID, job)
- if err != nil {
- log.Error("job-failed", err)
- return err
- }
- streamMap, srcVideoStream, srcAudioStream, err := getAudioVideoStreamSource(inputCtx, outputCtx, job, log)
- if err != nil {
- log.Error("failed-get-stream", err)
- return err
- }
- log.Debug("stream-map", lager.Data{
- "data": streamMap,
- "srcVideoStream": srcVideoStream,
- "srcAudioStream": srcAudioStream,
- })
- var (
- pkt *gmf.Packet
- ist, ost *gmf.Stream
- streamIdx int
- flush = -1
- framesCount float64 = 0
- )
- totalFrames := float64(srcVideoStream.NbFrames() + srcAudioStream.NbFrames())
- for {
- if flush < 0 {
- pkt, err = inputCtx.GetNextPacket()
- if err != nil && err != io.EOF {
- if pkt != nil {
- pkt.Free()
- }
- log.Error("error getting next packet", err)
- } else if err != nil && pkt == nil {
- log.Debug("=== flushing")
- flush++
- break
- }
- }
- if flush == len(streamMap) {
- break
- }
- if flush < 0 {
- streamIdx = pkt.StreamIndex()
- } else {
- streamIdx = flush
- flush++
- }
- if _, ok := streamMap[streamIdx]; !ok {
- if pkt != nil {
- pkt.Free()
- }
- continue
- }
- ist, err = inputCtx.GetStream(streamIdx)
- if err != nil {
- if pkt != nil {
- pkt.Free()
- }
- log.Error("error getting stream", err)
- }
- ost, err = outputCtx.GetStream(streamMap[ist.Index()])
- if err != nil {
- if pkt != nil {
- pkt.Free()
- }
- log.Error("error getting stream", err)
- }
- frames, err := ist.CodecCtx().Decode(pkt)
- if err != nil {
- log.Error("error decoding - %s\n", err)
- }
- for _, frame := range frames {
- frame.SetPts(pts)
- pts++
- framesCount++
- percentage := framesCount / totalFrames * 100
- if percentage != job.Progress {
- job.Progress = percentage
- _, err = dbInstance.UpdateJob(job.ID, job)
- if err != nil {
- log.Error("error update job", err)
- return err
- }
- }
- }
- packets, err := ost.CodecCtx().Encode(frames, flush)
- for _, op := range packets {
- packet := configurePacket(op, ost)
- if err = outputCtx.WritePacket(packet); err != nil {
- break
- }
- op.Free()
- }
- for _, frame := range frames {
- if frame != nil {
- frame.Free()
- }
- }
- if pkt != nil {
- pkt.Free()
- }
- }
- for i := 0; i < inputCtx.StreamsCnt(); i++ {
- st, _ := inputCtx.GetStream(i)
- st.CodecCtx().Free()
- st.Free()
- }
- inputCtx.Free()
- outputCtx.WriteTrailer()
- for i := 0; i < outputCtx.StreamsCnt(); i++ {
- st, _ := outputCtx.GetStream(i)
- st.CodecCtx().Free()
- st.Free()
- }
- if job.Progress != 100 {
- job.Progress = 100
- _, err = dbInstance.UpdateJob(job.ID, job)
- if err != nil {
- log.Error("job-failed", err)
- return err
- }
- }
- return nil
- }
- func configurePacket(packet *gmf.Packet, outputStream *gmf.Stream) *gmf.Packet {
- if packet.Pts() != gmf.AV_NOPTS_VALUE {
- packet.SetPts(gmf.RescaleQ(packet.Pts(), outputStream.CodecCtx().TimeBase(), outputStream.TimeBase()))
- }
- if packet.Dts() != gmf.AV_NOPTS_VALUE {
- packet.SetDts(gmf.RescaleQ(packet.Dts(), outputStream.CodecCtx().TimeBase(), outputStream.TimeBase()))
- }
- packet.SetStreamIndex(outputStream.Index())
- return packet
- }
- func getAudioVideoStreamSource(inputCtx *gmf.FmtCtx, outputCtx *gmf.FmtCtx, job types.Job, log lager.Logger) (map[int]int, *gmf.Stream, *gmf.Stream, error) {
- streamMap := make(map[int]int, 0)
- // add video stream to streamMap
- srcVideoStream, err := inputCtx.GetBestStream(gmf.AVMEDIA_TYPE_VIDEO)
- if err != nil {
- return nil, nil, nil, errors.New("unable to get the best video stream inside the input context")
- }
- videoCodec := getVideoCodec(job)
- fmt.Println(videoCodec)
- inputIndex, outputIndex, err := addStream(job, videoCodec, outputCtx, srcVideoStream, log)
- if err != nil {
- return nil, nil, nil, err
- }
- streamMap[inputIndex] = outputIndex
- // add audio stream to streamMap
- srcAudioStream, err := inputCtx.GetBestStream(gmf.AVMEDIA_TYPE_AUDIO)
- if err != nil {
- return nil, nil, nil, errors.New("unable to get the best audio stream inside the input context")
- }
- audioCodec := getAudioCodec(job)
- fmt.Println(audioCodec)
- inputIndex, outputIndex, err = addStream(job, audioCodec, outputCtx, srcAudioStream, log)
- if err != nil {
- return nil, nil, nil, err
- }
- streamMap[inputIndex] = outputIndex
- if err := outputCtx.WriteHeader(); err != nil {
- return nil, nil, nil, err
- }
- return streamMap, srcVideoStream, srcAudioStream, nil
- }
- func addStream(job types.Job, codecName string, oc *gmf.FmtCtx, inputStream *gmf.Stream, log lager.Logger) (int, int, error) {
- log.Debug("add-stream", lager.Data{
- "job": job,
- })
- var (
- codecContext *gmf.CodecCtx
- outputStream *gmf.Stream
- )
- codec, err := gmf.FindEncoder(codecName)
- if err != nil {
- return 0, 0, err
- }
- if codecContext = gmf.NewCodecCtx(codec); codecContext == nil {
- return 0, 0, fmt.Errorf("unable to create codec context")
- }
- if oc.IsGlobalHeader() {
- codecContext.SetFlag(gmf.CODEC_FLAG_GLOBAL_HEADER)
- }
- if codec.IsExperimental() {
- codecContext.SetStrictCompliance(gmf.FF_COMPLIANCE_EXPERIMENTAL)
- }
- if codecContext.Type() == gmf.AVMEDIA_TYPE_AUDIO {
- bitrate, err := strconv.Atoi(job.Preset.Audio.Bitrate)
- if err != nil {
- return 0, 0, err
- }
- options := append(
- []gmf.Option{
- {Key: "time_base", Val: inputStream.CodecCtx().TimeBase().AVR()},
- {Key: "ar", Val: inputStream.CodecCtx().SampleRate()},
- {Key: "ac", Val: inputStream.CodecCtx().Channels()},
- {Key: "channel_layout", Val: codecContext.SelectChannelLayout()},
- },
- )
- codecContext.SetOptions(options)
- codecContext.SetSampleFmt(inputStream.CodecCtx().SampleFmt()).
- SetBitRate(bitrate).
- SelectSampleRate()
- }
- if codecContext.Type() == gmf.AVMEDIA_TYPE_VIDEO {
- codecContext.SetTimeBase(gmf.AVR{Num: 1, Den: 24})
- if job.Preset.Video.Codec == "h264" {
- profile := getProfile(job)
- codecContext.SetProfile(profile)
- }
- gop, err := strconv.Atoi(job.Preset.Video.GopSize)
- if err != nil {
- return 0, 0, err
- }
- width, height := getResolution(job, inputStream.CodecCtx().Width(), inputStream.CodecCtx().Height())
- bitrate, err := strconv.Atoi(job.Preset.Video.Bitrate)
- if err != nil {
- return 0, 0, err
- }
- codecContext.
- SetDimension(width, height).
- SetGopSize(gop).
- SetBitRate(bitrate).
- SetPixFmt(inputStream.CodecCtx().PixFmt())
- }
- if err := codecContext.Open(nil); err != nil {
- return 0, 0, err
- }
- par := gmf.NewCodecParameters()
- if err = par.FromContext(codecContext); err != nil {
- return 0, 0, fmt.Errorf("error creating codec parameters from context - %s", err)
- }
- defer par.Free()
- if outputStream = oc.NewStream(codec); outputStream == nil {
- return 0, 0, fmt.Errorf("unable to create new stream in output context")
- }
- err = outputStream.CopyCodecPar(par)
- if err != nil {
- return 0, 0, err
- }
- outputStream.SetCodecCtx(codecContext)
- return inputStream.Index(), outputStream.Index(), nil
- }
- func getProfile(job types.Job) int {
- profiles := map[string]int{
- "baseline": gmf.FF_PROFILE_H264_BASELINE,
- "main": gmf.FF_PROFILE_H264_MAIN,
- "high": gmf.FF_PROFILE_H264_HIGH,
- }
- if job.Preset.Video.Profile != "" {
- return profiles[job.Preset.Video.Profile]
- }
- return gmf.FF_PROFILE_H264_MAIN
- }
- func getVideoCodec(job types.Job) string {
- codecs := map[string]string{
- "h264": "libx264",
- "vp8": "libvpx",
- "vp9": "libvpx-vp9",
- "theora": "libtheora",
- "aac": "aac",
- }
- if codec, ok := codecs[job.Preset.Video.Codec]; ok {
- return codec
- }
- return "libx264"
- }
- func getAudioCodec(job types.Job) string {
- codecs := map[string]string{
- "aac": "aac",
- "vorbis": "vorbis",
- }
- if codec, ok := codecs[job.Preset.Audio.Codec]; ok {
- return codec
- }
- return "aac"
- }
- func getResolution(job types.Job, inputWidth int, inputHeight int) (int, int) {
- var width, height int
- if job.Preset.Video.Width == "" && job.Preset.Video.Height == "" {
- return inputWidth, inputHeight
- } else if job.Preset.Video.Width == "" {
- height, _ = strconv.Atoi(job.Preset.Video.Height)
- width = (inputWidth * height) / inputHeight
- } else if job.Preset.Video.Height == "" {
- width, _ = strconv.Atoi(job.Preset.Video.Width)
- height = (inputHeight * width) / inputWidth
- } else {
- width, _ = strconv.Atoi(job.Preset.Video.Width)
- height, _ = strconv.Atoi(job.Preset.Video.Height)
- }
- return width, height
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement