Guest User

Untitled

a guest
Apr 6th, 2025
60
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 12.52 KB | Cryptocurrency | 0 0
  1. import asyncio
  2. import sys
  3. import logging
  4. import json
  5. import os
  6. import math
  7. from collections import defaultdict
  8. from cryptofeed import FeedHandler
  9. from cryptofeed.callback import TickerCallback
  10. from cryptofeed.defines import TICKER
  11.  
  12. # Import only the Bybit exchange class
  13. from cryptofeed.exchanges import Bybit # CHANGED BACK
  14.  
  15. # --- Configuration for Bybit Only ---
  16. TICKER_FILE = "10tickers.txt"       # File containing desired tickers (e.g., BTC/USDT)
  17. JSON_FILENAME = "bybit_prices.json"   # Output JSON file name (CHANGED BACK)
  18. WRITE_INTERVAL = 2.0                  # How often to write the JSON file (in seconds)
  19.  
  20. # Bybit Specifics (CHANGED BACK)
  21. EXCHANGE_CLASS = Bybit
  22. EXCHANGE_INTERNAL_NAME = "BYBIT"         # Name used internally by cryptofeed
  23. EXCHANGE_DISPLAY_NAME = "Bybit"          # Name to use in the output JSON
  24.  
  25. # Bybit's symbol limit per connection is typically high. 100 is safe.
  26. SYMBOL_LIMIT_PER_CONNECTION = 100
  27.  
  28. # --- End Configuration ---
  29.  
  30. # Configure logging (KEEP DEBUG ENABLED FOR CONNECTIONS)
  31. logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  32. logger = logging.getLogger(__name__)
  33.  
  34. # Enable DEBUG logging for websockets and cryptofeed connection libraries (KEEP THESE)
  35. logging.getLogger('websockets').setLevel(logging.DEBUG)
  36. logging.getLogger('cryptofeed.connection').setLevel(logging.DEBUG)
  37.  
  38. # --- Global State and Lock ---
  39. current_prices = defaultdict(list)
  40. price_data_lock = asyncio.Lock()
  41.  
  42. # --- Async Functions ---
  43.  
  44. async def ticker_handler(ticker, receipt_timestamp):
  45.     """Callback function to handle incoming Bybit ticker data and update global state."""
  46.     # SAFE initial log to confirm entry without accessing potentially missing attributes
  47.     logger.critical(f"!!!!!! TICKER HANDLER ENTERED: {ticker.exchange} / {ticker.symbol} !!!!!!")
  48.     # More detailed debug log (can be removed later)
  49.     logger.debug(f"[BYBIT_HANDLER_RAW] Tick: Sym={ticker.symbol}, Bid={ticker.bid}, Ask={ticker.ask}, Last={ticker.last}, TS={ticker.timestamp}")
  50.  
  51.  
  52.     # Check if it's Bybit (should always be true in this script)
  53.     if ticker.exchange != EXCHANGE_INTERNAL_NAME:
  54.         logger.warning(f"Received ticker from unexpected exchange: {ticker.exchange}. Skipping.")
  55.         return
  56.  
  57.     # --- Bybit Specific Price Logic ---
  58.     price = ticker.bid
  59.  
  60.     # Fallback for Bybit if bid is None, use last trade price
  61.     if price is None:
  62.         price = ticker.last # Use last price as fallback
  63.         if price is not None:
  64.             logger.debug(f"[BYBIT_HANDLER] Using LAST price ({price}) for {ticker.symbol} as Bid was None.")
  65.         else:
  66.             # If both bid and last are None, we cannot get a price
  67.             logger.debug(f"[BYBIT_HANDLER] Skipping {ticker.symbol}: Bid and Last are both None.")
  68.             return # Skip update if no usable price
  69.  
  70.     # If we reach here, 'price' holds either the bid or the last price
  71.     if price is None: # Should not happen due to check above, but safety first
  72.          logger.warning(f"[BYBIT_HANDLER] Logic error: price is None after check for {ticker.symbol}. Skipping.")
  73.          return
  74.  
  75.     # --- Process the price data ---
  76.     symbol_parts = ticker.symbol.split('-')
  77.     if len(symbol_parts) != 2:
  78.         logger.warning(f"[{EXCHANGE_INTERNAL_NAME}] Skipping unexpected symbol format: {ticker.symbol}")
  79.         return
  80.     json_symbol_key = f"{symbol_parts[0]}/{symbol_parts[1]}"
  81.  
  82.     cex_name = EXCHANGE_DISPLAY_NAME # Use configured display name
  83.  
  84.     # Ensure price is float before storing
  85.     try:
  86.         float_price = float(price)
  87.     except (ValueError, TypeError) as e:
  88.          logger.warning(f"[{EXCHANGE_INTERNAL_NAME}] Could not convert price '{price}' to float for {ticker.symbol}. Error: {e}. Skipping.")
  89.          return
  90.  
  91.     new_entry = { "price": float_price, "network": "CEX", "cex": cex_name }
  92.  
  93.     logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Acquiring lock for {json_symbol_key} - {cex_name} - Price: {float_price}")
  94.     async with price_data_lock:
  95.         logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Lock acquired. Updating {json_symbol_key} - {cex_name}")
  96.         exchange_found = False
  97.         for entry in current_prices[json_symbol_key]:
  98.             if entry["cex"] == cex_name:
  99.                 entry["price"] = new_entry["price"]
  100.                 exchange_found = True
  101.                 logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Updated existing entry.")
  102.                 break
  103.         if not exchange_found:
  104.             current_prices[json_symbol_key].append(new_entry)
  105.             logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Appended new entry.")
  106.  
  107.  
  108. async def periodic_json_writer(filename, interval):
  109.     """Periodically writes the current_prices dictionary to a JSON file."""
  110.     # This function remains the same
  111.     temp_filename = filename + ".tmp"; logged_write_error = False
  112.     while True:
  113.         try: await asyncio.sleep(interval); logged_write_error = False
  114.         except asyncio.CancelledError: logger.info("JSON writer task cancelled."); break
  115.         except Exception as e: logger.error(f"Error in JSON writer sleep: {e}"); await asyncio.sleep(interval * 2); continue
  116.  
  117.         logger.debug(f"Acquiring lock to write JSON...")
  118.         try:
  119.             async with price_data_lock:
  120.                 logger.debug(f"Lock acquired. Writing {len(current_prices)} symbols to {filename}...")
  121.                 data_to_write = dict(current_prices) # Create a copy for writing
  122.             with open(temp_filename, 'w') as f: json.dump(data_to_write, f, indent=4)
  123.             os.replace(temp_filename, filename)
  124.             logger.debug(f"Successfully wrote data to {filename}")
  125.         except IOError as e:
  126.             if not logged_write_error: logger.error(f"Error writing JSON {filename}: {e}"); logged_write_error = True
  127.         except Exception as e:
  128.             if not logged_write_error: logger.error(f"Unexpected error during JSON write: {e}", exc_info=True); logged_write_error = True
  129.  
  130.  
  131. # --- Synchronous Helper Functions ---
  132. # (read_tickers and get_validated_symbols_sync remain unchanged and work for Bybit)
  133. def read_tickers(filename):
  134.     """Reads tickers from a file, expecting 'BASE/QUOTE' format."""
  135.     # This function remains the same
  136.     try:
  137.         with open(filename, 'r') as f: raw_tickers = [line.strip() for line in f if line.strip() and not line.startswith('#')]
  138.     except FileNotFoundError: logger.error(f"Ticker file '{filename}' not found."); sys.exit(1)
  139.     if not raw_tickers: logger.error(f"No tickers found in '{filename}'."); sys.exit(1)
  140.     desired_symbols, invalid = [], 0
  141.     for t in raw_tickers:
  142.         if '/' in t and len(t.split('/')) == 2:
  143.             desired_symbols.append(t.replace('/', '-').upper())
  144.         else:
  145.             logger.warning(f"Skipping invalid format: '{t}'. Expected 'BASE/QUOTE'.")
  146.             invalid += 1
  147.     if not desired_symbols: logger.error(f"No valid format tickers found ('BASE/QUOTE')."); sys.exit(1)
  148.     logger.info(f"Read {len(desired_symbols)} desired tickers (Cryptofeed fmt: BASE-QUOTE).")
  149.     if invalid > 0: logger.warning(f"Skipped {invalid} invalid lines.")
  150.     return desired_symbols
  151.  
  152. def get_validated_symbols_sync(exchange_class, exchange_name, desired_symbols):
  153.     """Fetches supported symbols from the exchange and filters the desired list."""
  154.     # This function remains the same
  155.     logger.info(f"[{exchange_name}] Fetching/retrieving supported symbols...")
  156.     try:
  157.         exchange_instance = exchange_class()
  158.         supported_symbols = exchange_instance.symbols() # Assumed sync
  159.         if not supported_symbols or not isinstance(supported_symbols, (list, set)):
  160.              logger.error(f"[{exchange_name}] Invalid/empty symbol list received. Type: {type(supported_symbols)}.")
  161.              return None
  162.         supported_symbols_set = set(supported_symbols)
  163.         logger.info(f"[{exchange_name}] Obtained {len(supported_symbols_set)} symbols via REST API.")
  164.     except Exception as e:
  165.         logger.error(f"[{exchange_name}] Error obtaining symbols: {e}.", exc_info=True)
  166.         return None
  167.     valid_symbols = []
  168.     unsupported_count = 0
  169.     for symbol in desired_symbols:
  170.         if symbol in supported_symbols_set:
  171.             valid_symbols.append(symbol)
  172.         else:
  173.             unsupported_count += 1
  174.     if unsupported_count > 0:
  175.         logger.info(f"[{exchange_name}] Filtered out {unsupported_count} unsupported symbols (not in REST API list).")
  176.     if not valid_symbols:
  177.         logger.warning(f"[{exchange_name}] No valid symbols found from the desired list.")
  178.     else:
  179.         logger.info(f"[{exchange_name}] Found {len(valid_symbols)} symbols from the list that are valid for WebSocket subscription.")
  180.     return valid_symbols
  181.  
  182.  
  183. # --- Main Execution Logic ---
  184. if __name__ == "__main__":
  185.     f = None
  186.     writer_task = None
  187.     total_subscribed_count = 0
  188.     loop = asyncio.get_event_loop()
  189.  
  190.     try:
  191.         desired_symbols = read_tickers(TICKER_FILE)
  192.         f = FeedHandler()
  193.         logger.info(f"--- Processing {EXCHANGE_DISPLAY_NAME} ---") # USES BYBIT NAME
  194.  
  195.         symbols_to_subscribe = get_validated_symbols_sync(
  196.             EXCHANGE_CLASS,              # USES BYBIT CLASS
  197.             EXCHANGE_INTERNAL_NAME,      # USES BYBIT INTERNAL NAME
  198.             desired_symbols
  199.         )
  200.  
  201.         if symbols_to_subscribe is not None:
  202.             if len(symbols_to_subscribe) > 0:
  203.                 num_symbols = len(symbols_to_subscribe)
  204.                 num_connections = math.ceil(num_symbols / SYMBOL_LIMIT_PER_CONNECTION)
  205.                 logger.info(f"[{EXCHANGE_INTERNAL_NAME}] Validated {num_symbols} symbols. Limit/Conn: {SYMBOL_LIMIT_PER_CONNECTION}. Needs {num_connections} connection(s).")
  206.  
  207.                 for i in range(num_connections):
  208.                     start_index = i * SYMBOL_LIMIT_PER_CONNECTION
  209.                     end_index = start_index + SYMBOL_LIMIT_PER_CONNECTION
  210.                     symbol_chunk = symbols_to_subscribe[start_index:end_index]
  211.                     if not symbol_chunk: continue
  212.  
  213.                     logger.info(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Adding feed for {len(symbol_chunk)} tickers...")
  214.                     try:
  215.                         f.add_feed(
  216.                             EXCHANGE_CLASS(                # USES BYBIT CLASS
  217.                                 symbols=symbol_chunk,
  218.                                 channels=[TICKER],
  219.                                 callbacks={TICKER: TickerCallback(ticker_handler)} # USES OUR CORRECTED HANDLER
  220.                             )
  221.                         )
  222.                         logger.info(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Feed added successfully.")
  223.                         total_subscribed_count += len(symbol_chunk)
  224.                     except ValueError as ve:
  225.                         logger.error(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Failed adding feed chunk: {ve}")
  226.                     except Exception as e:
  227.                         logger.error(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Unexpected error adding feed chunk: {e}", exc_info=True)
  228.  
  229.             else:
  230.                 logger.info(f"[{EXCHANGE_INTERNAL_NAME}] No valid symbols from '{TICKER_FILE}' to subscribe to.")
  231.         else:
  232.             logger.error(f"[{EXCHANGE_INTERNAL_NAME}] Skipping feed addition due to symbol validation failure.")
  233.  
  234.         if total_subscribed_count > 0:
  235.             logger.info(f"\nSuccessfully configured {total_subscribed_count} {EXCHANGE_DISPLAY_NAME} symbols across {len(f.feeds)} connection(s).")
  236.             logger.info(f"Creating background task for {JSON_FILENAME} (Interval: {WRITE_INTERVAL}s).") # USES bybit_prices.json
  237.             writer_task = loop.create_task(periodic_json_writer(JSON_FILENAME, WRITE_INTERVAL))
  238.  
  239.             logger.info(f"Starting feed handler...")
  240.             logger.info("Press CTRL+C to stop.")
  241.             f.run() # Start listening
  242.  
  243.         else:
  244.             logger.error(f"\nNo {EXCHANGE_DISPLAY_NAME} symbols successfully configured for subscription. Exiting.")
  245.             sys.exit(1)
  246.  
  247.     except KeyboardInterrupt:
  248.         logger.info("\nShutdown requested via KeyboardInterrupt...")
  249.     except SystemExit as e:
  250.         logger.info(f"Exiting script (code {e.code}).")
  251.     except Exception as e:
  252.         logger.exception(f"\nCritical error in main execution: {e}")
  253.     finally:
  254.         if writer_task and not writer_task.done():
  255.             logger.info("Cancelling JSON writer task...")
  256.             writer_task.cancel()
  257.         logger.info("Script finished.")
Advertisement
Add Comment
Please, Sign In to add comment