Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- from sklearn.preprocessing import MinMaxScaler
- import pandas as pd
- from pandas import DataFrame
- import pandas_datareader.data as pdr
- import pymysql, dbCredentials
- from openpyxl import load_workbook
- class Stock:
- def __init__(self, SYMBOL, testShare):
- self.symbol = SYMBOL
- self.dfData = prepareFundamentalStatistics(self.symbol, printFile=True)
- self.X_train, self.y_train, self.X_test, self.y_test, self.scalar = trainAndTestData(self.dfData, testShare)
- def fetchFromDB(SYMBOL, dbTable='fundamantaltbl'):
- # The fundamental data is fetched from an external database
- conn = pymysql.connect(
- user=dbCredentials.user,
- password=dbCredentials.password,
- host=dbCredentials.host,
- port=3306,
- database='innodb')
- cur = conn.cursor()
- insert_stmt = "SELECT DATE, TYPE, VALUE FROM " + dbTable + " WHERE SYMBOL=%(SYMBOL)s"
- #print("insert_stmt: ", insert_stmt)
- cur.execute(insert_stmt, {'SYMBOL': SYMBOL})
- field_names = [i[0] for i in cur.description]
- get_data = [xx for xx in cur]
- cur.close()
- conn.close()
- df = DataFrame(get_data, index=[x[0] for x in get_data])
- df.columns = field_names
- df.sort_index(inplace=True)
- return df
- def writeToFile(df, workbook, worksheet):
- # Method writes a DataFrame to a specified worksheet in a Excel-workbook
- try:
- book = load_workbook(workbook) # Try to open an existing file
- with pd.ExcelWriter(workbook, engine='openpyxl') as writer:
- writer.book = book
- writer.sheets = dict((ws.title, ws) for ws in book.worksheets)
- df.to_excel(writer, worksheet) # Append the dataframe
- writer.save()
- except:
- df.to_excel(workbook, worksheet) # If the file does not exist a new file is created
- def fcff(cfo, intexp, taxrate, capex):
- return cfo + intexp * (1.0 - taxrate) + capex
- def wacc(de_ratio, k_debt=0.05, k_equity=0.12, tax_rate=0.3):
- weight_equity = 1.0 / (de_ratio + 1.0)
- return (1 - weight_equity) * k_debt * (1 - tax_rate) + weight_equity * k_equity
- def dcf(cf, growth, dr):
- #Growth rate is a scalar
- if (np.size(growth) == 1):
- return (cf * (1 + growth) / (dr - growth)) / (1 + dr)
- # Growth rate is an array
- pvCF, term_value = 0, 0
- for i in range(0, len(growth)):
- cf = cf * (1 + growth[i])
- if (i == len(growth) - 1):
- term_value = cf * (1 + growth[i]) / (dr - growth[i])
- pvCF = pvCF + (cf + term_value) / (1.0 + dr)**(i+1)
- return pvCF
- def getDfPrice(stocklist, start_date, end_date, type='Close'):
- df = pdr.get_data_yahoo(stocklist, start_date, end_date)
- #df = pdr.DataReader(stocklist, start=start_date, end=end_date)
- df = DataFrame(df[type].values, columns=[stocklist], index=df.index.date)
- #print("df.index:", df.index, ", df.columns: ", df.columns)
- return df
- def prepareFundamentalStatistics(symbol, printFile=True):
- dfFund = fetchFromDB(symbol, dbTable='fundamentaltbl') # Fetch fundamental data from database
- datesFund = dfFund.ix[:, 'DATE'].unique() # The unique dates
- dfPrice = getDfPrice(symbol, datesFund[0], datesFund[-1], type='Close') # Fetch the price data from Yahoo Finance
- # Prepare fundamental data values for the analysis
- rev = dfFund[(dfFund['TYPE'] == 'REV')].ix[:, 'VALUE'].values
- ni = dfFund[(dfFund['TYPE'] == 'NI')].ix[:, 'VALUE'].values
- ebt = dfFund[(dfFund['TYPE'] == 'EBT')].ix[:, 'VALUE'].values
- cfo = dfFund[(dfFund['TYPE'] == 'CFO')].ix[:, 'VALUE'].values
- intexp = dfFund[(dfFund['TYPE'] == 'INTEX')].ix[:, 'VALUE'].values
- capex = dfFund[(dfFund['TYPE'] == 'CAPEX')].ix[:, 'VALUE'].values
- wash = dfFund[(dfFund['TYPE'] == 'WASH')].ix[:, 'VALUE'].values
- # For every price-date sum over 4 previous quarters and calulate the key ratios based on the current price
- dfTemp = DataFrame(np.column_stack((rev, ni, fcff(np.array(cfo), np.array(intexp),
- np.divide(np.array(ebt) - np.array(ni), np.array(ebt)),
- np.array(capex)))), index=datesFund,
- columns=['REV', 'NI', 'FCFF'])
- # Annualize the fundamental data and remove the first 3 quarters
- dfTemp = DataFrame.rolling(dfTemp, window=4, min_periods=4, center=False).sum()[3:]
- dfTemp['WASH'] = wash[3:] # Add weighted average number of shares as the last column
- # Calculate and save the fundamental
- count = 0
- output2 = np.array([])
- dfPriceGrowth = dfPrice.pct_change(periods=1)
- price_start = 0
- for i in range(1, len(dfPrice.index)):
- output = np.array(dfTemp.iloc[dfTemp.index <= dfPrice.index[i]].iloc[-1:].values)
- if (output.size > 0):
- if (count==0):
- price_start = i
- count += 1
- price = dfPrice.iloc[i].values
- priceGrowth = round(dfPriceGrowth.iloc[i], 4)
- marketCap = price * output[-1][-1]
- output2 = np.append(output2, priceGrowth)
- output2 = np.append(output2, np.round(np.divide(marketCap, output[-1][:-1]), 2))
- # Assign the Class variable
- dfData = DataFrame(np.reshape(output2, (count, 4)),
- index=dfPriceGrowth[dfPriceGrowth.index >= dfPriceGrowth.index[price_start]].index,
- columns=['Price_delta', 'P/S', 'P/E', 'P/FCFF'])
- if (printFile == True):
- writeToFile(dfData, symbol + '.xlsx', 'python_output')
- return dfData
- def trainAndTestData(dfData, trainShare=0.8):
- # Prepare Train and Test data by splitting the data set in two parts by trainShare variable
- n_train = round(dfData.shape[0] * trainShare) # The total number of rows multiplied by the training share
- X = dfData.iloc[:,:].values
- # Normalize the dataset
- scaler = MinMaxScaler(feature_range=(0, 1))
- print("scaler: ", scaler)
- X = scaler.fit_transform(X)
- X = X[0:-1,:] # Remove the last row
- y = dfData.iloc[:, 0].values
- y = y[1:] # Remove the first row, since the y variables should be forward looking
- X_train, y_train = X[:n_train,:], y[:n_train]
- X_test, y_test = X[n_train:,:], y[n_train:]
- # Reshape input X to 3D format [samples, timesteps, features]
- X_train = X_train.reshape((X_train.shape[0], 1, X_train.shape[1]))
- X_test = X_test.reshape((X_test.shape[0], 1, X_test.shape[1]))
- return X_train, y_train, X_test, y_test, scaler
Add Comment
Please, Sign In to add comment