Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import socket
- import threading
- import re
- import time
- server = 'irc.retronode.org'
- port = 6667
- channel = '#downgrade'
- botnick = 'PollBot'
- poll = {
- 'question': '',
- 'options': [],
- 'votes': {},
- 'active': False,
- 'start_time': None,
- 'duration': None,
- 'maxvotes': 1,
- 'maxselect': 1
- }
- def send_msg(sock, msg):
- sock.send((msg + "\r\n").encode('utf-8'))
- def send_privmsg(sock, target, msg):
- send_msg(sock, f"PRIVMSG {target} :{msg}")
- def parse_poll_command(text):
- return re.findall(r'"(.*?)"', text)
- def poll_results():
- tally = [0] * len(poll['options'])
- for user_votes in poll['votes'].values():
- for v in user_votes:
- if 1 <= v <= len(tally):
- tally[v - 1] += 1
- total = sum(tally)
- results = []
- for i, count in enumerate(tally):
- percent = (count / total * 100) if total > 0 else 0
- results.append(f"{i+1}. {poll['options'][i]} - {percent:.1f}%")
- return results
- def handle_vote(user, message):
- if not poll['active']:
- return "No active poll."
- try:
- selections = list(map(int, message.strip().split()))
- if len(selections) > poll['maxselect']:
- return f"You can only select {poll['maxselect']} options per vote."
- for v in selections:
- if v < 1 or v > len(poll['options']):
- return f"Invalid option: {v}. Valid options are 1 to {len(poll['options'])}."
- existing_votes = poll['votes'].get(user, [])
- remaining_votes = poll['maxvotes'] - len(existing_votes)
- if remaining_votes <= 0:
- return f"You have already used all {poll['maxvotes']} of your votes."
- allowed_votes = selections[:remaining_votes]
- poll['votes'].setdefault(user, []).extend(allowed_votes)
- if len(allowed_votes) < len(selections):
- return f"Partial vote registered: only {remaining_votes} votes allowed. Others ignored."
- return "Vote registered."
- except:
- return "Invalid vote format. Use numbers like: 1 or 2 3"
- def end_poll(sock, target):
- poll['active'] = False
- results = poll_results()
- send_privmsg(sock, target, "Poll ended. Results:")
- for r in results:
- send_privmsg(sock, target, r)
- def poll_timer(sock):
- while True:
- if poll['active'] and poll['duration'] is not None:
- if time.time() - poll['start_time'] > poll['duration']:
- end_poll(sock, channel)
- time.sleep(1)
- def extract_nick(prefix, message):
- if prefix.startswith(":DMconnect_Bridge!"):
- match = re.match(r"<([^>]+)>\s+(.*)", message)
- if match:
- return match.group(1), match.group(2)
- else:
- return "unknown_bridge_user", message
- else:
- return prefix.split('!')[0][1:], message
- def main():
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- sock.connect((server, port))
- except ConnectionRefusedError:
- print("Connection refused — check if the IRC server is reachable.")
- return
- send_msg(sock, f"NICK {botnick}")
- send_msg(sock, f"USER {botnick} 0 * :Poll Bot")
- threading.Thread(target=poll_timer, args=(sock,), daemon=True).start()
- while True:
- resp = sock.recv(2048).decode("utf-8", errors='ignore')
- for line in resp.strip().split("\r\n"):
- print(line)
- if line.startswith("PING"):
- send_msg(sock, "PONG " + line.split()[1])
- continue
- parts = line.split()
- if len(parts) < 2:
- continue
- if parts[1] == "001":
- send_msg(sock, f"JOIN {channel}")
- elif parts[1] == "PRIVMSG":
- prefix = line.split(' ')[0]
- target = parts[2]
- message = ' '.join(parts[3:])[1:].strip()
- user, message = extract_nick(prefix, message)
- is_private = target == botnick
- reply_target = user if is_private else channel
- if message.startswith("!poll "):
- items = parse_poll_command(message)
- if len(items) >= 2:
- poll['question'] = items[0]
- poll['options'] = items[1:]
- poll['votes'] = {}
- poll['active'] = False
- poll['duration'] = None
- send_privmsg(sock, reply_target, f"Poll prepared: {poll['question']}")
- for i, opt in enumerate(poll['options']):
- send_privmsg(sock, reply_target, f"{i+1}. {opt}")
- send_privmsg(sock, reply_target, "Use !start to begin the poll.")
- else:
- send_privmsg(sock, reply_target, "Format: !poll \"Question?\" \"Option 1\" \"Option 2\" ...")
- elif message.startswith("!set "):
- parts = message.split()
- if len(parts) == 3:
- key, val = parts[1], parts[2]
- if key in ['duration', 'maxvotes', 'maxselect']:
- if val.isdigit():
- poll[key] = int(val)
- send_privmsg(sock, reply_target, f"{key} set to {val}.")
- else:
- send_privmsg(sock, reply_target, "Value must be a number.")
- else:
- send_privmsg(sock, reply_target, "Allowed keys: duration, maxvotes, maxselect.")
- else:
- send_privmsg(sock, reply_target, "Usage: !set <duration|maxvotes|maxselect> <number>")
- elif message.strip() == "!start":
- if poll['question'] and poll['options']:
- poll['active'] = True
- poll['start_time'] = time.time()
- send_privmsg(sock, channel, f"Poll started: {poll['question']}")
- for i, opt in enumerate(poll['options']):
- send_privmsg(sock, channel, f"{i+1}. {opt}")
- send_privmsg(sock, channel, f"Vote by typing numbers. Max votes per user: {poll['maxvotes']}, max options per vote: {poll['maxselect']}")
- else:
- send_privmsg(sock, reply_target, "Poll not defined yet.")
- elif message.strip() == "!results":
- if poll['active']:
- end_poll(sock, reply_target)
- else:
- send_privmsg(sock, reply_target, "No active poll to end.")
- elif poll['active'] and re.fullmatch(r"(\d+\s*)+", message.strip()):
- result = handle_vote(user, message)
- send_privmsg(sock, reply_target, result)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement