Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import asyncio
- import sys
- from pathlib import Path
- from quotexpy import Quotex
- from quotexpy.utils import asset_parse
- from quotexpy.utils.account_type import AccountType
- from quotexpy.utils.operation_type import OperationType
- from quotexpy.utils.duration_time import DurationTime
- from termcolor import colored
- from login_manager import login_accounts, generate_signals_for_account
- asset_current = "AUDCAD"
- class SingletonDecorator:
- """
- A decorator that turns a class into a singleton.
- """
- def __init__(self, cls):
- self.cls = cls
- self.instance = None
- def __call__(self, *args, **kwargs):
- if self.instance is None:
- self.instance = self.cls(*args, **kwargs)
- return self.instance
- @SingletonDecorator
- class MyConnection:
- """
- This class represents a connection object and provides methods for connecting to a client.
- """
- def __init__(self, client_instance):
- self.client = client_instance
- async def connect(self, attempts=5):
- check, reason = await self.client.connect()
- if not check:
- attempt = 0
- while attempt <= attempts:
- if not self.client.check_connect():
- check, reason = await self.client.connect()
- if check:
- print("Reconnected successfully!!!")
- break
- print("Error reconnecting.")
- attempt += 1
- if Path(os.path.join(".", "session.json")).is_file():
- Path(os.path.join(".", "session.json")).unlink()
- print(f"Trying to reconnect, attempt {attempt} of {attempts}")
- elif not check:
- attempt += 1
- else:
- break
- await asyncio.sleep(5)
- return check, reason
- return check, reason
- def close(self):
- """
- Closes the client connection.
- """
- self.client.close()
- async def check_asset(asset, client):
- print("Checking asset...")
- print("Client:", client)
- if client:
- print("Client is valid.")
- asset_query = asset_parse(asset)
- asset_open = client.check_asset_open(asset_query)
- if not asset_open[2]:
- print(colored("[WARN]: ", "yellow"), "Asset is closed.")
- asset = f"{asset}_otc"
- print(colored("[WARN]: ", "yellow"), "Try OTC Asset -> " + asset)
- asset_query = asset_parse(asset)
- asset_open = client.check_asset_open(asset_query)
- return asset, asset_open
- else:
- print("Client is None!")
- return None, None
- async def trade(option, client):
- prepare_connection = MyConnection(client)
- check_connect, message = await prepare_connection.connect()
- if check_connect:
- client.change_account(AccountType.PRACTICE)
- amount = 500
- action = OperationType.CALL_GREEN if option == "buy" else OperationType.PUT_RED
- global asset_current
- asset, asset_open = await check_asset(asset_current, client)
- if asset_open[2]:
- print(colored("[INFO]: ", "blue"), "Asset is open.")
- try:
- await client.trade(action, amount, asset, DurationTime.ONE_MINUTE)
- except Exception as e:
- print("Trade failed:", e)
- else:
- print(colored("[WARN]: ", "yellow"), "Asset is closed.")
- print(colored("[INFO]: ", "blue"), "Balance: ", await client.get_balance())
- print(colored("[INFO]: ", "blue"), "Exiting...")
- prepare_connection.close()
- async def get_realtime_candle(client):
- prepare_connection = MyConnection(client)
- check_connect, message = await prepare_connection.connect()
- if check_connect:
- list_size = 10
- global asset_current
- asset, asset_open = await check_asset(asset_current, client)
- client.start_candles_stream(asset, list_size)
- while True:
- if len(client.get_realtime_candles(asset)) == list_size:
- break
- print(client.get_realtime_candles(asset))
- prepare_connection.close()
- async def calculate_moving_averages(client):
- global asset_current
- asset, asset_open = await check_asset(asset_current, client)
- if asset_open[2]:
- candles = client.get_realtime_candles(asset)
- if len(candles) >= 200:
- # Calculate 50-period moving average
- ma_50 = sum(candle["close"] for candle in candles[-50:]) / 50
- # Calculate 200-period moving average
- ma_200 = sum(candle["close"] for candle in candles[-200:]) / 200
- return ma_50, ma_200
- return None, None
- async def main():
- accounts = [
- {"email": "[email protected]", "password": "password1"},
- {"email": "[email protected]", "password": "password2"},
- # Add more accounts as needed
- ]
- login_results = await login_accounts(accounts)
- if all(login_results):
- print("All accounts logged in successfully. Starting signal generation.")
- await asyncio.gather(*[generate_signals_for_account(account["email"], account["password"]) for account in accounts])
- else:
- print("Failed to log in to one or more accounts. Signal generation aborted.")
- def run_main():
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- print("Aborted!")
- sys.exit(0)
- if __name__ == "__main__":
- run_main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement