Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import sys
- import logging
- import json
- import os
- import math
- from collections import defaultdict
- from cryptofeed import FeedHandler
- from cryptofeed.callback import TickerCallback
- from cryptofeed.defines import TICKER
- # Import only the Bybit exchange class
- from cryptofeed.exchanges import Bybit # CHANGED BACK
- # --- Configuration for Bybit Only ---
- TICKER_FILE = "10tickers.txt" # File containing desired tickers (e.g., BTC/USDT)
- JSON_FILENAME = "bybit_prices.json" # Output JSON file name (CHANGED BACK)
- WRITE_INTERVAL = 2.0 # How often to write the JSON file (in seconds)
- # Bybit Specifics (CHANGED BACK)
- EXCHANGE_CLASS = Bybit
- EXCHANGE_INTERNAL_NAME = "BYBIT" # Name used internally by cryptofeed
- EXCHANGE_DISPLAY_NAME = "Bybit" # Name to use in the output JSON
- # Bybit's symbol limit per connection is typically high. 100 is safe.
- SYMBOL_LIMIT_PER_CONNECTION = 100
- # --- End Configuration ---
- # Configure logging (KEEP DEBUG ENABLED FOR CONNECTIONS)
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger(__name__)
- # Enable DEBUG logging for websockets and cryptofeed connection libraries (KEEP THESE)
- logging.getLogger('websockets').setLevel(logging.DEBUG)
- logging.getLogger('cryptofeed.connection').setLevel(logging.DEBUG)
- # --- Global State and Lock ---
- current_prices = defaultdict(list)
- price_data_lock = asyncio.Lock()
- # --- Async Functions ---
- async def ticker_handler(ticker, receipt_timestamp):
- """Callback function to handle incoming Bybit ticker data and update global state."""
- # SAFE initial log to confirm entry without accessing potentially missing attributes
- logger.critical(f"!!!!!! TICKER HANDLER ENTERED: {ticker.exchange} / {ticker.symbol} !!!!!!")
- # More detailed debug log (can be removed later)
- logger.debug(f"[BYBIT_HANDLER_RAW] Tick: Sym={ticker.symbol}, Bid={ticker.bid}, Ask={ticker.ask}, Last={ticker.last}, TS={ticker.timestamp}")
- # Check if it's Bybit (should always be true in this script)
- if ticker.exchange != EXCHANGE_INTERNAL_NAME:
- logger.warning(f"Received ticker from unexpected exchange: {ticker.exchange}. Skipping.")
- return
- # --- Bybit Specific Price Logic ---
- price = ticker.bid
- # Fallback for Bybit if bid is None, use last trade price
- if price is None:
- price = ticker.last # Use last price as fallback
- if price is not None:
- logger.debug(f"[BYBIT_HANDLER] Using LAST price ({price}) for {ticker.symbol} as Bid was None.")
- else:
- # If both bid and last are None, we cannot get a price
- logger.debug(f"[BYBIT_HANDLER] Skipping {ticker.symbol}: Bid and Last are both None.")
- return # Skip update if no usable price
- # If we reach here, 'price' holds either the bid or the last price
- if price is None: # Should not happen due to check above, but safety first
- logger.warning(f"[BYBIT_HANDLER] Logic error: price is None after check for {ticker.symbol}. Skipping.")
- return
- # --- Process the price data ---
- symbol_parts = ticker.symbol.split('-')
- if len(symbol_parts) != 2:
- logger.warning(f"[{EXCHANGE_INTERNAL_NAME}] Skipping unexpected symbol format: {ticker.symbol}")
- return
- json_symbol_key = f"{symbol_parts[0]}/{symbol_parts[1]}"
- cex_name = EXCHANGE_DISPLAY_NAME # Use configured display name
- # Ensure price is float before storing
- try:
- float_price = float(price)
- except (ValueError, TypeError) as e:
- logger.warning(f"[{EXCHANGE_INTERNAL_NAME}] Could not convert price '{price}' to float for {ticker.symbol}. Error: {e}. Skipping.")
- return
- new_entry = { "price": float_price, "network": "CEX", "cex": cex_name }
- logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Acquiring lock for {json_symbol_key} - {cex_name} - Price: {float_price}")
- async with price_data_lock:
- logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Lock acquired. Updating {json_symbol_key} - {cex_name}")
- exchange_found = False
- for entry in current_prices[json_symbol_key]:
- if entry["cex"] == cex_name:
- entry["price"] = new_entry["price"]
- exchange_found = True
- logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Updated existing entry.")
- break
- if not exchange_found:
- current_prices[json_symbol_key].append(new_entry)
- logger.debug(f"[{EXCHANGE_INTERNAL_NAME}_HANDLER] Appended new entry.")
- async def periodic_json_writer(filename, interval):
- """Periodically writes the current_prices dictionary to a JSON file."""
- # This function remains the same
- temp_filename = filename + ".tmp"; logged_write_error = False
- while True:
- try: await asyncio.sleep(interval); logged_write_error = False
- except asyncio.CancelledError: logger.info("JSON writer task cancelled."); break
- except Exception as e: logger.error(f"Error in JSON writer sleep: {e}"); await asyncio.sleep(interval * 2); continue
- logger.debug(f"Acquiring lock to write JSON...")
- try:
- async with price_data_lock:
- logger.debug(f"Lock acquired. Writing {len(current_prices)} symbols to {filename}...")
- data_to_write = dict(current_prices) # Create a copy for writing
- with open(temp_filename, 'w') as f: json.dump(data_to_write, f, indent=4)
- os.replace(temp_filename, filename)
- logger.debug(f"Successfully wrote data to {filename}")
- except IOError as e:
- if not logged_write_error: logger.error(f"Error writing JSON {filename}: {e}"); logged_write_error = True
- except Exception as e:
- if not logged_write_error: logger.error(f"Unexpected error during JSON write: {e}", exc_info=True); logged_write_error = True
- # --- Synchronous Helper Functions ---
- # (read_tickers and get_validated_symbols_sync remain unchanged and work for Bybit)
- def read_tickers(filename):
- """Reads tickers from a file, expecting 'BASE/QUOTE' format."""
- # This function remains the same
- try:
- with open(filename, 'r') as f: raw_tickers = [line.strip() for line in f if line.strip() and not line.startswith('#')]
- except FileNotFoundError: logger.error(f"Ticker file '{filename}' not found."); sys.exit(1)
- if not raw_tickers: logger.error(f"No tickers found in '{filename}'."); sys.exit(1)
- desired_symbols, invalid = [], 0
- for t in raw_tickers:
- if '/' in t and len(t.split('/')) == 2:
- desired_symbols.append(t.replace('/', '-').upper())
- else:
- logger.warning(f"Skipping invalid format: '{t}'. Expected 'BASE/QUOTE'.")
- invalid += 1
- if not desired_symbols: logger.error(f"No valid format tickers found ('BASE/QUOTE')."); sys.exit(1)
- logger.info(f"Read {len(desired_symbols)} desired tickers (Cryptofeed fmt: BASE-QUOTE).")
- if invalid > 0: logger.warning(f"Skipped {invalid} invalid lines.")
- return desired_symbols
- def get_validated_symbols_sync(exchange_class, exchange_name, desired_symbols):
- """Fetches supported symbols from the exchange and filters the desired list."""
- # This function remains the same
- logger.info(f"[{exchange_name}] Fetching/retrieving supported symbols...")
- try:
- exchange_instance = exchange_class()
- supported_symbols = exchange_instance.symbols() # Assumed sync
- if not supported_symbols or not isinstance(supported_symbols, (list, set)):
- logger.error(f"[{exchange_name}] Invalid/empty symbol list received. Type: {type(supported_symbols)}.")
- return None
- supported_symbols_set = set(supported_symbols)
- logger.info(f"[{exchange_name}] Obtained {len(supported_symbols_set)} symbols via REST API.")
- except Exception as e:
- logger.error(f"[{exchange_name}] Error obtaining symbols: {e}.", exc_info=True)
- return None
- valid_symbols = []
- unsupported_count = 0
- for symbol in desired_symbols:
- if symbol in supported_symbols_set:
- valid_symbols.append(symbol)
- else:
- unsupported_count += 1
- if unsupported_count > 0:
- logger.info(f"[{exchange_name}] Filtered out {unsupported_count} unsupported symbols (not in REST API list).")
- if not valid_symbols:
- logger.warning(f"[{exchange_name}] No valid symbols found from the desired list.")
- else:
- logger.info(f"[{exchange_name}] Found {len(valid_symbols)} symbols from the list that are valid for WebSocket subscription.")
- return valid_symbols
- # --- Main Execution Logic ---
- if __name__ == "__main__":
- f = None
- writer_task = None
- total_subscribed_count = 0
- loop = asyncio.get_event_loop()
- try:
- desired_symbols = read_tickers(TICKER_FILE)
- f = FeedHandler()
- logger.info(f"--- Processing {EXCHANGE_DISPLAY_NAME} ---") # USES BYBIT NAME
- symbols_to_subscribe = get_validated_symbols_sync(
- EXCHANGE_CLASS, # USES BYBIT CLASS
- EXCHANGE_INTERNAL_NAME, # USES BYBIT INTERNAL NAME
- desired_symbols
- )
- if symbols_to_subscribe is not None:
- if len(symbols_to_subscribe) > 0:
- num_symbols = len(symbols_to_subscribe)
- num_connections = math.ceil(num_symbols / SYMBOL_LIMIT_PER_CONNECTION)
- logger.info(f"[{EXCHANGE_INTERNAL_NAME}] Validated {num_symbols} symbols. Limit/Conn: {SYMBOL_LIMIT_PER_CONNECTION}. Needs {num_connections} connection(s).")
- for i in range(num_connections):
- start_index = i * SYMBOL_LIMIT_PER_CONNECTION
- end_index = start_index + SYMBOL_LIMIT_PER_CONNECTION
- symbol_chunk = symbols_to_subscribe[start_index:end_index]
- if not symbol_chunk: continue
- logger.info(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Adding feed for {len(symbol_chunk)} tickers...")
- try:
- f.add_feed(
- EXCHANGE_CLASS( # USES BYBIT CLASS
- symbols=symbol_chunk,
- channels=[TICKER],
- callbacks={TICKER: TickerCallback(ticker_handler)} # USES OUR CORRECTED HANDLER
- )
- )
- logger.info(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Feed added successfully.")
- total_subscribed_count += len(symbol_chunk)
- except ValueError as ve:
- logger.error(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Failed adding feed chunk: {ve}")
- except Exception as e:
- logger.error(f"[{EXCHANGE_INTERNAL_NAME} Conn {i+1}/{num_connections}] Unexpected error adding feed chunk: {e}", exc_info=True)
- else:
- logger.info(f"[{EXCHANGE_INTERNAL_NAME}] No valid symbols from '{TICKER_FILE}' to subscribe to.")
- else:
- logger.error(f"[{EXCHANGE_INTERNAL_NAME}] Skipping feed addition due to symbol validation failure.")
- if total_subscribed_count > 0:
- logger.info(f"\nSuccessfully configured {total_subscribed_count} {EXCHANGE_DISPLAY_NAME} symbols across {len(f.feeds)} connection(s).")
- logger.info(f"Creating background task for {JSON_FILENAME} (Interval: {WRITE_INTERVAL}s).") # USES bybit_prices.json
- writer_task = loop.create_task(periodic_json_writer(JSON_FILENAME, WRITE_INTERVAL))
- logger.info(f"Starting feed handler...")
- logger.info("Press CTRL+C to stop.")
- f.run() # Start listening
- else:
- logger.error(f"\nNo {EXCHANGE_DISPLAY_NAME} symbols successfully configured for subscription. Exiting.")
- sys.exit(1)
- except KeyboardInterrupt:
- logger.info("\nShutdown requested via KeyboardInterrupt...")
- except SystemExit as e:
- logger.info(f"Exiting script (code {e.code}).")
- except Exception as e:
- logger.exception(f"\nCritical error in main execution: {e}")
- finally:
- if writer_task and not writer_task.done():
- logger.info("Cancelling JSON writer task...")
- writer_task.cancel()
- logger.info("Script finished.")
Advertisement
Add Comment
Please, Sign In to add comment