Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python
- LICENCE = "WTFPL", "http://www.wtfpl.net/about/"
- VERSION = "v0.4.0a"
- import atexit
- import logging
- import pickle
- import praw
- import prawcore
- import random
- import shelve
- import string
- import sys
- import unicodedata
- from datetime import datetime, timedelta
- from math import ceil
- from time import sleep
- from random import randint
- from statistics import mean
- ## Constants
- FLAIR_UPDATE_INTERVAL = timedelta(minutes=15)
- USER_UPDATE_INTERVAL = timedelta(hours=6)
- # TODO: add allies
- ALLIES = []
- # The Book of Grudges
- GRUDGE_REDPILL = ["theredpill", "marriedredpill", "asktrp", "askmrp", "rpchristians"]
- GRUDGE_STEPFORD = ["redpillwomen", "redpillwives", "femininenotfeminist"]
- GRUDGE_MGTOW = ["mgtow"]
- GRUDGE_INCEL = ["braincels"]
- GRUDGE_LOBSTER = ["jordanpeterson", "jordan_peterson_memes"]
- GRUDGE_EDGELORD = ["the_donald", "cringeanarchy", "milliondollarextreme", "4chan"] #TODO: "Russian Agent" for the_donald?
- GRUDGE_GENERAL = ["mensrights", "pussypassdenied", "feministpassdenied", "whereareallthegoodmen", "kotakuinaction", "tumblrinaction", "sjwhate"]
- BOOK_OF_GRUDGES = GRUDGE_REDPILL + GRUDGE_STEPFORD + GRUDGE_MGTOW + GRUDGE_INCEL + GRUDGE_LOBSTER + GRUDGE_EDGELORD + GRUDGE_GENERAL
- # Flairs
- BAD_FLAIRS = { "vex-terp": "VEXATIOUS LIFTER",
- "vex-terp-flaired": "VEXATIOUS ROYALTY",
- "vex-stepford": "STEPFORD WIFE",
- "vex-mgtow": "LOST MGTOW",
- "vex-incel": "VOLUNTARY ASSHOLE",
- "vex-lobster": "ROGUE LOBSTER",
- "vex-edgy": "VEXATIOUS EDGELORD",
- "vexatious": "VEXATIOUS LITIGANT"
- }
- BAD_FLAIR_LIST = list(BAD_FLAIRS.keys())
- GOOD_FLAIR_LIST = ["endorsedflair", "vanguard", "alpha", "betaasfuck", "feeemale", "robot"]
- IMMUTABLE_FLAIRS = BAD_FLAIR_LIST + GOOD_FLAIR_LIST + ["purged"]
- # Threat levels
- THREAT_LEVELS = {"tlsevere":"Severe", "tlhigh":"High", "tlelevated":"Elevated", "tlguarded":"Guarded", "tllow":"Low"}
- THREAT_MATRIX = { "tllow": BAD_FLAIR_LIST + ["hb{0}".format(x) for x in range(1,7)],
- "tlguarded": BAD_FLAIR_LIST + ["hb{0}".format(x) for x in range(1,5)],
- "tlelevated": BAD_FLAIR_LIST + ["hb{0}".format(x) for x in range(1,3)],
- "tlhigh": BAD_FLAIR_LIST,
- "tlsevere": []
- }
- # Special Snowflake users
- FLAIR_EXEMPT = []
- BETA_TESTERS = ["FailAShitTestToday", "Black_m0ngoose", "MrPerfidy", "frankwashere44", "movielover2018", "SufficientGlass"]
- ## Functions
- delay = 2
- last_fuck_up = None
- def sleep_it_off():
- # The sloppy, lazy way of making a static function in Python
- global delay
- global last_fuck_up
- log.error("Reddit API error: waiting {0} seconds".format(delay))
- try:
- if delay > 128 or datetime.now() > last_fuck_up + timedelta(minutes=2): delay = 2
- except TypeError:
- delay = 2
- sleep(delay)
- delay *= 2
- last_fuck_up = datetime.now()
- @atexit.register
- def sync_data():
- # Hopefully this will keep VirtueTron from destroying its data files when it crashes
- global_data.close()
- user_data.close()
- def drop_user(name):
- log.info("User: {0} is deleted or suspended, dropping from userlist".format(name))
- try:
- del users[name]
- del user_data[name]
- except KeyError:
- log.warn("Tried to delete nonexistant user: {0}".format(name))
- def ban_user(name, ban_reason, ban_message):
- tbp.banned.add(name, ban_reason=ban_reason, ban_message=ban_message)
- set_flair(user.name, "PURGED", "purged")
- del user.data["comment_eater"]["scrabble"]
- user.data["flair"] = "purged"
- user_data[user.name] = user.data
- def get_flair(name):
- while True:
- try:
- for flair in tbp.flair(redditor=name):
- return flair["flair_css_class"]
- except prawcore.exceptions.ServerError:
- sleep_it_off()
- except (prawcore.exceptions.Forbidden, prawcore.exceptions.NotFound):
- # It looks like Reddit always returns Forbidden if you try to get a dead user's flair, but better safe than sorry
- return False
- def set_flair(name, flair_text, flair_css):
- while True:
- try:
- tbp.flair.set(name, flair_text, flair_css)
- return True
- except prawcore.exceptions.ServerError:
- sleep_it_off()
- # We won't get errors for trying to set flair on a dead user, so there's no point trying to catch Forbidden/NotFound here
- def set_link_flair(submission, flair_text, flair_css):
- while True:
- try:
- submission.mod.flair(css_class=flair_css, text=flair_text)
- return
- except prawcore.exceptions.ServerError:
- sleep_it_off()
- def add_approved(name):
- while True:
- try:
- tbp.contributor.add(name)
- return True
- except praw.exceptions.APIException as e:
- if e.error_type == "USER_DOESNT_EXIST":
- drop_user(name)
- return False
- except prawcore.exceptions.ServerError:
- sleep_it_off()
- def del_approved(name):
- while True:
- try:
- tbp.contributor.remove(name)
- return True
- except praw.exceptions.APIException as e:
- if e.error_type == "USER_DOESNT_EXIST":
- drop_user(name)
- return False
- else:
- log.error("Unhandled API exception: {0}".format(e.error_type))
- raise e
- except prawcore.exceptions.ServerError:
- sleep_it_off()
- ## Shit tests
- def scrabble_get_draw(scrabble):
- BURN_RATE = 3
- VOWELS = ['u', 'a', 'i', 'o', 'e']
- consonants = [letter for letter in scrabble["safe_letters"] if letter not in VOWELS]
- log.debug("Safe letters are {0}, consonants are: {1}".format(scrabble["safe_letters"], consonants))
- if len(consonants) <= BURN_RATE:
- if len(scrabble["safe_letters"]) <= BURN_RATE:
- return False
- else:
- scrabble["banned_letters"] += scrabble["safe_letters"]
- scrabble["safe_letters"] = []
- return scrabble
- new_len = len(scrabble["banned_letters"]) + BURN_RATE
- log.debug("Should end up with up to {0} letters in banned_letters".format(new_len))
- letter = random.choice(scrabble["safe_letters"])
- log.debug("Picked '{0}'".format(letter))
- scrabble["safe_letters"].remove(letter)
- try:
- consonants.remove(letter)
- except ValueError:
- pass
- scrabble["banned_letters"].append(letter)
- if letter not in VOWELS:
- while len(scrabble["banned_letters"]) < new_len:
- letter = random.choice(consonants)
- log.debug("Picked '{0}'".format(letter))
- scrabble["safe_letters"].remove(letter)
- scrabble["banned_letters"].append(letter)
- scrabble["banned_letters"] = sorted(scrabble["banned_letters"])
- return scrabble
- def scrabble_test(user, comment):
- # Start the game if the user isn't already playing scrabble
- if "scrabble" not in user.data["comment_eater"].keys():
- scrabble = {"start": datetime.now(), "end": datetime.now() + timedelta(weeks=1), "banned_letters": [], "safe_letters": list(string.ascii_lowercase), "eaten": []}
- scrabble_get_draw(scrabble)
- reply = comment.reply("As an unmutual member of our community, you've been chosen for a shit test:\n\n Your comments are no longer allowed to contain the letters: {0}. If your comment contains forbidden letters, it will be removed and bonus forbidden letters will be added. If you manage to run out of letters within a week, you will be hard nexted.\n\n**Good luck!**\n\n*NOTE: This is an automated action, but this account is also attached to a human operator if you have any concerns. We appreciate your continued business!*".format(scrabble["banned_letters"]))
- reply.mod.distinguish()
- log.info("User: {0} now playing scrabble! Banned letters are {1}".format(user.name, scrabble["banned_letters"]))
- user.data["comment_eater"]["scrabble"] = scrabble
- user_data[user.name] = user.data
- return
- # Stop the game if the time limit's up
- scrabble = user.data["comment_eater"]["scrabble"]
- if datetime.now() > scrabble["end"]:
- reply = comment.reply("You've successfully passed your shit test! Congratulations! As recompense you will receive a token SMV boost on r/TheBluePill.\n\n**Thanks for playing!**")
- reply.mod.distinguish()
- user.data["manual_score_bias"] += 1
- del user.data["comment_eater"]["scrabble"]
- user_data[user.name] = user.data
- log.warn("User: {0} successfully passed scrabble!".format(user.name))
- return
- # Continue the game
- body = unicodedata.normalize("NFKD", comment.body.lower()) # Decompose accented characters and flatten case to catch cheaters
- if comment.id not in scrabble["eaten"] and any(letter in body for letter in scrabble["banned_letters"]):
- scrabble = scrabble_get_draw(scrabble)
- if scrabble == False: # Out of letters to draw, test has been failed
- mesg = "Oh, no! It looks like you've run out of letters. This game is over.\n\n**Thanks for playing!**"
- reply = comment.reply(mesg)
- reply.mod.distinguish()
- ban_user(user.name, ban_reason="AUTOBANNED: failed at scrabble", ban_message=mesg)
- log.warn("User: {0} is purged (failed at scrabble)".format(user.name))
- return
- if any([unicodedata.category(char) == "Mn" for char in body]):
- insert = " Putting accent marks on letters doesn't make them different letters, but nice try at cheating!"
- else:
- insert = ""
- reply = comment.reply("Whoops! It looks like [you](https://www.reddit.com/u/{0}) used a banned letter.{1} Your banned letters are now: {2}\n\n**Better luck next time!**".format(user.name, insert, scrabble["banned_letters"]))
- reply.mod.distinguish()
- if not comment.approved:
- scrabble["eaten"].append(comment.id)
- comment.mod.remove()
- log.info("Removing comment by user: {0} ({1}) [id: {2}] on submission '{3}' [id: {4}], failed scrabble test. Banned letters are now {5}".format(user.name, user.data["flair"], comment.id, comment.submission.title, comment.submission.id, scrabble["banned_letters"]))
- user.data["comment_eater"]["scrabble"] = scrabble
- user_data[user.name] = user.data
- def brevity_test(user, comment):
- # Start the game if the user isn't already playing brevity
- if "brevity" not in user.data["comment_eater"].keys():
- brevity = {"start": datetime.now(), "end": datetime.now() + timedelta(weeks=1), "length": 160, "eaten": []}
- reply = comment.reply("As an unmutual member of our community, you've been chosen for a shit test:\n\n Your comments are no longer allowed to be longer than 160 characters. If your comment is too long, it will be removed and the limit will be halved. If the limit hits 1 character within a week, you will be hard nexted.\n\n**Good luck!**\n\n*NOTE: This is an automated action, but this account is also attached to a human operator if you have any concerns. We appreciate your continued business!*")
- user.data["comment_eater"]["brevity"] = brevity
- user_data[user.name] = user.data
- log.info("User: {0} now playing brevity! Max comment length 160".format(user.name))
- return
- # Stop the game if the time limit's up
- brevity = user.data["comment_eater"]["brevity"]
- if datetime.now() > brevity["end"]:
- reply = comment.reply("You've successfully passed your shit test! Congratulations! As recompense you will receive a token SMV boost on r/TheBluePill.\n\n**Thanks for playing!**")
- reply.mod.distinguish()
- user.data["manual_score_bias"] += 1
- del user.data["comment_eater"]["brevity"]
- user_data[user.name] = user.data
- return
- # Continue the game
- if len(comment.body) > brevity["length"]:
- brevity["length"] /= 2
- if brevity["length"] < 2:
- mesg = "Oh, no! It looks like you've failed to master the virtue of brevity. This game is over.\n\n**Thanks for playing!**"
- reply = comment.replay(mesg)
- reply.mod.distinguish()
- ban_user(user.name, ban_reason="AUTOBANNED: failed at brevity", ban_message=mesg)
- return
- else:
- reply = comment.reply("Whoops! It looks like that comment was too long. [Your](https://www.reddit.com/u/{0}) comments can't be longer than {1} characters now.\n\n**Good luck!**\n\n*NOTE: This is an automated action, but this account is also attached to a human operator if you have any concerns. We appreciate your continued business!*".format(user.name, brevity["length"]))
- reply.mod.distinguish()
- if not comment.approved:
- brevity["eaten"].append(comment.id)
- comment.mod.remove()
- log.info("Removing comment by user: {0} ({1}) [id: {2}] on submission '{3}' [id: {4}], failed brevity test. Max comment length is now {5}".format(user.name, user.data["flair"], comment.id, comment.submission.title, comment.submission.id, brevity["length"]))
- user.data["comment_eater"]["brevity"] = brevity
- user_data[user.name] = user.data
- SHIT_TESTS = {"scrabble": scrabble_test, "brevity": brevity_test}
- # If I use a class that makes this object oriented, right?
- class Tracker:
- SCAN_INTERVAL = timedelta(minutes=10)
- SCAN_DEPTH = 16
- DROP_THRESHOLD = 16
- def __init__(self, name, new_user=True):
- self.name = name
- if new_user:
- self.data = { "name": name,
- "scan_count": 0,
- "last_seen": None,
- "next_scan": None,
- "smv": 0,
- "flair": get_flair(name),
- "good_score": [],
- "ally_score": [],
- "bad_score": [],
- "manual_score_bias": 0,
- "best_comment": { "score": 0, "id": None },
- "worst_comment": { "score": 0, "id": None },
- "comment_eater": {},
- "zero_count": 0
- }
- self.update()
- log.info("Adding new user: {0}".format(name))
- else:
- self.data = user_data[name]
- @property
- def good_score(self):
- if len(self.data["good_score"]) > 0:
- try:
- return mean(self.data["good_score"]) + mean(self.data["ally_score"]) / 2 + self.data["manual_score_bias"]
- except KeyError:
- return mean(self.data["good_score"]) + self.data["manual_score_bias"]
- else:
- log.warn("User: {0} has no tracked good score?".format(self.name))
- return 0
- @property
- def bad_score(self):
- if len(self.data["bad_score"]) > 0:
- return mean(self.data["bad_score"])
- else:
- log.warn("User: {0} has no tracked bad score?".format(self.name))
- return 0
- @property
- def score(self):
- return self.good_score + self.bad_score
- def _tally(self):
- good, ally, bad = 0, 0, 0
- count = { "good": 0,
- "ally": 0,
- "vex-terp": 0,
- "vex-stepford": 0,
- "vex-mgtow": 0,
- "vex-incel": 0,
- "vex-lobster": 0,
- "vex-edgy": 0,
- "vexatious": 0,
- "total_bad": 0
- }
- for comment in reddit.redditor(self.name).comments.new(limit=100):
- sub = comment.subreddit.display_name.lower()
- karma = comment.score - 1
- if sub == "thebluepill":
- good += karma
- count["good"] += 1
- stats = global_data["stats"]
- dirty = False
- # Per user best/worst comment stats
- try:
- if karma > self.data["best_comment"]["score"]:
- self.data["best_comment"] = {"score": karma, "id": comment.id}
- dirty = True
- except (TypeError, KeyError):
- self.data["best_comment"] = {"score": karma, "id": comment.id}
- dirty = True
- try:
- if karma < self.data["worst_comment"]["score"]:
- self.data["worst_comment"] = {"score": karma, "id": comment.id}
- dirty = True
- except (TypeError, KeyError):
- self.data["worst_comment"] = {"score": karma, "id": comment.id}
- dirty = True
- if dirty:
- user_data[self.name] = self.data
- dirty = False
- # Global best/worst comment stats
- try:
- if karma > stats["best_comment"]["score"]:
- stats["best_comment"] = {"score": karma, "id": comment.id, "name": self.name}
- dirty = True
- except (TypeError, KeyError):
- stats["best_comment"] = {"score": karma, "id": comment.id, "name": self.name}
- dirty = True
- try:
- if karma < stats["worst_comment"]["score"]:
- stats["worst_comment"] = {"score": karma, "id": comment.id, "name": self.name}
- dirty = True
- except (TypeError, KeyError):
- stats["worst_comment"] = {"score": karma, "id": comment.id, "name": self.name}
- dirty = True
- if dirty:
- global_data["stats"] = stats
- elif sub in ALLIES:
- count["ally"] += 1
- ally += karma
- elif sub in BOOK_OF_GRUDGES and karma > 0:
- bad -= karma
- if sub in GRUDGE_REDPILL:
- count["vex-terp"] += 1
- count["total_bad"] += 1
- elif sub in GRUDGE_STEPFORD:
- count["vex-stepford"] += 1
- count["total_bad"] += 1
- elif sub in GRUDGE_MGTOW:
- count["vex-mgtow"] += 1
- count["total_bad"] += 1
- elif sub in GRUDGE_INCEL:
- count["vex-incel"] += 1
- count["total_bad"] += 1
- elif sub in GRUDGE_LOBSTER:
- count["vex-lobster"] += 1
- count["total_bad"] += 1
- elif sub in GRUDGE_EDGELORD:
- count["vex-edgy"] += 1
- count["total_bad"] += 1
- else:
- count["vexatious"] += 1
- count["total_bad"] += 1
- if count["good"] > 0:
- good /= count["good"]
- good *= count["good"] / 10 # Every 20 posts in r/TheBluePill doubles your score
- if count["total_bad"] > 0:
- bad /= count["total_bad"]
- bad *= count["total_bad"] / 5 # Every 10 posts in NAUGHTY_LIST doubles your bad score
- return good, bad, ally, count
- def update(self):
- try:
- if self.data["last_seen"] < self.data["next_scan"]:
- log.debug("Skipping user: {0} (next scan on {1:%Y/%m/%d %I:%M:%S%p})".format(self.name, self.data["next_scan"]))
- return True
- except TypeError:
- pass
- tally = False
- while not tally:
- try:
- good, bad, ally, count = self._tally()
- tally = True
- except (prawcore.exceptions.NotFound, prawcore.exceptions.Forbidden):
- # Dead user
- return False
- except prawcore.exceptions.ServerError:
- sleep_it_off()
- # Are they vexatious? (Of course they are!)
- if count["good"] == 0:
- try:
- self.data["zero_count"] += 1
- except KeyError:
- self.data["zero_count"] = 1
- if self.data["zero_count"] > Tracker.DROP_THRESHOLD:
- log.info("Dropping user: {0} (zero posts on TBP in the last {1} scans".format(self.name, Tracker.DROP_THRESHOLD))
- return False
- bad_flairs = list(BAD_FLAIRS.keys())
- bad_flairs.remove("vexatious") # Transitional, specific vexatious flairs are new
- vexatious_count = dict(count)
- log.debug("Vexatious score: {0}".format(vexatious_count))
- del vexatious_count["good"]
- del vexatious_count["ally"]
- del vexatious_count["total_bad"]
- if self.data["flair"] not in bad_flairs and count["total_bad"] > 10 and abs(bad) > good:
- vexatious_type = sorted(vexatious_count.keys(), key=lambda x: vexatious_count[x], reverse=True)[0]
- #Disabling this for now -- do I need moderator access on a subreddit to read a user's flair? It seems like it.
- #if vexatious_type == "vex-terp":
- #for flair in reddit.subreddit("theredpill").flair(redditor=self.name):
- # terp_flair = flair
- #if flair != None:
- # log.debug("User: {0} has a flair ('{1}') on r/TheRedPill".format(self.name, terp_flair))
- # vexatious_type = "vex-terp-flaired"
- # vexatious_count["vex-terp-flaired"] = vexatious_count["vex-terp"]
- # vexatious_count["vex-terp"] = 0
- log.info("User: {0} is a '{1}' ({2} of {3} posts in The Book of Grudges)".format(self.name, BAD_FLAIRS[vexatious_type], vexatious_count[vexatious_type], count["total_bad"]))
- if not self.data["flair"] == vexatious_type:
- set_flair(self.name, BAD_FLAIRS[vexatious_type], vexatious_type)
- self.data["flair"] = vexatious_type
- else:
- log.debug("User: {0} already assigned flair '{1}'".format(self.name, vexatious_type))
- self.data["good_score"].insert(0, good)
- if len(self.data["good_score"]) > Tracker.SCAN_DEPTH:
- self.data["good_score"].pop()
- try:
- self.data["ally_score"].insert(0, ally)
- except KeyError: # Transitional, wasn't tracking this separately before
- self.data["ally_score"] = [ally]
- if len(self.data["ally_score"]) > Tracker.SCAN_DEPTH:
- self.data["ally_score"].pop()
- self.data["bad_score"].insert(0, bad)
- if len(self.data["bad_score"]) > Tracker.SCAN_DEPTH:
- self.data["bad_score"].pop()
- self.data["scan_count"] += 1
- self.data["next_scan"] = datetime.now() + Tracker.SCAN_INTERVAL
- log.info("Scanned user: {0} (scanned {1} times), good_karma: {2:.2f} ({3} comments), bad_karma: {4:.2f} ({5} comments)".format(self.name, self.data["scan_count"], good, count["good"] + count["ally"], bad, count["total_bad"]))
- user_data[self.name] = self.data
- return True
- # Sorting hat
- def calc_sjw_market():
- log.info("Recalculating SJW marketplace")
- sjw_market = [list() for i in range(10)]
- i = 0
- neg_count = 0
- zero_count = 0
- pos_count = 0
- rank_lowest = 0
- rank_highest = 0
- last_smv = -1
- in_negative = True
- in_positive = False
- for user in users.values():
- if user.score < 0:
- neg_count += 1
- elif user.score == 0:
- zero_count += 1
- else:
- pos_count += 1
- log.info("TBP has {0} tracked users: {1} negative, {2} at zero, {3} positive".format(len(users), neg_count, zero_count, pos_count))
- for user in sorted(users.values(), key=lambda user: user.score):
- if in_negative:
- if i > neg_count:
- log.debug("Out of negative")
- in_negative = False
- i = 1
- smv = 2
- else:
- i += 1
- smv = ceil((i / neg_count) * 2) - 1
- elif in_positive:
- i += 1
- smv = ceil((i / pos_count) * 7) + 2
- else:
- if i > zero_count:
- log.debug("Out of zero")
- in_positive = True
- i = 1
- smv = ceil((i / pos_count) * 7) + 2
- else:
- smv = 2
- i += 1
- if smv > last_smv:
- if last_smv != -1:
- log.info("{0} Users at rank hb{1} (lowest: {2} at {3:.2f}, highest: {4} at {5:.2f}), now processing rank hb{6}".format(len(sjw_market[last_smv]), last_smv, sjw_market[last_smv][0], rank_lowest, sjw_market[last_smv][-1], rank_highest, smv))
- last_smv = smv
- rank_lowest = user.score
- rank_highest = user.score
- sjw_market[smv].append(user.name)
- log.info("{0} Users at rank hb{1} (lowest: {2} at {3:.2f}, highest: {4} at {5:.2f})".format(len(sjw_market[smv]), smv, sjw_market[smv][0], rank_lowest, sjw_market[smv][-1], rank_highest))
- return sjw_market
- def update_flairs():
- SPECIAL_THRESHOLD = 20
- # I'm so fuckin' dumb, we should've been doing it this way from the beginning
- # (grabbing the entire lists of purged/approved users at the start and checking locally instead of asking Reddit if individual users are purged/approved)
- got_lists = False
- log.info("Retrieving purged/approved lists from Reddit.")
- while not got_lists:
- try:
- purged_users = list(tbp.banned(limit=None))
- approved_users = list(tbp.contributor(limit=None))
- got_lists = True
- except prawcore.exceptions.ServerError:
- sleep_it_off()
- log.info("{0} purged users, {1} approved contributors".format(len(purged_users), len(approved_users)))
- smv = 0
- for rank in calc_sjw_market():
- smv += 1
- log.debug("On rank {0}".format(smv))
- i = 0
- for name in rank:
- i += 1
- user = users[name]
- dirty = False
- flair = user.data["flair"]
- # Check if user should be marked/unmarked as purged
- if flair != "purged" and user.name in purged_users:
- log.info("Marking user: {0} purged".format(user.name))
- set_flair(user.name, "PURGED", "purged")
- flair = "purged"
- user.data["flair"] = "purged"
- dirty = True
- elif flair == "purged" and user.name not in purged_users:
- if get_flair(name) == "purged":
- log.info("User: {0} was unbanned, correcting flair".format(user.name))
- set_flair(user.name, "", "")
- flair = None
- user.data["flair"] = None
- dirty = True
- if not flair:
- log.debug("No cached flair for user: {0}, checking Reddit".format(user.name))
- flair = get_flair(user.name)
- if flair == False:
- drop_user(user.name)
- continue
- if flair != None: # Don't update tracked flair if user isn't wearing a flair
- user.data["flair"] = flair
- log.debug("Reddit says user: {0}'s flair is {1}".format(user.name, flair))
- dirty = True
- log.debug("User: {0}, SMV: {1}, score: {2:.2f} (current flair {3}) ".format(user.name, smv, user.score, flair))
- # Update flairs
- if smv == 3:
- new_flair = "hb3"
- elif i < SPECIAL_THRESHOLD:
- new_flair = "hb{0}minus".format(smv)
- elif i + SPECIAL_THRESHOLD > len(rank):
- new_flair = "hb{0}plus".format(smv)
- else:
- new_flair = "hb{0}".format(smv)
- log.debug("Computed flair for user: {0} is '{1}'".format(user.name, new_flair))
- if flair in IMMUTABLE_FLAIRS:
- if flair in BAD_FLAIRS.keys() and smv > 2 and user.good_score > abs(user.bad_score):
- log.warn("Should user: {0} be vexatious? SMV: {1}, good_score: {2:.2f}, bad_score: {3:.2f}".format(user.name, smv, user.good_score, user.bad_score))
- else:
- log.debug("Not changing user: {0} (immutable flair {1})".format(user.name, flair))
- elif flair != new_flair:
- if set_flair(user.name, "Hβ{0}".format(smv), new_flair) == False:
- drop_user(user.name)
- continue
- log.info("Updating user: {0} flair from {1} to {2}".format(user.name, flair, new_flair))
- flair = new_flair
- user.data["flair"] = flair
- user.data["smv"] = smv
- dirty = True
- else:
- log.debug("User: {0} still a {1}".format(user.name, flair))
- # Add/remove approved contributors
- if ((smv > 7 or flair in GOOD_FLAIR_LIST) and flair not in BAD_FLAIR_LIST) and user.name not in approved_users:
- add_approved(user.name)
- log.info("Adding approved contributor: {0}".format(user.name))
- elif ((smv < 4 or flair in BAD_FLAIR_LIST) and flair not in GOOD_FLAIR_LIST) and user.name in approved_users:
- del_approved(user.name)
- log.info("Removing approved contributor: {0}".format(user.name))
- if dirty:
- user_data[user.name] = user.data
- global_data["next_flair_update"] = datetime.now() + FLAIR_UPDATE_INTERVAL
- log.info("Flair update complete (next update due on/after: {0:%Y/%m/%d %I:%M:%S%p})".format(global_data["next_flair_update"]))
- def botloop():
- try:
- seen_comments = global_data["seen_comments"]
- except pickle.UnpicklingError:
- seen_comments = []
- # Loop over comment stream indefinitely
- for comment in tbp.stream.comments():
- if datetime.now() > global_data["next_dead_user_update"]:
- dead_user_count = 0
- user_count = len(users)
- for user in list(users.values()): # Python gets cranky if the size of a iterable changes while it's getting iterated
- if not user.update():
- dead_user_count += 1
- drop_user(user.name)
- # If they're getting shit tested and haven't been seen since the last user update, extend the end time out by USER_UPDATE_INTERVAL
- test = None
- for eater in user.data["comment_eater"]:
- if eater in SHIT_TESTS.keys():
- test = eater
- break
- if test and datetime.now() > user.data["last_seen"] + USER_UPDATE_INTERVAL:
- try:
- user.data["comment_eater"][test]["end"] += USER_UPDATE_INTERVAL
- except KeyError:
- user.data["comment_eater"][test]["end"] = user.data["comment_eater"][test]["start"] + timedelta(weeks=1) + USER_UPDATE_INTERVAL
- log.info("Dead user scan complete: {0} of {1} tracked users dropped (next update due on/after: {2:%Y/%m/%d %I:%M:%S%p})".format(dead_user_count, user_count, global_data["next_dead_user_update"]))
- global_data["next_dead_user_update"] = datetime.now() + USER_UPDATE_INTERVAL
- if datetime.now() > global_data["next_flair_update"]:
- update_flairs()
- # Don't bother processing the comment if it's already been looked at
- if comment.id in seen_comments:
- log.debug("Skipping comment '{0}' on submission '{1}' [id: {2}], already seen.".format(comment.id, comment.submission.title, comment.submission.id))
- continue
- # Grab submission link flair, assign one randomly if blank
- threat_level = comment.submission.link_flair_css_class
- if threat_level == None:
- if randint(0,1) == 0: # 50-50 shot of getting "Severe"
- threat_selector = 0
- else:
- threat_selector = randint(1, len(THREAT_LEVELS) - 3) # Won't randomly assign the last two link flairs
- threat_level = list(THREAT_LEVELS.keys())[threat_selector]
- set_link_flair(comment.submission, THREAT_LEVELS[threat_level], threat_level)
- log.info("Set threat level for submission '{0}' [id: {1}] to {2}".format(comment.submission.title, comment.submission.id, THREAT_LEVELS[threat_level]))
- else:
- log.debug("Threat level for submission '{0}' [id: {1}] is {2})".format(comment.submission.title, comment.submission.id, THREAT_LEVELS[threat_level]))
- if not comment.author: # Skip over comments from deleted users
- log.debug("Skipping comment {0} on submission '{1}' [id: {2}], author no longer exists".format(comment.id, comment.submission.title, comment.submission.id))
- continue
- dirty = False
- name = comment.author.name
- author_flair = comment.author_flair_css_class
- # Update tracker
- if not name in users.keys():
- users[name] = Tracker(name)
- else:
- if not users[name].update():
- drop_user(name)
- continue
- users[name].data["last_seen"] = datetime.now()
- dirty = True
- # Update tracked flair if comment flair doesn't match tracked flair
- user_flair = users[name].data["flair"]
- if author_flair and user_flair != author_flair:
- dirty = True
- log.info("Updating tracked flair for user: {0} -- was {1}, now {2}".format(name, user_flair, author_flair))
- user_flair = author_flair
- users[name].data["flair"] = user_flair
- # Handle unflaired users and their comments
- if not author_flair and users[name].data["scan_count"] > 3:
- log.debug("User: {0} should have flair: {1}, but comment flair is {2}".format(name, user_flair, author_flair))
- if "unflaired" not in users[name].data["comment_eater"].keys() and users[name].data["smv"] < 4:
- users[name].data["comment_eater"]["unflaired"] = {"start":datetime.now(), "eaten": []}
- reply = comment.reply("Apologies for the inconvenience, but user flairs are mandatory on /r/TheBluePill. Please enable your user flair at your earliest convenience:\n\n- If you're on mobile Reddit, you'll need to first flip back to the desktop site (Burger Menu -> Desktop Site), then:\n- If you're on classic Reddit, you'll need to disable this Subreddit's custom CSS (preferences -> display options -> allow subreddits to show me custom themes)\n- If you're using an app to access Reddit, check the app's documentation, but you'll probably need to access the desktop site from a browser.\n\nYour comments will only be automatically removed if your SMV is too low, or the comment has negative karma. Once you're properly flaired again, these automatically removed comments will be restored.\n\nThanks for choosing r/TheBluePill!\n\netc. etc.,\n\n-Management\n\n*NOTE: This is an automated action, but this account is also attached to a human operator if you have any concerns. We appreciate your continued business!*")
- reply.mod.distinguish()
- log.info("Warned user: {0}, flairs are mandatory (except when they aren't).".format(name))
- dirty = True
- else:
- unflaired = users[name].data["comment_eater"]["unflaired"]
- if name in FLAIR_EXEMPT:
- log.info("Not removing comment {0} by unflaired user: {1} (exempt)".format(comment.id, name))
- elif datetime.now() < unflaired["start"] + timedelta(minutes=15):
- log.info("Not removing comment {0} by unflaired user: {1} (still in grace period after warning)".format(comment.id, name))
- elif users[name].data["smv"] > 3 or comment.score > 0 or user_flair in GOOD_FLAIR_LIST:
- log.info("Not removing comment {0} by unflaired user: {1} (too high value)".format(comment.id, name))
- elif comment.approved:
- log.info("Not removing comment {0} by unflaired user: {1} (manually approved)".format(comment.id, name))
- elif comment.id in unflaired["eaten"] or comment.removed:
- log.info("Not removing comment {0} by unflaired user: {1} (already removed)".format(comment.id, name))
- else:
- unflaired["eaten"].append(comment.id)
- comment.mod.remove()
- log.info("Removed comment by user {0} ({1}) [id: {2}] on submission {3} [id: {4}], not wearing flair!".format(name, author_flair, comment.id, comment.submission.title, comment.submission.id))
- dirty = True
- elif author_flair and "unflaired" in users[name].data["comment_eater"].keys() and len(users[name].data["comment_eater"]["unflaired"]["eaten"]) > 0:
- for comment in users[name].data["comment_eater"]["unflaired"]["eaten"]:
- reddit.comment(id=comment).approve()
- log.info("Restoring comment {0} for user: {1} (is wearing flair now)".format(comment, name))
- users[name].data["comment_eater"]["unflaired"]["eaten"] = []
- dirty = True
- # Remove comments if user's SMV is too low for submission's threat level
- if author_flair in THREAT_MATRIX[threat_level] and name != comment.submission.author and not comment.approved:
- try:
- users[name].data["comment_eater"]["low_smv"]["eaten"].append(comment.id)
- except KeyError:
- users[name].data["comment_eater"]["low_smv"] = {"eaten": [comment.id]}
- comment.mod.remove()
- log.info("Removing comment by user: {0} ({1}) [id: {2}] on submission '{3}' [id: {4}] ({5}), SMV too low!".format(name, author_flair, comment.id, comment.submission.title, comment.submission.id, THREAT_LEVELS[threat_level]))
- dirty = True
- #Shit tests
- if name in BETA_TESTERS:
- test = None
- for eater in users[name].data["comment_eater"]:
- if eater in SHIT_TESTS.keys():
- test = eater
- break
- if test:
- log.debug("Shit testing comment by user: {0} ({1}) [id: {2}] on submission '{3}' [id: {4}]".format(name, author_flair, comment.id, comment.submission.title, comment.submission.id))
- SHIT_TESTS[test](users[name], comment)
- else:
- scrabble_test(users[name], comment)
- # Quick and dirty quarantine
- if name == "movielover2018" and comment.submission.id != "91xefy":
- mesg = "I GAVE YOU ONE FUCKING RULE AND YOU DEFY ME."
- reply = comment.reply(mesg)
- reply.mod.distinguish()
- tbp.banned.add(name, ban_reason="AUTOBANNED: tried to escape quarantine", ban_message=mesg)
- set_flair(name, "PURGED", "purged")
- try:
- del users[name].data["comment_eater"]["scrabble"]
- except KeyError:
- pass
- users[name].data["flair"] = "purged"
- user_data[name] = users[name].data
- log.warn("GET FUCKED MOVIELOVER2018")
- if dirty:
- user_data[name] = users[name].data
- seen_comments.insert(0, comment.id)
- if len(seen_comments) > 250:
- seen_comments.pop()
- global_data["seen_comments"] = seen_comments
- global_data.sync()
- user_data.sync()
- # Set up logging facilities
- log = logging.getLogger("VirtueTron")
- log.setLevel(logging.DEBUG)
- formatter = logging.Formatter(fmt="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y/%m/%d %I:%M:%S%p")
- file_log = logging.FileHandler(filename="VirtueTron.log", mode="a")
- file_log.setLevel(logging.INFO)
- file_log.setFormatter(formatter)
- log.addHandler(file_log)
- console_log = logging.StreamHandler(stream=sys.stdout)
- console_log.setLevel(logging.DEBUG)
- console_log.setFormatter(formatter)
- log.addHandler(console_log)
- log.info("VirtueTron® 9000™ {0} © CrashARuntimeToday@outlook.com".format(VERSION))
- # Connect to Reddit
- credentials = pickle.load(open("credentials.pickle", "rb")) # { client_id: "VirtueTron9000", client_secret: "🤖🤡🍆💯™", username: "SignalAVirtueToday", password: "https://youtu.be/RCVJ7bujnSc"}
- credentials["user_agent"] = "VirtueTron 9000 {0}".format(VERSION)
- reddit = praw.Reddit(**credentials)
- tbp = reddit.subreddit("TheBluePill")
- log.info("Hello, Reddit!")
- # Get to work
- with shelve.open("user_data") as user_data, shelve.open("global_data") as global_data:
- users = {}
- if "seen_comments" not in global_data.keys():
- global_data["seen_comments"] = []
- if "stats" not in global_data.keys():
- stats = {}
- stats["best_comment"] = {"name": None, "score": None, "id": None}
- stats["worst_comment"] = {"name": None, "score": None, "id": None}
- global_data["stats"] = stats
- if "next_flair_update" not in global_data.keys():
- global_data["next_flair_update"] = datetime.now() + FLAIR_UPDATE_INTERVAL
- if "next_dead_user_update" not in global_data.keys():
- global_data["next_dead_user_update"] = datetime.now() + USER_UPDATE_INTERVAL
- for user in user_data.keys():
- users[user] = Tracker(user, False) # Change this to "True" the next time you crash the bot and wipe out user_data you stupid fuckin' hack
- try:
- botloop()
- except prawcore.PrawcoreException:
- sleep_it_off()
- botloop()
- except KeyboardInterrupt:
- log.info("VirtuteTron going off-line")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement