Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime
- import getopt
- import logging
- import os
- import shutil
- import subprocess
- import sys
- import time
- import config
- class KickRecorder:
- def __init__(self):
- self.ffmpeg_path = "ffmpeg"
- self.disable_ffmpeg = False
- self.refresh = 15
- self.root_path = config.root_path
- self.username = config.username
- self.quality = "best"
- def run(self):
- recorded_path = os.path.join(self.root_path, "recorded", self.username)
- processed_path = os.path.join(self.root_path, "processed", self.username)
- os.makedirs(recorded_path, exist_ok=True)
- os.makedirs(processed_path, exist_ok=True)
- if self.refresh < 15:
- logging.warning("Check interval should not be lower than 15 seconds.")
- self.refresh = 15
- # Fix any previously recorded files
- for f in os.listdir(recorded_path):
- full_path = os.path.join(recorded_path, f)
- if os.path.isfile(full_path):
- self.process_recorded_file(full_path, os.path.join(processed_path, f))
- logging.info("Checking Kick stream for %s every %s seconds", self.username, self.refresh)
- self.loop_check(recorded_path, processed_path)
- def is_kick_stream_live(self):
- try:
- result = subprocess.run(
- ["streamlink", f"https://kick.com/{self.username}"],
- capture_output=True,
- text=True,
- timeout=10
- )
- return "Available streams:" in result.stdout
- except Exception as e:
- logging.error("Error checking stream status: %s", e)
- return False
- def loop_check(self, recorded_path, processed_path):
- while True:
- if self.is_kick_stream_live():
- logging.info("%s is live. Starting recording.", self.username)
- filename = f"{self.username} - {datetime.datetime.now().strftime('%Y-%m-%d %Hh%Mm%Ss')}.mp4"
- filename = "".join(x for x in filename if x.isalnum() or x in [" ", "-", "_", "."])
- recorded_filename = os.path.join(recorded_path, filename)
- processed_filename = os.path.join(processed_path, filename)
- subprocess.call([
- "streamlink", f"https://kick.com/{self.username}", self.quality,
- "-o", recorded_filename
- ])
- if os.path.exists(recorded_filename):
- self.process_recorded_file(recorded_filename, processed_filename)
- else:
- logging.warning("Recording file not found after recording session.")
- else:
- logging.info("%s is offline. Checking again in %s seconds.", self.username, self.refresh)
- time.sleep(self.refresh)
- def process_recorded_file(self, recorded_filename, processed_filename):
- if self.disable_ffmpeg:
- logging.info("Moving %s to processed directory", recorded_filename)
- shutil.move(recorded_filename, processed_filename)
- else:
- logging.info("Fixing and processing %s", recorded_filename)
- self.ffmpeg_copy_and_fix_errors(recorded_filename, processed_filename)
- def ffmpeg_copy_and_fix_errors(self, recorded_filename, processed_filename):
- try:
- subprocess.call([
- self.ffmpeg_path, "-err_detect", "ignore_err", "-i", recorded_filename, "-c", "copy",
- processed_filename
- ])
- os.remove(recorded_filename)
- except Exception as e:
- logging.error("FFmpeg error: %s", e)
- def main(argv):
- kick_recorder = KickRecorder()
- usage = "kick-recorder.py -u <username> -q <quality>"
- logging.basicConfig(filename="kick-recorder.log", level=logging.INFO)
- logging.getLogger().addHandler(logging.StreamHandler())
- try:
- opts, _ = getopt.getopt(argv, "hu:q:l:", ["username=", "quality=", "log=", "logging=", "disable-ffmpeg"])
- except getopt.GetoptError:
- print(usage)
- sys.exit(2)
- for opt, arg in opts:
- if opt == "-h":
- print(usage)
- sys.exit()
- elif opt in ("-u", "--username"):
- kick_recorder.username = arg
- elif opt in ("-q", "--quality"):
- kick_recorder.quality = arg
- elif opt in ("-l", "--log", "--logging"):
- level = getattr(logging, arg.upper(), None)
- if not isinstance(level, int):
- raise ValueError(f"Invalid log level: {arg}")
- logging.basicConfig(level=level)
- elif opt == "--disable-ffmpeg":
- kick_recorder.disable_ffmpeg = True
- kick_recorder.run()
- if __name__ == "__main__":
- main(sys.argv[1:])
Advertisement
Add Comment
Please, Sign In to add comment