Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # imports:
- import gym
- import random
- import numpy as np
- import tflearn
- from tflearn.layers.core import input_data, dropout, fully_connected
- from tflearn.layers.estimator import regression
- from statistics import median, mean
- from collections import Counter
- # learning rate, this can be changed empirically:
- LR = 1e-3
- # import the env from gym:
- env = gym.make("CartPole-v0")
- # starts a clean slate:
- env.reset()
- # steps would be actually frames..
- goal_steps = 200
- # learn from episodes that have more than this score:
- score_requirement = 50
- # num of games:
- initial_games = 1000
- def some_random_games_first():
- # Each of these is its own game.
- for episode in range(5):
- env.reset()
- # this is each frame, up to 200...but we wont make it that far.
- for t in range(goal_steps):
- # This will display the environment
- # Only display if you really want to see it.
- # Takes much longer to display it. uncomment to see:
- env.render()
- # Sample a random action. In this environment, the action can be 0 or 1, which is left or right.
- action = env.action_space.sample()
- # this executes the environment with an action,
- # and returns the observation of the environment,
- # the reward, if the env is over, and other info.
- observation, reward, done, info = env.step(action)
- if done:
- break
- # some_random_games_first()
- # the game played:
- def initial_population():
- # random moves and observations for scores above score_requirement:
- training_data = []
- # all scores:
- scores = []
- # just the scores that met our threshold:
- accepted_scores = []
- # iterate through however many games we want:
- for _ in range(initial_games):
- score = 0
- # save scores for the game:
- game_memory = []
- # previous observation that we saw
- prev_observation = []
- # for each frame in 200
- for _ in range(goal_steps):
- # choose random action (0 or 1)
- # somewhat like action = env.action_space.sample() that we had before, but less general:
- action = random.randrange(0,2)
- # as before:
- observation, reward, done, info = env.step(action)
- # notice that the observation is returned FROM the action
- # so we'll store the previous observation here, pairing
- # the prev observation to the action we'll take.
- if len(prev_observation) > 0 :
- # actually save into the game memory:
- game_memory.append([prev_observation, action])
- # since previous observation was saved corresponding to the aciton, update to current observation:
- prev_observation = observation
- #
- score+=reward
- if done: break
- # save scores that met the target score i.e. score_requirement:
- if score >= score_requirement:
- accepted_scores.append(score)
- for data in game_memory:
- # while this game is binary (move left or right) for its actions,
- # it is better to write a more generalized version to enable reacher games:
- # convert to one-hot (this is the output layer for our neural network) the left or right move:
- if data[1] == 1:
- # so this would be the output layer:
- output = [0,1]
- elif data[1] == 0:
- output = [1,0]
- # saving our training data, which is observations and output layer generated:
- training_data.append([data[0], output])
- # reset env to play again
- env.reset()
- # save overall scores
- scores.append(score)
- # save the overall training data for logging purposes.
- # this is absolutely optional:
- training_data_save = np.array(training_data)
- np.save('training_data_save.npy',training_data_save)
- # the average of the scores that were over the bar we set (sort of a benchmark):
- print('Average accepted score:',mean(accepted_scores))
- # same, but median:
- print('Median score for accepted scores:',median(accepted_scores))
- # count the num of accepted scores
- print(Counter(accepted_scores))
- return training_data
- # initial_population()
- def neural_network_model(input_size):
- # input layer:
- network = input_data(shape=[None, input_size, 1], name='input')
- # fully connected layer, using relu and 128 nodes:
- network = fully_connected(network, 128, activation='relu')
- # dropout to improve performance (optional):
- # the .5 is the dropout rate; I've read that .5 is recommended but this should be empirically tested:
- network = dropout(network, 0.5)
- # layers, layers...
- network = fully_connected(network, 256, activation='relu')
- network = dropout(network, 0.8)
- network = fully_connected(network, 512, activation='relu')
- network = dropout(network, 0.5)
- network = fully_connected(network, 256, activation='relu')
- network = dropout(network, 0.5)
- network = fully_connected(network, 128, activation='relu')
- network = dropout(network, 0.5)
- # fully connected to wrap up an output:
- network = fully_connected(network, 2, activation='softmax')
- # squeeze through regression to get a tangible output:
- network = regression(network, optimizer='adam', learning_rate=LR, loss='categorical_crossentropy', name='targets')
- model = tflearn.DNN(network, tensorboard_dir='log')
- return model
- # mode can also be imported instead of False (otherwise, will be created)
- def train_model(training_data, model=False):
- # parse observations from training data and reshape them so they can be squeezed onto the network:
- X = np.array([i[0] for i in training_data]).reshape(-1,len(training_data[0][0]),1)
- # actions:
- y = [i[1] for i in training_data]
- if not model:
- # if model does not alreayd exist, create it:
- model = neural_network_model(input_size = len(X[0]))
- # fit the model:
- model.fit({'input': X}, {'targets': y}, n_epoch=7, snapshot_step=500, show_metric=True, run_id='openai_learning')
- return model
- # that's somewhat unnecessary renaming:
- training_data = initial_population()
- # model is actuayll a trained model here:
- model = train_model(training_data)
- # I might have done instead the shorter version:
- # model = train_model(initial_population())
- # there is no right or wrong, it's a matter of style I think.
- # save the model:
- model.save('notGreat.model')
- #################################### actually play the game: ####################################
- scores = []
- choices = []
- # play 10 games:
- for each_game in range(10):
- # same setup:
- score = 0
- game_memory = []
- prev_obs = []
- env.reset()
- for _ in range(goal_steps):
- # comment out if speed is importnat:
- env.render()
- # choose randomly for the first time:
- if len(prev_obs)==0:
- action = random.randrange(0,2)
- else:
- # get the maximum of one-hot of the prediction based on the previous step:
- action = np.argmax(model.predict(prev_obs.reshape(-1,len(prev_obs),1))[0])
- # I would refarctor here once again to something like:
- # prediction = model.predict(prev_obs.reshape(-1,len(prev_obs),1))[0]
- # action = np.argmax(prediction)
- choices.append(action)
- new_observation, reward, done, info = env.step(action)
- prev_obs = new_observation
- game_memory.append([new_observation, action])
- score+=reward
- if done: break
- scores.append(score)
- # different matrics:
- print('Average Score:',sum(scores)/len(scores))
- print('choice 1:{} choice 0:{}'.format(choices.count(1)/len(choices),choices.count(0)/len(choices)))
- print(score_requirement)
Add Comment
Please, Sign In to add comment