Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import firebase_admin, random, string, asyncio, os, socket, sys, http, interactions, datetime
- from firebase_admin import credentials, db
- from dateutil import parser as du_parser
- from datetime import datetime
- from dotenv import load_dotenv
- from twitchAPI.twitch import Twitch
- from twitchAPI.oauth import UserAuthenticator
- from twitchAPI.types import AuthScope
- from twitchAPI.oauth import refresh_access_token
- from twitchAPI.twitch import Twitch
- from twitchAPI.helper import first
- from twitchAPI.eventsub import EventSub
- load_dotenv()
- CLIENT_ID = os.getenv('CLIENT_ID')
- CLIENT_SECRET = os.getenv('CLIENT_SECRET')
- EVENTSUB_URL = "https://twitch.lawmixerscpf.tk"
- OAUTH_TOKEN = os.getenv('OAUTH_TOKEN')
- OAUTH_REFRESH_TOKEN = os.getenv('OAUTH_REFRESH_TOKEN')
- bot = interactions.Client(
- token=os.environ["DISCORD_TOKEN"],
- intents= interactions.Intents.ALL | interactions.Intents.GUILD_MESSAGE_CONTENT)
- bot.load("events.event")
- cred = credentials.Certificate("#")
- default_app = firebase_admin.initialize_app(cred, {
- 'databaseURL': "#"
- })
- async def stream_online(data: dict):
- streamerName = data["event"]["broadcaster_user_login"]
- started_at = data["event"]["started_at"]
- timestamp = du_parser.isoparse(started_at).timestamp()
- streamerId = data["event"]["broadcaster_user_id"]
- channel_id = db.reference(f"/streamers/{streamerId}/channel_id").get()
- guild_id = db.reference(f"/streamers/{streamerId}/guild_id").get()
- if channel_id != None and guild_id != None:
- channel = await interactions.get(bot, interactions.Channel, object_id=channel_id)
- if data["subscription"]["status"] == "enabled" and data["subscription"]["type"] == "stream.online":
- await channel.send(f"{streamerName} is live! @everyone")
- async def main():
- twitch = await Twitch(CLIENT_ID, CLIENT_SECRET)
- new_token, new_refresh_token = await refresh_access_token(OAUTH_REFRESH_TOKEN, CLIENT_ID, CLIENT_SECRET)
- await twitch.set_user_authentication(new_token, [], new_refresh_token, validate=True)
- event_sub = EventSub(EVENTSUB_URL, CLIENT_ID, 8080, twitch)
- await event_sub.unsubscribe_all()
- event_sub.start()
- for streamer_id in db.reference("/streamers/").get():
- await event_sub.listen_stream_online(str(streamer_id), stream_online)
- loop = asyncio.get_event_loop()
- task1 = loop.create_task(main())
- task2 = loop.create_task(bot._ready())
- gathered = asyncio.gather(task1, task2)
- loop.run_until_complete(gathered)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement