Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package recorder
- import (
- "fmt"
- "io"
- "net/http"
- "net/url"
- "os"
- "os/signal"
- "strings"
- "syscall"
- "time"
- "github.com/dustin/go-humanize"
- "github.com/grafov/m3u8"
- "github.com/hako/durafmt"
- lru "github.com/hashicorp/golang-lru"
- log "github.com/sirupsen/logrus"
- "github.com/wmw9/rekoda/internal/config"
- "github.com/wmw9/twitchpl"
- )
- var (
- channels = &config.ConfigStruct.Channels
- //streamsDir = &config.ConfigStruct.StreamsDir
- USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0"
- client = &http.Client{Timeout: 2 * time.Second}
- channelsOnline = Online{}
- backoffSchedule = []time.Duration{1 * time.Second, 2 * time.Second, 3 * time.Second}
- )
- type Download struct {
- URI string
- totalDuration time.Duration
- }
- type Online struct {
- Channels []string
- }
- func Start() {
- ctxLog := log.WithField("general", "REC")
- ctxLog.Info("Recorder starting")
- if i := config.GetRecAmountChannels(); i < 1 {
- ctxLog.Error("There's nothing to record, try to add and enable channels for recording. Type 'rekoda channel' for more info.")
- return
- }
- // Capture <Ctrl>+<C>
- c := make(chan os.Signal)
- signal.Notify(c, os.Interrupt, syscall.SIGTERM)
- go func() {
- <-c
- cleanup()
- os.Exit(1)
- }()
- for {
- ctxLog.Tracef("Channels being recorded right now: %v", channelsOnline)
- for _, u := range *channels {
- ctxLog.Tracef("Checking %v", u.User)
- if u.Enabled && !channelsOnline.Has(u.User) {
- //if u.Enabled {
- cLog := ctxLog.WithField("channel", u.User)
- cLog.Debugf("Checking '%v' channel. Trying to get m3u8 live playlist", u.User)
- pl, err := twitchpl.Get(u.User)
- if err != nil {
- cLog.Debug("Channel is offline or banned.")
- continue
- }
- cLog.Info("Went online! 🤩")
- switch u.Quality {
- case "best":
- cLog.Info("Opening stream: best quality")
- hlsURL, err := pl.Best()
- if err != nil {
- ctxLog.Error(err)
- }
- cLog.Debugf("URL: %v", hlsURL)
- record(cLog, u.User, hlsURL)
- case "worst":
- cLog.Info("Opening stream: worst quality")
- hlsURL, err := pl.Worst()
- if err != nil {
- ctxLog.Error(err)
- }
- cLog.Debugf("URL: %v", hlsURL)
- record(cLog, u.User, hlsURL)
- case "audio_only":
- cLog.Info("Opening stream: audio_only")
- hlsURL, err := pl.Audio()
- if err != nil {
- ctxLog.Error(err)
- }
- cLog.Debugf("URL: %v", hlsURL)
- record(cLog, u.User, hlsURL)
- }
- }
- time.Sleep(1 * time.Second)
- }
- ctxLog.Tracef("Sleep for 1 minute")
- time.Sleep(1 * time.Minute)
- }
- //forever := make(chan bool)
- //<-forever
- }
- func record(log *log.Entry, channel string, hlsURL string) {
- // Get local time
- now, err := TimeIn(time.Now(), "Local")
- if err != nil {
- log.Error(err)
- }
- log.Tracef("Local time: %v", now)
- // Define file name
- fname := fmt.Sprintf("%v_%v.ts", channel, now.Format("2006-01-02_15-04-05"))
- fLog := log.WithField("file", fname)
- // Create channel directory
- sep := string(os.PathSeparator)
- log.Trace("Trying to create channel directory")
- channelDir := config.ConfigStreamsDir + sep + channel
- if err := os.MkdirAll(channelDir, 0777); err != nil {
- fLog.Error(err)
- return
- }
- filepath := channelDir + sep + fname
- fLog.Debugf("Stream directory: '%s'", channelDir)
- fLog.Infof("Writing stream to file: %v", filepath)
- dlc := make(chan *Download, 1024)
- go getPlaylist(fLog, channel, hlsURL, dlc)
- go downloadSegment(fLog, filepath, dlc)
- }
- func downloadSegment(log *log.Entry, filepath string, dlc chan *Download) {
- ctxLog := log.WithField("status", "DOWNLOAD").WithField("func", "SEG")
- var totalBytes uint64 = 0
- var bytes int64
- var err error
- var req *http.Request
- out, err := os.Create(filepath)
- if err != nil {
- ctxLog.Error(err)
- }
- defer out.Close()
- for v := range dlc {
- req, err = http.NewRequest("GET", v.URI, nil)
- if err != nil {
- ctxLog.Info(err)
- }
- resp, err := doRequestWithRetries(req)
- if err != nil {
- ctxLog.Error(err)
- continue
- }
- //defer resp.Body.Close()
- if resp.StatusCode != 200 {
- ctxLog.Errorf("Received HTTP %v for %v\n", resp.StatusCode, v.URI)
- continue
- }
- bytes, err = io.Copy(out, resp.Body)
- if err != nil {
- ctxLog.Fatal(err)
- }
- resp.Close = true
- resp.Body.Close()
- totalBytes += uint64(bytes)
- duration := durafmt.Parse(v.totalDuration).LimitFirstN(1).String()
- //speed := uint64(v.totalDuration * time.Millisecond)
- ctxLog.Infof("Written %v (%v)", humanize.Bytes(totalBytes), duration)
- }
- }
- func getPlaylist(log *log.Entry, channel, urlStr string, dlc chan *Download) {
- channelsOnline.Add(channel)
- defer channelsOnline.Remove(channel)
- ctxLog := log.WithField("status", "DOWNLOAD").WithField("func", "GET")
- var recDuration time.Duration = 0
- var req *http.Request
- cache, _ := lru.New(1024)
- playlistUrl, err := url.Parse(urlStr)
- if err != nil {
- ctxLog.Fatal(err)
- }
- for {
- req, err = http.NewRequest("GET", urlStr, nil)
- if err != nil {
- ctxLog.Fatal(err)
- }
- ctxLog.Tracef("URL: %v", urlStr)
- resp, err := doRequestWithRetries(req)
- if err != nil {
- ctxLog.Error(err)
- }
- //defer resp.Body.Close()
- playlist, listType, err := m3u8.DecodeFrom(resp.Body, true)
- if err != nil {
- ctxLog.Fatal(err)
- }
- resp.Close = true
- resp.Body.Close()
- if listType == m3u8.MEDIA {
- mpl := playlist.(*m3u8.MediaPlaylist)
- for _, v := range mpl.Segments {
- if v != nil {
- var msURI string
- if strings.HasPrefix(v.URI, "http") {
- msURI, err = url.QueryUnescape(v.URI)
- if err != nil {
- ctxLog.Fatal(err)
- }
- } else {
- msUrl, err := playlistUrl.Parse(v.URI)
- if err != nil {
- ctxLog.Print(err)
- continue
- }
- msURI, err = url.QueryUnescape(msUrl.String())
- if err != nil {
- ctxLog.Fatal(err)
- }
- }
- _, hit := cache.Get(msURI)
- if !hit {
- cache.Add(msURI, nil)
- recDuration += time.Duration(int64(v.Duration * 1000000000))
- dlc <- &Download{msURI, recDuration}
- }
- }
- }
- if mpl.Closed {
- ctxLog.Info("Stream ended")
- close(dlc)
- return
- } else {
- time.Sleep(time.Duration(int64(mpl.TargetDuration * 1000000000)))
- }
- } else {
- ml := playlist.(*m3u8.MasterPlaylist)
- log.Printf("%v", ml)
- log.Fatal("Not a valid media playlist")
- }
- }
- }
- func TimeIn(t time.Time, name string) (time.Time, error) {
- loc, err := time.LoadLocation(name)
- if err == nil {
- t = t.In(loc)
- }
- return t, err
- }
- func doRequestWithRetries(req *http.Request) (*http.Response, error) {
- ctxLog := log.WithField("general", "GET")
- var err error
- var resp *http.Response
- //req.Close = true
- //req.Header.Set("Connection", "close") // prevent 'too many open files' error
- req.Header.Set("User-Agent", USER_AGENT)
- for _, backoff := range backoffSchedule {
- resp, err = client.Do(req)
- if err == nil {
- break
- }
- ctxLog.Errorf("Request error: '%v' Retrying in %v", err, backoff)
- time.Sleep(backoff)
- }
- if err != nil {
- return resp, err
- }
- return resp, err
- }
- func doRequest(c *http.Client, req *http.Request) (*http.Response, error) {
- req.Close = true
- req.Header.Set("Connection", "close") // prevent 'too many open files' error
- req.Header.Set("User-Agent", USER_AGENT)
- resp, err := c.Do(req)
- return resp, err
- }
- func cleanup() {
- log.WithField("general", "CLI").Info("Interrupted! SIGTERM signal. <Ctrl>+<C> pressed. Graceful shutdown...")
- }
- func (l *Online) Has(channel string) bool {
- for _, v := range l.Channels {
- if v == channel {
- return true
- }
- }
- return false
- }
- func (l *Online) Add(u string) {
- l.Channels = append(l.Channels, u)
- }
- func (l *Online) Remove(u string) {
- for i, v := range l.Channels {
- if v == u {
- l.Channels = append(l.Channels[:i], l.Channels[i+1:]...)
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement