Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import re
- from vkbottle import Bot, Message
- from vkbottle.rule import AbstractMessageRule
- from vkbottle.types.events.community.events_objects import WallPostNew
- from units import *
- bot = Bot(token=token, group_id=group_id, debug=False)
- vk = vk_api.VkApi(token=token)
- with open("users.txt") as f:
- users.extend(list(set(int(str(i).strip()) for i in f.readlines())))
- class Permission(AbstractMessageRule):
- def __init__(self, mode: str):
- self.mode = mode
- def check(self, msg: Message):
- if self.mode is 'user':
- if re.match(r"(?i)(помощь|меню|хелп)", msg.text.lower()):
- return True
- @bot.on.message(Permission('keyboard'), text="/test")
- async def on_test(ans: Message):
- keyboard = keyboard_gen(
- [[{'text': 'Начать', 'color': 'primary', 'payload': {'command': 'start'}}]]
- )
- await ans("Клавиатура отправлена", keyboard=keyboard)
- @bot.on.chat_message(Permission('cmd'), text="/рассылка <attach> <text>")
- async def on_mailing(ans: Message, attach, text):
- await ans(f"Запускаю рассылку для {len(users)} пользователей.")
- Thread(target=start_mailing, args=(vk, text, attach)).start()
- @bot.on.message(Permission('user'), text="<arg>")
- @bot.on.message.payload({"command": "start"})
- async def on_start(ans: Message, arg):
- events.append({
- 'peer_id': ans.peer_id,
- 'message': for_start.format(await get_name(bot, ans.peer_id)),
- 'keyboard': keyboard_gen(main_menu, one_time=False)
- })
- @bot.on.message.payload({'command': 'main'})
- async def on_main(ans: Message):
- events.append({
- 'peer_id': ans.peer_id,
- 'message': 'Выберите действие',
- 'keyboard': await on_keyboard(ans.peer_id)
- })
- @bot.on.message.payload({'act': 'back'})
- async def to_main(ans: Message):
- events.append({
- 'peer_id': ans.peer_id,
- 'message': 'Выход из управления рассылкой',
- 'keyboard': keyboard_gen(main_menu)
- })
- @bot.on.message.payload({'act': 'add'})
- async def on_subscribe(ans: Message):
- if ans.peer_id not in users:
- await to_file(ans.peer_id, 'add')
- else:
- events.append({
- 'peer_id': ans.peer_id,
- 'message': 'Вы и так подписаны на рассылку',
- })
- @bot.on.message.payload({'act': 'remove'})
- async def unsubscribe(ans: Message):
- if ans.peer_id in users:
- await to_file(ans.peer_id, 'remove')
- else:
- events.append({
- 'peer_id': ans.peer_id,
- 'message': 'Вы и так отписаны от рассылки.',
- })
- @bot.on.event.wall_post_new()
- async def wrapper(event: WallPostNew):
- print(event.post_type)
- if event.post_type != "suggest":
- Thread(target=start_mailing, args=(
- vk,
- '🔔 Новый пост в нашей группе!',
- f'wall-{group_id}_{event.id}'
- )).start()
- def events_parser():
- while True:
- if events:
- event_patcher(events)
- events.clear()
- def event_patcher(data: list):
- num = 0
- if len(data) > 25:
- while num < len(data):
- with vk_api.VkRequestsPool(vk) as p:
- for i in data[num: num + 25]:
- p.method("messages.send", {**i, 'random_id': 0})
- num += 25
- else:
- with vk_api.VkRequestsPool(vk) as p:
- for i in data:
- p.method("messages.send", {**i, 'random_id': 0})
- if __name__ == '__main__':
- Thread(target=events_parser).start()
- bot.run_polling()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement