Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import websocket
- import json
- import os
- import time
- from datetime import datetime, timezone
- # --- ANSI Color Codes ---
- GREEN = '\033[92m'
- RED = '\033[91m'
- RESET = '\033[0m' # Resets the color
- YELLOW = '\033[93m' # For no change or initial display
- BLUE = '\033[94m' # For TRADE label
- CYAN = '\033[96m' # For QUOTE label
- # --- TROUBLESHOOTING NOTE ---
- # "insufficient subscription (Code: 409)" from Alpaca means your API key
- # doesn't have the necessary subscription for the chosen data feed.
- # Ensure SOCKET_URL below is set appropriately (likely IEX_SOCKET_URL for paper trading).
- # --- END TROUBLESHOOTING NOTE ---
- # --- Configuration ---
- API_KEY = os.environ.get('APCA_API_KEY_ID') or "YOUR_API_KEY"
- API_SECRET = os.environ.get('APCA_API_SECRET_KEY') or "YOUR_API_SECRET"
- TICKER_SYMBOL = "GME"
- SIP_SOCKET_URL = "wss://stream.data.alpaca.markets/v2/sip"
- IEX_SOCKET_URL = "wss://stream.data.alpaca.markets/v2/iex"
- # CHOOSE YOUR SOCKET URL HERE:
- # For paper trading, use IEX_SOCKET_URL.
- # SOCKET_URL = SIP_SOCKET_URL
- SOCKET_URL = IEX_SOCKET_URL # Set for IEX based on paper account
- # --- Global Variables ---
- authenticated = False
- # Store the last prices to determine color changes
- last_trade_price = 0.0
- last_bid_price = 0.0
- last_ask_price = 0.0
- def get_price_color(current_price, last_price):
- """Determines the color based on price change."""
- if last_price == 0.0: # First price update
- return YELLOW
- if current_price is None: # current_price might be None if parsing failed
- return RESET # Default color if price is None
- if current_price > last_price:
- return GREEN
- elif current_price < last_price:
- return RED
- else:
- return YELLOW
- def format_timestamp_iso(iso_timestamp_str):
- """Formats an ISO 8601 timestamp string (used for both trades and quotes from IEX)."""
- if iso_timestamp_str:
- try:
- if iso_timestamp_str.endswith('Z'):
- dt_object = datetime.fromisoformat(iso_timestamp_str.replace('Z', '+00:00'))
- else:
- dt_object = datetime.fromisoformat(iso_timestamp_str)
- if dt_object.tzinfo is None:
- dt_object = dt_object.replace(tzinfo=timezone.utc)
- return dt_object.strftime('%H:%M:%S.%f')[:-3] + " UTC"
- except ValueError as e:
- print(f"{RED}Error converting ISO timestamp '{iso_timestamp_str}': {e}{RESET}")
- return "Invalid Timestamp"
- return "N/A"
- def on_open(ws):
- global authenticated, last_trade_price, last_bid_price, last_ask_price
- authenticated = False
- last_trade_price = 0.0
- last_bid_price = 0.0
- last_ask_price = 0.0
- print("WebSocket connection opened.")
- if not API_KEY or API_KEY == "YOUR_API_KEY" or not API_SECRET or API_SECRET == "YOUR_API_SECRET":
- print(f"{RED}Error: API_KEY or API_SECRET not configured. Please set them.{RESET}")
- ws.close()
- return
- auth_data = {"action": "auth", "key": API_KEY, "secret": API_SECRET}
- ws.send(json.dumps(auth_data))
- print(f"Sent authentication request for API Key ID starting with: {API_KEY[:4]}...")
- def on_message(ws, message):
- global authenticated, last_trade_price, last_bid_price, last_ask_price
- try:
- data_list = json.loads(message)
- for item in data_list:
- message_type = item.get("T")
- if message_type == "success" and item.get("msg") == "authenticated":
- authenticated = True
- print(f"{GREEN}Successfully authenticated!{RESET}")
- subscribe_message = {
- "action": "subscribe",
- "trades": [TICKER_SYMBOL],
- "quotes": [TICKER_SYMBOL],
- }
- ws.send(json.dumps(subscribe_message))
- print(f"Subscribing to trades and quotes for {TICKER_SYMBOL}...")
- elif message_type == "error":
- error_msg = item.get('msg')
- error_code = item.get('code')
- print(f"{RED}API Error: {error_msg} (Code: {error_code}){RESET}")
- if error_code == 409:
- print(f"{RED}This 'insufficient subscription' error means your Alpaca account/API key "
- f"doesn't have access to this data feed. Check SOCKET_URL and your Alpaca plan.{RESET}")
- ws.close()
- elif message_type == "subscription":
- print(f"Subscription update: Trades: {item.get('trades', [])}, "
- f"Quotes: {item.get('quotes', [])}, Bars: {item.get('bars', [])}")
- elif authenticated:
- symbol = item.get("S")
- if symbol != TICKER_SYMBOL:
- continue
- if message_type == "t": # Trade message
- price = item.get("p")
- size = item.get("s")
- raw_timestamp = item.get("t")
- readable_time = format_timestamp_iso(raw_timestamp)
- price_color = get_price_color(price, last_trade_price)
- price_num = price if price is not None else 0.0
- # Infer trade side
- trade_side_indicator = f"{YELLOW}[CROSS]{RESET}" # Default
- if price is not None:
- if last_ask_price > 0 and price >= last_ask_price:
- trade_side_indicator = f"{GREEN}[BUY]{RESET}"
- elif last_bid_price > 0 and price <= last_bid_price:
- trade_side_indicator = f"{RED}[SELL]{RESET}"
- print(f"{BLUE}TRADE{RESET} [{symbol}]: {trade_side_indicator} Price: {price_color}${price_num:<7.2f}{RESET} "
- f"Size: {size:<5} Time: {readable_time}")
- if price is not None:
- last_trade_price = price
- elif message_type == "q": # Quote message
- bid_price = item.get("bp")
- bid_size = item.get("bs")
- ask_price = item.get("ap")
- ask_size = item.get("as")
- raw_timestamp = item.get("t")
- readable_time = format_timestamp_iso(raw_timestamp)
- bid_color = get_price_color(bid_price, last_bid_price)
- ask_color = get_price_color(ask_price, last_ask_price)
- bid_price_num = bid_price if bid_price is not None else 0.0
- ask_price_num = ask_price if ask_price is not None else 0.0
- print(f"{CYAN}QUOTE{RESET} [{symbol}]: Bid: {bid_color}${bid_price_num:<7.2f}{RESET} (Size: {bid_size:<4}) "
- f"Ask: {ask_color}${ask_price_num:<7.2f}{RESET} (Size: {ask_size:<4}) Time: {readable_time}")
- if bid_price is not None:
- last_bid_price = bid_price
- if ask_price is not None:
- last_ask_price = ask_price
- elif not (message_type == "success" and item.get("msg") == "connected"):
- print(f"Other data: {item}")
- elif not (message_type == "success" and item.get("msg") == "connected"):
- print(f"System message: {item}")
- except json.JSONDecodeError:
- print(f"{RED}Failed to decode JSON: {message}{RESET}")
- except Exception as e:
- print(f"{RED}An error occurred in on_message: {e!r}{RESET}")
- def on_error(ws, error):
- print(f"{RED}WebSocket error: {error}{RESET}")
- def on_close(ws, close_status_code, close_msg):
- global authenticated
- authenticated = False
- print(f"WebSocket connection closed. Code: {close_status_code}, Message: {close_msg if close_msg else 'N/A'}")
- def main():
- print("Starting Alpaca WebSocket client...")
- print(f"Connecting to: {SOCKET_URL} for ticker: {TICKER_SYMBOL}")
- if API_KEY == "YOUR_API_KEY" or API_SECRET == "YOUR_API_SECRET":
- print(f"{YELLOW}WARNING: API_KEY or API_SECRET is set to placeholder values. Please configure them.{RESET}")
- ws_app = websocket.WebSocketApp(SOCKET_URL,
- on_open=on_open,
- on_message=on_message,
- on_error=on_error,
- on_close=on_close)
- try:
- ws_app.run_forever()
- except KeyboardInterrupt:
- print(f"\n{YELLOW}WebSocket client stopped by user.{RESET}")
- except Exception as e:
- print(f"{RED}An unexpected error occurred while running WebSocketApp: {e!r}{RESET}")
- finally:
- if 'ws_app' in locals() and hasattr(ws_app, 'sock') and ws_app.sock and ws_app.sock.connected:
- ws_app.close()
- print("WebSocket client shutdown complete.")
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement