Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from random import randint
- import requests
- import json
- import matplotlib.pyplot as plt
- import numpy as np
- import tensorflow as tf
- from math import floor, ceil
- class InvestmentPredictor(object):
- def __init__(self, **kwargs):
- self.DataUrl = 'https://api.blockchain.info/charts/market-price?format=json'
- with open('market-price.json', 'r') as f:
- self.data = json.load(f)
- self.rng = 8
- #self.data = json.loads(requests.get(self.DataUrl).text)
- #with open('market-price.json', 'w') as f:
- #json.dump(self.data, f, indent=4)
- def _train(self):
- pass
- def parse_data(self):
- values = self.data['values']
- x = [round(p['x'], 2) for p in values]
- y = [round(p['y'], 2) for p in values]
- return x, y
- def _get_point(self, **kwargs):
- x, y = self.parse_data()
- if kwargs['r'] is True:
- return randint(0, len(x))
- else:
- axis = kwargs['axis']
- p = kwargs['p']
- rng = kwargs['rng']
- if axis == 'y':
- return y[p-rng:p+1]
- elif axis == 'b':
- return x[p-rng:p+1], y[p-rng:p+1]
- else:
- return x[p-rng:p+1]
- def _get_trends(self, array, per=False):
- trends = []
- for v in range(len(array)):
- if v != 0:
- avg = ((array[v]+array[v-1])/2)
- if per is True:
- trends.append(round(((array[v]-array[v-1])/avg)*100, 5))
- else:
- trends.append(round((array[v]-array[v-1]), 5))
- return trends
- def _certify(self, p=None, h=1):
- if p is None: p = self._get_point(r=True)
- sub_y = self._get_point(r=False, p=p, axis='y', rng=self.rng)
- div = len(sub_y) - 2
- test_y = sub_y[div]
- pre_y = sub_y[0: div+1]
- post_y = sub_y[div::]
- pre_trends_p = self._get_trends(pre_y, per=True)
- post_trends_d = self._get_trends(post_y, per=False)
- post_trends_p = self._get_trends(post_y, per=True)
- pre_overall_trend_p = sum(pre_trends_p)/len(pre_trends_p)
- post_overall_trend_d = sum(post_trends_d)/len(post_trends_d)
- post_overall_trend_p = sum(post_trends_p)/len(post_trends_p)
- pre_trends_d = self._get_trends(pre_y, per=False)
- pre_overall_trend_d = sum(pre_trends_d)/len(pre_trends_d)
- worth = 0
- out_set = [pre_overall_trend_d + pre_overall_trend_p + test_y] + pre_trends_d + pre_trends_p
- if post_overall_trend_d > 0 and post_overall_trend_p > h:
- worth = 1
- return out_set, worth
- def _get_training_set(self):
- x_set = []
- y_set = []
- for i in range(len(self.data['values'][0:300])):
- if i > self.rng-1:
- x, y = self._certify(p=i)
- x_set.append(x)
- y_set.append(y)
- return x_set, y_set
- def _get_testing_set(self):
- x_set = []
- y_set = []
- for i in range(len(self.data['values'][300::])):
- if i > self.rng-1:
- x, y = self._certify(p=i)
- x_set.append(x)
- y_set.append(y)
- return x_set, y_set
- class NeuralNetwork(object):
- def __init__(self):
- print(tf.__version__)
- inp = InvestmentPredictor(nn=False)
- x, y = inp._get_training_set()
- x_t, y_t = inp._get_testing_set()
- self.x_train = tf.keras.utils.normalize(x, axis=1)
- self.x_test = tf.keras.utils.normalize(x_t, axis=1)
- self.y_train = np.array(y)
- self.y_test = np.array(y_t)
- self.model = tf.keras.models.Sequential()
- def train(self, e=4):
- self.model.add(tf.keras.layers.Dense(128, activation=tf.nn.relu))
- self.model.add(tf.keras.layers.Dense(128, activation=tf.nn.relu))
- self.model.add(tf.keras.layers.Dense(10, activation=tf.nn.softmax))
- self.model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
- self.model.fit(self.x_train, self.y_train, epochs=e)
- def test(self):
- val_loss, val_acc = self.model.evaluate(self.x_test, self.y_test)
- print('Values Lost: {}, Accuracy: {}'.format(val_loss, val_acc))
- nn = NeuralNetwork()
- nn.train()
- nn.test()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement