Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pygame as pg
- from math import sin, cos, pi, ceil, floor
- import torch as T
- from torch import nn
- import torch.nn.functional as F
- import torch.optim as optim
- import numpy as np
- from numpy.random import random as nprand
- import matplotlib.pyplot as plt
- import os
- import csv
- import pygad, pygad.torchga
- import joblib
- # import time
- # device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- WIDTH, HEIGHT = 1000, 600
- m = 1 # drone mass
- g = 4 # grav. acceleration
- dt = 4 / 60
- l = 1 # length of the base
- eng_l = 0.25 # length of the engine (there are two of them on the left and on the right)
- d = 0.25 # height of both the base and the engines
- drag = 0.1 # drag coefficient
- maxthr = 4 # max engine thrust
- thr_incr = maxthr * dt / 1 # increment by which the power is changed according to the key presses
- I = (m * (l + 2 * eng_l) ** 2 / 12) # Moment of inertia for a thin rod
- fontsize = 18
- # image = pg.image.load("undrtale.png")
- if __name__ == "__main__":
- pg.init()
- font = pg.font.SysFont("arial", fontsize)
- class NewrNet(nn.Module):
- def __init__(self, n_state, n_actions, n_layers, n_neurons, lr=0.001, use_cuda=True):
- super().__init__()
- self.layers = nn.ModuleList()
- self.len = n_layers
- self.n_state = n_state
- self.n_actions = n_actions
- if n_layers == 1:
- self.layers.append(nn.Linear(n_state, n_actions))
- else:
- self.layers.append(nn.Linear(n_state, n_neurons))
- for i in range(n_layers - 2):
- self.layers.append(nn.Linear(n_neurons, n_neurons))
- self.layers.append(nn.Linear(n_neurons, n_actions))
- self.optimizer = optim.Adam(self.parameters(), lr=lr)
- self.loss = nn.MSELoss()
- self.device = T.device('cuda' if T.cuda.is_available() and use_cuda else 'cpu')
- self.to(self.device)
- print(f"using {self.device}")
- def forward(self, x):
- # start = time.time_ns() / 1e6
- for i in range(self.len - 1):
- x = F.relu(self.layers[i](x))
- # end = time.time_ns() / 1e6
- # print(f"QNet forward time: {end - start} ms")
- return self.layers[-1](x)
- class MDPMemory():
- def __init__(self, max_mem, n_state):
- self.mem_size = max_mem
- self.n_state = n_state
- self.state_memory = np.zeros((self.mem_size, n_state), dtype=np.float32)
- self.new_state_memory = np.zeros((self.mem_size, n_state), dtype=np.float32)
- self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
- self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
- self.terminal_memory = np.zeros(self.mem_size, dtype=np.bool_)
- self.mem_countr = 0
- def store(self, state, action, reward, new_state, done):
- i = self.mem_countr % self.mem_size
- self.state_memory[i] = state
- self.action_memory[i] = action
- self.reward_memory[i] = reward
- self.new_state_memory[i] = new_state
- self.terminal_memory[i] = done
- self.mem_countr += 1
- class Agent():
- def __init__(self, gamma, eps, lr, n_state, n_actions, batch_size, max_mem=100000,
- eps_end=0.01, eps_dec=5e-4, n_layers=3, n_neurons=128, use_cuda=True):
- self.gamma = gamma
- self.eps = eps
- self.eps_min = eps_end
- self.eps_dec = eps_dec
- self.action_space = [i for i in range(n_actions)]
- self.n_state = n_state
- self.n_actions = n_actions
- self.n_layers = n_layers
- self.n_neurons = n_neurons
- self.lr = lr
- self.batch_size = batch_size
- self.mem_size = max_mem
- self.mem_countr = 0
- self.eval = NewrNet(n_state, n_actions, n_layers, n_neurons, lr, use_cuda=use_cuda)
- self.state_memory = np.zeros((self.mem_size, n_state), dtype=np.float32)
- self.new_state_memory = np.zeros((self.mem_size, n_state), dtype=np.float32)
- self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
- self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
- self.terminal_memory = np.zeros(self.mem_size, dtype=np.bool_)
- def store_transition(self, state, action, reward, new_state, done):
- i = self.mem_countr % self.mem_size
- self.state_memory[i] = state
- self.action_memory[i] = action
- self.reward_memory[i] = reward
- self.new_state_memory[i] = new_state
- self.terminal_memory[i] = done
- self.mem_countr += 1
- def policy(self, state):
- if np.random.random() < self.eps:
- action = np.random.choice(self.action_space)
- else:
- state = T.tensor([state]).to(self.eval.device)
- actions = self.eval.forward(state)
- action = T.argmax(actions).item()
- return action
- def learn(self):
- if self.mem_countr < self.batch_size:
- return
- self.eval.optimizer.zero_grad()
- mem = min(self.mem_size, self.mem_countr)
- batch = np.random.choice(mem, self.batch_size, replace=False)
- batch_index = np.arange(self.batch_size, dtype=np.int32)
- state_batch = T.from_numpy(self.state_memory[batch]).to(self.eval.device)
- new_state_batch = T.from_numpy(self.new_state_memory[batch]).to(self.eval.device)
- reward_batch = T.from_numpy(self.reward_memory[batch]).to(self.eval.device)
- terminal_batch = T.from_numpy(self.terminal_memory[batch]).to(self.eval.device)
- action_batch = self.action_memory[batch] # not necessarily a tensor
- q_eval = self.eval.forward(state_batch)[batch_index, action_batch]
- nq_eval = self.eval.forward(new_state_batch)
- nq_eval[terminal_batch] = 0.0
- q_target = reward_batch + self.gamma * T.max(nq_eval, dim=1)[0]
- loss = self.eval.loss(q_target, q_eval).to(self.eval.device)
- loss.backward()
- self.eval.optimizer.step()
- self.eps = max(self.eps_min, self.eps - self.eps_dec)
- def save(self, name, special=""):
- if special != "":
- special = "_" + special
- T.save(self.eval.state_dict(), f"{name}/net1{special}.pt")
- def load(self, name, special=""):
- if special != "":
- special = "_" + special
- self.eval.load_state_dict(T.load(f"{name}/net1{special}.pt"))
- class DoubleQAgent():
- def __init__(self, gamma, eps, lr, n_state, n_actions, batch_size, max_mem=100000,
- eps_end=0.01, eps_dec=5e-4, n_layers=3, n_neurons=64, use_cuda=True):
- self.gamma = gamma
- self.eps = eps
- self.eps_min = eps_end
- self.eps_dec = eps_dec
- self.action_space = [i for i in range(n_actions)]
- self.n_state = n_state
- self.n_actions = n_actions
- self.n_layers = n_layers
- self.n_neurons = n_neurons
- self.lr = lr
- self.batch_size = batch_size
- self.memchoice = None
- self.set_eval()
- self.mem_countr = 0
- self.mem_size = max_mem
- self.eval = [NewrNet(n_state, n_actions, n_layers, n_neurons, lr, use_cuda=use_cuda) for _ in range(2)]
- self.mem = [MDPMemory(max_mem, n_state) for _ in range(2)]
- def set_eval(self):
- self.memchoice = 1 if nprand() < 0.5 else 0
- def store_transition(self, state, action, reward, new_state, done):
- self.mem[self.memchoice].store(state, action, reward, new_state, done)
- self.mem_countr += 1
- def policy(self, state):
- if nprand() < self.eps:
- action = np.random.choice(self.action_space)
- else:
- state = T.tensor([state]).to(self.eval[self.memchoice].device)
- actions = self.eval[self.memchoice].forward(state)
- action = T.argmax(actions).item()
- return action
- def learn(self):
- # start = time.time_ns() / 1e6
- if self.mem[self.memchoice].mem_countr < self.batch_size:
- return
- self.eval[self.memchoice].optimizer.zero_grad()
- mem = min(self.mem[self.memchoice].mem_size, self.mem[self.memchoice].mem_countr)
- batch = np.random.choice(mem, self.batch_size, replace=False)
- batch_index = np.arange(self.batch_size, dtype=np.int32)
- state_batch = T.from_numpy(self.mem[self.memchoice].state_memory[batch]).to(self.eval[self.memchoice].device)
- new_state_batch = T.from_numpy(self.mem[self.memchoice].new_state_memory[batch]).to(
- self.eval[self.memchoice].device)
- reward_batch = T.from_numpy(self.mem[self.memchoice].reward_memory[batch]).to(self.eval[self.memchoice].device)
- terminal_batch = T.from_numpy(self.mem[self.memchoice].terminal_memory[batch]).to(
- self.eval[self.memchoice].device)
- action_batch = self.mem[self.memchoice].action_memory[batch] # not necessarily a tensor
- q_eval = self.eval[self.memchoice].forward(state_batch)[batch_index, action_batch]
- another = int(not self.memchoice)
- nq_eval = self.eval[another].forward(new_state_batch)
- nq_eval[terminal_batch] = 0.0
- q_target = reward_batch + self.gamma * T.max(nq_eval, dim=1)[0]
- loss = self.eval[self.memchoice].loss(q_target, q_eval).to(self.eval[self.memchoice].device)
- loss.backward()
- self.eval[self.memchoice].optimizer.step()
- self.eps = max(self.eps_min, self.eps - self.eps_dec)
- self.set_eval()
- # end = time.time_ns() / 1e6
- # print(f"Agent learn time: {end - start} ms")
- def save(self, name, special=""):
- if special != "":
- special = "_" + special
- T.save(self.eval[0].state_dict(), f"{name}/net1{special}.pt")
- T.save(self.eval[1].state_dict(), f"{name}/net2{special}.pt")
- def load(self, name, special=""):
- if special != "":
- special = "_" + special
- self.eval[0].load_state_dict(T.load(f"{name}/net1{special}.pt"))
- self.eval[1].load_state_dict(T.load(f"{name}/net1{special}.pt"))
- def reward_f(state):
- global l, eng_l, d
- (x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thrust, right_thrust, _) = state
- h, x, y = y, x - xc, y - yc
- vx -= vxc
- vy -= vyc
- collision_punish = 100
- R = 5
- r = (x ** 2 + y ** 2) ** 0.5
- vr = (vx ** 2 + vy ** 2) ** 0.5
- precision = 10 / (vr + 1 / 10) if r < 0.5 else 0
- vangle_punish = abs(vangle) / 5
- if r == 0:
- toward_reward = precision
- else:
- toward_reward = -(vx * x + vy * y) / r
- if r > R:
- r = R
- done = h < d + l / 2 + eng_l or abs(x) > 20 or abs(y) > 20
- reward = (((1 - r / R) * 10 + 1 + precision + toward_reward - vangle_punish) * 0.01 -
- collision_punish * int(done))
- return reward, done
- def fitness_per_frame(state):
- global l, eng_l, d, maxthr
- (x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thrust, right_thrust, _) = state
- h, x, y = y, x - xc, y - yc
- vx -= vxc
- vy -= vyc
- collision_punish = 100
- R = .125
- r = (x ** 2 + y ** 2) ** 0.5
- vr = (vx ** 2 + vy ** 2) ** 0.5
- consumption = min(left_thrust + right_thrust, 4)
- done = h < d + l / 2 + eng_l or abs(x) > 20 or abs(y) > 20
- reward = (1 + (R * 20) / (r + R) / (vr + 0.5) / (consumption + 1) - abs(vangle) / 10) * dt - collision_punish * int(
- done)
- return reward, done
- def fitness(ga_instanse=None, solution=None, sol_idx=None):
- global model, device
- fitness, state = simstep([True])
- for step in range(1000):
- observation = T.tensor([get_observation3(state)]).to(device)
- action = T.flatten(pygad.torchga.predict(model=model, solution=solution, data=observation)).tolist()
- fpf, state = simstep(state, action=action, reward=fitness_per_frame)
- fitness += fpf
- if state[-1]:
- break
- return fitness
- gen = 0
- renderme = False
- def on_generation(ga_instance):
- global gen, renderme
- gen += 1
- print(f"Generation = {ga_instance.generations_completed}")
- if not gen % 10:
- solution, fitness, id = ga_instance.best_solution()
- print(f"Fitness = {fitness}")
- quickrender(model, min(10, ga_instance.generations_completed // 5), pyga=True, solution=solution)
- temp = nn.Sequential(nn.Linear(n_in, n_neur),
- nn.ReLU(),
- nn.Linear(n_neur, n_neur),
- nn.ReLU(),
- nn.Linear(n_neur, n_out))
- temp.load_state_dict(pygad.torchga.model_weights_as_dict(model=model, weights_vector=solution))
- T.save(temp.state_dict(), f"{folder}/model_{gen}.pt")
- def weights_init_uniform_rule(m):
- classname = m.__class__.__name__
- # for every Linear layer in a model
- if classname.find('Linear') != -1:
- # get the number of the inputs
- n = m.in_features
- y = 1.0 / np.sqrt(n)
- m.weight.data.uniform_(-y, y)
- m.bias.data.fill_(0)
- folder = "mltthr alg 1"
- n_in = 8
- n_neur = 48
- n_out = 2
- model = nn.Sequential(nn.Linear(n_in, n_neur),
- nn.ReLU(),
- nn.Linear(n_neur, n_neur),
- nn.ReLU(),
- nn.Linear(n_neur, n_out))
- # model.apply(weights_init_uniform_rule)
- # device = T.device('cuda' if T.cuda.is_available() else 'cpu')
- device = T.device('cpu')
- model.to(device)
- def quickrender(model, n_ep, pyga=False, solution=None):
- font = pg.font.SysFont("arial", fontsize)
- global device
- fitness, state = simstep([True])
- scale = 100
- clock = pg.time.Clock()
- do_render = True
- screen = pg.display.set_mode((WIDTH, HEIGHT))
- pg.display.set_caption('Drone thingy')
- for i in range(n_ep):
- fitness = 0
- for step in range(2000):
- for event in pg.event.get():
- if event.type == pg.QUIT:
- return
- if event.type == pg.KEYDOWN:
- if event.key == pg.K_r:
- do_render = True
- elif event.key == pg.K_SPACE:
- do_render = False
- if do_render:
- cam = (WIDTH, HEIGHT, scale, state[0], max(2, state[1]))
- screen = render(state, fitness, screen, cam, scale, WIDTH, HEIGHT, i, font)
- pg.display.flip()
- clock.tick(60)
- observation = T.tensor([get_observation3(state)]).to(device)
- if pyga:
- action = T.flatten(pygad.torchga.predict(model=model, solution=solution, data=observation)).tolist()
- else:
- action = T.flatten(model(observation.clone().detach())).tolist()
- fpf, state = simstep(state, action=action, reward=fitness_per_frame)
- fitness += fpf
- if state[-1]:
- break
- #state[-1] = True
- def gmain():
- '''
- observation3: [xc', yc', h, sin, cos, vx, vy, vxc', vxy', vangle] - 8
- actions3 = (left_thr, right_thr) - 2
- '''
- preload = False
- if os.path.exists(folder) and not preload:
- print("FUCK YOU CHOOSE ANOTHER FOLDER")
- return
- elif preload and os.path.exists(f"{folder}/model_50.pt"):
- model.load_state_dict(T.load(f"{folder}/model_50.pt"))
- pass
- else:
- os.mkdir(folder)
- torch_ga = pygad.torchga.TorchGA(model=model,
- num_solutions=100)
- num_generations = 2500 # Number of generations.
- num_parents_mating = 30 # Number of solutions to be selected as parents in the mating pool.
- initial_population = torch_ga.population_weights # Initial population of network weights
- ga_instance = pygad.GA(num_generations=num_generations,
- num_parents_mating=num_parents_mating,
- initial_population=initial_population,
- fitness_func=fitness,
- on_generation=on_generation,
- mutation_type="adaptive",
- mutation_probability=(0.01, 0.0),
- random_mutation_min_val=-0.5,
- random_mutation_max_val=0.5)
- ga_instance.run()
- # After the generations complete, some plots are showed that summarize how the outputs/fitness values evolve over generations.
- ga_instance.plot_fitness(title="PyGAD & PyTorch - Iteration vs. Fitness", linewidth=4)
- solution, solution_fitness, solution_idx = ga_instance.best_solution()
- print(f"Fitness value of the best solution = {solution_fitness:.2f}")
- print(f"Index of the best solution : {solution_idx}")
- bestmodel = pygad.torchga.model_weights_as_dict(model=model, weights_vector=solution)
- model.load_state_dict(bestmodel)
- model.to(device)
- T.save(model.state_dict(), f"{folder}/model.pt")
- quickrender(model, 1000)
- def eval_genomes_thread(chromosome):
- return fitness(solution=chromosome)
- def next_generation(chromosomes, fitnesses, ratio_selected=0.5, ratio_mutated=0.5, mutation_prob=0.05,
- mutation_range=0.5, cross_prob=0.5):
- pop_size = np.shape(chromosomes)[0]
- chrom_size = np.shape(chromosomes)[1]
- fitnessTotal = np.sum(fitnesses)
- sorting = np.argsort(-fitnesses)
- fitnesses = fitnesses[sorting] # fitnesses now in descending order
- sorted_chromosomes = chromosomes[sorting]
- n_selected = int(pop_size * ratio_selected)
- new_chromosomes = np.zeros(shape=(pop_size, chrom_size), dtype=np.float32)
- new_chromosomes[:n_selected] = sorted_chromosomes[:n_selected]
- cross = np.random.randint(0, chrom_size, size=(pop_size - n_selected))
- crossed = np.random.randint(0, n_selected, size=(pop_size - n_selected, 2))
- for i in range(n_selected, pop_size):
- # crossbreed
- if nprand() < cross_prob:
- new_chromosomes[i][:cross[i - n_selected]] = chromosomes[crossed[i - n_selected][0]].flatten()[
- :cross[i - n_selected]]
- new_chromosomes[i][cross[i - n_selected]:] = chromosomes[crossed[i - n_selected][1]].flatten()[
- cross[i - n_selected]:]
- else:
- new_chromosomes[i] = chromosomes[crossed[i - n_selected][0]].flatten()
- # mutate
- if nprand() < ratio_mutated:
- for j in range(chrom_size):
- if nprand() < mutation_prob:
- new_chromosomes[i][j] += 2 * (nprand() - 0.5) * mutation_range
- return new_chromosomes, fitnesses, fitnessTotal
- def threadmain():
- pg.init()
- preload = True
- if os.path.exists(folder) and not preload:
- print("FUCK YOU CHOOSE ANOTHER FOLDER")
- return
- elif preload and os.path.exists(f"{folder}/model_2525.pt"):
- model.load_state_dict(T.load(f"{folder}/model_2525.pt"))
- pass
- else:
- os.mkdir(folder)
- chromosomes = np.array(pygad.torchga.TorchGA(model=model,
- num_solutions=100).population_weights, dtype=np.float32)
- # print(chromosomes)
- num_generations = 10000
- # fitnesses = np.zeros(pop_size, dtype=np.float32)
- for i in range(2526, num_generations):
- fitnesses = np.array(joblib.Parallel(n_jobs=-1)(joblib.delayed(eval_genomes_thread)(i) for i in chromosomes))
- # Performs selection, mutation, and crossover operations to create new generation
- chromosomes, fitnesses, total = next_generation(chromosomes, fitnesses,
- ratio_selected=0.3, ratio_mutated=0.3,
- cross_prob=0.1, mutation_prob=0.05, mutation_range=0.1)
- print(
- f"Gen {i}/{num_generations}:\tbest:{fitnesses[0]:.1f};\t"
- f"second:{fitnesses[1]:.1f}\tthird:{fitnesses[2]:.1f};\ttotal:{total:.0f}".expandtabs(16))
- # fitnesses = np.zeros(pop_size, dtype=np.float32) # Wipe fitnesses
- if not i % 101:
- quickrender(model, min(10, i // 5), pyga=True, solution=chromosomes[0])
- temp = nn.Sequential(nn.Linear(n_in, n_neur),
- nn.ReLU(),
- nn.Linear(n_neur, n_neur),
- nn.ReLU(),
- nn.Linear(n_neur, n_out))
- bestmodel = pygad.torchga.model_weights_as_dict(model=temp, weights_vector=chromosomes[0])
- model.load_state_dict(bestmodel)
- model.to(device)
- T.save(model.state_dict(), f"{folder}/model_{i}.pt")
- def reward_stay(state):
- global l, eng_l, d, dt
- collision_punish = 100
- (x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thrust, right_thrust, _) = state
- h, x, y = y, x - xc, y - yc
- vr = (vx ** 2 + vy ** 2) ** 0.5
- done = h < d + l / 2 + eng_l or abs(x) > 20 or abs(y) > 20
- vangle_punish = abs(vangle) / 4
- return (1 / (vr + 1 / 20) - vangle_punish) * 0.01 - collision_punish * int(done), done
- def simstep(state, playable=False, action=None, reward=reward_f):
- # start = time.time_ns()
- global dt, m, g, l, eng_l, d, drag, maxthr, thr_incr, I
- if state[-1]:
- # [x, y, xc, yc, angle, vx, vy, vxc, vyc, vangle, left_thrust, right_thrust, done]
- state = [(2 * nprand() - 1) * 10, 3 + nprand() * 17, # x y
- (2 * nprand() - 1) * 10, 2 + nprand() * 18, # xc yc
- pi * (2 * nprand() - 1) * 0.1, # angle
- (2 * nprand() - 1) * 1, (1.5 * nprand()) * 1, # vx, vy
- 0, 0, # vxc', vyc' (have to be initialised even with no actual info)
- pi * (2 * nprand() - 1) * 0.1, # vangle
- maxthr * nprand() * 0, maxthr * nprand() * 0, False] # thrust, done
- return 0, state
- (x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thrust, right_thrust, done) = state
- '''# cursor
- prevx = xc
- prevy = yc
- # some code for moving
- vxc = (xc - prevx) / dt
- vyc = (yc - prevy) / dt'''
- # forces
- fx = -drag * vx - (left_thrust + right_thrust) * sin(angle)
- fy = - m * g - drag * vy + (left_thrust + right_thrust) * cos(angle)
- torque = (right_thrust - left_thrust) * (l + eng_l) / 2 - drag * vangle * 4
- # velocities
- vx += (fx / m) * dt
- vy += (fy / m) * dt
- vangle += (torque / I) * dt
- # position and angle
- x += vx * dt
- y += vy * dt
- angle += vangle * dt
- if angle < -pi:
- angle += 2 * pi
- elif angle > pi:
- angle -= 2 * pi
- # Engine control
- if playable:
- # Adjust engine thrusts based on key presses
- if pg.key.get_pressed()[pg.K_LEFT]:
- left_thrust += thr_incr
- else:
- left_thrust -= 2 * thr_incr
- if pg.key.get_pressed()[pg.K_RIGHT]:
- right_thrust += thr_incr
- else:
- right_thrust -= 2 * thr_incr
- else:
- '''
- if action in (1, 5):
- left_thrust -= thr_incr
- if action in (2, 5):
- right_thrust -= thr_incr
- if action in (3, 6):
- left_thrust += thr_incr
- if action in (4, 6):
- right_thrust += thr_incr
- '''
- '''
- if action == 1:
- left_thrust -= thr_incr
- right_thrust += thr_incr
- elif action == 2:
- left_thrust += thr_incr
- right_thrust -= thr_incr
- elif action == 3:
- left_thrust += thr_incr
- right_thrust += thr_incr
- elif action == 4:
- left_thrust -= thr_incr
- right_thrust -= thr_incr
- '''
- left_thrust = action[0] * maxthr / 10
- right_thrust = action[1] * maxthr / 10
- left_thrust = max(0, min(left_thrust, maxthr))
- right_thrust = max(0, min(right_thrust, maxthr))
- newstate = [x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thrust, right_thrust, done]
- rew, done = reward(newstate)
- newstate[-1] = done
- # end = time.time_ns()
- # print(f"sim time: {end - start} ns")
- return rew, newstate
- def get_observation(state):
- global dt
- (x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thr, right_thr, done) = state
- return (xc - x, yc - y, y, sin(angle), cos(angle),
- vx, vy, vxc - vx, vyc - vy, vangle,
- left_thr, right_thr)
- def get_observation2(state):
- global dt
- (x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thr, right_thr, done) = state
- return (xc - x, yc - y, y, sin(angle), cos(angle), vx, vy, vangle, left_thr, right_thr)
- def get_observation3(state):
- global dt
- (x, y, xc, yc, angle,
- vx, vy, vxc, vyc, vangle,
- left_thr, right_thr, done) = state
- return (x - xc, y - yc, y, sin(angle), cos(angle), vx, vy, vangle)
- def render_multi_line(screen, font, text, x, y, color, fsize):
- lines = text.splitlines()
- for i, l in enumerate(lines):
- screen.blit(font.render(l, 1, color), (x, y + fsize * i))
- def drawgrid(cam, step, substeps, wl=1, dark=100, thin=0):
- w, h, scale, x, y = cam
- surf = pg.Surface((w, h), pg.SRCALPHA, 32)
- x -= w / scale / 2
- y -= h / scale / 2
- xstart = floor(x / step) * step - x
- ystart = y - ceil(y / step) * step
- for i in range(ceil(h / step) * (substeps + 1)):
- if ystart + i * step / substeps > h:
- break
- weaken = bool(i % (substeps + 1))
- pg.draw.line(surf, (255 - weaken * dark, 255 - weaken * dark, 255 - weaken * dark),
- (0, (ystart + i * step / (substeps + 1)) * scale),
- (w, (ystart + i * step / (substeps + 1)) * scale), wl - weaken * thin)
- for j in range(ceil(w / step) * (substeps + 1)):
- if xstart + j * step / substeps > w:
- break
- weaken = bool(j % (substeps + 1))
- pg.draw.line(surf, (255 - weaken * dark, 255 - weaken * dark, 255 - weaken * dark),
- ((xstart + j * step / (substeps + 1)) * scale, 0),
- ((xstart + j * step / (substeps + 1)) * scale, h), wl - weaken * thin)
- return surf
- def cam_coords(cam, x, y):
- w, h, scale, x0, y0 = cam
- x = (x - x0) * scale + w / 2
- y = (y0 - y) * scale + h / 2
- return x, y
- def render(state, score, screen, cam, scale, w, h, episode_i, font):
- '''render the drone, its engines, and the ground.
- The camera is centered at (0, 2); 1 unit corresponds to 100px.
- The background is black, the drone is also black woth a white thin outline;
- the engines are also outlined. When they are turned on, little triangles appear,
- which represent air/propellant/whatever. The ground is grey.
- '''
- # Clear the screen
- screen.fill((0, 0, 0))
- global l, eng_l, d, maxthr
- # Unpack the state
- x, y, xc, yc, angle, vx, vy, vxc, vyc, vangle, left_thrust, right_thrust, done = state
- # Draw the ground
- pg.draw.rect(screen, (100, 100, 100), (0, cam_coords(cam, 0, 0)[1], w, h + 1))
- # Draw the grid
- grid = drawgrid(cam, 4, 3, 2, thin=1)
- screen.blit(grid, (0, 0))
- # Calculate the coordinates relative to the camera
- xc, yc = cam_coords(cam, xc, yc)
- # Draw the cursor
- pg.draw.circle(screen, (150, 255, 150), (max(min(xc, w), 0), max(min(yc, h), 0)), 0.25 * scale)
- # Draw the drone
- thr_scale = 0.5 * scale
- l_ = l * scale
- eng_l_ = eng_l * scale
- d_ = d * scale
- drone_surf = pg.Surface((l_ + 2 * eng_l_, d_ + 2 * thr_scale), pg.SRCALPHA, 32)
- pg.draw.rect(drone_surf, (255, 255, 255), (0, thr_scale, eng_l_, d_), 2) # left engine
- pg.draw.rect(drone_surf, (255, 255, 255), (eng_l_, thr_scale, l_, d_), 2) # base
- pg.draw.rect(drone_surf, (255, 255, 255), (l_ + eng_l_, thr_scale, eng_l_, d_), 2) # right engine
- pg.draw.polygon(drone_surf, (255, 255, 200),
- [(0, d_ + thr_scale),
- (eng_l_ // 2, d_ + (1 + left_thrust / maxthr) * thr_scale),
- (eng_l_, d_ + thr_scale)]) # left flame
- pg.draw.polygon(drone_surf, (255, 255, 200),
- [(l_ + eng_l_, d_ + thr_scale),
- (l_ + eng_l_ + eng_l_ // 2, d_ + (1 + right_thrust / maxthr) * thr_scale),
- (l_ + 2 * eng_l_, d_ + thr_scale)]) # right flame
- drone_surf = pg.transform.rotate(drone_surf, angle / pi * 180)
- drone_rect = drone_surf.get_rect()
- drone_rect.center = cam_coords(cam, x, y)
- screen.blit(drone_surf, drone_rect)
- # Print information & "HUD"
- # global image
- # screen.blit(image, (0, 500))
- winfo = 3
- trnsprt = 180
- hud = pg.Surface((fontsize * 18 + 2 * winfo, fontsize * 8 + 2 * winfo), pg.SRCALPHA, 32)
- pg.draw.rect(hud, (180, 180, 180, trnsprt), (0, 0, fontsize * 18 + 2 * winfo, fontsize * 8 + 2 * winfo))
- pg.draw.rect(hud, (0, 0, 0, trnsprt), (winfo, winfo, fontsize * 18, fontsize * 8))
- render_multi_line(hud, font,
- f'Coords: ({round(x, 2):.2f}, {round(y, 2):.2f}); angle: {round(angle, 2):.2f}\n'
- f'Velocity: ({round(vx, 2):.2f}, {round(vy, 2):.2f}); angular: {round(vangle, 2):.2f}\n'
- f'Thrusters: left: {round(left_thrust, 2):.2f}; right: {round(right_thrust, 2):.2f}\n'
- f'Episode: {episode_i}; Score: {score:.2f}',
- 20, 10, (255, 255, 255), fontsize * 2)
- screen.blit(hud, (0, 0))
- return screen
- def plot_progress(fcsv, n, names, file):
- with open(fcsv, "r") as f:
- reader = csv.reader(f)
- args = [[] for _ in range(n)]
- for row in reader:
- if not row:
- continue
- for i in range(n):
- args[i].append(float(row[i]))
- plots = [plt.scatter(args[0], args[i], s=1 / 4, c=((0.3 + i / (n + 2), 0.6 - i / (n + 3), 0.8),), linewidth=0)
- for i in range(1, n)]
- plt.legend(plots, names)
- plt.savefig(file, dpi=300)
- plt.clf()
- def writedata(file, args):
- with open(file, "a", newline='', encoding='utf-8') as f:
- writer = csv.writer(f)
- writer.writerow(args)
- def save(dronename, drone, episode_i, maxcount, exceeded):
- drone.save(dronename)
- with open(f"{dronename}/config.txt", "w") as f:
- f.write("\n".join([str(episode_i), str(maxcount), str(exceeded), str(drone.gamma),
- str(drone.eps), str(drone.lr), str(drone.n_state), str(drone.n_actions),
- str(drone.batch_size), str(drone.n_layers), str(drone.n_neurons),
- str(drone.eps_dec), str(drone.eps_min), str(drone.mem_size)]))
- def load(dronename, cuda):
- with open(f"{dronename}/config.txt", "r") as f:
- (lastprev, maxcount, exceeded, gamma,
- eps, lr, n_state, n_actions,
- batch_size, n_layers, n_neurons,
- eps_dec, eps_end, mem_size) = [float(i) for i in f.readlines()]
- lastprev = int(lastprev) + 1
- maxcount = int(maxcount)
- exceeded = int(exceeded)
- n_state = int(n_state)
- n_actions = int(n_actions)
- batch_size = int(batch_size)
- n_neurons = int(n_neurons)
- n_layers = int(n_layers)
- mem_size = int(mem_size)
- drone = DoubleQAgent(gamma, eps, lr, n_state=n_state, n_actions=n_actions, batch_size=batch_size,
- n_layers=n_layers, n_neurons=n_neurons, eps_dec=eps_dec, eps_end=eps_end, max_mem=mem_size,
- use_cuda=cuda)
- drone.load(dronename)
- return drone, lastprev, maxcount, exceeded
- def shift(arr, num, fill_value=np.nan):
- result = np.empty_like(arr)
- if num > 0:
- result[:num] = fill_value
- result[num:] = arr[:-num]
- elif num < 0:
- result[num:] = fill_value
- result[:num] = arr[-num:]
- else:
- result[:] = arr
- return result
- def main():
- '''
- observation (not-exactly-state): [xc', yc', h, sin, cos, vx, vy, vxc', vxy', vangle, left_thr, right_thr] - 12
- observation2: [xc', yc', h, sin, cos, vx, vy, vangle, left_thr, right_thr] - 10
- actions = (0:nothing, 1:left-, 2:right-, 3:left+, 4:right+, 5:both-, 6:both+) - 7
- actions2 = (0:nothing, 1:left_roll, 2:right_roll, 3:both+, 4:both-) - 5
- '''
- pg.init()
- font = pg.font.SysFont("arial", fontsize)
- print(T.cuda.is_available())
- scale = 100
- dronename = "double q standart"
- screen = pg.display.set_mode((WIDTH, HEIGHT))
- pg.display.set_caption('Drone thingy')
- clock = pg.time.Clock()
- do_render = False
- do_preload = True
- playable = False
- n_frames = 100_000_000
- use_cuda = True
- if not os.path.exists(dronename):
- os.mkdir(dronename)
- elif not do_preload:
- print("Something already exists here! Aborting")
- return
- if do_preload and os.path.exists(f"{dronename}/data.csv") and os.path.exists(f"{dronename}/net1.pt"):
- drone, lastprev, maxcount, exceeded = load(dronename, use_cuda)
- else:
- lastprev = 0
- maxcount = 2000
- exceeded = 0
- drone = DoubleQAgent(0.99, 1, 0.001, n_state=10, n_actions=5, batch_size=256, n_layers=3,
- n_neurons=64, eps_dec=1e-5, eps_end=0.03, use_cuda=use_cuda, max_mem=100_000)
- meanoverwhat = 100
- scores = np.empty(meanoverwhat, dtype=np.float32)
- lengths = np.empty(meanoverwhat, dtype=np.float32)
- scores[:] = np.nan
- lengths[:] = np.nan
- currexceeded = 0
- _, state = simstep([True])
- ep = lastprev
- score = 0
- counter = 0
- # cam = (WIDTH, HEIGHT, scale, state[0], state[1])
- for i in range(n_frames):
- for event in pg.event.get():
- if event.type == pg.QUIT:
- save(dronename, drone, ep, maxcount, exceeded)
- plot_progress(f"{dronename}/data.csv", 3, ["score", "average score"],
- f"{dronename}/plot_so_far.png")
- return
- if event.type == pg.KEYDOWN:
- if event.key == pg.K_r:
- do_render = True
- elif event.key == pg.K_SPACE:
- do_render = False
- observation = get_observation2(state)
- action = drone.policy(observation)
- if ep < 10000:
- reward, state = simstep(state, playable, action, reward=reward_stay)
- else:
- reward, state = simstep(state, playable, action, reward=reward_f)
- if ep == 10000:
- drone.eps = 0.5
- score += reward
- next_observation = get_observation2(state)
- drone.store_transition(observation, action, reward, next_observation, state[-1])
- drone.learn()
- if do_render:
- cam = (WIDTH, HEIGHT, scale, state[0], max(2, state[1]))
- screen = render(state, score, screen, cam, scale, WIDTH, HEIGHT, ep, font)
- pg.display.flip()
- clock.tick(60)
- if counter > maxcount:
- state[-1] = True
- currexceeded += 1
- counter += 1
- if state[-1]:
- scores = shift(scores, -1, score)
- lengths = shift(lengths, -1, counter)
- avg_score = np.nanmean(scores)
- avg_len = np.nanmean(lengths)
- writedata(f"{dronename}/data.csv", [ep, score, avg_score])
- if not ep % 50:
- exceeded += currexceeded
- print(f"ep {ep}:\tscore:{score:.2f}\tav.sc:{avg_score:.2f}\t"
- f"ep.len:{counter}\tav.ep.len:{avg_len:0.0f}\teps:{drone.eps:.3f}\t"
- f"exceeded:{currexceeded}\t(total:{exceeded})\t"
- f"steps:{drone.mem_countr}\t({drone.mem_countr / 2 / drone.mem_size:.2f})".expandtabs(16))
- currexceeded = 0
- if not ep % 500:
- print("saving...")
- plot_progress(f"{dronename}/data.csv", 3, ["score", "average score"],
- f"{dronename}/plot_{ep / 1000:.1f}k.png")
- drone.save(dronename, f"{ep / 1000:.1f}k")
- save(dronename, drone, ep, maxcount, exceeded)
- score = 0
- counter = 0
- ep += 1
- save(dronename, drone, n_frames - 1, maxcount, exceeded)
- return
- def showmain():
- drones = ["mltthr alg 1/model_1111.pt", "mltthr alg 1/model_2222.pt", "mltthr alg 1/model_2525.pt"]
- neurons = [48, 48, 48]
- for i in range(len(drones)):
- if i == -1:
- model = nn.Sequential(
- nn.Linear(11, neurons[i]),
- nn.ReLU(),
- nn.Linear(neurons[i], 7))
- else:
- model = nn.Sequential(
- nn.Linear(8, neurons[i]),
- nn.ReLU(),
- nn.Linear(neurons[i], neurons[i]),
- nn.ReLU(),
- nn.Linear(neurons[i], 2))
- model.load_state_dict(T.load(drones[i]))
- quickrender(model, 1000 if i == len(drones) - 1 else 5)
- if __name__ == '__main__':
- showmain()
Add Comment
Please, Sign In to add comment