Help for PPO implementation without pytorch/tf
Hey !
I’m trying to implement a very simple PPO algorithm with numpy but I’m struggling with 2 things :
– It seems that the actor net is not learning and I don’t know why.
– some values go to nan after some epochs.
I tried to comment as well as I could to keep it simple.
Thank you very much for taking the time to help me:
the environnement : a little grid 2d :
""" GAME : grid : [[int]] -> map grid size : int -> dim of grid win_coor : (int, int) -> coordinates where the player win coor : [int] -> actual coordinates of player ----- reset() : -> place player at (0, 0) move(direction) : -> move in the direction get_reward_of_pos : int -> return reward of current position get_coor() : [int] -> return actual coordinates isEnd() : bool -> True if dead else False """ class Game(): def __init__(self): self.grid = [ [0, 0, 0], [0, 1, 0], [0, 0, 0] ] self.size = 1 self.win_coor = (1, 1) self.coor = [0, 0] def reset(self): self.coor = [0, 0] def move(self, direction): if (direction == 0): self.coor[1] += 1 elif (direction == 1): self.coor[0] += 1 elif (direction == 2): self.coor[1] -= 1 elif (direction == 3): self.coor[0] -= 1 def get_reward_of_pos(self): #if good if self.coor[0] == self.win_coor[0] and self.coor[1] == self.win_coor[1]: print("Reussi") return 1 # if quit map elif self.coor[0] > self.size or self.coor[0] < 0 or self.coor[1] > self.size or self.coor[1] < 0: return -100 # if on 1 elif self.grid[self.coor[0]][self.coor[1]] == 1: return -100 # if on 0 elif self.grid[self.coor[0]][self.coor[1]] == 0: return -1 def getCoor(self): return [self.coor[0], self.coor[1]] def isEnd(self): if self.coor[0] > self.size or self.coor[0] < 0 or self.coor[1] > self.size or self.coor[1] < 0: return True # if on 1 elif self.grid[self.coor[0]][self.coor[1]] == 1 or self.grid[self.coor[0]][self.coor[1]] == 4: return True else: return False
nn.py :
import numpy as np class Network(): def __init__(self, sizes): self.num_layers = len(sizes) self.sizes = sizes self.biases = [np.random.randn(y, 1) * 0.01 for y in sizes[1:]] self.weights = [np.random.randn(y, x) * 0.01 for x, y in zip(sizes[:-1], sizes[1:])] print("[NN] init ended") def get_cpy(self): new_net = Network(self.sizes) new_net.biases = self.biases new_net.weights = self.weights return new_net
and the main file : ppo.py
""" ## struct of trajectories_data in train() trajectories_data = { "states" : [ [int] ], "actions" : [int], "log_probs" : [float], "rewards" : [float], "values" : [float], "r_t_g" : [float], "advantages" : [float] "critic_datas" : [{ "zs" : [z], "activations" : [a] }], "actor_datas" : [{ "zs" : [z], "activations" : [a] }] } """ # --------- Imports from nn import Network from game import Game import random import math import numpy as np # --------- Hyperparameters batch_size = 64 max_batch_dist = 8 gamma = 0.9 epsilon = 0.2 epoch = 1000 eta = 0.001 # learning rate # --------- methods def ReLU(z): return np.maximum(z, 0) def ReLU_prime(z): return (z > 0).astype(float) def softmax(z): exp_z = np.exp(z - np.max(z, axis=0, keepdims=True)) return exp_z / exp_z.sum(axis=0, keepdims=True) def random_picker(list_of_probas): rd = random.random() total = 0 for id_p, p in enumerate(list_of_probas): total += p if (total > rd): return id_p # --------- PPO class PPO: # --------- Init def __init__(self): # inputs -> poisition(x, y) -- output direction self.actor_nn = Network([2, 128, 128, 4]) # need the last nn to compare to the last ouput self.actor_last_nn = self.actor_nn.get_cpy() # inputs -> poisition(x, y) -- output cost -> no softmax self.critic_nn = Network([2, 128, 128, 1]) self.game = Game() print("[PPO] init ended") # --------- train - big function that does all def train(self): for epoch_num in range(epoch): # for each epoch print("[PPO] epoch " + str(epoch_num)) # we initialize a gradient vector to 0 nabla_critic_b = [np.zeros(b.shape) for b in self.critic_nn.biases] nabla_critic_w = [np.zeros(w.shape) for w in self.critic_nn.weights] nabla_actor_b = [np.zeros(b.shape) for b in self.actor_nn.biases] nabla_actor_w = [np.zeros(w.shape) for w in self.actor_nn.weights] for _ in range(batch_size): # for each batch # we compute a "trajectory" and get the data out of it trajectories_data = self.get_all_data_of_a_trajectory() # we initialiaze another gradient vector at 0 delta_nabla_critic_b = [np.zeros(b.shape) for b in self.critic_nn.biases] delta_nabla_critic_w = [np.zeros(w.shape) for w in self.critic_nn.weights] delta_nabla_actor_b = [np.zeros(b.shape) for b in self.actor_nn.biases] delta_nabla_actor_w = [np.zeros(w.shape) for w in self.actor_nn.weights] for i in range(len(trajectories_data["states"])): # for each state / decision we have taken # we get the gradient of each state d_c_b, d_c_w, d_a_b, d_a_w = self.backprop(trajectories_data, i) # we add it to the current gradient delta_nabla_critic_b = [nb + dnb for nb, dnb in zip(d_c_b, delta_nabla_critic_b)] delta_nabla_critic_w = [nw + dnw for nw, dnw in zip(d_c_w, delta_nabla_critic_w)] delta_nabla_actor_b = [nb + dnb for nb, dnb in zip(d_a_b, delta_nabla_actor_b)] delta_nabla_actor_w = [nw + dnw for nw, dnw in zip(d_a_w, delta_nabla_actor_w)] # we add current gradient to real gradient nabla_critic_b = [nb + dnb for nb, dnb in zip(nabla_critic_b, delta_nabla_critic_b)] nabla_critic_w = [nw + dnw for nw, dnw in zip(nabla_critic_w, delta_nabla_critic_w)] nabla_actor_b = [nb + dnb for nb, dnb in zip(nabla_actor_b, delta_nabla_actor_b)] nabla_actor_w = [nw + dnw for nw, dnw in zip(nabla_actor_w, delta_nabla_actor_w)] # we get a copy of our neural net before updating it self.actor_last_nn = self.actor_nn.get_cpy() # we update the w and b of the NN self.critic_nn.weights = [w-(eta/batch_size)*nw for w, nw in zip(self.critic_nn.weights, nabla_critic_w)] self.critic_nn.biases = [b-(eta/batch_size)*nb for b, nb in zip(self.critic_nn.biases, nabla_critic_b)] self.actor_nn.weights = [w-(eta/batch_size)*nw for w, nw in zip(self.actor_nn.weights, nabla_actor_w)] self.actor_nn.biases = [b-(eta/batch_size)*nb for b, nb in zip(self.actor_nn.biases, nabla_actor_b)] print("[PPO] training ended") # --------- compute a trajectory and collect all the data we need def get_all_data_of_a_trajectory(self): # we initialize data as in comments to get the data of a trajectory data = { "states" : list(), "actions" : list(), "log_probs" : list(), "rewards" : list(), "values" : list(), "r_t_g" : list(), "advantages" : list(), "critic_datas" : list(), "actor_datas" : list() } self.game.reset() i = 0 # we store i just not to loop over and over while (not self.game.isEnd() and i < max_batch_dist): # while the game is not ended and we dont loop over "max_batch_dist" times # we forward and store the layers outputs critic_data, actor_data = self.get_nn_data_of_a_trajectory() data["critic_datas"].append(critic_data) data["actor_datas"].append(actor_data) probs = actor_data["activations"][-1].flatten() action = random_picker(probs) # TODO : upgrade this random picker data["actions"].append(action) data["log_probs"].append(math.log( probs[action] + 1e-5 )) # add 1e-8 that cannot be 0 data["states"].append(self.game.getCoor()) data["values"].append(critic_data["activations"][-1][0][0].item()) # store the output of critic net # move self.game.move(action) data["rewards"].append(self.game.get_reward_of_pos()) i += 1 data["r_t_g"], data["advantages"] = self.get_r_t_g_and_advantages(data["rewards"], data["values"]) return data # --------- compute a nn forward and collect nn data simultanously def get_nn_data_of_a_trajectory(self): # critic -------== activation_critic = np.array([[self.game.getCoor()[0]], [self.game.getCoor()[1]]]) activations_critic = [activation_critic.copy()] zs_critic = [] for b, w in zip(self.critic_nn.biases, self.critic_nn.weights): z = np.dot(w, activation_critic) + b zs_critic.append(z) if len(zs_critic) < len(self.critic_nn.weights): # in all layers -> ReLU activation_critic = ReLU(z) else: # in last layer -> Linear activation_critic = z activations_critic.append(activation_critic.copy()) critic_data = { "zs" : zs_critic, "activations" : activations_critic } # actor -------== activation_actor = np.array([[self.game.getCoor()[0]], [self.game.getCoor()[1]]]) activations_actor = [activation_actor.copy()] zs_actor = [] for b, w in zip(self.actor_nn.biases, self.actor_nn.weights): z = np.dot(w, activation_actor) + b zs_actor.append(z) if len(zs_actor) < len(self.actor_nn.weights): # in all layers -> ReLU activation_actor = ReLU(z) else: # In last layer -> softmax activation_actor = softmax(z) activations_actor.append(activation_actor.copy()) actor_data = { "zs" : zs_actor, "activations" : activations_actor } return (critic_data, actor_data) # --------- return the reward to go of a list of rewards and get at same time the advantages def get_r_t_g_and_advantages(self, reward_list, values_list): # length of trajectory length = len(reward_list) # inits r_t_g = [0] * length advantages = [0] * length for i in range(length): current_r_t_g = 0 for j in range(length - i): current_r_t_g += reward_list[i + j] * math.pow(gamma, j) # r_t_g = R0 + R1*g + R2*g^2... r_t_g[i] = current_r_t_g advantages[i] = current_r_t_g - values_list[i] return (r_t_g, advantages) # --------- return the gradient of both of the nn for the "state" i def backprop(self, data, i): # critic -----------== nabla_critic_b = [np.zeros(b.shape) for b in self.critic_nn.biases] nabla_critic_w = [np.zeros(w.shape) for w in self.critic_nn.weights] # LOSS delta = np.array([[2 * data["advantages"][i]]]) # loss = 1/1 A^2 -> loss' = 2A # Backpropagate MSE nabla_critic_b[-1] = delta nabla_critic_w[-1] = np.dot(delta, data["critic_datas"][i]["activations"][-2].T) for l in range(2, self.critic_nn.num_layers): # ReLU_prime for all z = data["critic_datas"][i]["zs"][-l] sp = ReLU_prime(z) delta = np.dot(self.critic_nn.weights[-l+1].T, delta) * sp nabla_critic_b[-l] = delta nabla_critic_w[-l] = np.dot(delta, data["critic_datas"][i]["activations"][-l-1].T) # actor ------------== nabla_actor_b = [np.zeros(b.shape) for b in self.actor_nn.biases] nabla_actor_w = [np.zeros(w.shape) for w in self.actor_nn.weights] old_policy_output = self.feed_forward_the_last_actor(np.array( [[data["states"][i][0]], [data["states"][i][1]]] )) old_log_prob = math.log(np.clip(old_policy_output[data["actions"][i]].flatten()[0], 1e-8, 1)) ratio = math.exp( data["log_probs"][i] - old_log_prob ) loss = min( ratio * data["advantages"][i], np.clip(ratio, 1-epsilon, 1+epsilon) * data["advantages"][i] ) delta = np.zeros((4, 1)) delta[ data["actions"][i] ] = -loss # last layer firstly - softmax nabla_actor_b[-1] = delta nabla_actor_w[-1] = np.dot(delta, data["actor_datas"][i]["activations"][-2].T) for l in range(2, self.actor_nn.num_layers): # ReLU_prime for other layers z = data["actor_datas"][i]["zs"][-l] sp = ReLU_prime(z) delta = np.dot(self.actor_nn.weights[-l+1].T, delta) * sp nabla_actor_b[-l] = delta nabla_actor_w[-l] = np.dot(delta, data["actor_datas"][i]["activations"][-l-1].T) return (nabla_critic_b, nabla_critic_w, nabla_actor_b, nabla_actor_w) # --------- symply forward the last actor nn def feed_forward_the_last_actor(self, a): for b, w in zip(self.actor_last_nn.biases[:-1], self.actor_last_nn.weights[:-1]): a = ReLU(np.dot(w, a)+b) a = softmax(np.dot(self.actor_last_nn.weights[-1], a)+self.actor_last_nn.biases[-1]) return a ppo = PPO() ppo.train() ppo.game.reset() while (not ppo.game.isEnd() ): inp = [[ppo.game.getCoor()[0]], [ppo.game.getCoor()[1]]] nn_res = ppo.actor_nn.feedforward( inp ) res = max(enumerate(nn_res), key=lambda x: x[1]) action = res[0] ppo.game.move(action) print(f"{action} : {ppo.game.get_reward_of_pos()}")
Thanks
submitted by /u/Independent-Key-1329
[link] [comments]
Like
0
Liked
Liked