Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """
- Moderation Log Analyzer
- Analyzes Reddit moderation logs to track automod effectiveness
- and identify when human moderators are undoing automod actions.
- """
- import json
- import re
- from datetime import datetime
- from collections import defaultdict, Counter
- from dataclasses import dataclass
- from typing import List, Dict, Tuple, Optional
- @dataclass
- class ModAction:
- """ Represents a single moderation action from the log.
- """
- time: str
- moderator: str
- action_type: str
- action: str
- content: str
- username: str
- parsed_time: datetime
- class ModerationAnalyzer:
- """ Analyzes moderation logs to track automod effectiveness and
- human moderator interventions.
- """
- def __init__(self, json_file_path: str):
- """ Initialize analyzer with moderation log data.
- Args:
- json_file_path: Path to the JSON moderation log file
- """
- self.actions = []
- self.load_data(json_file_path)
- def load_data(self, json_file_path: str) -> None:
- """ Load and parse moderation log data from JSON file.
- """
- with open(json_file_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- for entry in data.get('JSONdata', []):
- action = self.parse_action(entry)
- if action:
- self.actions.append(action)
- # Sort by time for chronological analysis
- self.actions.sort(key=lambda x: x.parsed_time)
- def parse_action(self, entry: Dict) -> Optional[ModAction]:
- """ Parse a single log entry into a ModAction object.
- Extracts username from content using multiple patterns to handle
- various formats in the mod log.
- """
- try:
- content = entry.get('Content', '')
- # Extract username using multiple patterns
- username = self.extract_username(content)
- if username == 'unknown':
- return None
- # Parse time
- time_str = entry.get('Time', '')
- parsed_time = self.parse_time(time_str)
- return ModAction(
- time=time_str,
- moderator=entry.get('Moderator', ''),
- action_type=entry.get('Type', ''),
- action=entry.get('Action', ''),
- content=content,
- username=username,
- parsed_time=parsed_time
- )
- except Exception as e:
- print(f"Error parsing entry: {e}")
- return None
- def extract_username(self, content: str) -> str:
- """ Extract username from content using various patterns.
- Handles different formats and edge cases like system actions
- and bot-generated content.
- """
- # Handle empty or very short content
- if not content or len(content.strip()) < 3:
- return 'system'
- # Try different username patterns
- patterns = [
- r'^u/(\S+)' # Captures non-whitespace characters after u/
- ]
- for pattern in patterns:
- match = re.search(pattern, content, re.IGNORECASE)
- if match:
- username = match.group(1)
- username = username.rstrip('.,!?:;')
- if len(username) >= 2:
- return username
- return 'unknown'
- def parse_time(self, time_str: str) -> datetime:
- """ Parse time string into datetime object.
- """
- try:
- return datetime.strptime(time_str, '%I:%M %p %b %d, %Y')
- except:
- try:
- return datetime.strptime(time_str, '%H:%M %p %b %d, %Y')
- except:
- return datetime.now() # Fallback
- def identify_undos(self) -> List[Tuple[ModAction, ModAction]]:
- undos = []
- # Group all actions by username only
- user_actions = defaultdict(list)
- for action in self.actions:
- user_actions[action.username].append(action)
- # Look for AutoMod → Human undo patterns for each user
- for username, actions in user_actions.items():
- # Sort actions by time for this user
- actions.sort(key=lambda x: x.parsed_time)
- # Find all AutoMod → Human action pairs
- for i, automod_action in enumerate(actions):
- if 'AutoModerator' not in automod_action.moderator:
- continue
- # Look for subsequent human actions on same user
- for j in range(i + 1, len(actions)):
- human_action = actions[j]
- # Skip if it's another AutoMod action
- if 'AutoModerator' in human_action.moderator:
- continue
- # Check if this human action undoes the AutoMod action
- if self.is_undo_pair(automod_action, human_action):
- undos.append((automod_action, human_action))
- break # Only match first undo for this AutoMod action
- return undos
- def is_undo_pair(self, action1: ModAction, action2: ModAction) -> bool:
- """ Determine if two actions constitute an undo pair.
- For username-based correlation, we look for:
- - AutoMod removes/spams → Human approves/unspams
- - Any opposing moderation actions on the same user
- """
- # Define opposing action patterns
- removal_actions = ['Remove', 'Spam']
- approval_actions = ['Approve', 'Unspam']
- action1_is_removal = any(word in action1.action for word in removal_actions)
- action1_is_approval = any(word in action1.action for word in approval_actions)
- action2_is_removal = any(word in action2.action for word in removal_actions)
- action2_is_approval = any(word in action2.action for word in approval_actions)
- # Check if action2 undoes action1
- if action1_is_removal and action2_is_approval:
- return True
- # Also check reverse case (though less common in AutoMod context)
- if action1_is_approval and action2_is_removal:
- return True
- return False
- def generate_summary_report(self) -> str:
- """ Generate comprehensive summary report of moderation activity.
- """
- report = []
- report.append("=" * 60)
- report.append("MODERATION LOG ANALYSIS SUMMARY")
- report.append("=" * 60)
- # Basic statistics
- total_actions = len(self.actions)
- automod_actions = len([a for a in self.actions if 'AutoModerator' in a.moderator])
- human_actions = total_actions - automod_actions
- report.append(f"\nBASIC STATISTICS:")
- report.append(f"Total Actions: {total_actions}")
- report.append(f"AutoModerator Actions: {automod_actions} ({automod_actions/total_actions*100:.1f}%)")
- report.append(f"Human Moderator Actions: {human_actions} ({human_actions/total_actions*100:.1f}%)")
- # Action type breakdown
- action_counts = Counter(action.action for action in self.actions)
- report.append(f"\nACTION BREAKDOWN:")
- for action, count in action_counts.most_common():
- report.append(f" {action}: {count}")
- # Moderator activity
- mod_counts = Counter(action.moderator for action in self.actions)
- report.append(f"\nMODERATOR ACTIVITY:")
- for mod, count in mod_counts.most_common():
- report.append(f" {mod}: {count}")
- # Undo analysis
- undos = self.identify_undos()
- report.append(f"\nUNDO ANALYSIS:")
- report.append(f"Total Undo Actions Found: {len(undos)}")
- if undos:
- # Categorize undos
- automod_undone = [u for u in undos if 'AutoModerator' in u[0].moderator]
- human_undone = [u for u in undos if 'AutoModerator' not in u[0].moderator]
- report.append(f"AutoModerator Actions Undone by Humans: {len(automod_undone)}")
- report.append(f"Human Actions Undone: {len(human_undone)}")
- if automod_actions > 0:
- undo_rate = len(automod_undone) / automod_actions * 100
- report.append(f"AutoMod Undo Rate: {undo_rate:.2f}%")
- return "\n".join(report)
- def get_undoable_actions(self, actions: List[ModAction]) -> List[ModAction]:
- """ Filter actions to only those that could potentially be undone.
- Only removal/spam actions can be undone by approval/unspam actions.
- """
- removal_actions = ['Remove', 'Spam']
- return [action for action in actions
- if any(word in action.action for word in removal_actions)]
- def generate_effectiveness_report(self) -> str:
- """ Generate report on automod effectiveness for the experiment.
- Separates analysis by content type (Comments vs Posts/Other).
- Only counts undoable actions in effectiveness calculations.
- """
- undos = self.identify_undos()
- all_automod_actions = [a for a in self.actions if 'AutoModerator' in a.moderator]
- # Only count actions that could potentially be undone
- automod_actions = self.get_undoable_actions(all_automod_actions)
- automod_undone = [u for u in undos if 'AutoModerator' in u[0].moderator]
- report = []
- report.append("=" * 60)
- report.append("AUTOMOD EFFECTIVENESS ANALYSIS")
- report.append("=" * 60)
- if not automod_actions:
- if all_automod_actions:
- report.append("AutoModerator performed actions, but none were undoable removal/spam actions.")
- report.append(f"Total AutoMod actions: {len(all_automod_actions)} (sticky, distinguish, etc.)")
- else:
- report.append("No AutoModerator actions found in log.")
- return "\n".join(report)
- # Show both totals for transparency
- report.append(f"AutoMod total actions: {len(all_automod_actions)}")
- report.append(f"AutoMod undoable actions (Remove/Spam): {len(automod_actions)}")
- # Separate actions by content type
- comment_actions = [a for a in automod_actions if a.action_type == 'Comments']
- post_actions = [a for a in automod_actions if a.action_type != 'Comments']
- comment_undone = [u for u in automod_undone if u[0].action_type == 'Comments']
- post_undone = [u for u in automod_undone if u[0].action_type != 'Comments']
- # Overall effectiveness (only for undoable actions)
- total_automod = len(automod_actions)
- undone_count = len(automod_undone)
- effectiveness = ((total_automod - undone_count) / total_automod) * 100 if total_automod > 0 else 0
- report.append(f"\nOVERALL EFFECTIVENESS (Undoable Actions Only):")
- report.append(f"Total AutoMod Undoable Actions: {total_automod}")
- report.append(f"Actions Undone by Humans: {undone_count}")
- report.append(f"Effectiveness Rate: {effectiveness:.1f}%")
- report.append(f"Undo Rate: {(undone_count/total_automod)*100:.1f}%")
- # Comments-specific analysis
- if comment_actions:
- comment_total = len(comment_actions)
- comment_undone_count = len(comment_undone)
- comment_effectiveness = ((comment_total - comment_undone_count) / comment_total) * 100
- report.append(f"\nCOMMENT MODERATION EFFECTIVENESS:")
- report.append(f"Total AutoMod Comment Actions: {comment_total}")
- report.append(f"Comment Actions Undone: {comment_undone_count}")
- report.append(f"Comment Effectiveness Rate: {comment_effectiveness:.1f}%")
- report.append(f"Comment Undo Rate: {(comment_undone_count/comment_total)*100:.1f}%")
- # Break down comment actions by type
- comment_by_action = defaultdict(int)
- comment_undos_by_action = defaultdict(int)
- for action in comment_actions:
- comment_by_action[action.action] += 1
- for original, _ in comment_undone:
- comment_undos_by_action[original.action] += 1
- if comment_by_action:
- report.append(f"\n Comment Actions Breakdown:")
- for action_type in comment_by_action:
- total = comment_by_action[action_type]
- undone = comment_undos_by_action[action_type]
- rate = ((total - undone) / total) * 100 if total > 0 else 0
- report.append(f" {action_type}:")
- report.append(f" Total: {total}, Undone: {undone}, Effectiveness: {rate:.1f}%")
- # Posts-specific analysis
- if post_actions:
- post_total = len(post_actions)
- post_undone_count = len(post_undone)
- post_effectiveness = ((post_total - post_undone_count) / post_total) * 100
- report.append(f"\nPOST MODERATION EFFECTIVENESS:")
- report.append(f"Total AutoMod Post Actions: {post_total}")
- report.append(f"Post Actions Undone: {post_undone_count}")
- report.append(f"Post Effectiveness Rate: {post_effectiveness:.1f}%")
- report.append(f"Post Undo Rate: {(post_undone_count/post_total)*100:.1f}%")
- # Break down post actions by type
- post_by_action = defaultdict(int)
- post_undos_by_action = defaultdict(int)
- for action in post_actions:
- post_by_action[action.action] += 1
- for original, _ in post_undone:
- post_undos_by_action[original.action] += 1
- if post_by_action:
- report.append(f"\n Post Actions Breakdown:")
- for action_type in post_by_action:
- total = post_by_action[action_type]
- undone = post_undos_by_action[action_type]
- rate = ((total - undone) / total) * 100 if total > 0 else 0
- report.append(f" {action_type}:")
- report.append(f" Total: {total}, Undone: {undone}, Effectiveness: {rate:.1f}%")
- return "\n".join(report)
- def main():
- """ Main function to run the analysis.
- """
- import sys
- if len(sys.argv) != 2:
- print("Usage: python mod_analyzer.py <json_file>")
- sys.exit(1)
- json_file = sys.argv[1]
- try:
- analyzer = ModerationAnalyzer(json_file)
- print(analyzer.generate_summary_report())
- print("\n\n")
- print(analyzer.generate_effectiveness_report())
- print("\n\n")
- except FileNotFoundError:
- print(f"Error: File '{json_file}' not found.")
- except json.JSONDecodeError:
- print(f"Error: Invalid JSON in file '{json_file}'.")
- except Exception as e:
- print(f"Error analyzing file: {e}")
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment