Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pygame
- from PIL import Image
- import numpy as np
- import math
- import matplotlib.pyplot as plt
- from PIL import Image
- import random
- import os
- import numpy as np
- import keras.backend.tensorflow_backend as backend
- from keras.models import Sequential
- from keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Activation, Flatten
- from keras.optimizers import Adam
- from keras.callbacks import TensorBoard
- import tensorflow as tf
- from collections import deque
- import time
- import random
- from tqdm import tqdm
- import os
- from PIL import Image
- import cv2
- import pickle
- SIZE = 500
- replay_memory_size = 50_000
- MODEL_NAME = "256x2"
- min_replay_memory_size = 1_000
- minibatch_size = 64
- discount = 0.95
- update_target_every = 5
- MIN_REWARD = -200 # For model save
- MEMORY_FRACTION = 0.20
- c = 0
- x_train = []
- y_train = []
- # Environment settings
- EPISODES = 10_000
- epsilon = 0.4198 # not a constant, going to be decayed
- EPSILON_DECAY = 0.99975
- MIN_EPSILON = 0.001
- # Stats settings
- AGGREGATE_STATS_EVERY = 50 # episodes
- SHOW_PREVIEW = True
- k = 0
- run_options = tf.compat.v1.RunOptions(report_tensor_allocations_upon_oom = True)
- def iscollision(x_apple, y_apple, x, y):
- distence = math.sqrt((math.pow(x_apple - x, 2)) + (math.pow(y_apple - y, 2)))
- '''if distence < 3:
- return True
- else:
- return False'''
- return distence
- class blob:
- def __init__(self):
- self.x = 80
- self.y = 300
- self.width = 20
- self.height = 20
- def collision(self, other):
- if self.x == other.x and self.y == other.y:
- return True
- def move(self, select):
- if select == 1:
- self.x -= 1
- if select == 2:
- self.x += 1
- if select == 3:
- self.y += 1
- if select == 4:
- self.y -= 1
- class blobfood:
- def __init__(self):
- self.x = []
- self.y = []
- self.width = 20
- self.height = 20
- self.x.append(80)
- self.y.append(350)
- self.x.append(80)
- self.y.append(120)
- self.x.append(80)
- self.y.append(220)
- self.x.append(250)
- self.y.append(115)
- self.x.append(400)
- self.y.append(350)
- self.x.append(400)
- self.y.append(120)
- self.x.append(400)
- self.y.append(220)
- self.x.append(250)
- self.y.append(360)
- def collision(self, other):
- if self.x == other.x and self.y == other.y:
- return True
- def move(self, select):
- if select == 1:
- self.x -= 0.1
- if select == 2:
- self.x += 0.1
- if select == 3:
- self.y += 0.1
- if select == 4:
- self.y -= 0.1
- class walls:
- def __init__(self):
- self.x = []
- self.y = []
- self.width = []
- self.height = []
- self.x.append(50)
- self.y.append(100)
- self.width.append(1)
- self.height.append(100)
- self.x.append(50)
- self.y.append(100)
- self.width.append(100)
- self.height.append(1)
- self.x.append(150)
- self.y.append(100)
- self.width.append(100)
- self.height.append(1)
- self.x.append(250)
- self.y.append(100)
- self.width.append(100)
- self.height.append(1)
- self.x.append(350)
- self.y.append(100)
- self.width.append(100)
- self.height.append(1)
- self.x.append(435)
- self.y.append(100)
- self.width.append(1)
- self.height.append(100)
- self.x.append(435)
- self.y.append(200)
- self.width.append(1)
- self.height.append(100)
- self.x.append(435)
- self.y.append(300)
- self.width.append(1)
- self.height.append(100)
- self.x.append(350)
- self.y.append(385)
- self.width.append(100)
- self.height.append(1)
- self.x.append(250)
- self.y.append(385)
- self.width.append(100)
- self.height.append(1)
- self.x.append(150)
- self.y.append(385)
- self.width.append(100)
- self.height.append(1)
- self.x.append(50)
- self.y.append(385)
- self.width.append(100)
- self.height.append(1)
- self.x.append(50)
- self.y.append(300)
- self.width.append(1)
- self.height.append(100)
- self.x.append(50)
- self.y.append(200)
- self.width.append(1)
- self.height.append(100)
- #ggggggggggggggggggggggggggggg
- #14
- self.x.append(130)
- self.y.append(170)
- self.width.append(1)
- self.height.append(80)
- #15
- self.x.append(130)
- self.y.append(230)
- self.width.append(1)
- self.height.append(80)
- #16
- self.x.append(145)
- self.y.append(155)
- self.width.append(80)
- self.height.append(1)
- #17
- self.x.append(210)
- self.y.append(155)
- self.width.append(80)
- self.height.append(1)
- #18
- self.x.append(270)
- self.y.append(155)
- self.width.append(80)
- self.height.append(1)
- self.x.append(350)
- self.y.append(170)
- self.width.append(1)
- self.height.append(80)
- self.x.append(350)
- self.y.append(230)
- self.width.append(1)
- self.height.append(80)
- self.x.append(145)
- self.y.append(310)
- self.width.append(80)
- self.height.append(1)
- #self.x.append(130)
- #self.y.append(350)
- #self.width.append(80)
- #self.height.append(1)
- self.x.append(225)
- self.y.append(310)
- self.width.append(80)
- self.height.append(1)
- self.x.append(270)
- self.y.append(310)
- self.width.append(80)
- self.height.append(1)
- class BlobEnv:
- SIZE = 500
- RETURN_IMAGES = True
- MOVE_PENALTY = 10
- ENEMY_PENALTY = 300
- FOOD_REWARD = 100
- OBSERVATION_SPACE_VALUES = (SIZE, SIZE, 3) # 4
- ACTION_SPACE_SIZE = 9
- PLAYER_N = 1 # player key in dict
- FOOD_N = 2 # food key in dict
- ENEMY_N = 3 # enemy key in dict
- #pygame.init()
- #win = pygame.display.set_mode((SIZE, SIZE))
- # the dict! (colors)
- d = {1: (255, 175, 0),
- 2: (0, 255, 0),
- 3: (0, 0, 255)}
- def __init__(self):
- self.win = pygame.display.set_mode((SIZE, SIZE))
- self.wall = walls()
- self.player = blob()
- self.food = blobfood()
- self.MOVE_PENALTY = 10
- self.ENEMY_PENALTY = 300
- self.FOOD_REWARD = 100
- self.size = 1000
- self.i = 0
- def reset(self):
- pygame.init()
- self.win = pygame.display.set_mode((SIZE, SIZE))
- self.wall = walls()
- self.player = blob()
- self.food = blobfood()
- self.MOVE_PENALTY = 5
- self.ENEMY_PENALTY = 100
- self.FOOD_REWARD = 40
- self.i = 0
- path = "images/screenshot.jpeg"
- pygame.image.save(self.win, path)
- im = Image.open(path)
- observation = np.array(im)
- observation = np.resize(observation, (10, 10, 3))
- #print(observation.shape)
- return observation
- def get_all(self, done, choice,k):
- new_observation = np.array(self.get_image())
- new_observation = np.resize(new_observation, (10, 10, 3))
- self.player.move(choice)
- for c in range(len(self.wall.x)):
- if self.wall.height[c] > self.wall.width[c]:
- h_w = self.wall.height[c]
- for i in range(self.wall.height[c]):
- # print(wall.x[d])
- distence = iscollision(self.wall.x[c], self.wall.y[c] + (i + 5), self.player.x, self.player.y)
- if distence < 3:
- reward = -self.ENEMY_PENALTY
- done = True
- elif self.wall.height[c] < self.wall.width[c]:
- for i in range(self.wall.width[c]):
- distence = iscollision(self.wall.x[c] + (i + 5), self.wall.y[c], self.player.x, self.player.y)
- if distence < 3:
- reward = -self.ENEMY_PENALTY
- done = True
- for d in range(len(self.food.x)):
- distence = iscollision(self.food.x[d], self.food.y[d], self.player.x, self.player.y)
- if distence < 20:
- reward = self.FOOD_REWARD
- if self.i == 8:
- done = True
- self.i = 0
- self.i += 1
- self.food.x[d] = 20000
- self.food.y[d] = 20000
- #print("eat")
- else:
- reward = -self.MOVE_PENALTY
- return new_observation, reward, done
- def render(self):
- self.get_image()
- pygame.display.update()
- def get_image(self):
- self.win.fill((0, 0, 0))
- # env = np.zeros((self.size, self.size, 3), dtype=np.uint8)
- pygame.draw.rect(self.win, (255, 0, 0), (self.player.x, self.player.y, self.player.width, self.player.height))
- for b in range(len(self.food.x)):
- pygame.draw.rect(self.win, (255, 0, 0), (self.food.x[b], self.food.y[b], self.food.width, self.food.height))
- # env[self.food.x[b]][self.food.y[b]] = self.d[self.FOOD_N]
- for i in range(len(self.wall.x)):
- if i >= 5 and i <= 7:
- pygame.draw.rect(self.win, (255, 0, 0),
- (self.wall.x[i] + 15, self.wall.y[i], self.wall.width[i], self.wall.height[i]))
- # env[self.wall.x[i] + 15][self.wall.y[i]] = self.d[self.ENEMY_N]
- elif i >= 8 and i <= 11:
- pygame.draw.rect(self.win, (255, 0, 0),
- (self.wall.x[i], self.wall.y[i] + 15, self.wall.width[i], self.wall.height[i]))
- # env[self.wall.x[i]][self.wall.y[i] + 15] = self.d[self.ENEMY_N]
- elif i >= 14 and i <= 15:
- pygame.draw.rect(self.win, (255, 0, 0),
- (self.wall.x[i] + 15, self.wall.y[i], self.wall.width[i], self.wall.height[i]))
- # env[self.wall.x[i] + 15][self.wall.y[i]] = self.d[self.ENEMY_N]
- elif i >= 16 and i <= 18:
- pygame.draw.rect(self.win, (255, 0, 0),
- (self.wall.x[i], self.wall.y[i] + 15, self.wall.width[i], self.wall.height[i]))
- # env[self.wall.x[i]][self.wall.y[i] + 15] = self.d[self.ENEMY_N]
- else:
- pygame.draw.rect(self.win, (255, 0, 0),
- (self.wall.x[i], self.wall.y[i], self.wall.width[i], self.wall.height[i]))
- pygame.display.update()
- path = f"images/screenshot1.jpeg"
- pygame.image.save(self.win, path)
- im = Image.open(path)
- return im
- def key_movement(self, size):
- key = pygame.key.get_pressed()
- if key[pygame.K_w]:
- self.player.move(4)
- if key[pygame.K_a]:
- self.player.move(1)
- if key[pygame.K_s]:
- self.player.move(3)
- if key[pygame.K_d]:
- self.player.move(2)
- env = BlobEnv()
- # For stats
- ep_rewards = [-200]
- # For more repetitive results
- random.seed(1)
- np.random.seed(1)
- tf.compat.v1.set_random_seed(1)
- # Create models folder
- if not os.path.isdir('models'):
- os.makedirs('models')
- # Own Tensorboard class
- class ModifiedTensorBoard(tf.compat.v1.keras.callbacks.TensorBoard):
- # Overriding init to set initial step and writer (we want one log file for all .fit() calls)
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.step = 1
- self.writer = tf.summary.create_file_writer(self.log_dir)
- def _write_logs(self, logs, index):
- with self.writer.as_default():
- for name, value in logs.items():
- tf.summary.scalar(name, value, step=index)
- self.step += 1
- self.writer.flush()
- # Overriding this method to stop creating default log writer
- def set_model(self, model):
- pass
- # Overrided, saves logs with our step number
- # (otherwise every .fit() will start writing from 0th step)
- def on_epoch_end(self, epoch, logs=None):
- self.update_stats(**logs)
- # Overrided
- # We train for one batch only, no need to save anything at epoch end
- def on_batch_end(self, batch, logs=None):
- pass
- # Overrided, so won't close writer
- def on_train_end(self, _):
- pass
- # Custom method for saving own metrics
- # Creates writer, writes custom metrics and closes writer
- def update_stats(self, **stats):
- self._write_logs(stats, self.step)
- class DQNagent:
- run_opts = tf.compat.v1.RunOptions(report_tensor_allocations_upon_oom=True)
- def __init__(self):
- # main model gets trained every step
- self.model = self.create_model()
- # Target model this is what we use to predict every step
- self.target_model = self.create_model()
- self.target_model.set_weights(self.model.get_weights())
- self.replay_memory = deque(maxlen=replay_memory_size)
- self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}", profile_batch=0)
- self.target_update_counter = 0
- self.run_opts = tf.compat.v1.RunOptions(report_tensor_allocations_upon_oom=True)
- self.i = 0
- self.b = 0
- def create_model(self):
- model = Sequential()
- model.add(Conv2D(64, (3, 3), input_shape=(10,10,3)))
- model.add(Activation('relu'))
- model.add(MaxPooling2D(pool_size=(2, 2)))
- model.add(Dropout(0.2))
- model.add(Conv2D(160, (3, 3)))
- model.add(Activation("relu"))
- model.add(MaxPooling2D(pool_size=(2, 2)))
- model.add(Dropout(0.2))
- model.add(Flatten()) # this converts our 3D feature maps to 1D feature vectors
- model.add(Dense(41))
- model.add(Dense(51))
- model.add(Dense(4, activation="linear"))
- model.compile(loss="mse",
- optimizer=Adam(lr=0.001),
- metrics=['accuracy'])
- return model
- def update_replay_memory(self, transition):
- self.replay_memory.append(transition)
- def get_qs(self, state):
- return self.model.predict(np.array(state).reshape(-1, *state.shape) / 255)[0]
- def save_model(self):
- self.model.save(f"models/256x10x64x4-{int(time.time())}")
- def train(self, terminal_state, step,x_train_list,y_train_list):
- if len(self.replay_memory) < min_replay_memory_size:
- return
- minibatch = random.sample(self.replay_memory, minibatch_size)
- current_states = np.array([transition[0] for transition in minibatch]) / 255
- current_states = np.resize(current_states, (10, 10, 3))
- current_qs_list = self.model.predict(np.array(current_states).reshape(-1, *current_states.shape))
- new_current_states = np.array([transition[3] for transition in minibatch]) / 255
- new_current_states = np.resize(new_current_states, (10, 10, 3))
- x = []
- y = []
- future_ps_list = self.target_model.predict(np.array(new_current_states).reshape(-1, *new_current_states.shape))
- for index, (current_state, action, reward, new_current_state, done) in enumerate(minibatch):
- self.i += 1
- if not done:
- max_future_q = np.argmax(future_ps_list[-index:])
- new_q = reward + discount * max_future_q
- else:
- new_q = reward
- current_qs = current_qs_list[-index:]
- current_qs[-action:] = new_q
- x.append(current_state)
- y.append(current_qs)
- '''if self.i <= 60_000:
- x_train = np.array(x)
- y_train = np.array(y)
- x_train = np.resize(x_train, (10, 10, 3))
- y_train = np.resize(y_train, (4))
- #x_train = x_train / 255
- x_train_list.append(x_train)
- y_train_list.append(y_train)
- if self.i == 60_000:
- x_train = np.array(x_train_list)
- y_train = np.array(y_train_list)
- print(x_train.shape)
- print(y_train.shape)
- #X_train = np.resize(self.i, 10, 10, 3)
- #y_train = np.resize(self.i, 4)
- #X_train = X_train / 255.0
- #print(X_train.shape)
- #print(y_train.shape)
- if self.b == 1:
- pickle_out = open("x_test.pickle","wb")
- pickle.dump(x_train, pickle_out)
- pickle_out.close()
- pickle_out = open("y_test.pickle","wb")
- pickle.dump(y_train, pickle_out)
- pickle_out.close()
- else:
- pickle_out = open("x.pickle","wb")
- pickle.dump(x_train, pickle_out)
- pickle_out.close()
- pickle_out = open("y.pickle","wb")
- pickle.dump(y_train, pickle_out)
- pickle_out.close()
- self.b += 1
- self.i = 0
- x_train_list = []
- y_train_list = []'''
- x = np.array(x)
- y = np.array(y)
- x = np.resize(x, (64, 10, 10, 3))
- y = np.resize(y, (64, 4))
- x = x / 255
- #print(x.shape)
- #print(y.shape)
- #print(len(y))
- #print(len(x))
- self.model.fit(x, y,epochs=2, batch_size=minibatch_size,verbose=0,shuffle=False,callbacks=[self.tensorboard] if terminal_state else None)
- # updating to determine if we want to update target_model yet
- if terminal_state:
- self.target_update_counter += 1
- if self.target_update_counter > update_target_every:
- self.target_model.set_weights(self.model.get_weights())
- self.target_update_counter = 0
- agent = DQNagent()
- for episode in tqdm(range(1, EPISODES + 1), ascii=True, unit="episode"):
- agent.tensorboard.step = episode
- episode_reward = 0
- step = 1
- current_state = env.reset()
- done = False
- #c = 0
- #d = 0
- if SHOW_PREVIEW and not episode % 10:
- agent.save_model()
- while not done:
- for event in pygame.event.get():
- # check if the event is the X button
- if event.type==pygame.QUIT:
- # if it is quit the game
- done = True
- #d +=1
- #print(c)
- if np.random.random() > epsilon:
- action = np.argmax(agent.get_qs(current_state))
- else:
- action = np.random.randint(0, env.ACTION_SPACE_SIZE)
- new_state, reward, done = env.get_all(done,action,k)
- os.remove(f"images/screenshot1.jpeg")
- k += 1
- episode_reward += reward
- if reward == 100:
- agent.save_model()
- '''lcurrent_state = list(current_state)
- lnew_state = list(new_state)
- if d > 50:
- d = 0
- if np.array_equal(current_state,new_state):
- c += 1
- if c > 30:
- c = 0
- break'''
- #if episode == 1:
- # break
- #if episode > 100:
- # if SHOW_PREVIEW and not episode % 10:
- # env.get_image(render=True)
- #else:
- # if SHOW_PREVIEW and not episode % AGGREGATE_STATS_EVERY:
- # env.get_image(render=True)
- agent.update_replay_memory((current_state, action, reward, new_state, done))
- agent.train(done, step, x_train, y_train)
- current_state = new_state
- step += 1
- if not episode % AGGREGATE_STATS_EVERY or episode == 1:
- average_reward = sum(ep_rewards[-AGGREGATE_STATS_EVERY:]) / len(ep_rewards[-AGGREGATE_STATS_EVERY:])
- min_reward = min(ep_rewards[-AGGREGATE_STATS_EVERY:])
- max_reward = max(ep_rewards[-AGGREGATE_STATS_EVERY:])
- agent.tensorboard.update_stats(reward_avg=average_reward, reward_min=min_reward, reward_max=max_reward,
- epsilon=epsilon)
- pygame.quit()
- ep_rewards.append(episode_reward)
- # Save model, but only when min reward is greater or equal a set value
- if min_reward >= MIN_REWARD:
- agent.model.save(f'models/{MODEL_NAME}__{max_reward:_>7.2f}max_{average_reward:_>7.2f}avg_{min_reward:_>7.2f}min__{int(time.time())}')
- # Decay epsilon
- if epsilon > MIN_EPSILON:
- epsilon *= EPSILON_DECAY
- epsilon = max(MIN_EPSILON, epsilon)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement