Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //@version=5
- indicator("πΊ πΎπππππππππ πΎππππππππππ πΎππππ πΊ", overlay=false)
- // Input lengths
- ADF_LEN = input.int(20, title="ADF Length")
- PP_LEN = input.int(2, title="PP Length")
- HURST_LEN = input.int(30, title="Hurst Exponent Length", minval=1)
- KPSS_LEN = input.int(75, title="KPSS Length")
- var table result_table = table.new(position.middle_right, 4, 8, border_width=1, border_color=color.gray, frame_color=color.gray)
- // ADF Test
- src = input.source(title='Source', defval=close)
- lookback = input.int(title='Length', defval=30, minval = 2, tooltip = 'The test is applied in a moving window. Length defines the number of points in the sample.')
- nLag = input.int(title='Maximum lag', defval=0, minval = 0, tooltip = 'Maximum lag which is included in test. Generally, lags allow taking into account serial correlation of price changes.')
- conf = input.string(title='Confidence Level', defval="90%", options = ['90%', '95%', '99%'], tooltip = 'Defines at which confidence level the critical value of the ADF test statistic is calculated. If the test statistic is below the critical value, the time series sample is concluded to be mean-reverting.')
- // Matrix
- matrix_get(A, i, j, nrows) =>
- array.get(A, i + nrows * j)
- matrix_set(A, value, i, j, nrows) =>
- array.set(A, i + nrows * j, value)
- A
- transpose(A, nrows, ncolumns) =>
- float[] AT = array.new_float(nrows * ncolumns, 0)
- for i = 0 to nrows - 1
- for j = 0 to ncolumns - 1
- matrix_set(AT, matrix_get(A, i, j, nrows), j, i, ncolumns)
- AT
- multiply(A, B, nrowsA, ncolumnsA, ncolumnsB) =>
- float[] C = array.new_float(nrowsA * ncolumnsB, 0)
- int nrowsB = ncolumnsA
- float elementC = 0.0
- for i = 0 to nrowsA - 1
- for j = 0 to ncolumnsB - 1
- elementC := 0
- for k = 0 to ncolumnsA - 1
- elementC += matrix_get(A, i, k, nrowsA) * matrix_get(B, k, j, nrowsB)
- matrix_set(C, elementC, i, j, nrowsA)
- C
- vnorm(X) =>
- int n = array.size(X)
- float norm = 0.0
- for i = 0 to n - 1
- norm += math.pow(array.get(X, i), 2)
- math.sqrt(norm)
- qr_diag(A, nrows, ncolumns) =>
- float[] Q = array.new_float(nrows * ncolumns, 0)
- float[] R = array.new_float(ncolumns * ncolumns, 0)
- float[] a = array.new_float(nrows, 0)
- float[] q = array.new_float(nrows, 0)
- float r = 0.0
- float aux = 0.0
- for i = 0 to nrows - 1
- array.set(a, i, matrix_get(A, i, 0, nrows))
- r := vnorm(a)
- matrix_set(R, r, 0, 0, ncolumns)
- for i = 0 to nrows - 1
- matrix_set(Q, array.get(a, i) / r, i, 0, nrows)
- if ncolumns != 1
- for k = 1 to ncolumns - 1
- for i = 0 to nrows - 1
- array.set(a, i, matrix_get(A, i, k, nrows))
- for j = 0 to k - 1
- r := 0
- for i = 0 to nrows - 1
- r += matrix_get(Q, i, j, nrows) * array.get(a, i)
- matrix_set(R, r, j, k, ncolumns)
- for i = 0 to nrows - 1
- aux := array.get(a, i) - r * matrix_get(Q, i, j, nrows)
- array.set(a, i, aux)
- r := vnorm(a)
- matrix_set(R, r, k, k, ncolumns)
- for i = 0 to nrows - 1
- matrix_set(Q, array.get(a, i) / r, i, k, nrows)
- [Q, R]
- pinv(A, nrows, ncolumns) =>
- [Q, R] = qr_diag(A, nrows, ncolumns)
- float[] QT = transpose(Q, nrows, ncolumns)
- var Rinv = array.new_float(ncolumns * ncolumns, 0)
- float r = 0.0
- matrix_set(Rinv, 1 / matrix_get(R, 0, 0, ncolumns), 0, 0, ncolumns)
- if ncolumns != 1
- for j = 1 to ncolumns - 1
- for i = 0 to j - 1
- r := 0.0
- for k = i to j - 1
- r += matrix_get(Rinv, i, k, ncolumns) * matrix_get(R, k, j, ncolumns)
- matrix_set(Rinv, r, i, j, ncolumns)
- for k = 0 to j - 1
- matrix_set(Rinv, -matrix_get(Rinv, k, j, ncolumns) / matrix_get(R, j, j, ncolumns), k, j, ncolumns)
- matrix_set(Rinv, 1 / matrix_get(R, j, j, ncolumns), j, j, ncolumns)
- float[] Ainv = multiply(Rinv, QT, ncolumns, ncolumns, nrows)
- Ainv
- adftest(a, nLag, conf) =>
- if nLag >= array.size(a)/2 - 2
- runtime.error("ADF: Maximum lag must be less than (Length/2 - 2)")
- int nobs = array.size(a)-nLag-1
- float[] y = array.new_float(na)
- float[] x = array.new_float(na)
- float[] x0 = array.new_float(na)
- for i = 0 to nobs-1
- array.push( y, array.get(a,i)-array.get(a,i+1))
- array.push( x, array.get(a,i+1))
- array.push(x0, 1.0)
- float[] X = array.copy(x)
- int M = 2
- X := array.concat(X, x0)
- if nLag > 0
- for n = 1 to nLag
- float[] xl = array.new_float(na)
- for i = 0 to nobs-1
- array.push(xl, array.get(a,i+n)-array.get(a,i+n+1))
- X := array.concat(X, xl)
- M += 1
- float[] c = pinv(X, nobs, M)
- float[] coeff = multiply(c, y, M, nobs, 1)
- float[] Yhat = multiply(X,coeff,nobs,M,1)
- float meanX = array.avg(x)
- float sum1 = 0.0
- float sum2 = 0.0
- for i = 0 to nobs-1
- sum1 += math.pow(array.get(y,i) - array.get(Yhat,i), 2)/(nobs-M)
- sum2 += math.pow(array.get(x,i) - meanX, 2)
- float SE = math.sqrt(sum1/sum2)
- float adf = array.get(coeff,0) /SE
- float crit = switch
- conf == "90%" => -2.56677 - 1.5384/nobs - 2.809/nobs/nobs
- conf == "95%" => -2.86154 - 2.8903/nobs - 4.234/nobs/nobs - 40.040/nobs/nobs/nobs
- conf == "99%" => -3.43035 - 6.5393/nobs - 16.786/nobs/nobs - 79.433/nobs/nobs/nobs
- [adf, crit, nobs]
- // Load data from a moving window into an array
- float[] a = array.new_float(na)
- for i = 0 to lookback-1
- array.push(a,src[i])
- // Perform the ADF test
- [tauADF, crit, nobs] = adftest(a, nLag, conf)
- //plot(tauADF, title="ADF Test Statistic", color=color.red)
- //plot(crit, title="ADF Critical Value", color=color.green)
- // Phillips-Perron (PP) Test
- logReturns(series) =>
- math.log(series / series[1])
- price = close
- logReturnsSeries = logReturns(price)
- lookbackPeriod = PP_LEN
- meanLogReturns = ta.sma(logReturnsSeries, lookbackPeriod)
- stdLogReturns = ta.stdev(logReturnsSeries, lookbackPeriod)
- b = ta.correlation(logReturnsSeries, logReturnsSeries[1], lookbackPeriod) * (stdLogReturns / ta.stdev(logReturnsSeries[1], lookbackPeriod))
- residuals = logReturnsSeries - (meanLogReturns + b * logReturnsSeries[1])
- pp_z = ((b - 1) / ta.stdev(residuals, lookbackPeriod)) /10 //Calming down stat, remove if reqd
- criticalValue = input.float(-70)
- //plot(pp_z, title="PP Test Statistic", color=color.blue)
- //hline(criticalValue, title="Critical Value", color=color.red)
- // Hurst Exponent calculation
- N = HURST_LEN
- hurst_exponent(source, length) =>
- float result = na
- if not na(source[0])
- mean = ta.sma(source, length)
- cumulative_dev = array.new_float(length, 0.0)
- for i = 0 to (length - 1)
- if not na(source[i])
- array.set(cumulative_dev, i, source[i] - mean)
- cumulative_sum = 0.0
- Y = array.new_float(length, 0.0)
- for i = 0 to (length - 1)
- cumulative_sum += array.get(cumulative_dev, i)
- array.set(Y, i, cumulative_sum)
- R = array.max(Y) - array.min(Y)
- S = ta.stdev(source, length)
- result := math.log(R / S) / math.log(length)
- result
- close_prices = request.security(syminfo.tickerid, 'D', close)
- H = hurst_exponent(close_prices, N)
- // KPSS Test
- length = KPSS_LEN
- f_linreg(src, length) =>
- linreg_val = ta.linreg(src, length, 0)
- linreg_val
- linreg_values = f_linreg(close, length)
- KPSSresiduals = close - linreg_values
- partial_sums = 0.0
- partial_sum_squared = 0.0
- for i = 0 to length - 1
- partial_sums += nz(KPSSresiduals[i])
- partial_sum_squared += partial_sums * partial_sums
- lags = math.floor(math.sqrt(length))
- sum_residuals = 0.0
- for i = 0 to length - 1
- sum_residuals += nz(KPSSresiduals[i]) * nz(KPSSresiduals[i])
- for lag = 1 to lags
- weight = 1.0 - lag / (lags + 1.0)
- lag_sum = 0.0
- for i = lag to length - 1
- lag_sum += nz(KPSSresiduals[i]) * nz(KPSSresiduals[i - lag])
- sum_residuals += 2 * weight * lag_sum
- long_run_variance = sum_residuals / length
- kpss_stat = partial_sum_squared / (length * length * long_run_variance)
- critical_value_kpss = input.float(0.5, step = 0.01)
- //plot(kpss_stat, title="KPSS Test", color=color.orange)
- //Clamping function to keep the value within the specified range
- clamp(value, min_value, max_value) =>
- math.min(math.max(value, min_value), max_value)
- // Mapping functions
- normalize_statistic(value, min_value, max_value) =>
- 2 * ((value - min_value) / (max_value - min_value)) - 1
- // Normalize ADF and PP scores using their respective critical values
- tauADF_clamped = clamp(tauADF, -1, 1) // Adjust the range based on your data
- normalized_tauADF = normalize_statistic(tauADF_clamped, -1, 1) // Adjust the range based on your data
- pp_z_clamped = clamp(pp_z, -1, 1) // Adjust the range based on your data
- normalized_pp_z = normalize_statistic(pp_z_clamped, -1, 1) // Adjust the range based on your data
- H_clamped = clamp(H, -1, 1) // Adjust the range based on your data
- normalized_H = 2 * (math.abs(H_clamped - 0.5) / 0.5) - 1
- kpss_stat_clamped = clamp(kpss_stat, -1, 1) // Adjust the range based on your data
- normalized_kpss_stat = normalize_statistic(kpss_stat_clamped, -1, 1) // Adjust the range based on your data
- // Plotting
- //plot(tauADF, title="ADF Test Statistic", color=color.red)
- //plot(crit, title="ADF Critical Value", color=color.green)
- //plot(pp_z, title="Phillips-Perron Test Z-Score", color=color.blue)
- //plot(H, title="Hurst Exponent", color=color.purple)
- //plot(kpss_stat, title="KPSS Test", color=color.orange)
- // Function to round value to specified decimal places
- roundTo(value, decimals) =>
- factor = math.pow(10, decimals)
- math.round(value * factor) / factor
- // Update table with ADF, PP, Hurst, and KPSS results
- if bar_index == 0
- // Title row
- table.cell(result_table, 0, 0, "πΎπππππππππ", bgcolor=color.gray, text_color=color.white)
- table.cell(result_table, 1, 0, "πΎππππππππππ", bgcolor=color.gray, text_color=color.white)
- table.cell(result_table, 2, 0, "πΎππππ", bgcolor=color.gray, text_color=color.white)
- // Header row
- table.cell(result_table, 0, 1, "Test", bgcolor=color.gray)
- table.cell(result_table, 1, 1, "Value", bgcolor=color.gray)
- table.cell(result_table, 2, 1, "Normalized Value", bgcolor=color.gray)
- // Data labels
- table.cell(result_table, 0, 2, "ADF Statistic", bgcolor=color.white)
- table.cell(result_table, 0, 3, "PP Statistic", bgcolor=color.white)
- table.cell(result_table, 0, 4, "Hurst Exponent", bgcolor=color.white)
- table.cell(result_table, 0, 5, "KPSS Statistic", bgcolor=color.white)
- table.cell(result_table, 1, 2, str.tostring(roundTo(tauADF, 4)), bgcolor=color.white)
- table.cell(result_table, 1, 3, str.tostring(roundTo(pp_z, 4)), bgcolor=color.white)
- table.cell(result_table, 1, 4, str.tostring(roundTo(H, 4)), bgcolor=color.white)
- table.cell(result_table, 1, 5, str.tostring(roundTo(kpss_stat, 4)), bgcolor=color.white)
- table.cell(result_table, 2, 2, str.tostring(roundTo(normalized_tauADF, 4)), bgcolor=color.white)
- table.cell(result_table, 2, 3, str.tostring(roundTo(normalized_pp_z, 4)), bgcolor=color.white)
- table.cell(result_table, 2, 4, str.tostring(roundTo(normalized_H, 4)), bgcolor=color.white)
- table.cell(result_table, 2, 5, str.tostring(roundTo(normalized_kpss_stat, 4)), bgcolor=color.white)
- // Calculate average of normalized values
- average_normalized = (normalized_tauADF + normalized_pp_z + normalized_H + normalized_kpss_stat) / 4
- // Add average row
- table.cell(result_table, 0, 6, "Average Normalized Value", bgcolor=color.white)
- table.cell(result_table, 1, 6, "πΊ", bgcolor=color.white)
- table.cell(result_table, 2, 6, str.tostring(roundTo(average_normalized, 4)), bgcolor=color.white)
- // Add trend interpretation row
- trend_interpretation = average_normalized >= 0 ? "Trend Following" : "Mean Reverting"
- table.cell(result_table, 0, 7, "Trend Interpretation", bgcolor=color.white)
- table.cell(result_table, 1, 7, "πΊ", bgcolor=color.white)
- table.cell(result_table, 2, 7, trend_interpretation, bgcolor=color.white)
- // Plotting
- plot(normalized_tauADF, title="Normalized ADF Test Statistic", color=color.red)
- plot(normalized_pp_z, title="Normalized PP Test Statistic", color=color.blue)
- plot(normalized_H, title="Normalized Hurst Exponent", color=color.purple)
- plot(normalized_kpss_stat, title="Normalized KPSS Test", color=color.orange)
- // NK's Plot
- colorState = average_normalized >= 0 ? color.lime : color.fuchsia
- plot(average_normalized, title="Average",color=colorState)
- //There is neither happiness nor unhappiness in this world; there is only the comparison of one state with another. Only a man who has felt ultimate despair is capable of feeling ultimate bliss. It is necessary to have wished for death in order to know how good it is to live.....the sum of all human wisdom will be contained in these two words: Wait and Hope.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement