Advertisement
Guest User

internal/recorder/recorder.go

a guest
Aug 26th, 2021
168
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 12.19 KB | None | 0 0
  1. package recorder
  2.  
  3. import (
  4.         "fmt"
  5.         "io"
  6.         "net/http"
  7.         "net/url"
  8.         "os"
  9.         "os/signal"
  10.         "strings"
  11.         "syscall"
  12.         "time"
  13.  
  14.         "github.com/dustin/go-humanize"
  15.         "github.com/grafov/m3u8"
  16.         "github.com/hako/durafmt"
  17.         lru "github.com/hashicorp/golang-lru"
  18.         log "github.com/sirupsen/logrus"
  19.         "github.com/wmw9/rekoda/internal/config"
  20.         "github.com/wmw9/twitchpl"
  21. )
  22.  
  23. var (
  24.         channels = &config.ConfigStruct.Channels
  25.         //streamsDir = &config.ConfigStruct.StreamsDir
  26.         USER_AGENT      = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0"
  27.         client          = &http.Client{Timeout: 2 * time.Second}
  28.         channelsOnline  = Online{}
  29.         backoffSchedule = []time.Duration{1 * time.Second, 2 * time.Second, 3 * time.Second}
  30. )
  31.  
  32. type Download struct {
  33.         URI           string
  34.         totalDuration time.Duration
  35. }
  36.  
  37. type Online struct {
  38.         Channels []string
  39. }
  40.  
  41. func Start() {
  42.         ctxLog := log.WithField("general", "REC")
  43.         ctxLog.Info("Recorder starting")
  44.  
  45.         if i := config.GetRecAmountChannels(); i < 1 {
  46.                 ctxLog.Error("There's nothing to record, try to add and enable channels for recording. Type 'rekoda channel' for more info.")
  47.                 return
  48.         }
  49.  
  50.         // Capture <Ctrl>+<C>
  51.         c := make(chan os.Signal)
  52.         signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  53.         go func() {
  54.                 <-c
  55.                 cleanup()
  56.                 os.Exit(1)
  57.         }()
  58.  
  59.         for {
  60.                 ctxLog.Tracef("Channels being recorded right now: %v", channelsOnline)
  61.                 for _, u := range *channels {
  62.                         ctxLog.Tracef("Checking %v", u.User)
  63.                         if u.Enabled && !channelsOnline.Has(u.User) {
  64.                                 //if u.Enabled {
  65.                                 cLog := ctxLog.WithField("channel", u.User)
  66.                                 cLog.Debugf("Checking '%v' channel. Trying to get m3u8 live playlist", u.User)
  67.                                 pl, err := twitchpl.Get(u.User)
  68.                                 if err != nil {
  69.                                         cLog.Debug("Channel is offline or banned.")
  70.                                         continue
  71.                                 }
  72.                                 cLog.Info("Went online! 🤩")
  73.                                 switch u.Quality {
  74.                                 case "best":
  75.                                         cLog.Info("Opening stream: best quality")
  76.                                         hlsURL, err := pl.Best()
  77.                                         if err != nil {
  78.                                                 ctxLog.Error(err)
  79.                                         }
  80.                                         cLog.Debugf("URL: %v", hlsURL)
  81.                                         record(cLog, u.User, hlsURL)
  82.                                 case "worst":
  83.                                         cLog.Info("Opening stream: worst quality")
  84.                                         hlsURL, err := pl.Worst()
  85.                                         if err != nil {
  86.                                                 ctxLog.Error(err)
  87.                                         }
  88.                                         cLog.Debugf("URL: %v", hlsURL)
  89.                                         record(cLog, u.User, hlsURL)
  90.                                 case "audio_only":
  91.                                         cLog.Info("Opening stream: audio_only")
  92.                                         hlsURL, err := pl.Audio()
  93.                                         if err != nil {
  94.                                                 ctxLog.Error(err)
  95.                                         }
  96.                                         cLog.Debugf("URL: %v", hlsURL)
  97.                                         record(cLog, u.User, hlsURL)
  98.  
  99.                                 }
  100.                         }
  101.                         time.Sleep(1 * time.Second)
  102.                 }
  103.                 ctxLog.Tracef("Sleep for 1 minute")
  104.                 time.Sleep(1 * time.Minute)
  105.         }
  106.  
  107.         //forever := make(chan bool)
  108.         //<-forever
  109.  
  110. }
  111.  
  112. func record(log *log.Entry, channel string, hlsURL string) {
  113.         // Get local time
  114.         now, err := TimeIn(time.Now(), "Local")
  115.         if err != nil {
  116.                 log.Error(err)
  117.         }
  118.         log.Tracef("Local time: %v", now)
  119.  
  120.         // Define file name
  121.         fname := fmt.Sprintf("%v_%v.ts", channel, now.Format("2006-01-02_15-04-05"))
  122.         fLog := log.WithField("file", fname)
  123.  
  124.         // Create channel directory
  125.         sep := string(os.PathSeparator)
  126.         log.Trace("Trying to create channel directory")
  127.         channelDir := config.ConfigStreamsDir + sep + channel
  128.         if err := os.MkdirAll(channelDir, 0777); err != nil {
  129.                 fLog.Error(err)
  130.                 return
  131.         }
  132.         filepath := channelDir + sep + fname
  133.         fLog.Debugf("Stream directory: '%s'", channelDir)
  134.         fLog.Infof("Writing stream to file: %v", filepath)
  135.  
  136.         dlc := make(chan *Download, 1024)
  137.         go getPlaylist(fLog, channel, hlsURL, dlc)
  138.         go downloadSegment(fLog, filepath, dlc)
  139. }
  140.  
  141. func downloadSegment(log *log.Entry, filepath string, dlc chan *Download) {
  142.         ctxLog := log.WithField("status", "DOWNLOAD").WithField("func", "SEG")
  143.         var totalBytes uint64 = 0
  144.         var bytes int64
  145.         var err error
  146.         var req *http.Request
  147.  
  148.         out, err := os.Create(filepath)
  149.         if err != nil {
  150.                 ctxLog.Error(err)
  151.         }
  152.         defer out.Close()
  153.  
  154.         for v := range dlc {
  155.                 req, err = http.NewRequest("GET", v.URI, nil)
  156.                 if err != nil {
  157.                         ctxLog.Info(err)
  158.                 }
  159.                 resp, err := doRequestWithRetries(req)
  160.                 if err != nil {
  161.                         ctxLog.Error(err)
  162.                         continue
  163.                 }
  164.                 //defer resp.Body.Close()
  165.  
  166.                 if resp.StatusCode != 200 {
  167.                         ctxLog.Errorf("Received HTTP %v for %v\n", resp.StatusCode, v.URI)
  168.                         continue
  169.                 }
  170.                 bytes, err = io.Copy(out, resp.Body)
  171.                 if err != nil {
  172.                         ctxLog.Fatal(err)
  173.                 }
  174.                 resp.Close = true
  175.                 resp.Body.Close()
  176.  
  177.                 totalBytes += uint64(bytes)
  178.                 duration := durafmt.Parse(v.totalDuration).LimitFirstN(1).String()
  179.                 //speed := uint64(v.totalDuration * time.Millisecond)
  180.  
  181.                 ctxLog.Infof("Written %v (%v)", humanize.Bytes(totalBytes), duration)
  182.         }
  183. }
  184.  
  185. func getPlaylist(log *log.Entry, channel, urlStr string, dlc chan *Download) {
  186.         channelsOnline.Add(channel)
  187.         defer channelsOnline.Remove(channel)
  188.  
  189.         ctxLog := log.WithField("status", "DOWNLOAD").WithField("func", "GET")
  190.  
  191.         var recDuration time.Duration = 0
  192.         var req *http.Request
  193.  
  194.         cache, _ := lru.New(1024)
  195.  
  196.         playlistUrl, err := url.Parse(urlStr)
  197.         if err != nil {
  198.                 ctxLog.Fatal(err)
  199.         }
  200.         for {
  201.                 req, err = http.NewRequest("GET", urlStr, nil)
  202.                 if err != nil {
  203.                         ctxLog.Fatal(err)
  204.                 }
  205.                 ctxLog.Tracef("URL: %v", urlStr)
  206.  
  207.                 resp, err := doRequestWithRetries(req)
  208.                 if err != nil {
  209.                         ctxLog.Error(err)
  210.                 }
  211.                 //defer resp.Body.Close()
  212.  
  213.                 playlist, listType, err := m3u8.DecodeFrom(resp.Body, true)
  214.                 if err != nil {
  215.                         ctxLog.Fatal(err)
  216.                 }
  217.                 resp.Close = true
  218.                 resp.Body.Close()
  219.  
  220.                 if listType == m3u8.MEDIA {
  221.                         mpl := playlist.(*m3u8.MediaPlaylist)
  222.                         for _, v := range mpl.Segments {
  223.                                 if v != nil {
  224.                                         var msURI string
  225.  
  226.                                         if strings.HasPrefix(v.URI, "http") {
  227.                                                 msURI, err = url.QueryUnescape(v.URI)
  228.                                                 if err != nil {
  229.                                                         ctxLog.Fatal(err)
  230.                                                 }
  231.                                         } else {
  232.                                                 msUrl, err := playlistUrl.Parse(v.URI)
  233.                                                 if err != nil {
  234.                                                         ctxLog.Print(err)
  235.                                                         continue
  236.                                                 }
  237.                                                 msURI, err = url.QueryUnescape(msUrl.String())
  238.                                                 if err != nil {
  239.                                                         ctxLog.Fatal(err)
  240.                                                 }
  241.                                         }
  242.                                         _, hit := cache.Get(msURI)
  243.                                         if !hit {
  244.                                                 cache.Add(msURI, nil)
  245.                                                 recDuration += time.Duration(int64(v.Duration * 1000000000))
  246.                                                 dlc <- &Download{msURI, recDuration}
  247.                                         }
  248.                                 }
  249.                         }
  250.                         if mpl.Closed {
  251.                                 ctxLog.Info("Stream ended")
  252.                                 close(dlc)
  253.                                 return
  254.                         } else {
  255.                                 time.Sleep(time.Duration(int64(mpl.TargetDuration * 1000000000)))
  256.                         }
  257.                 } else {
  258.                         ml := playlist.(*m3u8.MasterPlaylist)
  259.                         log.Printf("%v", ml)
  260.                         log.Fatal("Not a valid media playlist")
  261.                 }
  262.         }
  263. }
  264.  
  265. func TimeIn(t time.Time, name string) (time.Time, error) {
  266.         loc, err := time.LoadLocation(name)
  267.         if err == nil {
  268.                 t = t.In(loc)
  269.         }
  270.         return t, err
  271. }
  272.  
  273. func doRequestWithRetries(req *http.Request) (*http.Response, error) {
  274.         ctxLog := log.WithField("general", "GET")
  275.         var err error
  276.         var resp *http.Response
  277.  
  278.         //req.Close = true
  279.         //req.Header.Set("Connection", "close") // prevent 'too many open files' error
  280.         req.Header.Set("User-Agent", USER_AGENT)
  281.  
  282.         for _, backoff := range backoffSchedule {
  283.                 resp, err = client.Do(req)
  284.                 if err == nil {
  285.                         break
  286.                 }
  287.                 ctxLog.Errorf("Request error: '%v' Retrying in %v", err, backoff)
  288.                 time.Sleep(backoff)
  289.  
  290.         }
  291.         if err != nil {
  292.                 return resp, err
  293.         }
  294.         return resp, err
  295. }
  296.  
  297. func doRequest(c *http.Client, req *http.Request) (*http.Response, error) {
  298.         req.Close = true
  299.         req.Header.Set("Connection", "close") // prevent 'too many open files' error
  300.         req.Header.Set("User-Agent", USER_AGENT)
  301.         resp, err := c.Do(req)
  302.         return resp, err
  303. }
  304.  
  305. func cleanup() {
  306.         log.WithField("general", "CLI").Info("Interrupted! SIGTERM signal. <Ctrl>+<C> pressed. Graceful shutdown...")
  307. }
  308.  
  309. func (l *Online) Has(channel string) bool {
  310.         for _, v := range l.Channels {
  311.                 if v == channel {
  312.                         return true
  313.                 }
  314.         }
  315.         return false
  316. }
  317.  
  318. func (l *Online) Add(u string) {
  319.         l.Channels = append(l.Channels, u)
  320. }
  321.  
  322. func (l *Online) Remove(u string) {
  323.         for i, v := range l.Channels {
  324.                 if v == u {
  325.                         l.Channels = append(l.Channels[:i], l.Channels[i+1:]...)
  326.                 }
  327.         }
  328. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement