Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- import sys
- import os
- sys.path.append('..'+os.sep)
- import ABXToolkit
- import ABXstatus
- import time
- import talib
- import talib.abstract as ta
- import numpy as np
- import matplotlib.pyplot as plt
- import pandas as pd
- import datetime
- import configparser
- USER_PATH = os.path.dirname(__file__)
- ABXToolkit.load_ini(USER_PATH, 'h94_ab')
- # line bot
- from linebot import LineBotApi
- from linebot.models import TextSendMessage
- from linebot.exceptions import LineBotApiError
- line_bot_api = LineBotApi("xzbRVRWKFwPyzhHx/xS478lfjMNM1OHBNGgZdobGYp/QR85KVAE0jBpv4e+YC5DptU4VEusq+8pi/qZj610IGBrxsucE/kNEKE8HG9S+fA5lJpD6reioi5cZfdEr5RCTriiShLYD9YztNnO19jpOAQdB04t89/1O/w1cDnyilFU=")
- fileisread = False
- historydf = None
- tmpday = None
- tmpdaydf = None
- historydate = None
- anchorprice = 0.0
- # 如果想要回測 初始化ptrade.ini取得自訂義設定檔,成功時返回物件-class ConfigParser(RawConfigParser)
- ptrade = ABXToolkit.ptrade_ini('ptrade.ini')
- # 初始化通知間隔物件
- ABXToolkit.intervalinit()
- # 自定義line通知方法
- def linesendmessage(ptext):
- try:
- # ptext = stext
- line_bot_api.push_message("C02af7a312631c0e59d32660b332a3ea0", \
- TextSendMessage(text=ptext))
- except LineBotApiError as e:
- print(e)
- # 即時報價callback
- def my_func(watch_num, stkid, dframe):
- global historydf
- global anchorprice
- global tmpdaydf
- if watch_num == 'dfhistorydata':
- print("已下載歷史報價", stkid)
- fileisread = True
- if watch_num == 'dftrade':
- # 成交價若一樣就直接返回
- if anchorprice == dframe.iloc[-1]['TradePrice']:
- return
- anchorprice = dframe.iloc[-1]['TradePrice']
- # 將今日成交明細轉換成今日線
- locatnum = ABXToolkit.getIndexbystkid(stkid)
- tmpday = ABXToolkit.getdayx(sub=locatnum)
- # 取得歷史報價. key d = 歷史日線
- tsub = ABXToolkit.getsubbystkid(stkid)
- historydf = tsub['dfhistorydata']['d']
- if historydf is None:
- return
- # 今日線併入歷史日線
- tmpdaydf = historydf.append(tmpday, ignore_index=True)
- # ptrade.ini策略,將歷史&今日日線計算技術分析判斷策略.
- ABXToolkit.ptrade_buy(ptrade, tmpdaydf, locatnum)
- #
- def backtestfunc(pframe):
- ABXToolkit.ptrade_buy(ptrade, pframe, 1)
- # 將callback設定給api
- ABXToolkit.set_func(my_func)
- # 設定回測func
- ABXToolkit.set_backtestfunc(backtestfunc)
- # 設定ptrade條件成立func,目前把送line通知設定給api跑
- ABXToolkit.set_ptradefunc(linesendmessage)
- pdict = {}
- # 150天前日期
- historydate = ABXToolkit.get_day_of_day(n=-150)
- # 0050 download history
- if ABXToolkit.checkquotefile(stkid=ABXToolkit.getstkidbyindex(1)[2:], \
- fdate=historydate) == False:
- pdict["User"] = "2|0987313526"
- pdict["Password"] = "admin2257"
- pdict["StockFullID"] = ABXToolkit.getstkidbyindex(1)
- pdict["DataType"] = "d"
- pdict["StartDate"] = historydate
- pdict["DataDays"] = "0"
- ABXToolkit.getquotefile(pdict)
- else:
- print('target already exist.')
- print('No need to download.')
- historydf = ABXToolkit.read_file(stkid=ABXToolkit.getstkidbyindex(1)[2:], \
- fdate=historydate)
- tsub1 = ABXToolkit.getsubbyindex(n=1)
- tsub1['dfhistorydata']['d'] = historydf
- fileisread = True
- # 2317 download history
- if ABXToolkit.checkquotefile(stkid=ABXToolkit.getstkidbyindex(2)[2:], \
- fdate=historydate) == False:
- pdict["User"] = "2|0987313526"
- pdict["Password"] = "admin2257"
- pdict["StockFullID"] = ABXToolkit.getstkidbyindex(2)
- pdict["DataType"] = "d"
- pdict["StartDate"] = historydate
- pdict["DataDays"] = "0"
- ABXToolkit.getquotefile(pdict)
- else:
- print('target already exist.')
- print('No need to download.')
- historydf = ABXToolkit.read_file(stkid=ABXToolkit.getstkidbyindex(2)[2:], \
- fdate=historydate)
- tsub2 = ABXToolkit.getsubbyindex(n=2)
- tsub2['dfhistorydata']['d'] = historydf
- fileisread = True
- # 訂閱及時
- pdict["User"] = "2|0987313526"
- pdict["Password"] = "admin2257"
- # 即時訂閱、與回補當天交易明細一起呼叫
- ABXToolkit.subscrip(pdict)# 此功能有設定代碼的會一起訂閱
- # 開盤之後才執行程式需回補之前的成交明細
- # 0050
- pdict['sub'] = 1# server.ini訂閱的序位,與上面訂閱功能不同,回補功能需指定
- ABXToolkit.rebuildTrade(pdict)
- # 2317
- pdict['sub'] = 2# server.ini訂閱的序位
- ABXToolkit.rebuildTrade(pdict)
- # 回測 非測即時時使用,原理:歷史檔案模擬一筆一筆讀入回測,目前先註解
- # 不會與上述subscrip、rebuildTrade同時使用
- # ABXToolkit.backtest(historydf, 1)
- # 卡住程式不結束、輸入任意鍵即可結束程式
- input()
- # 以下結束前寫檔:
- # ptrade_buy的log
- ABXToolkit.plogsave()
- # debug用:
- # 歷史報價容器寫到檔案 debug用
- tsub1 = ABXToolkit.getsubbyindex(n=1)
- tsub2 = ABXToolkit.getsubbyindex(n=2)
- ABXToolkit.dftocsv(filename='tsub1.csv', df=tsub1['dfhistorydata']['d'])
- ABXToolkit.dftocsv(filename='tsub2.csv', df=tsub2['dfhistorydata']['d'])
- # 歷史報價+上今日成交明細轉日線
- historydf = tsub1['dfhistorydata']['d']
- tmpday = ABXToolkit.getdayx(sub=1)
- tmpdaydf = historydf.append(tmpday, ignore_index=True)
- ABXToolkit.dftocsv(filename='tmpdaydf1.csv', df=tmpdaydf)
- historydf = tsub2['dfhistorydata']['d']
- tmpday = ABXToolkit.getdayx(sub=2)
- tmpdaydf = historydf.append(tmpday, ignore_index=True)
- ABXToolkit.dftocsv(filename='tmpdaydf2.csv', df=tmpdaydf)
- # 成交明細
- ABXToolkit.dftocsv(filename='dftrade1.csv', df=tsub1['dftrade'].data)
- ABXToolkit.dftocsv(filename='dftrade2.csv', df=tsub2['dftrade'].data)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement