Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from random import randrange as rand
- import numpy as np
- import pygame, sys
- from keras.models import Sequential
- from keras.layers import Dense, Activation, Flatten, InputLayer
- from keras.optimizers import Adam
- from rl.agents.dqn import DQNAgent
- from rl.policy import EpsGreedyQPolicy
- from rl.memory import SequentialMemory
- # The configuration
- config = {
- 'cell_size': 20,
- 'cols': 10,
- 'rows': 20,
- 'delay': 150,
- 'maxfps': 30
- }
- colors = [
- (0, 0, 0),
- (255, 0, 0),
- (0, 150, 0),
- (0, 0, 255),
- (255, 120, 0),
- (255, 255, 0),
- (180, 0, 255),
- (0, 220, 220)
- ]
- # Define the shapes of the single parts
- tetris_shapes = [
- [[1, 1, 1],
- [0, 1, 0]],
- [[0, 2, 2],
- [2, 2, 0]],
- [[3, 3, 0],
- [0, 3, 3]],
- [[4, 0, 0],
- [4, 4, 4]],
- [[0, 0, 5],
- [5, 5, 5]],
- [[6, 6, 6, 6]],
- [[7, 7],
- [7, 7]]
- ]
- reward_scores = [
- 100,
- 300,
- 500,
- 800
- ]
- def rotate_clockwise(shape):
- return np.rot90(shape,k=3)
- def check_collision(board, shape, offset):
- off_x, off_y = offset
- for cy, row in enumerate(shape):
- for cx, cell in enumerate(row):
- try:
- if cell and board[cy + off_y][cx + off_x]:
- return True
- except IndexError:
- return True
- return False
- def remove_row(board, row):
- del board[row]
- return [[0 for i in range(config['cols'])]] + board
- def join_matrixes(mat1, mat2, mat2_off):
- off_x, off_y = mat2_off
- for cy, row in enumerate(mat2):
- for cx, val in enumerate(row):
- mat1[cy + off_y - 1][cx + off_x] += val
- return mat1
- def new_board():
- board = [[0 for x in range(config['cols'])]
- for y in range(config['rows'])]
- board += [[1 for x in range(config['cols'])]]
- return board
- class TetrisApp(object):
- def __init__(self):
- pygame.init()
- pygame.key.set_repeat(250, 25)
- self.width = config['cell_size'] * config['cols'] + 200
- self.height = config['cell_size'] * config['rows']
- self.stonebag = [i for i in range(len(tetris_shapes))] * 2
- self.current_stone = tetris_shapes[self.stonebag.pop(rand(len(self. stonebag)))]
- self.next_stone = tetris_shapes[self.stonebag.pop(rand(len(self. stonebag)))]
- self.score = 0
- self.current_reward = 0
- self.gameover = False
- self.paused = False
- # self.screen = pygame.display.set_mode((self.width, self.height))
- # pygame.event.set_blocked(pygame.MOUSEMOTION) # We do not need
- # mouse movement
- # events, so we
- # block them.
- self.init_game()
- def new_stone(self):
- if not self.stonebag:
- self.stonebag = [i for i in range(len(tetris_shapes))] * 2
- self.current_stone = self.next_stone
- self.next_stone = tetris_shapes[self.stonebag.pop(rand(len(self.stonebag)))]
- self.stone_x = int(config['cols'] / 2 - len(self.current_stone[0]) / 2)
- self.stone_y = 0
- if check_collision(self.board,
- self.current_stone,
- (self.stone_x, self.stone_y)):
- self.gameover = True
- def init_game(self):
- self.board = new_board()
- self.new_stone()
- self.step_count = 0
- def center_msg(self, msg):
- for i, line in enumerate(msg.splitlines()):
- msg_image = pygame.font.Font(
- pygame.font.get_default_font(), 12).render(
- line, False, (255, 255, 255), (0, 0, 0))
- msgim_center_x, msgim_center_y = msg_image.get_size()
- msgim_center_x //= 2
- msgim_center_y //= 2
- self.screen.blit(msg_image, (
- self.width // 2 - msgim_center_x,
- self.height // 2 - msgim_center_y + i * 22))
- def draw_matrix(self, matrix, offset):
- off_x, off_y = offset
- for y, row in enumerate(matrix):
- for x, val in enumerate(row):
- if val:
- pygame.draw.rect(
- self.screen,
- colors[val],
- pygame.Rect(
- (off_x + x) *
- (config['cell_size']),
- (off_y + y) *
- (config['cell_size']),
- (config['cell_size']),
- (config['cell_size'])), 0)
- def move(self, delta_x):
- if not self.gameover and not self.paused:
- new_x = self.stone_x + delta_x
- if new_x < 0:
- new_x = 0
- if new_x > config['cols'] - len(self.current_stone[0]):
- new_x = config['cols'] - len(self.current_stone[0])
- if not check_collision(self.board,
- self.current_stone,
- (new_x, self.stone_y)):
- self.stone_x = new_x
- def quit(self):
- self.center_msg("Exiting...")
- pygame.display.update()
- sys.exit()
- def drop(self):
- if not self.gameover and not self.paused:
- self.stone_y += 1
- if check_collision(self.board,
- self.current_stone,
- (self.stone_x, self.stone_y)):
- self.board = join_matrixes(
- self.board,
- self.current_stone,
- (self.stone_x, self.stone_y))
- self.new_stone()
- combo = 0
- while True:
- for i, row in enumerate(self.board[:-1]):
- if 0 not in row:
- self.board = remove_row(
- self.board, i)
- combo += 1
- break
- else:
- break
- # give reward
- if combo != 0:
- self.score += reward_scores[combo - 1]
- self.current_reward = reward_scores[combo - 1]
- def rotate_stone(self):
- if not self.gameover and not self.paused:
- new_stone = rotate_clockwise(self.current_stone)
- # if self.stone_x > config["cols"] - 3:
- for i in range(1, 4):
- if check_collision(self.board,
- new_stone,
- (self.stone_x, self.stone_y)) and not check_collision(self.board, new_stone, (
- self.stone_x - i, self.stone_y)):
- if self.stone_x - i >= 0:
- self.stone_x -= i
- self.current_stone = new_stone
- elif not check_collision(self.board,
- new_stone,
- (self.stone_x, self.stone_y)):
- self.current_stone = new_stone
- def toggle_pause(self):
- self.paused = not self.paused
- def reset(self):
- if self.gameover:
- self.init_game()
- self.score = 0
- self.gameover = False
- return self.get_state()
- def get_state(self):
- state = []
- for row in self.board:
- for value in row:
- if value == 0:
- state += [0]
- else:
- state += [1]
- for i, row in enumerate(self.current_stone):
- for j, column in enumerate(row):
- if column != 0:
- state[(self.stone_y + i) * config['cols'] + self.stone_x + j] = 0.5
- """
- print("## BOARD ##")
- for i, value in enumerate(state):
- if (i + 1) % config['cols'] == 0:
- print(value)
- else:
- print(value, end="")
- """
- return state
- def step(self, a):
- self.step_count += 1
- key_actions = {
- 'ESCAPE': self.quit,
- 'LEFT': lambda: self.move(-1),
- 'RIGHT': lambda: self.move(+1),
- 'DOWN': self.drop,
- 'UP': self.rotate_stone,
- 'p': self.toggle_pause,
- 'SPACE': self.reset
- }
- actions = ['LEFT', 'RIGHT', 'DOWN', 'UP']
- key_actions[actions[a]]()
- if self.step_count % 3:
- self.drop()
- self.current_reward += 5
- new_s = self.get_state()
- r = self.current_reward
- done = False
- if self.gameover:
- print(self.get_state())
- done = True
- r = -100
- self.current_reward = 0
- return new_s, r, done, {}
- #print(s,a,r,sep="\n")
- nb_actions = 4
- np.random.seed(123)
- model = Sequential()
- model.add(Flatten(input_shape=(1,210)))
- model.add(Dense(128))
- model.add(Activation('relu'))
- model.add(Dense(64))
- model.add(Activation('relu'))
- model.add(Dense(nb_actions))
- model.add(Activation('linear'))
- print(model.summary())
- policy = EpsGreedyQPolicy()
- memory = SequentialMemory(limit=50000, window_length=1)
- dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=40,
- target_model_update=1e-2, policy=policy)
- dqn.compile(Adam(lr=1e-3), metrics=['mae'])
- env = TetrisApp()
- print(env)
- # Okay, now it's time to learn something! We visualize the training here for show, but this slows down training quite a lot.
- dqn.fit(env, nb_steps=500000, visualize=False, verbose=2)
- dqn.test(env, nb_episodes=5, visualize=False)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement