Advertisement
FredMSloniker

Analyze Playlist.py

May 24th, 2022 (edited)
149
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 17.80 KB | None | 0 0
  1. # Given the URL of a playlist and the name of a CSV file (which will be created if necessary), update the contents of the CSV file to reflect the contents of the playlist and any changes in, for instance, accessibility.
  2.  
  3. ytdlp = "yt-dlp.exe" # The path to your copy of YT-DLP. Relative paths are supported, but you need the extension.
  4.  
  5. from json import loads
  6. from os import path, replace
  7. from re import finditer, sub
  8. from shutil import get_terminal_size
  9. from subprocess import run
  10. from sys import argv, exit
  11.  
  12. # Version history:
  13.     # (no version number): initial release.
  14.     # (still no version number): added reporting of number of changes to CSV.
  15.     # (version number next time seriously): fixed bug in previous feature.
  16.     # 2022-May-28: added flag support ('-nsp' and '-nkr').
  17.     # 2022-Jun-06: a video's status changing from 'New' to 'Ok' is no longer noteworthy.
  18.     # 2022-Jun-30: if the list of URLs in the CSV doesn't match the list of URLs in the playlist, the program reports how many URLs are new (in the playlist but not the CSV), old (in the CSV but not the playlist), and common (in both).
  19.     # 2022-Jul-01: changed the format of the previous report to be more clear.
  20.     # 2022-Jul-02: added a 'Description' column (saving the video's description) between the video name and channel ID. Because this can be spammy, also added the '-nsd' flag.
  21. arg_flags = {}
  22. valid_arg_flags = dict.fromkeys(["-nsp", "-nkr", "-nsd"], True)
  23. okay_go = True
  24. if len(argv) >= 3: # we have enough arguments
  25.     if len(argv) > 3: # more than enough
  26.         for i in range(1, len(argv) - 2): # are they valid?
  27.             if valid_arg_flags.get(argv[i], False):
  28.                 arg_flags[argv[i]] = True
  29.             else:
  30.                 okay_go = False
  31. else:
  32.     okay_go = False
  33. if not okay_go:
  34.     exit("\n".join([
  35.         "Usage: %s [flags] [playlist URL] [CSV file]" % argv[0],
  36.         "",
  37.         "Valid flags:",
  38.         "    -nsd: do Not Save the video's Description to the CSV.",
  39.         "          (Does not change existing description information.)",
  40.         "    -nsp: do Not Save Playlist number changes to the CSV.",
  41.         "    -nkr: do Not Keep data on videos Removed from the playlist.",
  42.         "          (Only data already in the CSV will be removed.)",
  43.     ]))
  44. last_terminal_width = 0
  45. ellipses = " ... "
  46. left_slice = 0
  47. right_slice = 0
  48.  
  49. def nice_print(s = ""):
  50.     "Nicely print a string on the screen, taking into account its width."
  51.     global last_terminal_width, left_slice, right_slice
  52.     terminal_width = get_terminal_size().columns - 1
  53.     if last_terminal_width != terminal_width:
  54.         last_terminal_width = terminal_width
  55.         left_slice = round((terminal_width - len(ellipses)) / 2)
  56.         right_slice = terminal_width - len(ellipses) - left_slice
  57.     print("\r" + (" " * terminal_width), end = "")
  58.     if len(s) <= terminal_width:
  59.         print("\r" + s, end = "")
  60.     else:
  61.         print("\r" + s[0:left_slice] + ellipses + s[-right_slice:], end = "")
  62.  
  63. def what_is_it(s):
  64.     "Given a potentially incomplete path, return whether the item in question is a file, a directory, or nothing."
  65.     if path.exists(s):
  66.         if path.isfile(s):
  67.             return "file"
  68.         elif path.isdir(s):
  69.             return "dir"
  70.         else: # I don't think this should ever happen, but...
  71.             exit("Error: %s exists, but is neither a file nor a directory? Wha?" % s)
  72.     else:
  73.         return False
  74.  
  75. ytdlp = path.abspath(ytdlp)
  76. if what_is_it(ytdlp) != "file":
  77.     exit("Error: could not find YT-DLP at %s. Double-check the path and edit the header of this file." % ytdlp)
  78.  
  79. def pluralize(n, s1 = "", s2 = "s"):
  80.     "Return the proper pluralization string for a number."
  81.     if n == 1: # Because Python has weird ideas about what constitutes falseness.
  82.         return s1
  83.     else:
  84.         return s2
  85.  
  86. def entryies(n):
  87.     return pluralize(n, "y", "ies")
  88.  
  89. def csv_to_list(csv):
  90.     "Given the contents of a CSV file, return a list of the rows in that file, which are lists of the columns in those rows."
  91.     csv = csv.strip() # remove any blank bits at beginning and end
  92.     output = []
  93.     row = []
  94.     stops = [n.start() for n in finditer('[",\n]', csv)] # Commas, double quotes, and newlines are where interesting stuff happens.
  95.     stops.append(len(csv)) # We also want to check the end of the contents.
  96.     element = ""
  97.     starting_element = True # Have we just started an element?
  98.     quoted_element = False # Are we processing a quoted element?
  99.     cursor = 0 # Where are we in the CSV?
  100.     quote_lock = 0 # Are we potentially processing an escaped double quote (i.e., is our 'quote lock' on?)
  101.     for n in stops:
  102.         symbol = csv[n: n + 1]
  103.         if starting_element: # Did we just start an element?
  104.             if n == cursor: # And did we hit a stop immediately?
  105.                 if symbol == '"': # This is the start of a quoted element.
  106.                     starting_element, quoted_element = False, True
  107.                 else: # This is the end of an empty element.
  108.                     row.append('')
  109.                     if symbol != ',': #And the end of the row.
  110.                         output.append(row)
  111.                         row = []
  112.                 cursor = cursor + 1 # Regardless, we want to do this.
  113.             else: # No? Well, what did we hit?
  114.                 if symbol == '"': # This is a double quote in the middle of an unquoted element.
  115.                     pass # We don't care about it.
  116.                 else: # This is the end of an unquoted element.
  117.                     row.append(csv[cursor: n])
  118.                     if symbol != ',': # And the end of the row.
  119.                         output.append(row)
  120.                         row = []
  121.                     starting_element, cursor = True, n + 1
  122.         else: # We didn't just start an element, so... is it quoted or not?
  123.             if quoted_element: # It's a quoted element.
  124.                 if quote_lock: # It's a quoted element with the quote lock on.
  125.                     if quote_lock + 1 == n: # It's a quoted element where the quote lock was JUST turned on.
  126.                         if symbol == '"': # We've hit an escaped double quote.
  127.                             element = element + csv[cursor: n] # Technically including the escaping double quote, not the escaped one, but hey.
  128.                             cursor, quote_lock = n + 1, False
  129.                         else: # We've hit the end of a quoted element.
  130.                             row.append(element + csv[cursor: n - 1]) # Leaving off the closing double quote of the element.
  131.                             element, starting_element, quoted_element, quote_lock = "", True, False, False
  132.                             cursor = n + 1
  133.                             if symbol != ',': # And the end of the row.
  134.                                 output.append(row)
  135.                                 row = []
  136.                     else: # There was an unescaped double quote in the middle of a quoted element!
  137.                         exit("Error: Malformed CSV, line %d: double quote was not escaped with another double quote!" % len(output) + 1)
  138.                 else: # It's a quoted element without the quote lock on.
  139.                     if symbol == '"': # This could be either an escaping double quote or the end of a quoted element. We don't know yet.
  140.                         quote_lock = n
  141.                     elif symbol != '': # We don't care about commas or newlines in the middle of a quoted element.
  142.                         pass
  143.                     else: # The line has ended without closing the quoted element!
  144.                         exit("Error: Malformed CSV, line %d: last element is quoted, but quote is not closed!" % len(output) + 1)
  145.             else: # It's not a quoted element.
  146.                 if symbol == '"': # We don't care about double quotes in the middle of an unquoted element.
  147.                     pass
  148.                 else: # This is the end of the element.
  149.                     row.append(element + csv[cursor: n])
  150.                     if symbol != ',': # And the end of the row.
  151.                         output.append(row)
  152.                         row = []
  153.                     element, starting_element = "", True
  154.                     cursor = n + 1
  155.     return output
  156.  
  157. def list_to_csv(list_of_lists):
  158.     "Given a list of lists of values, return a CSV-encoded string."
  159.     output = []
  160.     for list_line in list_of_lists:
  161.         row = []
  162.         for element in list_line:
  163.             row.append('"%s"' % sub('"', '""', str(element)))
  164.         output.append(",".join(row))
  165.     return "\n".join(output)
  166.  
  167. def load_csv(filename):
  168.     "Given a filename, load and parse its (presumably) CSV contents. Returns False if it is unable to do so."
  169.     try:
  170.         with open(filename, encoding = "utf-8") as handle:
  171.             return csv_to_list(handle.read())
  172.     except (FileNotFoundError, PermissionError):
  173.         return False
  174.  
  175. csv_header = {"#": "playlist_index", "URL": "url", "Video Name": "title", "Description": "description", "Channel ID": "channel_id", "Status": True, "Changed?": True, "Auto Notes": True, "User Notes": True}
  176. def load_url_data_from_csv(csv_filename):
  177.     "Given a filename, attempt to open it and import its CSV data. (If it doesn't exist, create blank data to import.)  Perform some sanity checks to make sure it's valid and that there are no duplicate URLs. Then return a dictionary whose keys are URLs and whose values are dictionaries whose keys are the column names and whose values are the contents."
  178.     # Does the file exist? If so, can we read it?
  179.     csv_filetype = what_is_it(csv_filename)
  180.     if csv_filetype:
  181.         if csv_filetype == "file":
  182.             csv_url_data = load_csv(csv_filename)
  183.             if csv_url_data == False:
  184.                 exit("Error: couldn't read the contents of %s!" % csv_filename)
  185.         else:
  186.             exit("Error: %s is a directory!" % csv_filename)
  187.     else:
  188.         print("Warning: CSV file %s not found. A new one will be created." % csv_filename)
  189.         csv_url_data = [[]]
  190.         for s in csv_header:
  191.             csv_url_data[0].append(s)
  192.     # A simple check that the data is the correct size and has the correct header.
  193.     if len(csv_url_data) == 0:
  194.         exit("Error: CSV file %s exists, but contains no data!" % csv_filename)
  195.     for i in range(len(csv_url_data)):
  196.         if len(csv_url_data[i]) != len(csv_header):
  197.             exit("Error: CSV file %s, line %s, contains %d entr%s (should contain %d)!" % (csv_filename, i + 1, len(csv_url_data[i]), entryies(len(csv_url_data[i])), len(csv_header)))
  198.     for a in csv_url_data[0]:
  199.         if not csv_header.get(a, False):
  200.             exit("Error: CSV file %s header not valid!" % csv_filename)
  201.     # Okay, we got this far. Now we start constructing the dictionary.
  202.     output = {}
  203.     for i in range(1, len(csv_url_data)):
  204.         entry = {}
  205.         for j, s in enumerate(csv_header):
  206.             entry[s] = csv_url_data[i][j]
  207.         if output.get(entry["URL"], False):
  208.             exit("Error: URL %s appears more than once in CSV file %s!" % (entry["URL"], csv_filename))
  209.         output[entry["URL"]] = entry
  210.     if len(output) > 0:
  211.         print("CSV file loaded. %d entr%s found." % (len(output), entryies(len(output))))
  212.     return output
  213.  
  214. def load_url_data_from_playlist(url):
  215.     "Given the playlist URL, return a dictionary in the same format as load_url_data_from_csv. Display any errors."
  216.     thingy = run([ytdlp, "-j", "--flat-playlist", url], capture_output = True, text = True)
  217.     s = thingy.stderr.strip()
  218.     if s:
  219.         print("Errors detected during playlist scan:\n%s" % s)
  220.     entries = thingy.stdout.split("\n")
  221.     output = {}
  222.     for entry in entries:
  223.         if entry:
  224.             line = {}
  225.             jthingy = loads(entry)
  226.             for header_name, jthingy_key in csv_header.items():
  227.                 line[header_name] = str(jthingy.get(jthingy_key, "")) # Leave the fields we don't get from YT-DLP blank
  228.             output[jthingy["url"]] = line
  229.     return output
  230.  
  231. def save_csv(filename, contents):
  232.     "Given a list of lists of values, save it in CSV format ATOMICALLY, i.e. without causing a corrupt file to exist if it's overwriting an existing file."
  233.     target_file_path = path.abspath(filename)
  234.     output = list_to_csv(contents)
  235.     with open(filename + ".tmp", "w", encoding = "utf-8") as f:
  236.         f.write(output)
  237.     replace(filename + ".tmp", target_file_path)
  238.  
  239. def save_url_data(filename, contents):
  240.     "Given a dictionary of the format produced by the load_url_data_from* functions (q.v.), save it in CSV format using save_csv."
  241.     output = []
  242.     for URL, entry in contents.items():
  243.         line = []
  244.         for s in csv_header:
  245.             line.append(entry[s])
  246.         output.append(line)
  247.     output.sort(key = lambda entry: entry[1]) # Secondary sort by URL
  248.     output.sort(key = lambda entry: float(entry[0] or "inf")) # Primary sort by playlist number
  249.     output.insert(0, [])
  250.     for s in csv_header:
  251.         output[0].append(s)
  252.     save_csv(filename, output)
  253.  
  254. # Load the URL data from the CSV file, if any.
  255. csv_filename = path.abspath(argv[len(argv) - 1])
  256. print("Loading CSV file %s..." % csv_filename)
  257. csv_url_data = load_url_data_from_csv(csv_filename)
  258.  
  259. # If -nkr is given, remove any 'removed' items.
  260. if arg_flags.get("-nkr", False):
  261.     zz = len(csv_url_data)
  262.     for entry in list(csv_url_data.keys()):
  263.         if csv_url_data[entry]["Status"] == "Removed":
  264.             del csv_url_data[entry]
  265.     if zz != len(csv_url_data):
  266.         save_url_data(csv_filename, csv_url_data)
  267.         print("%d item%s no longer in the playlist removed from the CSV." % (zz - len(csv_url_data), pluralize(zz - len(csv_url_data))))
  268.  
  269. # Load the URL data from the playlist.
  270. print("Getting video data from %s..." % argv[len(argv) - 2])
  271. playlist_url_data = load_url_data_from_playlist(argv[len(argv) - 2])
  272. if len(playlist_url_data) == 0:
  273.     sys.exit("No videos in the playlist! (Did you mistype the URL?)")
  274. print("Playlist data loaded. %d entr%s found." % (len(playlist_url_data), entryies(len(playlist_url_data))))
  275.  
  276. def prepare_set():
  277.     global urls_to_check
  278.     "Compare the two sets of URLs, prepare the set to check (the union of the two), and report on the similarities and differences. (Mainly a function to keep some variables local.)"
  279.     csv_urls, playlist_urls = set(s for s in csv_url_data), set(s for s in playlist_url_data)
  280.     urls_to_check = playlist_urls | csv_urls
  281.     new_urls, old_urls = playlist_urls - csv_urls, csv_urls - playlist_urls
  282.     parenthetical = []
  283.     if len(new_urls) > 0:
  284.         parenthetical.append("%d not in CSV" % len(new_urls))
  285.     if len(old_urls) > 0:
  286.         parenthetical.append("%d not in playlist" % len(old_urls))
  287.     if len(parenthetical) > 0:
  288.         parenthetical = " (%s)" % (", ".join(parenthetical))
  289.     else:
  290.         parenthetical = ""
  291.     print("%d URL%s total to analyze%s." % (len(urls_to_check), pluralize(len(urls_to_check)), parenthetical))
  292. prepare_set()
  293.  
  294. changed_entries = 0
  295. for i, url_to_check in enumerate(urls_to_check):
  296.     nice_print("Checking URL %d/%d (%.1f%%), %s..." % (i + 1, len(urls_to_check), (i + 1) * 100 / len(urls_to_check), url_to_check))
  297.     video_data = run([ytdlp, "-j", url_to_check], capture_output = True, text = True)
  298.     couldaccess = video_data.stdout.split("\n")[0].strip() # this is either a wodge of JSON (which will test true), in which case we could access it, or an empty string (which will test false), in which case we couldn't.
  299.     entry = {}
  300.     for key in csv_header: # Initialize the entries in, well, entry for convenience later.
  301.         entry[key] = ""
  302.     csv_entry, playlist_entry = csv_url_data.get(url_to_check, False), playlist_url_data.get(url_to_check, False)
  303.     # The description of the video is in the data from the video URL, not the playlist URL. So if we're getting it, we have to parse couldaccess, if possible.
  304.     if couldaccess and (not arg_flags.get("-nsd")):
  305.         playlist_entry["Description"] = loads(couldaccess)["description"]
  306.     if csv_entry:
  307.         # We already have an entry for this URL in the CSV. Do we have one in the playlist?
  308.         if playlist_entry: # We need to merge the data.
  309.             new_auto_notes = []
  310.             if csv_entry["Auto Notes"]:
  311.                 new_auto_notes.append(csv_entry["Auto Notes"])
  312.             for key, value in playlist_entry.items():
  313.                 entry[key] = value
  314.             # if we aren't saving description info, keep the old info, whatever it is. Otherwise, note changes, if any.
  315.             if arg_flags.get("-nsd"):
  316.                 entry["Description"] = csv_entry["Description"]
  317.             else:
  318.                 if (entry["Description"] != csv_entry["Description"]) and (csv_entry["Description"] != ""):
  319.                     new_auto_notes.append("Previous description:\n%s" % csv_entry["Description"])
  320.             if (entry["#"] != csv_entry["#"]) and not arg_flags.get("-nsp"):
  321.                 new_auto_notes.append("Was previously playlist entry %s." % csv_entry["#"])
  322.             if entry["Video Name"] != csv_entry["Video Name"]:
  323.                 new_auto_notes.append("Was previously named %s." % csv_entry["Video Name"])
  324.             if entry["Channel ID"] != csv_entry["Channel ID"]:
  325.                 new_auto_notes.append("Was previously channel ID %s." % csv_entry["Channel ID"])
  326.             # 'Status', 'Auto Notes', and 'User Notes' were initialized to '' in entry earlier and won't have been overwritten.
  327.             # Valid statuses are 'New' (added to the playlist since the last time the program was run), 'Removed' (no longer in the playlist), 'Error' (couldn't access the file), and 'Ok' (no problems accessing the file).
  328.             if couldaccess:
  329.                 entry["Status"] = "Ok"
  330.                 if csv_entry["Status"] == "Removed":
  331.                     new_auto_notes.append("Was re-added to playlist.")
  332.                 if csv_entry["Status"] == "Error":
  333.                     new_auto_notes.append("Was able to access the file this time.")
  334.             else:
  335.                 entry["Status"] = "Error"
  336.                 sanitized_error = video_data.stderr.strip()
  337.                 if csv_entry["Status"] == "Removed":
  338.                     new_auto_notes.append("Was re-added to playlist, but cannot access.\n%s" % sanitized_error)
  339.                 elif csv_entry["Status"] != "Error":
  340.                     new_auto_notes.append("Was unable to access file.\n%s" % sanitized_error)
  341.                 else: # not sure if needed, but...
  342.                     pass
  343.             entry["Changed?"] = (entry["Status"] != csv_entry["Status"]) and "Y" or "N"
  344.             if (entry["Status"] == "Ok") and (csv_entry["Status"] == "New"): # New to Ok status change unremarkable.
  345.                 entry["Changed?"] = "N"
  346.             entry["User Notes"] = csv_entry["User Notes"]
  347.             entry["Auto Notes"] = "\n".join(new_auto_notes)
  348.         else: # The URL has been removed from the playlist. We won't touch it other than to update the status and remove the playlist ID.
  349.             for key, value in csv_entry.items():
  350.                 entry[key] = value
  351.             entry["Changed?"] = (entry["Status"] == "Removed") and "N" or "Y"
  352.             entry["Status"] = "Removed"
  353.             entry["#"] = ""
  354.     else: # This is a new entry, so we need to add the data.
  355.         for key, value in playlist_entry.items():
  356.             entry[key] = value
  357.         if arg_flags.get("-nsd"):
  358.             entry["Description"] = ""
  359.         if couldaccess:
  360.             entry["Status"] = "New"
  361.             entry["Auto Notes"] = ""
  362.         else:
  363.             entry["Status"] = "Error"
  364.             entry["Auto Notes"] = "Was unable to access file.\n%s" % video_data.stderr.strip()
  365.         entry["Changed?"] = "Y"
  366.         entry["User Notes"] = ""
  367.     if entry["Changed?"] == "Y":
  368.         changed_entries = changed_entries + 1
  369.     csv_url_data[url_to_check] = entry
  370.     save_url_data(csv_filename, csv_url_data)
  371. nice_print()
  372. print("Processing complete. %d URL%s checked.%s" % (len(urls_to_check), pluralize(len(urls_to_check)), changed_entries and (" %d entr%s changed." % (changed_entries, entryies(changed_entries))) or ""))
  373.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement