Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import admin.admin as auth
- auth.startConnection("elevatedgaming.net", "user", "pass", "db name")
- import logger.logger as log
- import threading
- import time
- import codecs
- import datetime
- import pymysql
- import re
- from listeners import on_tick_listener_manager, OnLevelShutdown
- from core import AutoUnload
- from events import Event
- from players.helpers import index_from_userid, index_from_steamid
- from players.entity import Player
- from filters.players import PlayerIter
- from path import Path
- from plugins.info import PluginInfo
- from messages import HintText, SayText2
- from filters.recipients import RecipientFilter
- from commands.client import ClientCommand
- from commands.say import SayCommand
- from commands import CommandReturn
- from engines.sound import Sound, SOUND_FROM_WORLD
- from menus import PagedMenu, SimpleMenu, PagedOption, SimpleOption
- from events.hooks import PreEvent
- base_path = Path(__file__).parent
- MODNAME = base_path.namebase
- DENIEDSOUND = Sound("buttons/weapon_cant_buy.wav")
- NAVSOUND = Sound('buttons/button14.wav')
- EXITSOUND = Sound('common/wpn_hudoff.wav')
- SELECTSOUND = Sound('common/talk.wav')
- #info = PluginInfo()
- #info.author = "Shen Li"
- #info.basename = MODNAME
- #info.description = "Implements a CT Ban Module"
- #info.name = "EgN's CT Ban Mod"
- #info.version = "1.1.0"
- #info.url = "http://elevatedgaming.net"
- datadict = dict()
- def timeparser(timestr):
- ints = re.findall(r'\d+', timestr)
- strs = re.findall(r'\D+', timestr)
- if strs:
- sec = 0
- try: sec += int(ints[strs.index("s")])
- except: pass
- try: sec += int(ints[strs.index("m")])*60
- except: pass
- try: sec += int(ints[strs.index("min")])*60
- except: pass
- try: sec += int(ints[strs.index("h")])*60*60
- except: pass
- try: sec += int(ints[strs.index("d")])*60*60*24
- except: pass
- try: sec += int(ints[strs.index("mon")])*60*60*24*30.44
- except: pass
- try: sec += int(ints[strs.index("M")])*60*60*24*30.44
- except: pass
- try: sec += int(ints[strs.index("b")])*60*60*24*30.44
- except: pass
- try: sec += int(ints[strs.index("w")])*60*60*24*7
- except: pass
- try: sec += int(ints[strs.index("W")])*60*60*24*7
- except: pass
- try: sec += int(ints[strs.index("y")])*60*60*24*365.24
- except: pass
- try: sec += int(ints[strs.index("Y")])*60*60*24*365.24
- except: pass
- return int(sec)
- else:
- return int(ints[0])
- def steamid_from_networkid(networkid):
- #Old: [U:1:2*B+A] -> New: STEAM_0:A:B
- if networkid[:6] == "STEAM_":
- return networkid
- else:
- accountid, universe = divmod(int(networkid.split(':')[2][:-1]), 2)
- steamid = "STEAM_0:{0}:{1}".format(universe, accountid)
- return steamid
- class AudiblePagedMenu(PagedMenu):
- def __init__(self, nav_callback=None, exit_callback=None, *args, **kwargs):
- self.nav_callback = nav_callback
- self.exit_callback = exit_callback
- super().__init__(*args, **kwargs)
- def _select(self, ply_index, choice):
- if choice in (8,9):
- NAVSOUND.play(RecipientFilter(ply_index))
- try:
- self.nav_callback(ply_index, choice)
- except TypeError:
- pass
- elif choice == 0:
- EXITSOUND.play(RecipientFilter(ply_index))
- try:
- self.exit_callback(ply_index, choice)
- except TypeError:
- pass
- else:
- SELECTSOUND.play(RecipientFilter(ply_index))
- return super()._select(ply_index, choice)
- class StoppableSPThread(threading.Thread, AutoUnload):
- def __init__(self, accuracy=1, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.accuracy = accuracy
- on_tick_listener_manager.register_listener(self._tick)
- self._stop = threading.Event()
- self._finished = threading.Event()
- def run(self):
- while not self.stopped:
- try:
- self.do()
- except Exception:
- except_hooks.print_exception()
- time.sleep(self.accuracy)
- on_tick_listener_manager.unregister_listener(self._tick)
- self._finished.set()
- def do(self):
- raise NotImplementedError("Some exception to indicate that this should be overridden")
- def _tick(self):
- pass
- def stop(self):
- self._stop.set()
- @property
- def stopped(self):
- return self._stop.is_set()
- @property
- def finished(self):
- return self._finished.is_set()
- _unload_instance = stop
- class DBThread(StoppableSPThread):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- def prime(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- def do(self):
- banlist = list()
- conn = pymysql.connect(
- host='elevatedgaming.net', port=3306, user='user', passwd="passwd", db='sloppy_forums', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor
- )
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM ctbans")
- results = cursor.fetchall()
- for result in results:
- banlist.append({
- "index": result["index"],
- "name": codecs.getdecoder('unicode_escape')(result["name"]),
- "steamid": result["steamid"],
- "time": int(result["time"]),
- "duration": int(result["duration"]),
- "admin": result["admin"],
- "comment": result["comment"],
- "unban":result["unban"]})
- #log.Write("Downloaded {0} CT bans from the database.".format(str(len(banlist))))
- bans.update(banlist)
- self.stop()
- cursor.close()
- conn.close()
- class BanQuery(threading.Thread):
- def __init__(self, steamid, name, admin, duration, comment):
- threading.Thread.__init__(self)
- self.steamid = steamid
- self.name = name
- self.admin = admin
- self.duration = duration
- self.comment = comment
- if not self.comment: self.comment = "NULL"
- else: self.comment = "'%s'" % self.comment.replace("'","''")
- def run(self):
- conn = pymysql.connect(
- host='elevatedgaming.net', port=3306, user='user', passwd="passwd", db='sloppy_forums', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor
- )
- cursor = conn.cursor()
- cursor.execute("INSERT INTO `ctbans` (`name`, `steamid`, `time`, `duration`, `admin`, `comment`) VALUES ('{0}', '{1}', '{2}', '{3}', '{4}', {5})".format(self.name.replace("'","''"), self.steamid, int(time.time()), self.duration, auth.getID(self.admin.steamid), self.comment))
- if self.duration > 0:
- SayText2(message="\x05EgN Jailbreak| \x04{0} \x01CT banned player \x04{1} \x01until {2}.".format(self.admin.name, self.name, datetime.datetime.fromtimestamp(int(time.time()) + self.duration).strftime('%Y-%m-%d %H:%M:%S'))).send(RecipientFilter())
- else:
- SayText2(message="\x05EgN Jailbreak| \x04{0} \x01CT banned player \x04{1} \x01permanently.".format(self.admin.name, self.name)).send(RecipientFilter())
- log.Write("Executed ban: {0} {1} {2} {3}".format(self.steamid, self.name, self.duration, auth.getID(self.admin.steamid)))
- bans.refresh()
- cursor.close()
- conn.close()
- class Bans(object):
- def __init__(self):
- self.bans = list()
- self.piped = False
- def update(self, banlist):
- self.bans = banlist
- self.piped = True
- def refresh(self):
- DBThread().start()
- def player(self, steamid):
- info = list()
- steamid = steamid_from_networkid(steamid)
- for x in self.bans:
- if x["steamid"] == steamid:
- info.append(x)
- return (len(info), info)
- def ban(player, admin, dur=0, comment=False):
- target = player
- admin = Player(admin)
- steamid = steamid_from_networkid(target.steamid)
- if bans.isbanned(steamid):
- SayText2(message="\x05EgN Jailbreak| \x01That player is already CT banned.").send(RecipientFilter(admin.index))
- return False
- if not auth.isImmune(admin.steamid, target.steamid):
- SayText2(message="\x05EgN Jailbreak| \x01Target is immune.").send(RecipientFilter(admin.index))
- return False
- SayText2(message="\x05EgN Jailbreak| \x01Your CT ban is being processed.").send(RecipientFilter(admin.index))
- try:
- BanQuery(steamid, target.name, admin, dur, comment).start()
- except:
- log.Write("Couldn't execute ban '{0}' '{1}' '{2}'".format(steamid, admin, dur))
- return False
- target.team = 2
- HintText(message="You have been banned from CT!").send(RecipientFilter(target.index))
- def isbanned(self, steamid):
- steamid = steamid_from_networkid(steamid)
- bans = self.player(steamid)
- if bans[0] > 0:
- for x in bans[1]:
- if x["unban"]:
- continue
- if x["duration"] == 0:
- return True
- elif x["time"] + x["duration"] > time.time():
- return True
- return False
- bans = Bans()
- bans.refresh()
- def players(index):
- admin = Player(index)
- valid = list()
- for player in PlayerIter():
- if "BOT" in player.steamid:
- continue
- if not auth.isImmune(admin.steamid, player.steamid):
- continue
- if bans.isbanned(player.steamid):
- valid.append((True, player))
- continue
- valid.append((False, player))
- return valid
- class PlayerMenu(AudiblePagedMenu):
- def __init__(self):
- super().__init__(select_callback = self.select_callback, exit_callback = self.exit_callback, top_separator=" "*40, bottom_separator=" ")
- def _select(self, ply_index, choice):
- if choice in (0, 10):
- EXITSOUND.play(ply_index)
- MainMenu().send(ply_index)
- return None
- return super()._select(ply_index, choice)
- def send(self, index):
- self.description = "Select a Player to CT Ban:"
- self[:] = [PagedOption(p[1].name, highlight=False if bool(p[0]) else True) for num, p in enumerate(players(index))]
- super().send(index)
- def select_callback(self, self2, index, option):
- if option.highlight:
- LengthMenu(get_player_from_name(option.text)).send(index)
- else:
- SayText2(message="\x05EgN Jailbreak| \x01That player is already CT banned.").send(RecipientFilter(index))
- def exit_callback(self, *args):
- self.send()
- class LengthMenu(AudiblePagedMenu):
- def __init__(self, player):
- self.player = player
- super().__init__(select_callback = self.select_callback, exit_callback = self.exit_callback, top_separator=" "*40, bottom_separator=" ")
- def _select(self, ply_index, choice):
- if choice in (0, 10):
- EXITSOUND.play(ply_index)
- MainMenu().send(ply_index)
- return None
- return super()._select(ply_index, choice)
- def send(self, index):
- self.description = "Select the length of the Ban:"
- self[:] = [PagedOption(s, highlight=True) for num, s in enumerate(["30m", "1h", "12h", "1d", "3d", "1w", "Permanent"])]
- super().send(index)
- def select_callback(self, self2, index, option):
- if option.text in "Permanent":
- duration = 0
- else:
- duration = timeparser(option.text)
- ReasonMenu(self.player, duration).send(index)
- def exit_callback(self, *args):
- self.send()
- class ReasonMenu(AudiblePagedMenu):
- def __init__(self, player, length):
- self.player = player
- self.length = length
- super().__init__(select_callback = self.select_callback, exit_callback = self.exit_callback, top_separator=" "*40, bottom_separator=" ")
- def _select(self, ply_index, choice):
- if choice in (0, 10):
- EXITSOUND.play(ply_index)
- MainMenu().send(ply_index)
- return None
- return super()._select(ply_index, choice)
- def send(self, index):
- self.description = "Select the reason for the Ban:"
- self[:] = [PagedOption(s, highlight=True) for num, s in enumerate(["Gun Plant", "Freekill", "Camping Secrets", "Camping Armory", "Freeshooting", "False Orders", "Gunspam"])]
- super().send(index)
- def select_callback(self, self2, index, option):
- Bans.ban(player=self.player, admin=index, dur=self.length, comment=option.text)
- self.send(index)
- def exit_callback(self, *args):
- self.send()
- class MainMenu(SimpleMenu):
- def __init__(self):
- super().__init__(select_callback = self.select_callback, build_callback = self.build_callback)
- def build_callback(self, *args):
- self.clear()
- self += ["CTBans",
- " "]
- self.append(SimpleOption(1, 'CT Ban a Player'))
- self += [" ",
- "0. Exit"]
- def select_callback(self, self2, index, option, *args):
- SELECTSOUND.play(index)
- PlayerMenu().send(index)
- @OnLevelShutdown
- def listen_new_map():
- bans.refresh()
- @Event("round_end")
- def round_event(event):
- for player in PlayerIter():
- if bans.isbanned(player.steamid) and player.team == 3:
- player.team = 2
- #@ClientCommand('jointeam')
- #def jointeam_callback(command, index):
- # player = Player(index)
- # if int(command[1]) == 3 and bans.isbanned(player.steamid):
- # player.team = 2
- # SayText2(message="\x05EgN Jailbreak| \x01You are banned from joining the Counter Terrorist team.").send(RecipientFilter(index))
- # HintText(message="You are banned from CT!").send(RecipientFilter(index))
- # DENIEDSOUND.play(RecipientFilter(index))
- # return CommandReturn.BLOCK
- def get_player_from_name(name):
- for player in PlayerIter():
- if name.lower() in player.name.lower():
- return player
- def command_callback(command, index):
- admin = Player(index)
- if auth.hasFlag(admin.steamid, "ctban"):
- args = tuple(command)[1:]
- if len(args) == 0:
- MainMenu().send(index)
- else:
- player = get_player_from_name(args[0])
- duration = timeparser(args[1])
- reason = " ".join(args[2:])
- Bans.ban(player=player, admin=admin, dur=duration, comment=reason)
- else:
- DENIEDSOUND.play(RecipientFilter(index))
- SayText2(message="\x05EgN Jailbreak| \x01You do not have permission to execute this command.").send(RecipientFilter(index))
- @SayCommand('!ctban')
- def say_command(command, index, teamonly):
- command_callback(command, index)
- @SayCommand('/ctban')
- def say_command(command, index, teamonly):
- command_callback(command, index)
- return CommandReturn.BLOCK
- @ClientCommand("egn_ctban")
- def say_command(command, index):
- command_callback(command, index)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement