Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pygame.constants import K_DOWN, K_UP, K_LEFT, K_RIGHT
- from pygame_player import PyGamePlayer
- from skimage import data
- from skimage.color import rgb2gray
- from skimage.transform import rescale
- from skimage import io, exposure, img_as_uint, img_as_float
- from collections import deque
- from random import randint
- import matplotlib.pyplot as plt
- import matplotlib.image as mpimg
- import random
- import sys
- import os
- import time
- import numpy as np
- import theano
- import theano.tensor as T
- import lasagne
- ACTIONS_COUNT = 5
- STATE_FRAMES = 4
- RESIZED_SCREEN_X, RESIZED_SCREEN_Y = 80, 80
- MINI_BATCH_SIZE = 32
- all_actions = [K_DOWN, K_UP, K_LEFT, K_RIGHT, []]
- class SnakePlayer(PyGamePlayer):
- ACTIONS_COUNT = 5 # number of valid actions. In this case up, still and down
- FUTURE_REWARD_DISCOUNT = 0.99 # decay rate of past observations
- OBSERVATION_STEPS = 500. # time steps to observe before training
- EXPLORE_STEPS = 2000000. # frames over which to anneal epsilon
- INITIAL_RANDOM_ACTION_PROB = 1.0 # starting chance of an action being random
- FINAL_RANDOM_ACTION_PROB = 0.05 # final chance of an action being random
- MEMORY_SIZE = 590000 # number of observations to remember
- MINI_BATCH_SIZE = 32 # size of mini batches
- STATE_FRAMES = 4 # number of frames to store in the state
- RESIZED_SCREEN_X, RESIZED_SCREEN_Y = (80, 80)
- OBS_LAST_STATE_INDEX, OBS_ACTION_INDEX, OBS_REWARD_INDEX, OBS_CURRENT_STATE_INDEX, OBS_TERMINAL_INDEX = range(5)
- SAVE_EVERY_X_STEPS = 10000
- LEARN_RATE = 1e-6
- STORE_SCORES_LEN = 200.
- def __init__(self, force_game_fps=5, run_real_time=False, playback_mode=False):
- """
- Example class for playing Pong
- """
- # Prepare Theano variables for inputs and targets
- input_var = T.tensor4('inputs')
- target_var = T.ivector('targets')
- states = T.tensor4('states')
- super(SnakePlayer, self).__init__(force_game_fps=force_game_fps, run_real_time=run_real_time)
- # self.last_score = 1
- self._output_layer = SnakePlayer.build_cnn(input_var)
- self._previous_observations = deque()
- self._last_score = 1
- self._target = T.dscalar('target')
- self._action = T.TensorType('float64', (False,)*ACTIONS_COUNT)
- self._states_shared = theano.shared(
- np.ones((MINI_BATCH_SIZE, STATE_FRAMES, RESIZED_SCREEN_X, RESIZED_SCREEN_Y),
- dtype=theano.config.floatX))
- readout = lasagne.layers.get_output(self._output_layer, self._states_shared)
- print readout[0].eval()
- self._playback_mode = playback_mode
- self._observations = deque()
- self._last_scores = deque()
- # set the first action to do nothing
- self._last_action = np.zeros(self.ACTIONS_COUNT)
- self._last_action[1] = 1
- self._last_state = None
- self._probability_of_random_action = self.INITIAL_RANDOM_ACTION_PROB
- self._time = 0
- def get_keys_pressed(self, screen_array, feedback, terminal):
- scr = rescale(screen_array, 0.25)
- scr = rgb2gray(scr)
- action = all_actions[randint(0,3)]
- # return [action]
- ###############################################
- reward = feedback
- if reward != 0.0:
- self._last_scores.append(reward)
- if len(self._last_scores) > self.STORE_SCORES_LEN:
- self._last_scores.popleft()
- # first frame must be handled differently
- if self._last_state is None:
- # the _last_state will contain the image data from the last self.STATE_FRAMES frames
- self._last_state = np.stack(tuple(scr for _ in range(self.STATE_FRAMES)), axis=2)
- return SnakePlayer._key_presses_from_action(self._last_action)
- scr = np.reshape(scr, (RESIZED_SCREEN_X, RESIZED_SCREEN_Y, 1))
- print scr.shape
- print np.rollaxis(scr, 2, 0).shape
- print np.rollaxis(self._last_state, 2, 0).shape
- current_state = np.append(scr, self._last_state[:, :, 1:], axis=2)
- print current_state.shape
- if not self._playback_mode:
- # store the transition in previous_observations
- self._observations.append((self._last_state, self._last_action, reward, current_state, terminal))
- if len(self._observations) > self.MEMORY_SIZE:
- self._observations.popleft()
- # only train if done observing
- if len(self._observations) > self.OBSERVATION_STEPS:
- print "start training..."
- self._train()
- self._time += 1
- print len(self._observations)
- # update the old values
- self._last_state = current_state
- self._last_action = self._choose_next_action()
- if not self._playback_mode:
- # gradually reduce the probability of a random actionself.
- if self._probability_of_random_action > self.FINAL_RANDOM_ACTION_PROB \
- and len(self._observations) > self.OBSERVATION_STEPS:
- self._probability_of_random_action -= \
- (self.INITIAL_RANDOM_ACTION_PROB - self.FINAL_RANDOM_ACTION_PROB) / self.EXPLORE_STEPS
- print("Time: %s random_action_prob: %s reward %s scores differential %s" %
- (self._time, self._probability_of_random_action, reward,
- sum(self._last_scores) / self.STORE_SCORES_LEN))
- return SnakePlayer._key_presses_from_action(self._last_action)
- ###############################################
- def get_feedback(self):
- # import must be done here because otherwise importing would cause the game to start playing
- from snake import snake
- score = snake.length
- # get the difference in score between this and the last run
- score_change = (score - self._last_score)
- self.last_score = score
- return float(score_change), False
- def _choose_next_action(self):
- new_action = np.zeros([self.ACTIONS_COUNT])
- if self._playback_mode or (random.random() <= self._probability_of_random_action):
- # choose an action randomly
- action_index = random.randrange(self.ACTIONS_COUNT)
- else:
- # choose an action given our last state
- # readout_t = self._session.run(self._output_layer, feed_dict={self._input_layer: [self._last_state]})[0]
- last_state = np.rollaxis(self._last_state, 2, 0)
- readout_t = lasagne.layers.get_output(self._output_layer, self._last_state)
- action_index = np.argmax(readout_t)
- new_action[action_index] = 1
- print "****************************************** ACTION *******************************************"
- return new_action
- def _train(self):
- # Prepare Theano variables for inputs and targets
- input_var = T.tensor4('inputs')
- target_var = T.ivector('targets')
- states = T.tensor4('states')
- print "sampling mini batch..."
- # sample a mini_batch to train on
- mini_batch = random.sample(self._observations, self.MINI_BATCH_SIZE)
- # get the batch variables
- previous_states = [d[self.OBS_LAST_STATE_INDEX] for d in mini_batch]
- actions = [d[self.OBS_ACTION_INDEX] for d in mini_batch]
- rewards = [d[self.OBS_REWARD_INDEX] for d in mini_batch]
- current_states = np.array([d[self.OBS_CURRENT_STATE_INDEX] for d in mini_batch])
- agents_expected_reward = []
- # print np.rollaxis(current_states, 3, 1).shape
- print "compiling current states..."
- current_states = np.rollaxis(current_states, 3, 1)
- current_states = theano.compile.sharedvalue.shared(current_states)
- print "getting network output from current states..."
- agents_reward_per_action = lasagne.layers.get_output(self._output_layer, current_states)
- print agents_reward_per_action.eval()
- print "rewards adding..."
- for i in range(len(mini_batch)):
- if mini_batch[i][self.OBS_TERMINAL_INDEX]:
- # this was a terminal frame so need so scale future reward...
- agents_expected_reward.append(rewards[i])
- else:
- agents_expected_reward.append(
- rewards[i] + self.FUTURE_REWARD_DISCOUNT * np.max(agents_reward_per_action[i].eval()))
- print len(agents_expected_reward)
- print len(actions)
- @staticmethod
- def build_cnn(input_var=None):
- l_in = lasagne.layers.InputLayer(shape=(None, 4, 80, 80),
- input_var=input_var)
- l_conv1 = lasagne.layers.Conv2DLayer(
- l_in,
- num_filters=32,
- filter_size=(8, 8),
- stride=(4, 4),
- nonlinearity=lasagne.nonlinearities.rectify,
- W=lasagne.init.HeUniform(),
- b=lasagne.init.Constant(.1)
- )
- l_conv2 = lasagne.layers.Conv2DLayer(
- l_conv1,
- num_filters=64,
- filter_size=(4, 4),
- stride=(2, 2),
- nonlinearity=lasagne.nonlinearities.rectify,
- W=lasagne.init.HeUniform(),
- b=lasagne.init.Constant(.1)
- )
- l_conv3 = lasagne.layers.Conv2DLayer(
- l_conv2,
- num_filters=64,
- filter_size=(3, 3),
- stride=(1, 1),
- nonlinearity=lasagne.nonlinearities.rectify,
- W=lasagne.init.HeUniform(),
- b=lasagne.init.Constant(.1)
- )
- l_hidden1 = lasagne.layers.DenseLayer(
- l_conv3,
- num_units=512,
- nonlinearity=lasagne.nonlinearities.rectify,
- W=lasagne.init.HeUniform(),
- b=lasagne.init.Constant(.1)
- )
- l_out = lasagne.layers.DenseLayer(
- l_hidden1,
- num_units=5,
- nonlinearity=None,
- W=lasagne.init.HeUniform(),
- b=lasagne.init.Constant(.1)
- )
- network_shape = lasagne.layers.get_output_shape(l_out)
- print network_shape
- network = l_out
- return network
- @staticmethod
- def _key_presses_from_action(action_set):
- if action_set[0] == 1:
- return [K_DOWN]
- elif action_set[1] == 1:
- return [K_RIGHT]
- elif action_set[2] == 1:
- return [K_UP]
- elif action_set[3] == 1:
- return [K_UP]
- elif action_set[4] == 1:
- return []
- raise Exception("Unexpected action")
- if __name__ == '__main__':
- player = SnakePlayer()
- player.playing = True
- # importing pong will start the game playing
- import snake
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement