Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas as pd
- import numpy as np
- import cmath as mt
- import pylab as pl
- from matplotlib import style
- style.use('ggplot')
- # importing training period:
- df1 = np.genfromtxt('NASDAQ_training.csv',delimiter=',')
- # importing testing period:
- df2 = np.genfromtxt('NASDAQ_testing.csv',delimiter=',')
- # converting csv file to dataframe format:
- array1 = pd.DataFrame(df1)
- array2 = pd.DataFrame(df2)
- # naming each column of the dataframe:
- array1.columns = ['Index','Open','High','Low','Close','Adj Close','Volume']
- array2.columns = ['Index','Open','High','Low','Close','Adj Close','Volume']
- # remove redundant column which contains only NaNs:
- del array1['Index']
- del array2['Index']
- # remove redundant row which contains only NaNs:
- array1 = array1[1:]
- array2 = array2[1:]
- # select only Adj.Close from the data frame:
- array_adj_close1 = array1.ix[:, 'Adj Close']
- array_adj_close2 = array2.ix[:, 'Adj Close']
- # convert the Adj.Close to ndarray format (numpy):
- array_adj_close1 = array_adj_close1.values
- array_adj_close2 = array_adj_close2.values
- # Creating a class project that calculate the return of AMA strategy
- class project:
- # Constructor that's initialized with an array object:
- def __init__ (self,array,cumulative_pnl):
- self.array = array
- self.cumulative_pnl = cumulative_pnl
- # Creating method of calculating PnL:
- def PnL(self, ks, kf, interval, sd_interval, y):
- # Creating a matrix of 15 x (len(self.array) of zeros:
- data_matrix = np.zeros((len(self.array), 15))
- # Creating a vector of Vt:
- runningsum = 0
- r = 0
- for i in range(0, len(self.array)):
- if i + interval < len(self.array):
- for j in range(0, interval):
- runningsum += abs(self.array[i + interval] - self.array[i + interval - 1 - j])
- else:
- break
- data_matrix[r + interval][0]= runningsum
- r += 1
- runningsum = 0
- # Creating a vector of Dt:
- r = 0
- runningsum = 0
- for i in range(0, len(self.array)):
- if i + interval < len(self.array):
- runningsum += self.array[i + interval] - self.array[i]
- else:
- break
- data_matrix[r + interval][1] = runningsum
- r += 1
- runningsum = 0
- # Creating a Pt vector:
- r = 0
- for i in self.array[interval:]:
- data_matrix[r+interval][3] = i
- r += 1
- # Creating an ER vector:
- r = 0
- for i in range(len(self.array)-interval):
- data_matrix[r + interval][4] = (data_matrix[r + interval][1] / data_matrix[r + interval][0])
- r += 1
- # Creating a Ct vector:
- fastest = 2 / (kf + 1)
- slowest = 2 / (ks + 1)
- r = 0
- for i in range(len(self.array)-interval):
- data_matrix[r+interval][5] = ((data_matrix[r+interval][4] * (fastest - slowest) + fastest))
- r += 1
- runningsum = 0
- # Creating the first moving average:
- for i in self.array[0:interval-1]:
- runningsum += i
- SMA = runningsum / interval
- AMA = SMA
- r = 0
- # Creating AMA array:
- for i in range(len(self.array)-interval):
- AMA = AMA + data_matrix[r+interval][5] * (data_matrix[r + interval][3] - AMA)
- data_matrix[r + interval][6] = AMA
- r += 1
- r = 0
- # Filter array:
- data_matrix[interval:, 7] = pd.rolling_std(data_matrix[interval:, 6], window=sd_interval)
- data_matrix[:, 7] = np.nan_to_num(data_matrix[:, 7])
- data_matrix[:, 7] = y * data_matrix[:, 7]
- # Signal array:
- # Buy when the AMA rises above a prior n-period low by an amount greater than the filter.
- # Sell when the AMA falls below a prior n-period high by an amount greater than the filter.
- # If the current signal does not satisfy the above conditions, it will take the precedent signal.
- for i in range(0, len(self.array) - interval - sd_interval):
- if data_matrix[i + sd_interval + interval][6] - np.amin(data_matrix[i + interval:i + sd_interval + interval - 1, 6]) > data_matrix[i + sd_interval + interval - 1][7]:
- data_matrix[i + sd_interval + interval][8] = 1
- elif data_matrix[i + sd_interval + interval][6] - np.amax(data_matrix[i + interval:i + sd_interval + interval - 1, 6]) < data_matrix[i + sd_interval + interval - 1][7]:
- data_matrix[i + sd_interval + interval][8] = -1
- else:
- data_matrix[i + sd_interval + interval][8] = data_matrix[i - 1 + sd_interval + interval][8]
- # PnL array:
- # PnL = signal [t] * (P[t] - P[t-1]):
- for i in range(len(self.array)-interval-sd_interval):
- data_matrix[i + sd_interval + interval][9] = data_matrix[i + sd_interval + interval][8] * (data_matrix[i + sd_interval + interval][3]-data_matrix[i + sd_interval + interval - 1][3])
- # Cumulative PnL:
- runningsum = 0
- for i in range(len(self.array)-interval-sd_interval):
- runningsum += data_matrix[i + sd_interval + interval][9]
- data_matrix[i + sd_interval + interval][10] = runningsum
- self.cumulative_pnl = data_matrix[sd_interval + interval:, 10]
- return data_matrix[len(self.array)-1][10]
- # Create a a class to store the cumulative PnL at different point of time:
- class buy_n_hold:
- def __init__(self,array,cumulative_pnl):
- self.array = array
- self.cumulative_pnl = cumulative_pnl
- def cum_pnl(self,interval, sd_interval):
- runningsum = 0
- for i in range(len(self.array) - interval - sd_interval):
- runningsum += self.array[i + sd_interval + interval] - self.array[i + sd_interval + interval - 1]
- self.cumulative_pnl.append(runningsum)
- a = self.cumulative_pnl
- return a
- # parameters to be optimized:
- ks = np.arange(1,10,1)
- kf = np.arange(10,100,10)
- interval = np.arange(5,30,5)
- sd_interval = np.arange(30,90,30)
- y = [0.15,0.20,0.25,0.30]
- # Create a dataframe that contains all the parameters:
- df = pd.DataFrame(index=range(0,20000),columns=['ks','kf','interval','sd_interval','y','PnL'])
- # Create an object of type project:
- cumulative_pnl_1 = []
- project_pnl_cal = project(array_adj_close1,cumulative_pnl_1)
- # Storing the AMA's PnL with corresponding parameters:
- n = 0
- for i in ks:
- for j in kf:
- for m in interval:
- for o in sd_interval:
- for p in y:
- df.ix[n,'ks'] = i
- df.ix[n,'kf'] = j
- df.ix[n,'interval'] = m
- df.ix[n, 'sd_interval'] = o
- df.ix[n, 'y'] = p
- df.ix[n,'PnL'] = project_pnl_cal.PnL(j,i,m,o,p)
- n += 1
- # Displaying the 5 sets of parameters with the best results of the training period (in descending order):
- result = df.sort_values(['PnL'],ascending = False)
- print (result.head())
- # As seen from the dataframe, the best set of parameters is:
- # ks kf interval sd_interval y PnL
- # 1 10 25 30 0.3 13123.4
- # 1 50 25 30 0.3 13100.4
- # 1 80 25 30 0.3 13100.4
- # 1 60 25 30 0.3 13100.4
- # where interval is AMA n period and sd_interval is AMA standard deviation period
- # So clearly, the best set of parameters is:
- # ks = 1
- # kf = 10
- # AMA n period (interval) = 25
- # AMA standard deviation period (sd_interval) = 30
- # y = 0.3
- # PnL of buy and hold strategy over the training period:
- #print (array_adj_close1[-1]-array_adj_close1[0])
- # Create an array that stores cumulative PnL of buy and hold at different point of time:
- cumulative_pnl_2 = []
- buy_n_hold_pnl_cal = buy_n_hold(array_adj_close2,cumulative_pnl_2)
- project_pnl_cal = project(array_adj_close2,cumulative_pnl_2)
- # Plot the performance of AMA strategy vs. buy and hold strategy:
- # Create 2 arrays of AMA cumulative PnL and Buy and Hold cumulative PnL that shows profit at every single point of time:
- project_cumulative_pnl = project_pnl_cal.PnL(10,1,25,30,0.3)
- buy_n_hold_cumulative_pnl = buy_n_hold_pnl_cal.cum_pnl(25,30)
- project_cumulative_pnl_vec = project_pnl_cal.cumulative_pnl
- # Create index for the plot function:
- index1 = np.arange(0,len(project_cumulative_pnl_vec),1)
- index2 = np.arange(0,len(buy_n_hold_cumulative_pnl),1)
- # Plot the cumulative PnL of both strategies:
- pl.plot(index1,project_cumulative_pnl_vec,'r',label ='AMA cumulative PnL',linestyle='--')
- pl.plot(index2,buy_n_hold_cumulative_pnl,'b', label = 'Buy n Hold cumulative PnL')
- pl.legend(loc='top right')
- pl.show()
- # Annualized return of portfolio of AMA strategy vs. portfolio of Buy and Hold strategy:
- AMA_ROR = (project_cumulative_pnl_vec [-1]+array_adj_close2[-1] - project_cumulative_pnl_vec[0]-array_adj_close2[55])/(project_cumulative_pnl_vec[0]+array_adj_close2[55])*100
- Annualized_AMA_ROR = (1+AMA_ROR)**(365/471)
- print(Annualized_AMA_ROR)
- Buy_n_Hold_ROR = (buy_n_hold_cumulative_pnl[-1]+array_adj_close2[-1] - buy_n_hold_cumulative_pnl[0] - array_adj_close2[55])/(buy_n_hold_cumulative_pnl[0]+array_adj_close2[55])*100
- Annualized_Buy_n_Hold_ROR = (1+Buy_n_Hold_ROR)**(365/471)
- print(Annualized_Buy_n_Hold_ROR)
- # Create arrays that contain value of each portfolio at different point of time:
- # portfolio_value will reflect the portfolio of AMA strategy at every single point of testing time
- # buy_n_hold_value will reflect the portfolio of buy and hold strategy at every single point of testing time
- portfolio_value = np.add(project_cumulative_pnl_vec[:],array_adj_close2[55:])
- buy_n_hold_value = np.add(buy_n_hold_cumulative_pnl[:],array_adj_close2[55:])
- # Risk free rate from 2009-2015:
- # Source: http://www.multpl.com/10-year-treasury-rate/table/by-year
- annual_Rf = [0.0252,0.0373,0.0339,0.0197,0.0191,0.0286,0.0188,0.0209]
- # Convert annual risk free rate to daily risk free rate:
- daily_annual_Rf = np.mean(annual_Rf)/252
- # Annualized Information Ratio:
- def IR(Rp,Ri):
- Rp_mean = []
- for i in range(0,len(Rp)):
- if i+1 < len(Rp):
- Rp_mean.append(mt.log(Rp[i+1]/Rp[i]))
- Rp_return = np.mean(Rp_mean)
- Ri_mean = []
- for i in range(0,len(Ri)):
- if i+1 < len(Ri):
- Ri_mean.append(mt.log(Ri[i + 1] / Ri[i]))
- Ri_return = np.mean(Ri_mean)
- excess_return_std = np.std(np.subtract(Rp_mean,Ri_mean))
- return mt.sqrt(252)*(Rp_return - Ri_return)/excess_return_std
- print (IR(portfolio_value,buy_n_hold_value))
- # Annualized Information Ratio = 1.2
- # Annualized Sharpe Ratio:
- def SR(Rp,Rf):
- Rp_mean = []
- for i in range(0,len(Rp)):
- if i+1 < len(Rp):
- Rp_mean.append(mt.log(Rp[i+1]/Rp[i]))
- Rp_return = np.mean(Rp_mean)
- excess_return_std = np.std(np.subtract(Rp_mean,Rf))
- return mt.sqrt(252)*(Rp_return - Rf)/excess_return_std
- print (SR(portfolio_value,daily_annual_Rf))
- # Annualized Sharpe ratio = 2.89
Add Comment
Please, Sign In to add comment