Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """
- Created on Sun Sep 6 2025 (v1)
- Updated on Sat Sep 7 2025 (v5)
- Updated on Sat Sep 8 2025 (v7.1)
- @author: https://chatgpt.com/ # (free: auto)
- @author: https://gemini.google.com/ # (free: 2.5 PRO, 2.5 Flash)
- """
- """
- /chat
- TWITCH CHAT DOWNLOADER: https://www.twitchchatdownloader.com/
- /emote
- TWITCH GLOBAL EMOTES: https://twitchemotes.com/
- TWITCH FORSEN EMOTES: https://twitchemotes.com/channels/22484632
- TWITCH SHARED EMOTES: https://twitch-tools.rootonline.de/emotes_search.php
- BTTV GLOBAL EMOTES: https://betterttv.com/emotes/global
- BTTV FORSEN EMOTES: https://betterttv.com/users/555943515393e61c772ee968
- BTTV SHARED EMOTES: https://betterttv.com/emotes/shared
- EMOJI TO IMAGE: https://jpeg-optimizer.com/emoji/
- /user
- TWITCH USER IMAGE CDN: https://static-cdn.jtvnw.net/jtv_user_pictures/{ID}-profile_image-{DIM}.png
- """
- import math
- import re
- from collections import Counter
- from pathlib import Path
- from typing import List, Dict, Callable, Any, Tuple
- import matplotlib.pyplot as plt
- import numpy as np
- import pandas as pd
- import requests
- import unicodedata
- from PIL import Image, ImageDraw, ImageFont, ImageSequence
- from bs4 import BeautifulSoup
- from matplotlib.figure import Figure
- from matplotlib.offsetbox import AnnotationBbox, OffsetImage
- from matplotlib.ticker import FuncFormatter
- #from matplotlib import rcParams
- #rcParams['font.family'] = 'Segoe UI Emoji'
- # --- INPUT --- # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
- CSV_NAME = "forsen_chat_08_09_2025" # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
- # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
- # --- Configuration ---
- BASE_DIR = Path(__file__).parent
- EMOTE_FOLDER = BASE_DIR / "emote" # Downloaded manually
- USER_FOLDER = BASE_DIR / "user" # Automatically downloaded
- OUTPUT_FOLDER = BASE_DIR / "chat_statistics_7"
- FILE_PATH = BASE_DIR / ("chat/forsen/" + CSV_NAME + ".csv")
- # **NEW**: Specify emotes/users for lifecycle plots.
- # Will plot some combination of top10's if empyt list is given.
- LIFECYCLE_EMOTES = [] # e.g., ["LULE"]
- LIFECYCLE_USERS = None # e.g., ["forsen"]
- # --- FILTER ---
- NOT_EMOTES = ["", " ", "on", "the", "you", "to", "a", "for", "in", "is",\
- "chat", "it", "and", "i", "this", ".", "by", "so", "stream",\
- "forsen", "discord", "sure", "stay", "other", "following",\
- "typing", "subscribers", "instructions.", "forsenboys", "pleb",\
- "zone?", "free", "!discord", "https://twitter.com/forsen",\
- "up-to-date", "information:", "make", "follow", "@forsen",\
- "join", "!join", "looking", "no", "yes", "of", "game",\
- "twitter", "ta", "go", "let's"]
- # --- Constants ---
- EXPECTED_COLS = {"time", "user_name", "user_color", "message"}
- IMAGE_EXTENSIONS = [".gif", ".jpg", ".png", ".webp"]
- MAX_IMAGE_SIZE = 32
- IMAGE_PADDING_PIXELS = 6
- PLOT_WIDTH_EXTENSION_PIXELS = 79
- # --- Helper Functions ---
- def config_plot_figure(fig, ax):
- ax.spines['top'].set_visible(False)
- ax.spines['right'].set_visible(False)
- ax.spines['bottom'].set_visible(True) # keep x-axis line
- ax.spines['left'].set_visible(True) # keep y-axis line
- def strip_word(word: str) -> str:
- if word[0] == "@":
- word = word[1:]
- if word[-1] in [","]:
- word = word[0:-1]
- return word
- def comma_format(x: float, pos: int) -> str:
- """Formats a number with commas for plot axes."""
- return f"{int(x):,}"
- def percent_format(x: float, pos: int) -> str:
- """Formats a number as a percentage string for plot axes."""
- return f"{int(x)}%"
- def clean_text(text: Any) -> str:
- """Normalizes and cleans text content."""
- if pd.isna(text):
- return ""
- return unicodedata.normalize("NFKC", str(text)).strip()
- # --- Image Loading and Fetching ---
- def _load_image(filepath: Path) -> Image.Image | None:
- """Loads an image file, handling animated GIFs."""
- try:
- img = Image.open(filepath)
- if filepath.suffix == ".gif":
- # Use the middle frame of a GIF for a static representation
- frames = [frame.copy() for frame in ImageSequence.Iterator(img)]
- img = frames[len(frames)//2]
- return img.convert("RGBA")
- except Exception as e:
- print(f"Warning: Could not load image {filepath}. Reason: {e}")
- return None
- def load_emote_image(emote_name: str) -> Image.Image | None:
- """Loads a pre-downloaded emote image from the emote folder."""
- for ext in IMAGE_EXTENSIONS:
- filepath = EMOTE_FOLDER / f"{emote_name}{ext}"
- if filepath.exists():
- return _load_image(filepath)
- return None
- def create_combo_image(combo_string: str, drop_duplicates: bool = True) -> Image.Image | None:
- """
- Creates a single composite image from one or more emote images.
- """
- emote_names = combo_string.split()
- if not emote_names:
- return None
- # DEBUG
- # print("[DEBUG] create_combo_image:")
- # print(emote_names)
- # Handle special character emotes
- name_map = {
- "🔔": "emoji_bell",
- "🔇": "emoji_no-sound",
- "🔴": "emoji_red-circle",
- "🟠": "emoji_orange-circle",
- "🟡": "emoji_yellow-circle",
- "🟢": "emoji_green-circle",
- "🔵": "emoji_blue-circle",
- "🟣": "emoji_purple-circle",
- "🟤": "emoji_brown-circle",
- "⚫": "emoji_black-circle",
- "⚪": "emoji_white-circle",
- "💯": "emoji_100%",
- "100%": "emoji_100%"
- }
- emote_names = [name_map.get(name, name) for name in emote_names]
- images = [load_emote_image(name) for name in emote_names if load_emote_image(name) is not None]
- if drop_duplicates:
- unique_images = []
- for image in images:
- if image not in unique_images:
- unique_images.append(image)
- images = unique_images
- if not images:
- return None
- # If there's only one image, return it directly
- if len(images) == 1:
- return images[0]
- # Calculate dimensions for the composite image
- max_h = max(img.height for img in images)
- total_w = sum(img.width for img in images) + IMAGE_PADDING_PIXELS * (len(images) - 1)
- # Create a new blank image
- combo_img = Image.new('RGBA', (total_w, max_h), (0, 0, 0, 0))
- # Paste images with padding
- x_offset = 0
- for img in images:
- y_offset = int((max_h - img.height) / 2)
- combo_img.paste(img, (x_offset, y_offset))
- x_offset += img.width + IMAGE_PADDING_PIXELS
- # Resize the final composite image
- combo_img.thumbnail((MAX_IMAGE_SIZE * len(images), MAX_IMAGE_SIZE), Image.Resampling.LANCZOS)
- return combo_img
- def fetch_user_profile_image(username: str, save_path: Path) -> bool:
- """
- Scrapes a Twitch user's page for their profile image URL and downloads it.
- """
- if save_path.exists():
- return True
- match = re.search(r"\((.*?)\)", username)
- sanitized_username = match.group(1).strip() if match else username.strip()
- try:
- url = f"https://www.twitch.tv/{sanitized_username}"
- response = requests.get(url, timeout=10)
- response.raise_for_status()
- soup = BeautifulSoup(response.text, "html.parser")
- img_url = None
- for meta_tag in soup.find_all("meta"):
- content = meta_tag.get("content", "")
- if "static-cdn.jtvnw.net" in content and "-profile_image-" in content:
- img_url = content
- break
- if not img_url:
- print(f"Warning: Could not find profile image URL for {username}.")
- return False
- img_data = requests.get(img_url, timeout=10).content
- save_path.parent.mkdir(parents=True, exist_ok=True)
- with open(save_path, "wb") as f:
- f.write(img_data)
- return True
- except requests.RequestException as e:
- print(f"Error fetching data for {username}: {e}")
- return False
- except Exception as e:
- print(f"An unexpected error occurred for {username}: {e}")
- return False
- def load_user_image(username: str) -> Image.Image | None:
- """
- Loads a user's profile image. Checks local cache first, then fetches
- from Twitch if not found.
- """
- for ext in IMAGE_EXTENSIONS:
- filepath = USER_FOLDER / f"{username}{ext}"
- if filepath.exists():
- return _load_image(filepath)
- save_path = USER_FOLDER / f"{username}.png"
- if fetch_user_profile_image(username, save_path):
- return _load_image(save_path)
- return None
- # --- Data Loading and Analysis ---
- def load_and_clean_data(file_path: Path) -> pd.DataFrame:
- """Loads chat data from a CSV and performs initial cleaning."""
- df = pd.read_csv(file_path, encoding="utf-8", quotechar='"', on_bad_lines="skip")
- if not EXPECTED_COLS.issubset(df.columns):
- raise ValueError(f"CSV missing one of the expected columns: {EXPECTED_COLS}")
- df["message"] = df["message"].map(clean_text)
- return df
- def analyze_top_words_by_freq(df: pd.DataFrame, top_n: int = 10) -> List[Tuple[str, int]]:
- """Calculates the most frequent words by total occurrences."""
- # UPDATED: Added a condition to ignore words in NOT_EMOTES
- words = [
- word for msg in df["message"]
- for word in msg.split()
- if word.lower() not in NOT_EMOTES and len(word.lower())>1
- ]
- return Counter(words).most_common(top_n)
- def analyze_popular_words(df: pd.DataFrame, top_n: int = 10) -> List[Tuple[str, int]]:
- """
- Finds the most popular words, measured by the number of unique messages they appear in.
- This is calculated over all words in the chat log for accuracy.
- """
- # Create a Series of words, keeping the original message index
- words_series = df['message'].str.split().explode().dropna()
- # Convert the Series to a DataFrame, naming the new word column 'word'
- words_df = words_series.reset_index(name='word')
- # UPDATED: Filter out any words that are in the NOT_EMOTES set
- words_df = words_df[~words_df['word'].str.lower().isin(NOT_EMOTES)]
- words_df = words_df[words_df['word'].str.len() > 1]
- # Get unique (message index, word) pairs
- unique_word_per_message = words_df.drop_duplicates(subset=['index', 'word'])
- # Count how many unique messages each word appeared in
- message_counts = unique_word_per_message['word'].value_counts()
- return list(message_counts.head(top_n).items())
- def analyze_consistent_words(df: pd.DataFrame, top_n: int = 10) -> List[Tuple[str, float]]:
- """
- Finds the most consistent words, measured by the percentage of total stream minutes
- in which they appeared at least once. This is calculated over all words for accuracy.
- """
- if 'time' not in df.columns or df['time'].max() == 0:
- return []
- total_stream_seconds = df['time'].max()
- total_minutes = math.ceil(total_stream_seconds / 60)
- if total_minutes == 0:
- return []
- # Create a working copy with a 'minute' column
- work_df = df[['time', 'message']].copy()
- work_df['minute'] = work_df['time'] // 60
- # Explode messages into words, keeping the 'minute' for each word
- words_in_minutes = work_df.drop(columns=['time'])
- words_in_minutes['word'] = words_in_minutes['message'].str.split()
- words_in_minutes = words_in_minutes.explode('word').dropna(subset=['word'])
- # UPDATED: Filter out any words that are in the NOT_EMOTES set
- words_in_minutes = words_in_minutes[~words_in_minutes['word'].str.lower().isin(NOT_EMOTES)]
- words_in_minutes = words_in_minutes[words_in_minutes['word'].str.len() > 1]
- # Find unique (word, minute) pairs
- unique_word_minute_pairs = words_in_minutes[['word', 'minute']].drop_duplicates()
- # Count the number of unique minutes each word appeared in
- minute_counts = unique_word_minute_pairs['word'].value_counts()
- # Calculate presence percentage
- presence_percentages = (minute_counts / total_minutes) * 100
- ### print(sorted(presence_percentages.items(), key=lambda x: x[1], reverse=True)[:20]) # debug
- # Get the top N and format for output
- top_consistent = presence_percentages.head(top_n)
- return sorted(top_consistent.items(), key=lambda x: x[1], reverse=True)
- def analyze_emotes_by_users(df: pd.DataFrame, top_n: int = 10) -> List[Tuple[str, int]]:
- """
- Finds the top emotes based on the number of distinct users who used them.
- """
- # Use a dictionary where keys are emotes and values are sets of user names
- emote_to_users = {}
- # Iterate through each message
- for _, row in df.iterrows():
- user = row['user_name']
- # Split message into unique words to count a user only once per message for an emote
- words = set(str(row['message']).split())
- for word in words:
- # Basic filtering for non-emotes
- if word.lower() in NOT_EMOTES or len(word) <= 1:
- continue
- # Initialize the set if the emote is new
- if word not in emote_to_users:
- emote_to_users[word] = set()
- # Add the user to the set for that emote
- emote_to_users[word].add(user)
- # Count the number of unique users for each emote
- emote_distinct_user_counts = {
- emote: len(users) for emote, users in emote_to_users.items()
- }
- # Sort the emotes by the distinct user count in descending order
- sorted_emotes = sorted(
- emote_distinct_user_counts.items(),
- key=lambda item: item[1],
- reverse=True
- )
- return sorted_emotes[:top_n]
- def analyze_user_activity(df: pd.DataFrame, top_n: int = 10) -> pd.DataFrame:
- """Identifies the most active chatters and returns a DataFrame."""
- user_counts = df['user_name'].value_counts()
- top_chatters_df = user_counts.head(top_n).to_frame(name='message_count')
- top_chatters_df.index.name = 'user_name'
- return top_chatters_df
- def analyze_consistent_users(df: pd.DataFrame, top_n: int = 10) -> List[Tuple[str, float]]:
- """
- Finds the most consistent chatters, measured by the percentage of total stream minutes
- in which they sent at least one message. Calculated over all users for accuracy.
- """
- if 'time' not in df.columns or df['time'].max() == 0:
- return []
- total_stream_seconds = df['time'].max()
- total_minutes = math.ceil(total_stream_seconds / 60)
- if total_minutes == 0:
- return []
- # Create a working copy with a 'minute' column
- work_df = df[['time', 'user_name']].copy()
- work_df['minute'] = work_df['time'] // 60
- # Find unique (user, minute) pairs
- unique_user_minute_pairs = work_df[['user_name', 'minute']].drop_duplicates()
- # Count the number of unique minutes each user was active in
- minute_counts = unique_user_minute_pairs['user_name'].value_counts()
- # Calculate presence percentage
- presence_percentages = (minute_counts / total_minutes) * 100
- # Get the top N and format for output
- top_consistent = presence_percentages.head(top_n)
- return sorted(top_consistent.items(), key=lambda x: x[1], reverse=True)
- def analyze_original_chatters(df: pd.DataFrame, top_n: int = 10) -> Tuple[pd.Series, pd.DataFrame]:
- """Identifies users who post the most unique (single-occurrence) messages."""
- message_counts = df['message'].value_counts()
- unique_messages = message_counts[message_counts == 1].index
- unique_df = df[df['message'].isin(unique_messages)]
- top_original_chatters = unique_df['user_name'].value_counts().head(top_n)
- return top_original_chatters, unique_df
- def analyze_message_bins(user_msg_counts: pd.Series) -> Dict[str, int]:
- """Groups users into bins based on how many messages they've sent."""
- counts_by_num_messages = user_msg_counts.value_counts().sort_index()
- return {
- "1": counts_by_num_messages.get(1, 0),
- "2–10": counts_by_num_messages[(counts_by_num_messages.index >= 2) & (counts_by_num_messages.index <= 10)].sum(),
- "11–100": counts_by_num_messages[(counts_by_num_messages.index >= 11) & (counts_by_num_messages.index <= 100)].sum(),
- "101–1000": counts_by_num_messages[(counts_by_num_messages.index >= 101) & (counts_by_num_messages.index <= 1000)].sum(),
- "1000+": counts_by_num_messages[counts_by_num_messages.index > 1000].sum()
- }
- def merge_overlapping_chains(pairs_with_counts, conservative_merge: bool = False):
- """
- Merge overlapping sequences in a list of (sequence, count)
- Example input: [("x y", 5), ("y z", 3), ("a b", 2)]
- Output: [("x y z", 3), ("a b", 2)]
- Excludes last item if it is the same as first.
- """
- # Convert sequences to lists
- seqs = [(seq.split(), count) for seq, count in pairs_with_counts]
- merged = []
- while seqs:
- seq, count = seqs.pop(0)
- i = 0
- while i < len(seqs):
- other_seq, other_count = seqs[i]
- # Check if sequences overlap (end of seq == start of other_seq)
- if seq[-1] == other_seq[0]:
- seq = seq + other_seq[1:]
- if conservative_merge:
- count = min(count, other_count) # conservative count
- else:
- count = max(count, other_count) # optimistic count
- # else: average count ?
- seqs.pop(i)
- i = 0 # restart loop
- elif other_seq[-1] == seq[0]:
- seq = other_seq[:-1] + seq
- count = min(count, other_count)
- seqs.pop(i)
- i = 0
- else:
- i += 1
- # Exclude last item if same as first
- if len(seq) > 1 and seq[-1] == seq[0]:
- seq = seq[:-1]
- merged.append((" ".join(seq), count))
- return merged
- from typing import List, Tuple, Set
- def prune_top_n_subsets(combos: List[Tuple[List[str], int]], top_n: int = 10) -> List[Tuple[List[str], int]]:
- """
- Eliminates cyclical subsets from a list of combinations iteratively.
- This function prunes a list of (list_of_strings, integer_value) tuples.
- It identifies pairs where one list is a "cyclical subset" of another
- (i.e., its elements are a subset of the other's, order-agnostic).
- The pruning logic is applied iteratively within a "top_n" window:
- 1. The list is sorted by the integer value in descending order.
- 2. The top N items are compared against each other.
- 3. For any subset/superset pair, the one with the higher value is kept.
- 4. If values are equal, the one with the longer list is kept.
- 5. Inferior items are removed from the full list.
- 6. This process repeats until a pass over the top N items removes nothing.
- Args:
- combos: A list of tuples, where each tuple contains a list of strings
- and an associated integer value.
- top_n: The size of the window at the top of the sorted list to
- perform comparisons within.
- Returns:
- A pruned and sorted list of tuples.
- """
- # Use a mutable copy to work with
- pruned_combos = list(combos)
- while True:
- # 1. Sort the list by value (desc) and then length (desc) as a tie-breaker.
- # This sort is crucial for the "top_n" window logic and ensures stable processing.
- pruned_combos.sort(key=lambda x: (x[1], len(x[0])), reverse=True)
- # Optimization: if the list is smaller than the window, no need for a window
- window_size = min(top_n, len(pruned_combos))
- if window_size < 2:
- break
- window = pruned_combos[:window_size]
- # Pre-calculate sets for efficient comparison
- window_sets = [(frozenset(item[0]), item[1], i) for i, item in enumerate(window)]
- window_strs = [(str(item[0]), item[1], i) for i, item in enumerate(window)]
- indices_to_remove = set()
- # 2. Compare every item in the window against every other item
- for i in range(window_size):
- set_i, val_i, _ = window_sets[i]
- len_i = len(set_i)
- str_i, sal_i, _ = window_strs[i]
- sen_i = len(str_i)
- for j in range(window_size):
- if i == j:
- continue
- set_j, val_j, _ = window_sets[j]
- len_j = len(set_j)
- str_j, sal_j, _ = window_strs[j]
- sen_j = len(str_j)
- # Check for subset relationship
- if set_i.issubset(set_j):
- # i is a subset of j. Determine which one to remove.
- # We compare them based on (value, length). The smaller one gets removed.
- if len_i < len_j: #(val_i, len_i) < (val_j, len_j):
- indices_to_remove.add(i)
- if len_i == len_j and val_i < val_j:
- indices_to_remove.add(i)
- # Note: If (val_j, len_j) < (val_i, len_i), j would be marked
- # for removal in its own inner loop iteration.
- # Check for substr relationship
- if str_i in str_j:
- # i is a subset of j. Determine which one to remove.
- # We compare them based on (value, length). The smaller one gets removed.
- if sen_i < sen_j: #(val_i, len_i) < (val_j, len_j):
- indices_to_remove.add(i)
- if sen_i == sen_j and sal_i < sal_j:
- indices_to_remove.add(i)
- # Note: If (val_j, len_j) < (val_i, len_i), j would be marked
- # for removal in its own inner loop iteration.
- if not indices_to_remove:
- # 5. If no items were removed in this pass, the process is stable.
- break
- # 4. Rebuild the list, excluding the identified inferior items.
- # We get the actual items to remove from the original window list
- items_to_remove: Set[Tuple[List[str], int]] = {window[i] for i in indices_to_remove}
- # Create a frozenset of tuples for efficient lookup
- items_to_remove_set = { (tuple(sorted(item[0])), item[1]) for item in items_to_remove }
- pruned_combos = [
- item for item in pruned_combos
- if (tuple(sorted(item[0])), item[1]) not in items_to_remove_set
- ]
- return pruned_combos
- def canonical_repeating_words_segment(s: str) -> str:
- """
- Trim string s from left and right until a repeating substring is found.
- Trim process requires the substring of same set size as previous.
- """
- def srs(s: str) -> str: # smallest_repeating_substring
- n = len(s)
- for i in range(1, n + 1):
- # Check if prefix of length i can generate the whole string
- if n % i == 0: # substring length must divide total length
- candidate = s[:i]
- if candidate * (n // i) == s:
- return candidate
- return s # fallback (the whole string is the smallest unit)
- def tokenize(x: str) -> set[str]:
- return set(srs(x).split())
- words = s.split()
- best = srs(s)
- best_tokens = tokenize(s)
- # Try shrinking from both sides
- for direction in ("left", "right"):
- for k in range(1, len(words)):
- if direction == "left":
- candidate_str = " ".join(words[k:])
- else:
- candidate_str = " ".join(words[:-k])
- candidate_srs = srs(candidate_str)
- candidate_tokens = set(candidate_srs.split())
- # If set size is the same, update best
- if len(candidate_tokens) == len(best_tokens):
- best = candidate_srs
- best_tokens = candidate_tokens
- else:
- # stop shrinking further in this direction
- break
- return best
- def analyze_top_emote_combos(
- df: pd.DataFrame,
- top_n: int = 10
- ) -> List[Tuple[str, int]]:
- combo_counter = Counter()
- for message in df['message']:
- message_sanitized = message.replace(" ", " ")
- combo = canonical_repeating_words_segment(message_sanitized)
- if len(set(combo.split())) > 1:
- combo_counter[combo] += 1
- results = [(key, count) for key, count in combo_counter.most_common(top_n)]
- return results[:top_n]
- def analyze_first_time_chatters(df: pd.DataFrame) -> pd.Series:
- """Finds the timestamp of the first message for each unique user."""
- return df.groupby('user_name')['time'].min()
- def analyze_most_mentioned_chatters(df: pd.DataFrame, top_n: int = 10) -> List[Tuple[str, int]]:
- """
- Finds the most mentioned chatters in messages (words starting with @).
- """
- mentions = []
- for msg in df["message"]:
- for word in msg.split():
- if word.startswith("@") and len(word) > 1:
- word = strip_word(word)
- mentions.append(word.lower()) # lowercase for consistency
- return Counter(mentions).most_common(top_n)
- def analyze_emote_lifecycle(df: pd.DataFrame, emote_name: str) -> pd.Series:
- """Analyzes the usage of a specific emote over time."""
- emote_usage = df['message'].str.contains(rf"\b{re.escape(emote_name)}\b", regex=True, na=False)
- emote_df = df[emote_usage]
- if emote_df.empty:
- return pd.Series(dtype=float)
- bin_size = 60
- bins = np.arange(0, df['time'].max() + bin_size, bin_size)
- counts, edges = np.histogram(emote_df['time'], bins=bins)
- return pd.Series(counts, index=edges[:-1] / 60)
- def analyze_user_lifecycle(df: pd.DataFrame, user_name: str) -> pd.Series:
- """Analyzes the activity of a specific user over time."""
- user_df = df[df['user_name'] == user_name]
- if user_df.empty:
- return pd.Series(dtype=float)
- bin_size = 60
- bins = np.arange(0, df['time'].max() + bin_size, bin_size)
- counts, edges = np.histogram(user_df['time'], bins=bins)
- return pd.Series(counts, index=edges[:-1] / 60)
- # --- Plotting ---
- def add_images_to_bars(ax: plt.Axes, bars: plt.bar, labels: List[str], image_loader: Callable[[str], Image.Image | None]):
- """Adds images next to the bars of a horizontal bar chart."""
- fig = ax.figure
- renderer = fig.canvas.get_renderer()
- for bar, label in zip(bars, labels):
- img = image_loader(label)
- if img is not None:
- bbox = bar.get_window_extent(renderer=renderer)
- bar_height_pixels = bbox.height
- img_width, img_height = img.size
- zoom = bar_height_pixels / img_height # zoom = min(MAX_IMAGE_SIZE / max(img_width, img_height), bar_height_pixels / img_height)
- offset_img = OffsetImage(img, zoom=zoom)
- inv = ax.transData.inverted()
- px_offset = inv.transform((IMAGE_PADDING_PIXELS, 0))[0] - inv.transform((0, 0))[0]
- ab = AnnotationBbox(offset_img, (bar.get_width() + px_offset, bar.get_y() + bar.get_height() / 2), frameon=False, box_alignment=(0, 0.5))
- ax.add_artist(ab)
- def extend_plot_width(ax: plt.Axes, extra_pixels: int):
- """Extends the x-axis limit to make space for images and labels."""
- fig = ax.figure
- renderer = fig.canvas.get_renderer()
- xlim = ax.get_xlim()
- bbox = ax.get_window_extent(renderer=renderer)
- ratio = (xlim[1] - xlim[0]) / bbox.width
- ax.set_xlim(xlim[0], xlim[1] + extra_pixels * ratio)
- import textwrap
- import matplotlib.pyplot as plt
- from typing import List, Tuple, Callable
- from PIL import Image
- def create_horizontal_bar_chart(
- data: List[Tuple[str, int]] | pd.Series,
- title: str,
- color: str,
- image_loader: Callable[[str], Image.Image | None] = None,
- max_label_width: int = 30, # default chars per line
- label_fontsize: int = 10
- ) -> plt.Figure:
- """Generic function to create a styled horizontal bar chart with adaptive label wrapping."""
- if isinstance(data, pd.Series):
- labels, counts = data.index, data.values
- else:
- labels, counts = zip(*data)
- wrapped_labels = []
- line_counts = []
- for label in labels:
- wrapped = textwrap.wrap(label, width=max_label_width)
- # If label is more than 3 lines, allow wider wrapping
- if len(wrapped) > 3:
- wrapped = textwrap.wrap(label, width=int(max_label_width * 1.3))
- wrapped_labels.append("\n".join(wrapped))
- line_counts.append(len(wrapped))
- fig, ax = plt.subplots(figsize=(10, 6))
- bars = ax.barh(wrapped_labels, counts, color=color)
- ax.set_title(title)
- ax.invert_yaxis()
- ax.xaxis.set_major_formatter(FuncFormatter(lambda x, _: f"{int(x):,}"))
- if image_loader:
- extend_plot_width(ax, PLOT_WIDTH_EXTENSION_PIXELS)
- add_images_to_bars(ax, bars, labels, image_loader)
- inv = ax.transData.inverted()
- text_offset_x = inv.transform((5, 0))[0] - inv.transform((0, 0))[0]
- for bar in bars:
- ax.text(
- text_offset_x,
- bar.get_y() + bar.get_height() / 2,
- f"{int(bar.get_width()):,}",
- va='center',
- ha='left',
- fontweight='bold'
- )
- # Adjust font size per-label depending on number of lines
- for tick_label, n_lines in zip(ax.get_yticklabels(), line_counts):
- if n_lines > 3:
- tick_label.set_fontsize(label_fontsize - 2) # shrink a bit
- else:
- tick_label.set_fontsize(label_fontsize)
- plt.tight_layout()
- config_plot_figure(fig, ax)
- return fig
- def create_presence_bar_chart(data: List[Tuple[str, float]], title: str, color: str, image_loader: Callable[[str], Image.Image | None] = None) -> Figure:
- """Creates a horizontal bar chart for presence percentage data."""
- if not data:
- fig, ax = plt.subplots(figsize=(10, 6))
- ax.set_title(title); ax.text(0.5, 0.5, "No data.", ha='center', va='center'); ax.set_xlim(0, 100); ax.set_yticks([]); return fig
- labels, percentages = zip(*data)
- fig, ax = plt.subplots(figsize=(10, 6))
- bars = ax.barh(labels, percentages, color=color)
- ax.set_title(title); ax.invert_yaxis(); ax.xaxis.set_major_formatter(FuncFormatter(percent_format)); ax.set_xlim(0, 105)
- if image_loader:
- extend_plot_width(ax, PLOT_WIDTH_EXTENSION_PIXELS)
- add_images_to_bars(ax, bars, labels, image_loader)
- inv = ax.transData.inverted()
- text_offset_x = inv.transform((5, 0))[0] - inv.transform((0, 0))[0]
- for bar in bars:
- ax.text(text_offset_x, bar.get_y() + bar.get_height() / 2, f"{bar.get_width():.1f}%", va='center', ha='left', fontweight='bold')
- plt.tight_layout()
- config_plot_figure(fig, ax)
- return fig
- def plot_user_bins(bins_data: Dict[str, int]) -> Figure:
- """Plots the number of users by message count bins."""
- fig, ax = plt.subplots(figsize=(10, 6))
- ax.bar(bins_data.keys(), bins_data.values(), color='mediumpurple')
- ax.set_title("Number of Chatters (Bins by Total Messages Sent)")
- for x, y in bins_data.items():
- ax.text(x, y + 0.5, f"{y:,}", ha='center', va='bottom', fontweight='bold')
- plt.tight_layout()
- config_plot_figure(fig, ax)
- return fig
- def plot_messages_over_time(df: pd.DataFrame) -> Figure:
- """Plots the volume of messages over time."""
- time_seconds = df['time']
- bins = np.arange(0, time_seconds.max() + 60, 60)
- counts, edges = np.histogram(time_seconds, bins=bins)
- minutes = edges[:-1] / 60
- fig, ax = plt.subplots(figsize=(10, 6))
- ax.fill_between(minutes, counts, color='royalblue', alpha=0.6)
- ax.plot(minutes, counts, color='royalblue', alpha=0.9)
- ax.set_title("Number of Messages Over Time (by Minute into Stream)")
- ax.grid(True, linestyle='--', alpha=0.5)
- plt.tight_layout()
- config_plot_figure(fig, ax)
- return fig
- def plot_first_time_chatters(first_message_times: pd.Series) -> Figure:
- """Plots when users sent their first message, styled like the messages-over-time plot."""
- time_seconds = first_message_times
- bins = np.arange(0, time_seconds.max() + 60, 60) # 1-min bins
- counts, edges = np.histogram(time_seconds, bins=bins)
- minutes = edges[:-1] / 60
- fig, ax = plt.subplots(figsize=(10, 6))
- ax.fill_between(minutes, counts, color='teal', alpha=0.6)
- ax.plot(minutes, counts, color='teal', alpha=0.9)
- ax.set_title("First-Time Chatters during this Stream (by Minute into Stream)")
- ax.grid(True, linestyle='--', alpha=0.5)
- plt.tight_layout()
- config_plot_figure(fig, ax)
- return fig
- def format_minutes_to_hhmm(minutes: float) -> str:
- """Convert minutes into HH:MM format (rounded)."""
- total_minutes = int(round(minutes))
- hours, mins = divmod(total_minutes, 60)
- return f"{hours:02d}:{mins:02d}"
- def add_peak_annotation(ax: plt.Axes, usage_data: pd.Series, label_name: str, color: str = "black"):
- """Adds a sideways arrow pointing to the peak value with HH:MM and descriptive annotation.
- label_name = emote or username (used in the description line).
- """
- if usage_data.empty or usage_data.max() == 0:
- return
- peak_idx = usage_data.idxmax()
- peak_val = usage_data.max()
- hhmm = format_minutes_to_hhmm(peak_idx)
- # Label text with time, count, and descriptive note
- label_text = "?"
- if "'" in label_name:
- label_text = f'{hhmm} ({peak_val} msgs)\nMost "{label_name}" moment of the stream.'
- else:
- label_text = f"{hhmm} ({peak_val} msgs)\nMost '{label_name}' moment of the stream."
- # Positioning logic
- xlim = ax.get_xlim()
- midpoint = (xlim[0] + xlim[1]) / 2
- x_offset = (xlim[1] - xlim[0]) * 0.08 # 8% of axis width as offset
- if peak_idx < midpoint:
- # Peak is in left half → place label to the right
- xytext = (peak_idx + x_offset, peak_val)
- ha = "left"
- else:
- # Peak is in right half → place label to the left
- xytext = (peak_idx - x_offset, peak_val)
- ha = "right"
- ax.annotate(
- label_text,
- xy=(peak_idx, peak_val),
- xytext=xytext,
- arrowprops=dict(
- facecolor=color,
- arrowstyle="->",
- lw=1.2
- ),
- va="center",
- ha=ha,
- fontsize=12,
- fontweight="bold",
- color=color,
- )
- def plot_emote_lifecycle(usage_data: pd.Series, emote: str) -> Figure:
- """Plots the usage of a specific emote over time, with peak annotation."""
- fig, ax = plt.subplots(figsize=(10, 6))
- ax.fill_between(usage_data.index, usage_data.values, color='darkviolet', alpha=0.6)
- ax.plot(usage_data.index, usage_data.values, color='darkviolet', alpha=0.9)
- ax.set_title(f"Emote Lifecycle: '{emote}' containing Messages (by Minute into Stream)")
- ax.grid(True, linestyle='--', alpha=0.5)
- # Annotate peak
- add_peak_annotation(ax, usage_data, emote, color="darkviolet")
- plt.tight_layout()
- config_plot_figure(fig, ax)
- return fig
- def plot_user_lifecycle(activity_data: pd.Series, user: str, color='darkcyan') -> Figure:
- """Plots the activity of a specific user over time, with peak annotation."""
- fig, ax = plt.subplots(figsize=(10, 6))
- ax.fill_between(activity_data.index, activity_data.values, color=color, alpha=0.6)
- ax.plot(activity_data.index, activity_data.values, color=color, alpha=0.9)
- ax.set_title(f"User Lifecycle: '{user}' sent Messages (by Minute into Stream)")
- ax.grid(True, linestyle='--', alpha=0.5)
- # Annotate peak
- add_peak_annotation(ax, activity_data, user, color=color)
- plt.tight_layout()
- config_plot_figure(fig, ax)
- return fig
- # --- Image Combination ---
- def create_title_banner(text: str, width: int, height: int = 80, bg_color=(30,30,30), text_color=(255,255,255)) -> Image.Image:
- """Creates a banner image with centered section title text."""
- img = Image.new("RGB", (width, height), color=bg_color)
- draw = ImageDraw.Draw(img)
- try:
- font = ImageFont.truetype("arial.ttf", 36)
- except IOError:
- font = ImageFont.load_default()
- # Pillow >= 10 uses textbbox
- try:
- bbox = draw.textbbox((0, 0), text, font=font)
- text_w, text_h = bbox[2] - bbox[0], bbox[3] - bbox[1]
- except AttributeError:
- text_w, text_h = draw.textsize(text, font=font)
- draw.text(((width - text_w) // 2, (height - text_h) // 2), text, fill=text_color, font=font)
- return img
- def combine_plots(items: List[Any], output_path: Path):
- """Combines matplotlib figures and section banners into one image."""
- if not items:
- print("No figures to combine.")
- return
- images = []
- # Convert figures to images first, so we know max width
- fig_images = []
- for item in items:
- if isinstance(item, Figure):
- item.canvas.draw()
- img = Image.fromarray(np.array(item.canvas.buffer_rgba())[:, :, :3])
- fig_images.append(img)
- plt.close(item)
- max_width = max(img.width for img in fig_images) if fig_images else 1000
- # Now rebuild with banners at correct width and add figures back
- item_iter = iter(items)
- fig_iter = iter(fig_images)
- for item in item_iter:
- if isinstance(item, Figure):
- images.append(next(fig_iter))
- elif isinstance(item, tuple) and item[1] == "banner":
- banner = create_title_banner(item[0], width=max_width, height=80)
- images.append(banner)
- separator = 4
- total_height = sum(img.height for img in images) + separator * (len(images) - 1)
- combined = Image.new("RGB", (max_width, total_height), color="white")
- draw = ImageDraw.Draw(combined)
- y_offset = 0
- for i, img in enumerate(images):
- combined.paste(img, (0, y_offset))
- y_offset += img.height
- if i < len(images) - 1:
- draw.rectangle([0, y_offset, max_width, y_offset + separator - 1], fill=(0,0,0))
- y_offset += separator
- output_path.parent.mkdir(parents=True, exist_ok=True)
- combined.save(output_path)
- print(f"\nCombined statistics image saved to: {output_path}")
- # --- Main Execution ---
- import os
- def plt_main(figure, file_name):
- plt.show()
- PLT_DIR = OUTPUT_FOLDER / CSV_NAME
- os.makedirs(PLT_DIR, exist_ok=True)
- figure.savefig(PLT_DIR / str(file_name))
- def main():
- """Main function to run the chat analysis and generate plots."""
- if not FILE_PATH.exists():
- print(f"Error: Input file not found at {FILE_PATH}"); return
- df = load_and_clean_data(FILE_PATH)
- figures = []
- # --- Meta Plots ---
- figures.append(("Copypastas", "banner"))
- print("0. Top Emote Combos...")
- top_emote_combos = analyze_top_emote_combos(df)
- if top_emote_combos:
- figure = create_horizontal_bar_chart(
- top_emote_combos,
- "Top 10 Emote Combos (by Messages Reducing To Them)",
- 'darkviolet',
- create_combo_image # Use the new combo image function here
- )
- figures.append(figure); plt_main(figure, 0)
- # --- Emote/Word Summary Plots ---
- figures.append(("Emotes", "banner"))
- print("--- Generating Emote/Word Summary Plots ---")
- print("1. Top Words (by Total Occurrences)...")
- top_words_freq = analyze_top_words_by_freq(df)
- if top_words_freq:
- figure = create_horizontal_bar_chart(top_words_freq, "Top 10 Used Emotes (by Total Occurrences)", 'lightgreen', load_emote_image)
- figures.append(figure); plt_main(figure, 1)
- print("2. Top Words (by Messages)...")
- top_popular_words = analyze_popular_words(df)
- if top_popular_words:
- figure = create_horizontal_bar_chart(top_popular_words, "Top 10 Popular Emotes (by Messages Containing Them)", 'skyblue', load_emote_image)
- figures.append(figure); plt_main(figure, 2)
- print("3. Top Words (by Presence)...")
- top_consistent_words = analyze_consistent_words(df)
- if top_consistent_words:
- figure = create_presence_bar_chart(top_consistent_words, "Top 10 Consistent Emotes (% of Minutes With At Least One)", 'mediumturquoise', load_emote_image)
- figures.append(figure); plt_main(figure, 3)
- print("4. Top Words (by Spread)...")
- top_widespread_words = analyze_emotes_by_users(df)
- if top_widespread_words:
- figure = create_horizontal_bar_chart(top_widespread_words, "Top 10 Widespread Emotes (by Unique Users Using Them)", "#6495ED", load_emote_image)
- figures.append(figure); plt_main(figure, 4)
- # --- User Summary Plots ---
- figures.append(("Chatters", "banner"))
- print("\n--- Generating User Summary Plots ---")
- print("5. Top Active Chatters...")
- top_10_chatters_df = analyze_user_activity(df)
- top_10_chatters_series = top_10_chatters_df['message_count']
- figure = create_horizontal_bar_chart(top_10_chatters_series, "Top 10 Active Chatters (Total Messages)", 'lightcoral', load_user_image)
- figures.append(figure); plt_main(figure, 5)
- print("6. Top Chatters (by Presence)...")
- top_consistent_users = analyze_consistent_users(df)
- if top_consistent_users:
- figure = create_presence_bar_chart(top_consistent_users, "Top 10 Consistent Chatters (% of Minutes Chatting)", 'plum', load_user_image)
- figures.append(figure); plt_main(figure, 6)
- print("7. Top Original Chatters...")
- top_original, unique_df = analyze_original_chatters(df)
- figure = create_horizontal_bar_chart(top_original, "Top 10 Original Chatters (Unique Messages)", 'goldenrod', load_user_image)
- figures.append(figure); plt_main(figure, 7)
- print("8. Most Mentioned Chatters...")
- most_mentioned = analyze_most_mentioned_chatters(df)
- if most_mentioned:
- figure = create_horizontal_bar_chart(
- most_mentioned,
- "Top 10 Mentioned Chatters (by @mentions)",
- 'deepskyblue',
- load_user_image
- )
- figures.append(figure)
- plt_main(figure, 8)
- # --- General Summary Plots ---
- figures.append(("Stream", "banner"))
- print("\n--- Generating General Summary Plots ---")
- print("9. Number of Chatters (by Bins)...")
- user_msg_counts = df['user_name'].value_counts()
- figure = plot_user_bins(analyze_message_bins(user_msg_counts))
- figures.append(figure); plt_main(figure, 9)
- print("10. Number of Messages (Over Time)...")
- figure = plot_messages_over_time(df)
- figures.append(figure); plt_main(figure, 10)
- # --- DISABLED ---
- #print("10. First-Time Chatter Rate...")
- #figures.append(plot_first_time_chatters(analyze_first_time_chatters(df))); plt_main(plt, 11)
- # --- General Moments Plots ---
- figures.append(("Moments", "banner"))
- # --- Detailed Emote Lifecycle Plots ---
- print("\n--- Generating Emote Lifecycle Plots ---")
- #top_word_labels = set([w for w, _ in top_words_freq]) \
- #| set([w for w, _ in top_popular_words]) \
- #| set([w for w, _ in top_consistent_words])
- #print("top emote set:", len(top_word_labels))
- def find_emote(combo):
- for _ in combo[0].split(" "):
- if _ in [p.stem for p in EMOTE_FOLDER.iterdir()]:
- return _
- return combo[0]
- # PICK MOMENTS from top emote lists
- def unique_append(l, x):
- if x not in l:
- l.append(x)
- return True
- return False
- moments_suggestion = []
- # Define the sources as lists of lists
- sources = [
- [_[0] for _ in top_words_freq],
- [_[0] for _ in top_popular_words],
- [_[0] for _ in top_consistent_words],
- [_[0] for _ in top_widespread_words],
- [find_emote(combo) for combo in top_emote_combos] # flatten emote combos
- ]
- print("moments candidates:", sources)
- for idx, source in enumerate(sources):
- appended = 0
- for item in source:
- success = unique_append(moments_suggestion, item)
- if success:
- appended += 1
- # For all sources except the last one, stop after the first append
- if idx < len(sources) - 1 and appended == 1:
- break
- # For the last source, stop after 2 appends
- if idx == len(sources) - 1 and appended == 2:
- break
- moments_suggestion = [_ for _ in moments_suggestion[:-2]] + [moments_suggestion[-2]] + [moments_suggestion[-1]]
- print("moments:", moments_suggestion)
- # Default to showing lifecycle for top 10 most frequent words if LIFECYCLE_EMOTES is empty
- emotes_to_plot = LIFECYCLE_EMOTES if LIFECYCLE_EMOTES else list(moments_suggestion) if LIFECYCLE_EMOTES is not None else []
- for idx, emote in enumerate(emotes_to_plot):
- print(f" - Plotting lifecycle for '{emote}'...")
- emote_data = analyze_emote_lifecycle(df, emote)
- if not emote_data.empty and emote_data.sum() > 0:
- figure = plot_emote_lifecycle(emote_data, emote)
- figures.append(figure); plt_main(figure, 100+idx)
- else:
- print(f" (No usage data for '{emote}')")
- # --- Detailed User Lifecycle Plots ---
- print("\n--- Generating User Lifecycle Plots ---")
- # Default to showing lifecycle for top 10 most active users if LIFECYCLE_USERS is empty
- top_chatter_labels = top_10_chatters_df.index.tolist()
- users_to_plot = LIFECYCLE_USERS if LIFECYCLE_USERS else (top_chatter_labels if LIFECYCLE_USERS is not None else [])
- for idx, user in enumerate(users_to_plot):
- print(f" - Plotting lifecycle for '{user}'...")
- user_data = analyze_user_lifecycle(df, user)
- if not user_data.empty and user_data.sum() > 0:
- figure = plot_user_lifecycle(user_data, user)
- figures.append(figure); plt_main(figure, 200+idx)
- else:
- print(f" (No activity data for '{user}')")
- # --- Combine All Plots ---
- base_filename = FILE_PATH.stem
- output_file = OUTPUT_FOLDER / f"{base_filename}_statistics_full.png"
- combine_plots(figures, output_file)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment