Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import matplotlib.pyplot as plt
- from datetime import datetime
- import pandas as pd
- import numpy as np
- import requests
- import silence_tensorflow.auto
- from keras.models import Sequential
- from keras.layers import Dense
- from keras.layers import LSTM
- from sklearn.preprocessing import MinMaxScaler
- from sklearn.metrics import mean_squared_error
- # Dataset
- if True:
- json = requests.get('https://api.kraken.com/0/public/OHLC?pair=BTCUSDT&interval=1440&since=1483228800').json()
- data = json['result']
- data = data[list(data.keys())[0]]
- df = pd.DataFrame(data, columns = ['date', 'open', 'high', 'low', 'close', 'vwap', 'volume', 'number_of_trades'])
- for x in df.columns:
- if df[x].dtypes == np.dtype(object):
- df[x] = df[x].astype(np.float32)
- df['date'] = pd.to_datetime(df['date'], unit='s')
- dataset = df['close'][-1000:]
- else:
- from random import random
- dataset = [random() for x in range(100)]
- look_back = int(len(dataset)/10)
- print(dataset)
- # Prepare data
- dataset = np.array(dataset).reshape(-1, 1)
- scaler = MinMaxScaler(feature_range=(0, 1))
- dataset = scaler.fit_transform(dataset)
- def split(dataset, p):
- train_size = int(len(dataset) * p)
- test_size = len(dataset) - train_size
- return dataset[0:train_size,:], dataset[train_size:len(dataset), :]
- def create_dataset(_dataset, _look_back=1):
- data_x, data_y = [], []
- for i in range(len(_dataset) - _look_back - 1):
- a = _dataset[i:(i + _look_back), 0]
- data_x.append(a)
- data_y.append(_dataset[i + _look_back, 0])
- data_x = np.array(data_x)
- data_x = np.reshape(data_x, (data_x.shape[0], 1, data_x.shape[1]))
- return data_x, np.array(data_y)
- train, test = split(dataset, 0.5)
- train_x, train_y = create_dataset(train, look_back)
- test_x, test_y = create_dataset(test, look_back)
- # Model
- model = Sequential()
- model.add(LSTM(4, input_shape=(1, look_back)))
- model.add(Dense(1))
- model.compile(loss='mean_squared_error', optimizer='adam')
- model.fit(train_x, train_y, epochs=1, batch_size=1)
- train_predict = model.predict(train_x)
- test_predict = model.predict(test_x)
- # Transform result
- train_predict = scaler.inverse_transform(train_predict)
- train_y = scaler.inverse_transform([train_y])
- test_predict = scaler.inverse_transform(test_predict)
- test_y = scaler.inverse_transform([test_y])
- # Plot
- def to_plot(dataset, predict, start, end):
- predict_plot = np.empty_like(dataset)
- predict_plot[:, :] = np.nan
- predict_plot[start:end, :] = predict
- return predict_plot
- train_predict_plot = to_plot(dataset, train_predict, look_back, len(train_predict) + look_back)
- test_predict_plot = to_plot(dataset, test_predict, look_back + len(train_predict) + look_back + 1, len(dataset) - 1)
- dates = pd.date_range(start=df['date'][0], periods=len(dataset))
- plt.plot(dates, scaler.inverse_transform(dataset), label='dataset')
- plt.plot(dates, train_predict_plot, label='train')
- plt.plot(dates, test_predict_plot, label='test')
- plt.gcf().autofmt_xdate()
- plt.legend()
- plt.show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement