Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python
- LICENCE = "WTFPL", "http://www.wtfpl.net/about/"
- VERSION = "v0.3.8b"
- import logging
- import pickle
- import praw
- import prawcore
- import shelve
- import sys
- from datetime import datetime, timedelta
- from math import ceil
- from time import sleep
- from random import randint
- from statistics import mean
- log = logging.getLogger("VirtueTron")
- log.setLevel(logging.DEBUG)
- formatter = logging.Formatter(fmt="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y/%m/%d %I:%M:%S%p")
- file_log = logging.FileHandler(filename="VirtueTron.log", mode="a")
- file_log.setLevel(logging.INFO)
- file_log.setFormatter(formatter)
- log.addHandler(file_log)
- console_log = logging.StreamHandler(stream=sys.stdout)
- console_log.setLevel(logging.DEBUG)
- console_log.setFormatter(formatter)
- log.addHandler(console_log)
- log.info("Hello, Reddit!")
- log.info("VirtueTron® 9000™ {0} © CrashARuntimeToday@outlook.com".format(VERSION))
- credentials = pickle.load(open("credentials.pickle", "rb")) # { client_id: "VirtueTron9000", client_secret: "🤖🤡🍆💯™", username: "SignalAVirtueToday", password: "https://youtu.be/RCVJ7bujnSc"}
- credentials["user_agent"] = "VirtueTron 9000 {0}".format(VERSION)
- reddit = praw.Reddit(**credentials)
- tbp = reddit.subreddit("TheBluePill")
- NICE_LIST = []
- NAUGHTY_LIST = ["theredpill", "marriedredpill", "mgtow", "braincels", "asktrp", "askmrp", "redpillwomen", "redpillwives", "cringeanarchy", "the_donald", "rpchristians", "pussypassdenied", "mensrights", "milliondollarextreme", "4chan", "whereareallthegoodmen"]
- IMMUTABLE_FLAIRS = ["vanguard", "vexatious", "endorsedflair", "alpha", "betaasfuck", "feeemale", "purged"]
- BAD_FLAIRS = ["purged", "vexatious"]
- GOOD_FLAIRS = ["endorsedflair", "vanguard", "alpha", "betaasfuck", "feeemale"]
- TODAYS_THREAT_LEVEL = {"tlsevere":"Severe", "tlhigh":"High", "tlelevated":"Elevated", "tlguarded":"Guarded", "tllow":"Low"}
- THREAT_MATRIX = {"tllow": BAD_FLAIRS + ["hb{0}".format(x) for x in range(1,7)],
- "tlguarded": BAD_FLAIRS + ["hb{0}".format(x) for x in range(1,5)],
- "tlelevated": BAD_FLAIRS + ["hb{0}".format(x) for x in range(1,3)],
- "tlhigh": BAD_FLAIRS,
- "tlsevere": []}
- FLAIR_EXEMPT = []
- SHIT_TESTS = {"no_e": {"message": "Your comments may no longer contain the letter 'e'.", "eval_func": lambda comment: "e" not in comment.body.lower()}}
- delay = 2
- last_fuckup = None
- def praw_fucked_up():
- global delay
- global last_fuckup
- log.warning("Reddit API error: waiting {0} seconds".format(delay))
- try:
- if delay > 128 or datetime.now() > last_fuckup + timedelta(minutes=2): delay = 2
- except TypeError:
- delay = 2
- sleep(delay)
- delay *= 2
- last_fuckup = datetime.now()
- def get_flair(name):
- while True:
- try:
- for flair in tbp.flair(redditor=name):
- return flair["flair_css_class"]
- except prawcore.PrawcoreException:
- praw_fucked_up()
- def set_flair(name, flair_text, flair_css):
- while True:
- try:
- tbp.flair.set(name, flair_text, flair_css)
- return True
- except prawcore.PrawcoreException:
- praw_fucked_up()
- def set_link_flair(submission, flair_text, flair_css):
- while True:
- try:
- submission.mod.flair(css_class=flair_css, text=flair_text)
- return True
- except prawcore.PrawcoreException:
- praw_fucked_up()
- def add_approved(name):
- while True:
- try:
- tbp.contributor.add(name)
- return True
- except praw.exceptions.APIException as e:
- if e.error_type == "USER_DOESNT_EXIST":
- log.info("User: {0} is deleted or suspended, dropping from userlist".format(name))
- del users[name]
- del user_data[name]
- return True
- except prawcore.PrawcoreException:
- praw_fucked_up()
- def del_approved(name):
- while True:
- try:
- tbp.contributor.remove(name)
- except praw.exceptions.APIException as e:
- if e.error_type == "USER_DOESNT_EXIST":
- log.info("User: {0} is deleted or suspended, dropping from userlist".format(name))
- del users[name]
- del user_data[name]
- return True
- except prawcore.PrawcoreException:
- praw_fucked_up()
- class Tracker:
- SCAN_INTERVAL = timedelta(minutes=5)
- # For some reason VSCode eats the indents on blank lines and this gives the python interpreter fits when I copy/paste, so empty comments at the correct indent level it is!
- def __init__(self, name, new_user=True):
- self.name = name
- if new_user:
- self.data = { "name": name,
- "scan_count": 0,
- "last_seen": None,
- "next_scan": None,
- "smv": 0,
- "flair": None,
- "good_score": [],
- "bad_score": [],
- "manual_score_bias": 0,
- "best_post": { "score": 0, "id": None },
- "worst_post": { "score": 0, "id": None },
- "comment_eater": {}
- }
- self.update()
- log.info("Adding new user: {0}".format(name))
- else:
- self.data = user_data[name]
- #
- @property
- def good_score(self):
- if len(self.data["good_score"]) > 0:
- return mean(self.data["good_score"]) + self.data["manual_score_bias"]
- else:
- #log.warn("User: {0} has no tracked good points?".format(self.name))
- return 0
- @property
- def bad_score(self):
- if len(self.data["bad_score"]) > 0:
- return mean(self.data["bad_score"])
- else:
- #log.warn("User: {0} has no tracked bad points?".format(self.name))
- return 0
- @property
- def score(self):
- return self.good_score + self.bad_score
- def update(self):
- self.data["last_seen"] = datetime.now()
- try:
- if self.data["last_seen"] < self.data["next_scan"]:
- log.debug("Skipping user: {0} (next scan on {1:%Y/%m/%d %I:%M:%S%p})".format(self.name, self.next_refresh))
- return True
- except TypeError:
- pass
- good, bad, good_count, bad_count = 0, 0, 0, 0
- try:
- for comment in reddit.redditor(self.name).comments.new(limit=100):
- sub = comment.subreddit.display_name.lower()
- karma = comment.score - 1
- if sub == "thebluepill":
- good += karma
- good_count += 1
- try:
- if karma > self.data["best_post"]["score"]:
- self.data["best_post"] = {"score": karma, "id": comment.id}
- except TypeError:
- self.data["best_post"] = {"score": karma, "id": comment.id}
- try:
- if karma < self.data["worst_post"]["score"]:
- self.data["worst_post"] = {"score": karma, "id": comment.id}
- except TypeError:
- self.data["worst_post"] = {"score": karma, "id": comment.id}
- elif sub in NICE_LIST:
- good += karma / 2
- elif sub in NAUGHTY_LIST and karma > 1:
- bad -= karma
- bad_count += 1
- if good_count > 0:
- good /= good_count
- good *= good_count / 10 # Every 20 posts in r/TheBluePill doubles your good karma
- if bad_count > 0:
- bad /= bad_count
- bad *= bad_count / 5 # Every 10 posts in NAUGHTY_LIST doubles your bad karma
- # Are they vexatious? (Of course they are!)
- if self.data["flair"] != "vexatious" and bad_count > 10 and abs(bad) > good:
- log.info("User: {0} is vexatious ({1} posts in NAUGHTY_LIST)".format(self.name, bad_count))
- set_flair(self.name, "VEXATIOUS LITIGANT", "vexatious")
- self.data["flair"] = "vexatious"
- self.data["good_score"].insert(0, good)
- if len(self.data["good_score"]) > 8:
- self.data["good_score"].pop()
- log.debug("Dropping oldest good karma sample for user: {0}".format(self.name))
- self.data["bad_score"].insert(0, bad)
- if len(self.data["bad_score"]) > 8:
- self.data["bad_score"].pop()
- log.debug("Dropping oldest bad karma sample for user: {0}".format(self.name))
- self.data["scan_count"] += 1
- self.data["next_refresh"] = self.data["last_seen"] + Tracker.SCAN_INTERVAL
- log.info("Scanned user: {0} (scanned {1} times), good_karma: {2:.2f} ({3} comments), bad_karma: {4:.2f} ({5} comments)".format(self.name, self.data["scan_count"], good, good_count, bad, bad_count))
- user_data[self.name] = self.data
- return True
- except (prawcore.exceptions.NotFound, prawcore.exceptions.Forbidden):
- log.debug("User: {0} is suspended or deleted".format(self.name))
- return False
- def update_flairs():
- log.info("Recalculating SMV")
- i = 0
- total = len(users)
- # I'm so fuckin' dumb, we should've been doing it this way from the beginning
- purged_users = list(tbp.banned(limit=None))
- approved_users = list(tbp.contributor(limit=None))
- #
- for user in sorted(users.values(), key=lambda user: user.score):
- dirty = False
- flair = user.data["flair"]
- # Check if user should be marked/unmarked as purged
- if flair != "purged" and user.name in purged_users:
- log.info("Marking user: {0} purged".format(user.name))
- set_flair(user.name, "PURGED", "purged")
- flair = "purged"
- user.data["flair"] = "purged"
- dirty = True
- elif flair == "purged" and user.name not in purged_users:
- log.info("User: {0} was unbanned, correcting flair".format(user.name))
- set_flair(user.name, "", "")
- flair = None
- user.data["flair"] = None
- dirty = True
- # Calculate SMV
- i += 1
- smv = ceil((i / total) * 10)
- if not flair:
- log.debug("No cached flair for user: {0}, checking Reddit".format(user.name))
- flair = get_flair(user.name)
- user.data["flair"] = flair
- log.debug("Reddit says user: {0}'s flair is {1}".format(user.name, flair))
- dirty = True
- log.debug("User: {0}, SMV: {1}, score: {2:.2f} (current flair {3})".format(user.name, smv, user.score, flair))
- # Update flairs
- if flair in IMMUTABLE_FLAIRS:
- if "flair" == "vexatious" and smv > 2 and user.good_score > abs(user.bad_score):
- log.warn("Should user: {0} be vexatious? SMV: {1}, good_score: {2:.2f}, bad_score: {3:.2f}".format(user.name, smv, user.good_karma, user.bad_karma))
- else:
- log.debug("Not changing user: {0} (immutable flair {1})".format(user.name, flair))
- elif flair != "hb{0}".format(smv):
- flair = "hb{0}".format(smv)
- user.data["flair"] = flair
- user.data["smv"] = smv
- set_flair(user.name, "Hβ{0}".format(smv), "hb{0}".format(smv))
- log.info("Updating user: {0} flair to hb{1}".format(user.name, smv))
- dirty = True
- else:
- log.debug("User: {0} still an Hβ{1}".format(user.name, smv))
- # Add/remove approved contributors
- if ((smv > 7 or flair in GOOD_FLAIRS) and flair not in BAD_FLAIRS) and user.name not in approved_users:
- add_approved(user.name)
- log.debug("Adding approved contributor: {0}".format(user.name))
- elif ((smv < 4 or flair in BAD_FLAIRS) and flair not in GOOD_FLAIRS) and user.name in approved_users:
- del_approved(user.name)
- log.info("Removing approved contributor: {0}".format(user.name))
- if dirty:
- user_data[user.name] = user.data
- def drop_dead_users():
- for user in users.values():
- try:
- for comment in reddit.redditor(user.name).new(limit=1):
- log.debug("User: {0} still exists (most recent comment id is {1})".format(user.name, comment.id))
- except (prawcore.exceptions.NotFound, prawcore.exceptions.Forbidden):
- log.info("User: {0} is deleted or suspended, dropping from userlist".format(user.name))
- del users[user.name]
- del user_data[user.name]
- continue
- def botloop(first_update=datetime.now()):
- DEAD_USER_INTERVAL = timedelta(days=1)
- FLAIR_UPDATE_INTERVAL = timedelta(minutes=15)
- next_flair_update = first_update
- next_dead_user_update = first_update + DEAD_USER_INTERVAL
- # Loop over comment stream indefinitely
- for comment in tbp.stream.comments():
- if datetime.now() > next_dead_user_update:
- drop_dead_users()
- if datetime.now() > next_flair_update:
- update_flairs()
- next_flair_update = datetime.now() + FLAIR_UPDATE_INTERVAL
- log.debug("Flair update complete (next update due on/after: {0:%Y/%m/%d %I:%M:%S%p})".format(next_flair_update))
- # Grab submission link flair, assign one randomly if blank
- threat_level = comment.submission.link_flair_css_class
- if threat_level == None:
- if randint(0,1) == 0: # 50-50 shot of getting "Severe"
- threat_selector = 0
- else:
- threat_selector = randint(1, len(TODAYS_THREAT_LEVEL) - 3) # Won't randomly assign the last two link flairs
- threat_level = list(TODAYS_THREAT_LEVEL.keys())[threat_selector]
- set_link_flair(comment.submission, TODAYS_THREAT_LEVEL[threat_level], threat_level)
- log.info("Set threat level for submission '{0}' [id: {1}] to {2}".format(comment.submission.title, comment.submission.id, TODAYS_THREAT_LEVEL[threat_level]))
- else:
- log.debug("Threat level for submission '{0}' [id: {1}] is {2})".format(comment.submission.title, comment.submission.id, TODAYS_THREAT_LEVEL[threat_level]))
- # Skip over comments from deleted users
- if comment.author:
- dirty = False
- name = comment.author.name
- author_flair = comment.author_flair_css_class
- # Update tracker
- if not name in users.keys():
- users[name] = Tracker(name)
- else:
- if not users[name].update():
- log.info("User: {0} is deleted or suspended, dropping from userlist".format(name))
- del users[name]
- del user_data[name]
- continue
- # Update tracked flair if comment flair doesn't match tracked flair
- user_flair = users[name].data["flair"]
- if author_flair and user_flair != author_flair:
- dirty = True
- log.info("Updating tracked flair for user: {0} -- was {1}, now {2}".format(name, user_flair, author_flair))
- user_flair = author_flair
- users[name].data["flair"] = user_flair
- # Handle unflaired users and their comments
- if not author_flair and users[name].data["scan_count"] > 3:
- dirty = True
- log.debug("User: {0} should have flair: {1}, but comment flair is {2}".format(name, user_flair, author_flair))
- if "unflaired" not in users[name].data["comment_eater"].keys():
- users[name].data["comment_eater"]["unflaired"] = {"start":datetime.now(), "eaten": []}
- reply = comment.reply("Apologies for the inconvenience, but user flairs are mandatory on /r/TheBluePill. Please enable your user flair at your earliest convenience:\n\n- If you're on mobile Reddit, you'll need to first flip back to the desktop site (Burger Menu -> Desktop Site), then:\n- If you're on classic Reddit, you'll need to disable this Subreddit's custom CSS (preferences -> display options -> allow subreddits to show me custom themes)\n- If you're using an app to access Reddit, check the app's documentation, but you'll probably need to access the desktop site from a browser.\n\nYour comments will only be automatically removed if your SMV is too low, or the comment has negative karma. Once you're properly flaired again, these automatically removed comments will be restored.\n\nThanks for choosing r/TheBluePill!\n\netc. etc.,\n\n-Management\n\n*NOTE: This is an automated action, but this account is also attached to a human operator if you have any concerns. We appreciate your continued business!*")
- reply.mod.distinguish()
- log.info("Warned user: {0}, flairs are mandatory.".format(name))
- else:
- unflaired = users[name].data["comment_eater"]["unflaired"]
- exempt = name not in FLAIR_EXEMPT
- countdown_not_exceeded = datetime.now() < unflaired["start"] + timedelta(minutes=15)
- too_high_value = users[name].data["smv"] > 3 or comment.score > 0 or user_flair not in BAD_FLAIRS
- handled = comment.id in unflaired["eaten"] or comment.approved
- if exempt:
- log.info("Not removing comment {0} by unflaired user: {1} (exempt)".format(comment.id, name))
- elif countdown_not_exceeded:
- log.info("Not removing comment {0} by unflaired user: {1} (still in grace period after warning)".format(comment.id, name))
- elif too_high_value:
- log.info("Not removing comment {0} by unflaired user: {1} (too high value)".format(comment.id, name))
- elif handled:
- log.info("Not removing comment {0} by unflaired user: {1} (already removed or manually approved)".format(comment.id, name))
- else:
- unflaired["eaten"].append(comment.id)
- comment.mod.remove()
- log.info("Removed comment {0} by unflaired user: {1}".format(comment.id, name))
- elif author_flair and "unflaired" in users[name].data["comment_eater"].keys() and len(users[name].data["comment_eater"]["unflaired"]["eaten"]) > 0:
- dirty = True
- for comment in users[name].data["comment_eater"]["unflaired"]["eaten"]:
- reddit.comment(id=comment).approve()
- log.info("Restoring comment {0} for user: {1} (is wearing flair now)".format(comment, name))
- users[name].data["comment_eater"]["unflaired"]["eaten"] = []
- # Remove comments if user's SMV is too low for submission's threat level
- if author_flair in THREAT_MATRIX[threat_level] and name != comment.submission.author and not comment.approved:
- dirty = True
- if "low_smv" not in users[name].data["comment_eater"].keys():
- users[name].data["comment_eater"]["low_smv"] = {"eaten": [comment.id]}
- else:
- users[name].data["comment_eater"]["low_smv"]["eaten"].append(comment.id)
- comment.mod.remove()
- log.info("Removing comment by user: {0} ({1}) on submission '{2}' [id: {3}] ({4}), SMV too low!".format(name, author_flair, comment.submission.title, comment.id, TODAYS_THREAT_LEVEL[threat_level]))
- # TODO: Assign shit tests
- if dirty:
- user_data[name] = users[name].data
- else:
- log.debug("Skipping comment {0}, author no longer exists".format(comment.id))
- with shelve.open("user_data") as user_data:
- users = {}
- for user in user_data.keys():
- users[user] = Tracker(user, False)
- try:
- botloop()
- except prawcore.PrawcoreException:
- praw_fucked_up()
- botloop()
- except KeyboardInterrupt:
- log.info("VirtuteTron going off-line")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement