I know there are many similar topics discussed on StackOverflow, but I have done quite a lot research both in StackOverflow and on the Internet and I couldn't find a solution. I am trying to implement the classic Deep Q Learning Algorithm to solve the openAI gym's cartpole game: OpenAI Gym Cartpole
Firstly, I created an agent that generates random weights. The results are shown in the graph below:
Amazingly, the agent managed to reach 200 steps (which is the max) in many episodes by simply generating 4 random uniform weights [w1, w2, w3, w4] from (-1.0 to 1.0) in each episode.
So, i decided to implement a simple DQN with only 4 weights and 2 biases and to make the agent learn this game over the time. The weights will be initialized randomly in the beginning and Back-Propagation will be used to update them as the agent makes steps.
I used the Epsilon Greedy strategy to make the agent explore at the beginning and exploit the Q values later on. However, The results are disappointing compared to the random agent:
I have tried to tune a lot of parameters and different architectures and the result doesn't change as much. So, my question is the following:
Question: Did i make a wrong implementation of DQN or a simple DQN cannot beat the cartpole? What's your experience? It does reduces the loss (Error), but it doesn't guarantee a good solution. Thanks in advance.
import tensorflow as tf
import gym
import numpy as np
import random as rand
import matplotlib.pyplot as plt
# Cartpole's Observation:
# 4 Inputs
# 2 Actions (LEFT | RIGHT)
input_size = 4
output_size = 2
# Deep Q Network Class
class DQN:
def __init__(self, var_names):
self.var_names = var_names
self._define_placeholders()
self._add_layers()
self._define_loss()
self._choose_optimizer()
self._initialize()
# Placeholders:
# Inputs: The place where we feed the Observations (States).
# Targets: Q_target = R + gamma*Q(s', a*).
def _define_placeholders(self):
self.inputs = tf.placeholder(tf.float32, shape=(None, input_size), name='inputs')
self.targets = tf.placeholder( tf.float32, shape=(None, output_size), name='targets')
# Layers:
# 4 Input Weights.
# 2 Biases.
# output = softmax(inputs*weights + biases).
# Weights and biases are initialized randomly.
def _add_layers(self):
w = tf.get_variable(name=self.var_names[0], shape=(input_size, output_size),
initializer=tf.initializers.random_uniform(minval=-1.0, maxval=1.0) )
b = tf.get_variable(name=self.var_names[1], shape=(output_size),
initializer=tf.initializers.random_uniform(minval=-1.0, maxval=1.0) )
self.outputs = tf.nn.softmax(tf.matmul(self.inputs, w) + b)
self.prediction = tf.argmax(self.outputs, 1)
# Loss = MSE.
def _define_loss(self):
self.mean_loss = tf.losses.mean_squared_error(labels=self.targets, predictions=self.outputs) / 2
# AdamOptimizer with starting learning rate: a = 0.005.
def _choose_optimizer(self):
self.optimizer = tf.train.AdamOptimizer(learning_rate=0.005).minimize(loss=self.mean_loss)
# Initializes the dqn's weights.
def _initialize(self):
initializer = tf.global_variables_initializer()
self.sess = tf.InteractiveSession()
self.sess.run(initializer)
# Get's current's DQN weights.
def get_weights(self):
return [ self.sess.run( tf.trainable_variables(var) )[0] for var in self.var_names ]
# Updates the weights of DQN.
def update_weights(self, new_weights):
variables = [tf.trainable_variables(name)[0] for name in self.var_names]
update = [ tf.assign(var, weight) for (var, weight) in zip(variables, new_weights) ]
self.sess.run(update)
# Predicts the best possible action from a state s.
# a* = argmax( Q(s) )
# Returns from Q(s), a*
def predict(self, states):
Q, actions = self.sess.run( [self.outputs, self.prediction],
feed_dict={self.inputs: states} )
return Q, actions
# It partially fits the given observations and the targets into the network.
def partial_fit(self, states, targets):
_, loss = self.sess.run( [self.optimizer, self.mean_loss],
feed_dict={self.inputs: states, self.targets: targets} )
return loss
# Replay Memory Buffer
# It stores experiences as (s,a,r,s') --> (State, Action, Reward, Next_Action).
# It generates random mini-batches of experiences from the memory.
# If the memory is full, then it deletes the oldest experiences. Experience is an step.
class ReplayMemory:
def __init__(self, mem_size):
self.mem_size = mem_size
self.experiences = []
def add_experience(self, xp):
self.experiences.append(xp)
if len(self.experiences) > self.mem_size:
self.experiences.pop(0)
def random_batch(self, batch_size):
if len(self.experiences) < batch_size:
return self.experiences
else:
return rand.sample(self.experiences, batch_size)
# The agent's class.
# It contains 2 DQNs: Online DQN for Predictions and Target DQN for the targets.
class Agent:
def __init__(self, epsilon, epsilon_decay, min_epsilon, gamma, mem_size):
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.min_epsilon = min_epsilon
self.gamma = gamma
self.replay_mem = ReplayMemory(mem_size)
self.online_dqn = DQN( var_names=['online_w', 'online_b'] )
self.target_dqn = DQN( var_names=['target_w', 'target_b'] )
self.state = None
def set_epsilon(self, epsilon):
self.epsilon = epsilon
def reduce_epsilon(self):
if self.epsilon > self.min_epsilon:
self.epsilon -= self.epsilon_decay
def update_state(self, state):
self.state = state
def update_memory(self, state, action, reward, next_state):
experience = (state, action, reward, next_state)
self.replay_mem.add_experience(experience)
# It updates the target network after N steps.
def update_network(self):
self.target_dqn.update_weights( self.online_dqn.get_weights() )
# Randomly chooses an action from the enviroment.
def explore(self, env):
action = env.action_space.sample()
return action
# Predicts and chooses the best possible moves from the current state.
def exploit(self):
_, action = self.online_dqn.predict(self.state)
return action[0]
# Uses Epsilon-Greedy to decide whether to explore or exploit.
# Epsilon starts with 1 and is reduced over the time.
# After the agent makes a move, he returns: state, action, reward, next_state.
def take_action(self, env):
action = None
p = rand.uniform(0.0, 1.0)
if p < self.epsilon:
action = self.explore(env)
else:
action = self.exploit()
next_state, reward, done, _ = env.step(action)
if done:
next_state = None
else:
next_state = np.reshape( next_state, (1, input_size) )
return self.state, action, reward, next_state, done
# Trains the agent.
# A random mini-batch is generated from the memory.
# We feed each experience into the DQN.
# For each
# Q(s) = Qtarget(s)
# Q(s'), a* = Qtarget(s'), argmax Q(s')
# We set targets = Q(s')
# For each action (a), reward (r), next_state (s') in the batch:
# If s' is None the GameOver. So, we set target[i] = Reward
# If s' != None, then target[i][a] = r + gamma*Q(s', 'a')
# Then, the online DQN calculates the mean squared difference of r + gamma*Q(s', 'a') - Q(s, a)
# and uses Back-Propagation to update the weights.
def train(self):
mini_batch = self.replay_mem.random_batch(batch_size=256)
batch_size = len(mini_batch)
states = np.zeros( shape=(batch_size, input_size) )
next_states = np.zeros( shape=(batch_size, input_size) )
for i in range(batch_size):
states[i] = mini_batch[i][0]
next_states[i] = mini_batch[i][3]
Q, _ = self.target_dqn.predict(states)
next_Q, next_actions = self.target_dqn.predict(next_states)
targets = Q
for i in range(batch_size):
action = mini_batch[i][1]
reward = mini_batch[i][2]
next_state = mini_batch[i][3]
if next_state is None:
targets[i][action] = reward
else:
targets[i][action] = reward + self.gamma * next_Q[i][ next_actions[i] ]
loss = self.online_dqn.partial_fit(states, targets)
return loss
def play(agent, env, episodes, N, render=False, train=True):
ep = 0
episode_steps = []
steps = 0
total_steps = 0
loss = 0
# Sets the current state as the initial.
# Cartpole spawns the agent in a random state.
agent.update_state( np.reshape( env.reset(), (1, input_size) ) )
agent.update_network()
while ep < episodes:
if render:
env.render()
# The target DQN's weights are frozen.
# The agent Updates the Target DQN's Weights after 100 steps.
if train and total_steps % N == 0:
agent.update_network()
print('---Target network updated---')
# Takes action.
state, action, reward, next_state, done = agent.take_action(env)
# Updates the memory and the current state.
agent.update_memory(state, action, reward, next_state)
agent.update_state(next_state)
steps += 1
total_steps += 1
if train:
loss = agent.train()
if done:
agent.update_state( np.reshape( env.reset(), (1, input_size) ) )
episode_steps.append(steps)
ep += 1
if train:
agent.reduce_epsilon()
print('End of episode', ep, 'Training loss =', loss, 'Steps =', steps)
steps = 0
if render:
env.close()
return episode_steps
env = gym.make('CartPole-v0')
# Training the agent.
agent = Agent(epsilon=1, epsilon_decay = 0.01, min_epsilon = 0.05, gamma=0.9, mem_size=50000)
episodes = 1000
N = 100
episode_steps = play(agent, env, episodes, N)
# Plotting the results.
# After the training is done, the steps should be maximized (up to 200)
plt.plot(episode_steps)
plt.show()
# Testing the agent.
agent.set_epsilon(0)
episodes = 1
steps = play(agent, env, episodes, N, render=True, train=False)[0]
print('\nSteps =', steps)
The algorithm works quite well. When I decided to plot the data, I used as a metric:
Rewards / Episode
Most of Deep Reinforcement Learning Frameworks (e.g. tf-agents) use mean reward (e.g. mean reward per 10 episodes) and this is why the plots look so smooth. If You look at the above plot, The agent manages to get a high score most of the time.
Also, I have decided to improve the speed of the algorithm using numpy operations rather than "for" loops. You can check out my implementation here:
https://github.com/kochlisGit/Deep-Reinforcement-Learning/tree/master/Custom%20DQN