Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import firebase_admin, random, string, asyncio, os, socket, sys, http, interactions, datetime
- from firebase_admin import credentials, db
- from dateutil import parser as du_parser
- from datetime import datetime
- from dotenv import load_dotenv
- from twitchAPI.twitch import Twitch
- from twitchAPI.oauth import UserAuthenticator
- from twitchAPI.types import AuthScope
- from twitchAPI.oauth import refresh_access_token
- from twitchAPI.twitch import Twitch
- from twitchAPI.helper import first
- from twitchAPI.eventsub import EventSub
- load_dotenv()
- CLIENT_ID = os.getenv('CLIENT_ID')
- CLIENT_SECRET = os.getenv('CLIENT_SECRET')
- EVENTSUB_URL = "https://twitch.lawmixerscpf.tk"
- OAUTH_TOKEN = os.getenv('OAUTH_TOKEN')
- OAUTH_REFRESH_TOKEN = os.getenv('OAUTH_REFRESH_TOKEN')
- bot = interactions.Client(
- token=os.environ["DISCORD_TOKEN"],
- intents= interactions.Intents.ALL | interactions.Intents.GUILD_MESSAGE_CONTENT)
- bot.load("events.event")
- cred = credentials.Certificate("s")
- default_app = firebase_admin.initialize_app(cred, {
- 'databaseURL': "s"
- })
- async def stream_online(data: dict):
- #{'subscription': {'id': '3a775616-f352-48e1-9784-a3eb4441977c', 'status': 'enabled', 'type': 'stream.online', 'version': '1', 'condition': {'broadcaster_user_id': '128722026'}, 'transport': {'method': 'webhook', 'callback': 'https://twitch.lawmixerscpf.tk/callback'}, 'created_at': '2023-02-16T20:58:59.458463575Z', 'cost': 1}, 'event': {'id': '47981621437', 'broadcaster_user_id': '128722026', 'broadcaster_user_login': 'alexmasongaming', 'broadcaster_user_name': 'AlexMasonGaming', 'type': 'live', 'started_at': '2023-02-16T22:04:23Z'}}
- streamerName = data["event"]["broadcaster_user_login"]
- started_at = data["event"]["started_at"]
- timestamp = du_parser.isoparse(started_at).timestamp()
- streamerId = data["event"]["broadcaster_user_id"]
- channel_id = int(db.reference(f"/streamers/{streamerId}/channel_id").get())
- guild_id = int(db.reference(f"/streamers/{streamerId}/guild_id").get())
- guild = await interactions.get(bot, interactions.Guild, object_id=int(guild_id))
- for channel in guild.channels:
- if channel.id == channel_id:
- if data["subscription"]["type"] == "stream.online":
- await sendChann.send("This is a test message.")
- async def main():
- twitch = await Twitch(CLIENT_ID, CLIENT_SECRET)
- new_token, new_refresh_token = await refresh_access_token(OAUTH_REFRESH_TOKEN, CLIENT_ID, CLIENT_SECRET)
- await twitch.set_user_authentication(new_token, [], new_refresh_token, validate=True)
- event_sub = EventSub(EVENTSUB_URL, CLIENT_ID, 8080, twitch)
- await event_sub.unsubscribe_all()
- event_sub.start()
- for streamer_id in db.reference("/streamers/").get():
- await event_sub.listen_stream_online(str(streamer_id), stream_online)
- loop = asyncio.get_event_loop()
- task1 = loop.create_task(main())
- task2 = loop.create_task(bot._ready())
- gathered = asyncio.gather(task1, task2)
- loop.run_until_complete(gathered)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement