Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas as pd
- import numpy as np
- class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKCYAN = '\033[96m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
- def get_supertrend(data, atr_period, atr_multiplier):
- # ATR
- atr = data.atr(period=atr_period)
- atr_df = atr.to_pandas()
- data_df = data.to_pandas()
- high, low, close = data_df['high'],data_df['low'],data_df['close']
- # HL2 is simply the average of high and low prices
- hl2 = (high + low) / 2
- # upperband and lowerband calculation
- # notice that final bands are set to be equal to the respective bands
- final_upperband = upperband = hl2 + (atr_multiplier * atr_df['atr'])
- final_lowerband = lowerband = hl2 - (atr_multiplier * atr_df['atr'])
- # print(final_upperband)
- # initialize Supertrend column to True
- supertrend = [1] * len(data_df)
- for i in range(1, len(data_df.index)):
- curr, prev = i, i-1
- # if current close price crosses above upperband
- if close[curr] > final_upperband[prev]:
- supertrend[curr] = 1
- # if current close price crosses below lowerband
- elif close[curr] < final_lowerband[prev]:
- supertrend[curr] = -1
- # else, the trend continues
- else:
- supertrend[curr] = supertrend[prev]
- # adjustment to the final bands
- if supertrend[curr] == 1 and final_lowerband[curr] < final_lowerband[prev]:
- final_lowerband[curr] = final_lowerband[prev]
- if supertrend[curr] == -1 and final_upperband[curr] > final_upperband[prev]:
- final_upperband[curr] = final_upperband[prev]
- # to remove bands according to the trend direction
- if supertrend[curr] == 1:
- final_upperband[curr] = np.nan
- else:
- final_lowerband[curr] = np.nan
- # print(supertrend)
- final_df = pd.DataFrame({
- 'Supertrend': supertrend,
- 'Final Lowerband': final_lowerband,
- 'Final Upperband': final_upperband
- }, index=data_df.index)
- # print(final_df.tail(1))
- active_band = final_lowerband[-1] if not np.isnan(final_lowerband[-1]) else final_upperband[-1]
- with PlotScope.root(data.symbol):
- plot_line(f"supertrend ({atr_period}/{atr_multiplier})", active_band)
- return supertrend, active_band
- # return pd.DataFrame({
- # 'Supertrend': supertrend,
- # 'Final Lowerband': final_lowerband,
- # 'Final Upperband': final_upperband
- # }, index=df.index)
- def initialize(state):
- state.number_offset_trades = 0;
- stPeriod = 10
- stMult = 3
- maFilterEnabled = False
- maFilterPeriod = 200
- maFilterType = 'EMA'
- sellSignalsEnabled = False
- trailingEnabled = True
- trailingPerc = 20
- atrPeriod = 14
- stopLossEnabled = False
- takeProfitEnabled = False
- stopLossAtrMult = 2
- takeProfitAtrMult = 4
- SYMBOLS = ["ETHBUSD","SOLBUSD"]
- @schedule(interval="1d", symbol=SYMBOLS)
- def HANDLER1(state, data):
- handler(state,data)
- def handler(state, data):
- '''
- 1) Compute indicators from data
- '''
- PORTFOLIO_PERC_PER_BUY = 1/len(SYMBOLS)
- # print(f"Symbols length: {len(SYMBOLS)}")
- symbols = [{'data': data[symbol],'dataframe' : data[symbol].to_pandas()} for symbol in data.keys() if data[symbol]]
- for symbol in symbols:
- dataframe = symbol['dataframe']
- data = symbol['data']
- # update super trend indicator
- st, st_band = get_supertrend( data, stPeriod, stMult)
- atr = data.atr(period=atrPeriod)
- if maFilterEnabled:
- if maFilterType == 'EMA':
- trendMa = data.ema(period=maFilterPeriod)
- elif maFilterType in ['SMA','MA']:
- trendMa = data.ma(period=maFilterPeriod)
- elif maFilterType in ['HMA']:
- trendMa = data.hma(period=maFilterPeriod)
- # on erronous data return early (indicators are of NoneType)
- if (maFilterEnabled and not trendMa):
- return
- current_price = data.close_last
- '''
- 2) Fetch portfolio
- > check liquidity (in quoted currency)
- > resolve buy value
- '''
- portfolio = query_portfolio()
- balance_quoted = portfolio.excess_liquidity_quoted
- quote_asset = portfolio.quoted
- portfolio_value = portfolio.portfolio_value
- balances = portfolio.balances
- position_open = portfolio.open_positions
- position_closed = portfolio.closed_positions
- # we invest only 80% of available liquidity
- #balance_quoted
- buy_value = float(portfolio_value) * PORTFOLIO_PERC_PER_BUY
- if buy_value > balance_quoted:
- buy_value = balance_quoted
- '''
- 3) Fetch position for symbol
- > has open position
- > check exposure (in base currency)
- '''
- position = query_open_position_by_symbol(data.symbol,include_dust=False)
- has_position = position is not None
- '''
- 4) Resolve buy or sell signals
- > create orders using the order api
- > print position information
- '''
- bullishTrendFilter = not maFilterEnabled or current_price > trendMa[-1]
- bearishTrendFilter = not maFilterEnabled or current_price < trendMa[-1]
- sellSignal = False
- buySignal = False
- if st[-1] > 0:
- buySignal = True
- else:
- sellSignal = True
- if has_position:
- if sellSignal and sellSignalsEnabled:
- print("\n -------")
- print(bcolors.FAIL + "SELL" + bcolors.ENDC + f" - [{data.symbol}]: {current_price:.4f}")
- print(f"portfolio: {portfolio_value:.3f}{quote_asset} - OPENPOS: {len(position_open)}")
- close_position(data.symbol)
- else:
- # print(f"Level: {level} dir:{direction} ")
- if buySignal and bullishTrendFilter:
- print("\n -------")
- print(bcolors.OKGREEN + "BUY" + bcolors.ENDC + f" - [{data.symbol}]: {buy_value:.2f} at { current_price:.4f}")
- print(f"portfolio: {portfolio_value:.3f}{quote_asset} - OPENPOS: {len(position_open)}")
- order = order_market_value(symbol=data.symbol, value=buy_value)
- if trailingEnabled:
- firstStopPrice = current_price-(current_price*trailingPerc)/100
- order_trailing_iftouched_amount(symbol=data.symbol, amount=-subtract_order_fees(order.quantity), trailing_percent= trailingPerc/100, stop_price=firstStopPrice)
- print(f"TS order created - {-order.quantity} at {firstStopPrice} ({trailingPerc}%)")
- elif stopLossEnabled or takeProfitEnabled:
- with OrderScope.one_cancels_others():
- if stopLossEnabled:
- stopLossPrice = current_price - (atr[-1]*stopLossAtrMult)
- stopLossOrder = order_iftouched_market_amount(data.symbol,amount=-subtract_order_fees(order.quantity),stop_price=stopLossPrice)
- stopLossPerc = ((current_price-stopLossPrice)/current_price)*100
- print(f"[{data.symbol}] - Stop loss order created at {stopLossPrice} ({stopLossPerc:.2f}%)")
- if takeProfitEnabled:
- takeProfitPrice = current_price + (atr[-1]*takeProfitAtrMult)
- takeProfitOrder = order_iftouched_market_amount(data.symbol,amount=-subtract_order_fees(order.quantity),stop_price=takeProfitPrice)
- takeProfitPerc = ((takeProfitPrice - current_price)/current_price)*100
- print(f"[{data.symbol}] - Take profit order created at {takeProfitPrice} ({takeProfitPerc:.2f}%)")
- ''' 5) Check strategy profitability
- > print information profitability on every offsetting trade
- '''
- if state.number_offset_trades < portfolio.number_of_offsetting_trades:
- pnl = query_portfolio_pnl()
- profitability = query_portfolio_profitablity()
- # print(f"Profitability: {profitability}")
- print(f"Accumulated Pnl of Strategy: {pnl}")
- offset_trades = portfolio.number_of_offsetting_trades
- number_winners = portfolio.number_of_winning_trades
- winrate = (number_winners/offset_trades) *100
- print(f"Win Rate: {winrate:.2f}% ({number_winners}/{offset_trades})" )
- print(f"Best trade Return : {portfolio.best_trade_return:.2%} - {profitability.bestTradePnl:.2f}")
- print(f"Worst trade Return : {portfolio.worst_trade_return:.2%} - {profitability.worstTradePnl:.2f}")
- print(f"Average Profit per Winning Trade : {portfolio.average_profit_per_winning_trade:.2f}")
- print(f"Average Loss per Losing Trade : {portfolio.average_loss_per_losing_trade:.2f}")
- # reset number offset trades
- state.number_offset_trades = portfolio.number_of_offsetting_trades
- print("------- \n")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement