Guest User

Untitled

a guest
Apr 6th, 2025
50
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 11.54 KB | Cryptocurrency | 0 0
  1.  
  2. import asyncio
  3. import sys
  4. import logging
  5. import json
  6. import os
  7. import math
  8. from collections import defaultdict
  9. from cryptofeed import FeedHandler
  10. from cryptofeed.callback import TickerCallback
  11. from cryptofeed.defines import TICKER
  12.  
  13. # Import only the Binance exchange class
  14. from cryptofeed.exchanges import Binance  # CHANGED
  15.  
  16. # --- Configuration for Binance Only ---
  17. TICKER_FILE = "testtickers.txt"         # File containing desired tickers (e.g., BTC/USDT)
  18. JSON_FILENAME = "binance_prices.json"   # Output JSON file name (CHANGED)
  19. WRITE_INTERVAL = 2.0                    # How often to write the JSON file (in seconds)
  20.  
  21. # Binance Specifics (CHANGED)
  22. EXCHANGE_CLASS = Binance
  23. EXCHANGE_INTERNAL_NAME = "BINANCE"     # Name used internally by cryptofeed
  24. EXCHANGE_DISPLAY_NAME = "Binance"      # Name to use in the output JSON
  25.  
  26. # Symbol limit per connection for Binance tickers (often high, 100 is safe)
  27. SYMBOL_LIMIT_PER_CONNECTION = 100
  28.  
  29. # --- End Configuration ---
  30.  
  31. # Configure logging (KEEP DEBUG ENABLED FOR CONNECTIONS)
  32. logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  33. logger = logging.getLogger(__name__)
  34.  
  35. # Enable DEBUG logging for websockets and cryptofeed connection libraries (KEEP THESE)
  36. logging.getLogger('websockets').setLevel(logging.DEBUG)
  37. logging.getLogger('cryptofeed.connection').setLevel(logging.DEBUG)
  38.  
  39. # --- Global State and Lock ---
  40. current_prices = defaultdict(list)
  41. price_data_lock = asyncio.Lock()
  42.  
  43. # --- Async Functions ---
  44.  
  45. async def ticker_handler(ticker, receipt_timestamp):
  46.     """Callback function to handle incoming Binance ticker data and update global state."""
  47.     # Use CRITICAL level temporarily to ensure this message stands out if the handler is called
  48.     # REMOVED ticker.last FROM THIS LOGGING LINE
  49.     logger.critical(f"!!!!!! TICKER HANDLER ENTERED: {ticker.exchange} / {ticker.symbol} - BID: {ticker.bid} / ASK: {ticker.ask} !!!!!!")
  50.  
  51.     # Check if it's Binance (should always be true in this script)
  52.     if ticker.exchange != EXCHANGE_INTERNAL_NAME:
  53.         logger.warning(f"Received ticker from unexpected exchange: {ticker.exchange}. Skipping.")
  54.         return
  55.  
  56.     # Binance tickers generally have bid/ask. Use bid price.
  57.     price = ticker.bid
  58.  
  59.     if price is None:
  60.         # Log if bid is missing, but don't rely on 'last' as a primary source for Binance tickers typically
  61.         logger.debug(f"[{ticker.exchange}] Skipping {ticker.symbol}: Bid is None.")
  62.         return
  63.  
  64.     # --- Process the price data ---
  65.     symbol_parts = ticker.symbol.split('-')
  66.     if len(symbol_parts) != 2:
  67.         logger.warning(f"[{EXCHANGE_INTERNAL_NAME}] Skipping unexpected symbol format: {ticker.symbol}")
  68.         return
  69.     json_symbol_key = f"{symbol_parts[0]}/{symbol_parts[1]}"
  70.  
  71.     cex_name = EXCHANGE_DISPLAY_NAME # Use configured display name
  72.  
  73.     new_entry = { "price": float(price), "network": "CEX", "cex": cex_name }
  74.  
  75.     logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Acquiring lock for {json_symbol_key} - {cex_name} - Price: {price}")
  76.     async with price_data_lock:
  77.         logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Lock acquired. Updating {json_symbol_key} - {cex_name}")
  78.         exchange_found = False
  79.         for entry in current_prices[json_symbol_key]:
  80.             if entry["cex"] == cex_name:
  81.                 entry["price"] = new_entry["price"]
  82.                 exchange_found = True
  83.                 logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Updated existing entry.")
  84.                 break
  85.         if not exchange_found:
  86.             current_prices[json_symbol_key].append(new_entry)
  87.             logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Appended new entry.")
  88.  
  89.  
  90. async def periodic_json_writer(filename, interval):
  91.     """Periodically writes the current_prices dictionary to a JSON file."""
  92.     # This function remains the same
  93.     temp_filename = filename + ".tmp"; logged_write_error = False
  94.     while True:
  95.         try: await asyncio.sleep(interval); logged_write_error = False
  96.         except asyncio.CancelledError: logger.info("JSON writer task cancelled."); break
  97.         except Exception as e: logger.error(f"Error in JSON writer sleep: {e}"); await asyncio.sleep(interval * 2); continue
  98.  
  99.         logger.debug(f"Acquiring lock to write JSON...")
  100.         try:
  101.             async with price_data_lock:
  102.                 logger.debug(f"Lock acquired. Writing {len(current_prices)} symbols to {filename}...")
  103.                 data_to_write = dict(current_prices) # Create a copy for writing
  104.             with open(temp_filename, 'w') as f: json.dump(data_to_write, f, indent=4)
  105.             os.replace(temp_filename, filename)
  106.             logger.debug(f"Successfully wrote data to {filename}")
  107.         except IOError as e:
  108.             if not logged_write_error: logger.error(f"Error writing JSON {filename}: {e}"); logged_write_error = True
  109.         except Exception as e:
  110.             if not logged_write_error: logger.error(f"Unexpected error during JSON write: {e}", exc_info=True); logged_write_error = True
  111.  
  112.  
  113. # --- Synchronous Helper Functions ---
  114.  
  115. def read_tickers(filename):
  116.     """Reads tickers from a file, expecting 'BASE/QUOTE' format."""
  117.     # This function remains the same
  118.     try:
  119.         with open(filename, 'r') as f: raw_tickers = [line.strip() for line in f if line.strip() and not line.startswith('#')]
  120.     except FileNotFoundError: logger.error(f"Ticker file '{filename}' not found."); sys.exit(1)
  121.     if not raw_tickers: logger.error(f"No tickers found in '{filename}'."); sys.exit(1)
  122.     desired_symbols, invalid = [], 0
  123.     for t in raw_tickers:
  124.         if '/' in t and len(t.split('/')) == 2:
  125.             desired_symbols.append(t.replace('/', '-').upper())
  126.         else:
  127.             logger.warning(f"Skipping invalid format: '{t}'. Expected 'BASE/QUOTE'.")
  128.             invalid += 1
  129.     if not desired_symbols: logger.error(f"No valid format tickers found ('BASE/QUOTE')."); sys.exit(1)
  130.     logger.info(f"Read {len(desired_symbols)} desired tickers (Cryptofeed fmt: BASE-QUOTE).")
  131.     if invalid > 0: logger.warning(f"Skipped {invalid} invalid lines.")
  132.     return desired_symbols
  133.  
  134. def get_validated_symbols_sync(exchange_class, exchange_name, desired_symbols):
  135.     """Fetches supported symbols from the exchange and filters the desired list."""
  136.     # This function remains the same, should work for Binance
  137.     logger.info(f"[{exchange_name}] Fetching/retrieving supported symbols...")
  138.     try:
  139.         exchange_instance = exchange_class()
  140.         supported_symbols = exchange_instance.symbols() # Assumed sync
  141.         if not supported_symbols or not isinstance(supported_symbols, (list, set)):
  142.              logger.error(f"[{exchange_name}] Invalid/empty symbol list received. Type: {type(supported_symbols)}.")
  143.              return None
  144.         supported_symbols_set = set(supported_symbols)
  145.         logger.info(f"[{exchange_name}] Obtained {len(supported_symbols_set)} symbols via REST API.")
  146.     except Exception as e:
  147.         logger.error(f"[{exchange_name}] Error obtaining symbols: {e}.", exc_info=True)
  148.         return None
  149.     valid_symbols = []
  150.     unsupported_count = 0
  151.     for symbol in desired_symbols:
  152.         if symbol in supported_symbols_set:
  153.             valid_symbols.append(symbol)
  154.         else:
  155.             unsupported_count += 1
  156.     if unsupported_count > 0:
  157.         logger.info(f"[{exchange_name}] Filtered out {unsupported_count} unsupported symbols (not in REST API list).")
  158.     if not valid_symbols:
  159.         logger.warning(f"[{exchange_name}] No valid symbols found from the desired list.")
  160.     else:
  161.         logger.info(f"[{exchange_name}] Found {len(valid_symbols)} symbols from the list that are valid for WebSocket subscription.")
  162.     return valid_symbols
  163.  
  164.  
  165. # --- Main Execution Logic ---
  166. if __name__ == "__main__":
  167.     f = None
  168.     writer_task = None
  169.     total_subscribed_count = 0
  170.     loop = asyncio.get_event_loop()
  171.  
  172.     try:
  173.         desired_symbols = read_tickers(TICKER_FILE)
  174.         f = FeedHandler()
  175.         logger.info(f"--- Processing {EXCHANGE_DISPLAY_NAME} ---") # CHANGED
  176.  
  177.         symbols_to_subscribe = get_validated_symbols_sync(
  178.             EXCHANGE_CLASS,              # CHANGED
  179.             EXCHANGE_INTERNAL_NAME,      # CHANGED
  180.             desired_symbols
  181.         )
  182.  
  183.         if symbols_to_subscribe is not None:
  184.             if len(symbols_to_subscribe) > 0:
  185.                 num_symbols = len(symbols_to_subscribe)
  186.                 num_connections = math.ceil(num_symbols / SYMBOL_LIMIT_PER_CONNECTION)
  187.                 logger.info(f"[{EXCHANGE_INTERNAL_NAME}] Validated {num_symbols} symbols. Limit/Conn: {SYMBOL_LIMIT_PER_CONNECTION}. Needs {num_connections} connection(s).") # CHANGED
  188.  
  189.                 for i in range(num_connections):
  190.                     start_index = i * SYMBOL_LIMIT_PER_CONNECTION
  191.                     end_index = start_index + SYMBOL_LIMIT_PER_CONNECTION
  192.                     symbol_chunk = symbols_to_subscribe[start_index:end_index]
  193.                     if not symbol_chunk: continue
  194.  
  195.                     logger.info(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Adding feed for {len(symbol_chunk)} tickers...") # CHANGED
  196.                     try:
  197.                         f.add_feed(
  198.                             EXCHANGE_CLASS(                # CHANGED
  199.                                 symbols=symbol_chunk,
  200.                                 channels=[TICKER],
  201.                                 callbacks={TICKER: TickerCallback(ticker_handler)}
  202.                             )
  203.                         )
  204.                         logger.info(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Feed added successfully.") # CHANGED
  205.                         total_subscribed_count += len(symbol_chunk)
  206.                     except ValueError as ve:
  207.                         logger.error(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Failed adding feed chunk: {ve}") # CHANGED
  208.                     except Exception as e:
  209.                         logger.error(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Unexpected error adding feed chunk: {e}", exc_info=True) # CHANGED
  210.  
  211.             else:
  212.                 logger.info(f"[{EXCHANGE_INTERNAL_NAME}] No valid symbols from '{TICKER_FILE}' to subscribe to.") # CHANGED
  213.         else:
  214.             logger.error(f"[{EXCHANGE_INTERNAL_NAME}] Skipping feed addition due to symbol validation failure.") # CHANGED
  215.  
  216.         if total_subscribed_count > 0:
  217.             logger.info(f"\nSuccessfully configured {total_subscribed_count} {EXCHANGE_DISPLAY_NAME} symbols across {len(f.feeds)} connection(s).") # CHANGED
  218.             logger.info(f"Creating background task for {JSON_FILENAME} (Interval: {WRITE_INTERVAL}s).")
  219.             writer_task = loop.create_task(periodic_json_writer(JSON_FILENAME, WRITE_INTERVAL))
  220.  
  221.             logger.info(f"Starting feed handler...")
  222.             logger.info("Press CTRL+C to stop.")
  223.             f.run() # <--- THE CRITICAL CALL
  224.  
  225.         else:
  226.             logger.error(f"\nNo {EXCHANGE_DISPLAY_NAME} symbols successfully configured for subscription. Exiting.") # CHANGED
  227.             sys.exit(1)
  228.  
  229.     except KeyboardInterrupt:
  230.         logger.info("\nShutdown requested via KeyboardInterrupt...")
  231.     except SystemExit as e:
  232.         logger.info(f"Exiting script (code {e.code}).")
  233.     except Exception as e:
  234.         logger.exception(f"\nCritical error in main execution: {e}")
  235.     finally:
  236.         if writer_task and not writer_task.done():
  237.             logger.info("Cancelling JSON writer task...")
  238.             writer_task.cancel()
  239.         logger.info("Script finished.")
Advertisement
Add Comment
Please, Sign In to add comment