import os import openai import pandas as pd import numpy as np import matplotlib.pyplot as plt from datetime import datetime import json from alpha_vantage.timeseries import TimeSeries import sqlite3 from time import sleep # Set your OpenAI and Alpha Vantage API keys os.environ["OPENAI_API_KEY"] = '' openai.api_key = os.getenv("OPENAI_API_KEY") alpha_vantage_api_key = "" # Replace with your Alpha Vantage API key # Database connection def create_connection(db_file): conn = None try: conn = sqlite3.connect(db_file) print(f"Connected to SQLite database {db_file}") except Exception as e: print(f"Error connecting to database: {e}") return conn def create_table(conn): try: c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS stock_data ( timestamp TEXT PRIMARY KEY, open REAL, high REAL, low REAL, close REAL, volume INTEGER, macd REAL, macd_signal REAL, rsi REAL, bollinger_high REAL, bollinger_low REAL, signal INTEGER )''') conn.commit() except Exception as e: print(f"Error creating table: {e}") def save_to_database(conn, data): try: cursor = conn.cursor() for index, row in data.iterrows(): cursor.execute(""" INSERT OR IGNORE INTO stock_data (timestamp, open, high, low, close, volume, macd, macd_signal, rsi, bollinger_high, bollinger_low, signal) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( index.strftime("%Y-%m-%d %H:%M:%S"), # Format the timestamp float(row['open']), float(row['high']), float(row['low']), float(row['close']), int(row['volume']), float(row['macd']), float(row['macd_signal']), float(row['rsi']), float(row['bollinger_high']), float(row['bollinger_low']), int(row.get('signal', None)) if row.get('signal', None) is not None else None )) conn.commit() print("Data saved to database.") except Exception as e: print(f"Error saving data to database: {e}") # Fetch 60-minute intraday stock data from Alpha Vantage def fetch_60min_data_from_alpha_vantage(ticker): ts = TimeSeries(key=alpha_vantage_api_key, output_format='pandas') try: data, meta_data = ts.get_intraday(symbol=ticker, interval='60min', outputsize='full') data.rename(columns={ "1. open": "open", "2. high": "high", "3. low": "low", "4. close": "close", "5. volume": "volume" }, inplace=True) data = data[['open', 'high', 'low', 'close', 'volume']] return data except Exception as e: print(f"Error fetching 60-minute intraday stock data from Alpha Vantage: {e}") return None # Handle missing data by filling or interpolating def handle_missing_data(data): data = data.sort_index() data = data.resample('60T').ffill() # Forward fill missing data points return data # Calculate technical indicators def calculate_technical_indicators(data, params): # MACD short_ema = data['close'].ewm(span=params['macd']['short_ema'], adjust=False).mean() long_ema = data['close'].ewm(span=params['macd']['long_ema'], adjust=False).mean() data['macd'] = short_ema - long_ema data['macd_signal'] = data['macd'].ewm(span=params['macd']['signal_line'], adjust=False).mean() # RSI delta = data['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=params['rsi']['period']).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=params['rsi']['period']).mean() rs = gain / loss data['rsi'] = 100 - (100 / (1 + rs)) # Bollinger Bands rolling_mean = data['close'].rolling(window=params['bollinger_bands']['period']).mean() rolling_std = data['close'].rolling(window=params['bollinger_bands']['period']).std() data['bollinger_high'] = rolling_mean + (rolling_std * params['bollinger_bands']['standard_deviation']) data['bollinger_low'] = rolling_mean - (rolling_std * params['bollinger_bands']['standard_deviation']) return data # Generate trading signals based on optimized parameters using OpenAI def generate_trading_signals(data, params): recent_data = data.tail(50) # Only use the most recent 50 data points to stay within token limits data_json = recent_data.reset_index().to_json(orient='records', date_format='iso') # Include timestamps params_json = json.dumps(params) prompt = ( f"Given the following stock data with technical indicators in JSON format:\n{data_json}\n" f"and the following optimized parameters for the technical indicators:\n{params_json}\n" "Generate trading signals based on the following conditions:\n" "- Buy signal (1) if MACD crosses above the MACD signal line, RSI is below 30, or the closing price crosses above the lower Bollinger Band.\n" "- Sell signal (-1) if MACD crosses below the MACD signal line, RSI is above 70, or the closing price crosses below the upper Bollinger Band.\n" "Also, provide exit points based on the generated signals. Provide the output in JSON format with the structure:\n" "[\n" " {\"timestamp\": \"YYYY-MM-DDTHH:MM:SS\", \"signal\": 1/-1, \"exit\": \"YYYY-MM-DDTHH:MM:SS\"},\n" " ...\n" "]" "Ensure the timestamps are in chronological order and valid." ) try: response = openai.ChatCompletion.create( model="gpt-4", messages=[ {"role": "system", "content": "You are a financial expert."}, {"role": "user", "content": prompt} ], max_tokens=1500 ) signals_text = response['choices'][0]['message']['content'].strip() print(f"Raw response from OpenAI: {signals_text}") # Extract JSON from the response start_index = signals_text.find("[") end_index = signals_text.rfind("]") + 1 signals_json = signals_text[start_index:end_index] signals = json.loads(signals_json) # Sort signals by timestamp to ensure chronological order signals = sorted(signals, key=lambda x: x['timestamp']) # Add exit points based on the next signal's timestamp for i in range(len(signals) - 1): signals[i]['exit'] = signals[i + 1]['timestamp'] signals[-1]['exit'] = None # Last signal has no exit point return signals except json.JSONDecodeError as e: print(f"Error parsing signals: {e}") print(f"Response from OpenAI: {signals_text}") except Exception as e: print(f"Error fetching trading signals from OpenAI: {e}") return None # Execute trades based on signals def execute_trades(signals, current_position): for signal in signals: timestamp = pd.to_datetime(signal['timestamp']) # Check if the signal timestamp is within market hours if timestamp.time() >= datetime.strptime("09:30", "%H:%M").time() and timestamp.time() <= datetime.strptime("16:00", "%H:%M").time(): if signal['signal'] == 1 and current_position == 0: print(f"Executing buy trade at {signal['timestamp']}") current_position = 1 # Enter long position elif signal['signal'] == -1 and current_position == 1: print(f"Executing sell trade at {signal['timestamp']}") current_position = 0 # Exit long position elif signal['signal'] == -1 and current_position == 0: print(f"Executing short trade at {signal['timestamp']}") current_position = -1 # Enter short position elif signal['signal'] == 1 and current_position == -1: print(f"Executing cover trade at {signal['timestamp']}") current_position = 0 # Exit short position return current_position # Main function to execute the script def main(): ticker = 'IBM' # Specify the ticker for IBM db_file = 'stock_data.db' # Database file # Create database connection and table conn = create_connection(db_file) create_table(conn) # Initial parameters for technical indicators (you can fetch these from OpenAI if desired) initial_params = { "macd": {"short_ema": 12, "long_ema": 26, "signal_line": 9}, "rsi": {"period": 14}, "bollinger_bands": {"period": 20, "standard_deviation": 2} } current_position = 0 # 0 = no position, 1 = long, -1 = short while True: print(f"Fetching latest intraday stock data for {ticker}...") new_data = fetch_60min_data_from_alpha_vantage(ticker) if new_data is None: print("Failed to fetch new stock data. Retrying in an hour...") sleep(3600) # Wait for an hour before retrying continue # Handle missing data new_data = handle_missing_data(new_data) # Calculate technical indicators for the new data new_data_with_indicators = calculate_technical_indicators(new_data, initial_params) # Generate trading signals and exit points using OpenAI signals = generate_trading_signals(new_data_with_indicators, initial_params) if signals is None: print("Failed to generate trading signals. Retrying in an hour...") sleep(3600) # Wait for an hour before retrying continue # Update database with new data and signals save_to_database(conn, new_data_with_indicators) # Execute trades based on signals current_position = execute_trades(signals, current_position) print("Waiting for the next update...") sleep(3600) # Wait for an hour before fetching new data if __name__ == "__main__": main()