Atari Space Invaders and Dueling Q RL in TensorFlow 2

In previous posts (here and here) I introduced Double Q learning and the Dueling Q architecture. These followed on from posts about deep Q learning, and showed how double Q and dueling Q learning is superior to vanilla deep Q learning. However, these posts only included examples of simplistic environments like the OpenAI Cartpole environment. These types of environments are good to learn on, but more complicated environments are both more interesting and fun. They also demonstrate better the complexities of implementing deep reinforcement learning in realistic cases. In this post, I’ll use similar code to that shown in my Dueling Q TensorFlow 2 but in this case apply it to the Open AI Atari Space Invaders environment. All code for this post can be found on this site’s Github repository. Also, as mentioned in the title, the example code for this post is written using TensorFlow 2. TensorFlow 2 is now released and installation instructions can be found here.

Double and Dueling Q learning recap

Double Q recap

Double Q learning was created to address two problems with vanilla deep Q learning. These are:

1. Using the same network to both choose the best action and evaluate the quality of that action is a source of feedback / learning instability.
2. The max function used in calculating the target Q value (see formula below), which the neural network is to learn, tends to bias the network towards high, noisy, rewards. This again hampers learning and makes it more erratic

The problematic Bellman equation is shown below: $$Q_{target} = r_{t+1} + \gamma \max_{{a}}Q(s_{t+1}, a;\theta_t)$$ The Double Q solution to the two problems above involves creating another target network, which is initially created with weights equal to the primary network. However, during training the primary network and the target network are allowed to “drift” apart. The primary network is trained as per usual, but the target network is not. Instead, the target network weights are either periodically (but not frequently) set equal to the primary network weights, or they are only gradually “blended” with the primary network in a weighted average fashion. The benefit then comes from the fact that in Double Q learning, the Q value of the best action in the next state ($s_{t + 1}$) is extracted from the target network, not the primary network. The primary network is still used to evaluate what the best action will be, a*, by taking an argmax of the outputs from the primary network, but the Q value for this action is evaluated from the target network. This can be observed in the formulation below: $$a* = argmax Q(s_{t+1}, a; \theta_t)$$ $$Q_{target} = r_{t+1} + \gamma Q(s_{t+1}, a*; \theta^-_t)$$ Notice the different weights involved in the formulas above – the best action, a*, is calculated from the network with $\theta_t$ weights – this is the primary network weights. However the $Q_{target}$ calculation uses the target network, with weights $\theta^-_t$, to estimate the Q value for this chosen action. This Double Q methodology decouples the choosing of an action from the evaluation of the Q value of such an action. This provides more stability to the learning – for more details and a demonstration of the superiority of the Double Q methodology over vanilla Deep Q learning, see this post.

Dueling Q recap

The Dueling Q architecture, discussed in detail in this post, is an improvement to the Double Q network. It uses the same methodology of a target and a primary network, with periodic updates or blending of the target network weights to the primary network weights. However, it builds two important concepts into the architecture of the network. These are the advantage and value functions:

• Advantage function A(s, a): The advantage function is the relative benefit of choosing a certain action in state s over the other possible actions in state s
• Value function V(s): The value function is the value of being in state s, independent of the relative benefits of the actions within that state

The Q function is the simple addition of these two functions: $$Q(s, a) = V(s) + A(s, a)$$ The motivation of splitting these two functions explicitly in the architecture is that there can be inherently good or bad states for the agent to be in, regardless of the relative benefit of any actions within that state. For instance, in a certain state, all actions may lead to the agent “dying” in a game – this is an inherently bad state to be in, and there is no need to waste computational resources trying to determine the best action in this state. The converse can also be true. Ideally, this “splitting” into the advantage function and value function should be learnt implicitly during training. However, the Dueling Q architecture makes this split explicit, which acts to improve training. The Dueling Q architecture can be observed in the figure below:

Dueling Q architecture

It can be observed that in the Dueling Q architecture, there are common Convolutional Neural Network layers which perform image processing. The output from these layers is then flattened and the network then bifurcates into a Value function stream V(s) and an Advantage function stream A(s, a). The output of these separate streams are then aggregated in a special layer, before finally outputting Q values from the network. The aggregation layer does not perform a simple addition of the Value and Advantage streams – this would result in problems of identifiability (for more details on this, see the original Dueling Q post). Instead, the following aggregation function is performed: $$Q(s,a) = V(s) + A(s,a) – \frac{1}{\|a\|}\sum_{a’}A(s,a’)$$ In this post, I’ll demonstrate how to use the Dueling Q architecture to train an agent in TensorFlow 2 to play Atari Space Invaders. However, in this post I will concentrate on the extra considerations required to train the agent via an image stream from an Atari game. For more extra details, again, refer to the original Dueling Q post.

Considerations for training in an Atari environment

Training reinforcement learning agents on Atari environments is hard – it can be a very time consuming process as the environment complexity is high, especially when the agent needs to visually interpret objects direct from images. As such, each environment needs to be considered to determine legitimate ways of reducing the training burden and improving the performance. Three methods will be used in this post:

1. Converting images to greyscale
2. Reducing the image size
3. Stacking frames

Converting Atari images to greyscale and reducing the image size

The first, relatively easy, step in reducing the computational training burden is to convert all the incoming Atari images from depth-3 RGB colour images to depth-1 greyscale images. This reduces the number of input CNN filters required in the first layer by 3. Another step which can be performed to reduce the size of the input CNN filters is to resize the image inputs to make them smaller. There is obviously a limit in the reduction of the image sizes before learning performance is affected, however, in this case, a halving of the image size by rescaling is possible without affecting performance too much. The original image sizes from the Atari Space Invaders game are (210, 160, 3) – after converting to greyscale and resizing by half, the new image size is (105, 80, 1). Both of these operations are easy enough to implement in TensorFlow 2:

def image_preprocess(image, new_size=(105, 80)):
# convert to greyscale, resize and normalize the image
image = tf.image.rgb_to_grayscale(image)
image = tf.image.resize(image, new_size)
image = image / 255
return image

Stacking image frames

The next step that is commonly performed when training agents on Atari games is the practice of stacking image frames, and feeding all these frames into the input CNN layers. The purpose of this is to allow the neural network to get some sense of direction of the objects moving within the image. Consider a single, static image – examining such an image on its own will give no information about which direction any of the objects moving within this image are travelling (or their respective speeds). Therefore, for each sample fed into the neural network, a stack of frames is presented to the input – this gives the neural network both time and spatial information to work with. The input dimension to the network are not, then, of size (105, 80, 1) but rather (105, 80, NUM_FRAMES). In this case, we’ll use 3 frames to feed into the network i.e. NUM_FRAMES = 3. The specifics of how these stacked frames are stored, extracted and updated will be revealed as we step through the code in the next section. Additional steps can be taken to improve performance in complex Atari environment and similar cases. These include the skipping of frames and prioritised experience replay (PER). However, these have not been implemented in this example. A future post will discuss the benefits of PER and how to implement it.

Atari Space Invaders TensorFlow 2 implementation

The section below details the TensorFlow 2 implementation of training an agent on the Atari Space Invaders environment. In this post, comprehensive details of the Dueling Q architecture and training implementation will not be given – for a step by step discussion on these details, see my Dueling Q introductory post. However, detailed information will be given about the specific new steps required to train in the Atari environment. As stated at the beginning of the post, all code can be found on this site’s Github repository.

Model definition

First we define the Double/Dueling Q model class with its structure:

env = gym.make("SpaceInvaders-v0")
num_actions = env.action_space.n

class DQModel(keras.Model):
def __init__(self, hidden_size: int, num_actions: int, dueling: bool):
super(DQModel, self).__init__()
self.dueling = dueling
self.conv1 = keras.layers.Conv2D(16, (8, 8), (4, 4), activation='relu')
self.conv2 = keras.layers.Conv2D(32, (4, 4), (2, 2), activation='relu')
self.flatten = keras.layers.Flatten()
kernel_initializer=keras.initializers.he_normal())
kernel_initializer=keras.initializers.he_normal())
if dueling:
self.v_dense = keras.layers.Dense(hidden_size, activation='relu',
kernel_initializer=keras.initializers.he_normal())
self.v_out = keras.layers.Dense(1, kernel_initializer=keras.initializers.he_normal())
self.lambda_layer = keras.layers.Lambda(lambda x: x - tf.reduce_mean(x))
def call(self, input):
x = self.conv1(input)
x = self.conv2(x)
x = self.flatten(x)
if self.dueling:
v = self.v_dense(x)
v = self.v_out(v)
return combined
primary_network = DQModel(256, num_actions, True)
target_network = DQModel(256, num_actions, True)
# make target_network = primary_network
for t, e in zip(target_network.trainable_variables, primary_network.trainable_variables):
t.assign(e)


In the code above, first the Space Invaders environment is created. After this, the DQModel class is defined as a keras.Model base class. In this model, you can observe that first a number of convolutional layers are created, then a flatten layer and dedicated fully connected layers to enact the value and advantage streams. This structure is then implemented in the model call function. After this model class has been defined, two versions of it are implemented corresponding to the primary_network and the target_network – as discussed above, both of these will be utilised in the Double Q component of the learning. The target_network weights are then set to be initially equal to the primary_network weights. Finally the primary_network is compiled for training using an Adam optimizer and a Huber loss function. As stated previously, for more details see this post.

The Memory class

Next we will look at the Memory class, which is to hold all the previous experiences of the agent. This class is a little more complicated in the Atari environment case, due to the necessity of dealing with stacked frames:

class Memory:
def __init__(self, max_memory):
self._max_memory = max_memory
self._actions = np.zeros(max_memory, dtype=np.int32)
self._rewards = np.zeros(max_memory, dtype=np.float32)
self._frames = np.zeros((POST_PROCESS_IMAGE_SIZE[0], POST_PROCESS_IMAGE_SIZE[1], max_memory), dtype=np.float32)
self._terminal = np.zeros(max_memory, dtype=np.bool)
self._i = 0

In the class __init__ function, it can be observed that all the various memory buffers (for actions, rewards etc.) are initialized according to max_memory at the get-go. This is in opposition to a memory approach which involves appending to lists. This is performed so that it can be determined whether there will be a memory problem during training from the very beginning (as opposed to the code falling over after you’ve already been running it for 3 days!). It also increases the efficiency of the memory allocation process (as appending / growing memory dynamically is an inefficient process). You’ll also observe the creation of a counter variable, self._i. This is to record the present location of stored samples in the memory buffer, and will ensure that the memory is not overflowed. The next function within the class shows how samples are stored within the class:

def add_sample(self, frame, action, reward, terminal):
self._actions[self._i] = action
self._rewards[self._i] = reward
self._frames[:, :, self._i] = frame[:, :, 0]
self._terminal[self._i] = terminal
if self._i % (self._max_memory - 1) == 0 and self._i != 0:
self._i = BATCH_SIZE + NUM_FRAMES + 1
else:
self._i += 1

As will be shown shortly, for every step in the Atari environment, the current image frame, the action taken, the reward received and whether the state is terminal (i.e. the agent ran out of lives and the game ends) is stored in memory. Notice that nothing special as yet is being done with the stored frames – they are simply stored in order as the game progresses. The frame stacking process occurs during the sample extraction method to be covered next. One thing to notice is that once self._i reaches max_memory the index is reset back to the beginning of the memory buffer (but offset by the batch size and the number of frames). This reset means that, once the memory buffer reaches it’s maximum size, it will begin to overwrite the older samples. The next method in the class governs how random sampling from the memory buffer occurs:

def sample(self):
if self._i < BATCH_SIZE + NUM_FRAMES + 1:
raise ValueError("Not enough memory to extract a batch")
else:
rand_idxs = np.random.randint(NUM_FRAMES + 1, self._i, size=BATCH_SIZE)
states = np.zeros((BATCH_SIZE, POST_PROCESS_IMAGE_SIZE[0], POST_PROCESS_IMAGE_SIZE[1], NUM_FRAMES),
dtype=np.float32)
next_states = np.zeros((BATCH_SIZE, POST_PROCESS_IMAGE_SIZE[0], POST_PROCESS_IMAGE_SIZE[1], NUM_FRAMES),
dtype=np.float32)
for i, idx in enumerate(rand_idxs):
states[i] = self._frames[:, :, idx - 1 - NUM_FRAMES:idx - 1]
next_states[i] = self._frames[:, :, idx - NUM_FRAMES:idx]
return states, self._actions[rand_idxs], self._rewards[rand_idxs], next_states, self._terminal[rand_idxs]

First, a simple check is performed to ensure there are enough samples in the memory to actually extract a batch. If so, a set of random indices rand_idxs is selected. These random integers are selected from a range with a lower bound of NUM_FRAMES + 1 and an upper bound of self._i. In other words, it is possible to select any indices from the start of the memory buffer to the current filled location of the buffer – however, because NUM_FRAMES of images prior to the selected indices is extracted, indices less than NUM_FRAMES are not allowed. The number of random indices selected is equal to the batch size.

Next, some numpy arrays are initialised which will hold the current states and the next states – in this example, these are of size (32, 105, 80, 3) where 3 is the number of frames to be stacked (NUM_FRAMES). A loop is then entered into for each of the randomly selected memory indices. As can be observed, the states batch row is populated by the stored frames ranging from idx – 1 – NUM_FRAMES to idx – 1. In other words, it is the 3 frames including and prior to the randomly selected index idx – 1. Alternatively, the batch row for next_states is the 3 frames including and prior to the randomly selected index idx (think of a window of 3 frames shifted along by 1 position). These variables states and next_states are then returned from this function, along with the corresponding actions, rewards and terminal flags. The terminal flags communicate whether the game finished for during the randomly selected states. Finally, the memory class is instantiated with the memory size as the argument:

memory = Memory(200000)

The memory size should ideally be as large as possible, but considerations must be given to the amount of memory available on whatever computing platform is being used to run the training.

Miscellaneous functions

The following two functions are standard functions to choose the actions and update the target network:

def choose_action(state, primary_network, eps, step):
if step < DELAY_TRAINING:
return random.randint(0, num_actions - 1)
else:
if random.random() < eps:
return random.randint(0, num_actions - 1)
else:
return np.argmax(primary_network(tf.reshape(state, (1, POST_PROCESS_IMAGE_SIZE[0],
POST_PROCESS_IMAGE_SIZE[1], NUM_FRAMES)).numpy()))

def update_network(primary_network, target_network):
# update target network parameters slowly from primary network
for t, e in zip(target_network.trainable_variables, primary_network.trainable_variables):
t.assign(t * (1 - TAU) + e * TAU)

The choose_action function performs the  epsilon-greedy action selection policy, where a random action is selected if a random value falls below eps, otherwise it is selected by choosing the action with the highest Q value from the network. The update_network function slowly shifts the target network weights towards the primary network weights in accordance with the Double Q learning methodology. The next function deals with the “state stack” which is an array which holds the last NUM_FRAMES of the episode:

def process_state_stack(state_stack, state):
for i in range(1, state_stack.shape[-1]):
state_stack[:, :, i - 1].assign(state_stack[:, :, i])
state_stack[:, :, -1].assign(state[:, :, 0])
return state_stack


This function takes the existing state stack array, and the newest state to be added. It then shuffles all the existing frames within the state stack “back” one position. In other words, the most recent state, in this case, sitting in row 2 of the state stack, if shuffled back to row 1. The frame / state in row 1 is shuffled to row 0. Finally, the most recent state or frame is stored in the newly vacated row 2 of the state stack. The state stack is required so that it can be fed into the neural network in order to choose actions, and its updating can be observed in the main training loop, as will be reviewed shortly.

The Dueling Q / Double Q training function

Next up is the training function:

def train(primary_network, memory, target_network=None):
states, actions, rewards, next_states, terminal = memory.sample()
# predict Q(s,a) given the batch of states
prim_qt = primary_network(states)
# predict Q(s',a') from the evaluation network
prim_qtp1 = primary_network(next_states)
# copy the prim_qt tensor into the target_q tensor - we then will update one index corresponding to the max action
target_q = prim_qt.numpy()
valid_idxs = terminal != True
batch_idxs = np.arange(BATCH_SIZE)
if target_network is None:
updates[valid_idxs] += GAMMA * np.amax(prim_qtp1.numpy()[valid_idxs, :], axis=1)
else:
prim_action_tp1 = np.argmax(prim_qtp1.numpy(), axis=1)
q_from_target = target_network(next_states)
updates[valid_idxs] += GAMMA * q_from_target.numpy()[batch_idxs[valid_idxs], prim_action_tp1[valid_idxs]]
loss = primary_network.train_on_batch(states, target_q)
return loss

This train function is very similar to the train function reviewed in my first Dueling Q tutorial. Essentially, it first extracts batches of data from the memory buffer. Next the Q values from the current state (states) and the following states (next_states) are extracted from the primary network – these values are returned in prim_qt and prim_qtp1 respectively (where qtp1 refers to the Q values for the time t + 1). Next, the target Q values are initialized from the prim_qt values. After this, the updates variable is created – this holds the target Q values for the actions. These target values will be the Q values which the network will “step towards” during the optimization step – hence the name “target” Q values.

The variable valid_idxs specifies those indices which don’t include terminal states – obviously for terminal states (states where the game ended), there are no future rewards to discount from, so the target value for these states is the rewards value. For other states, which do have future rewards, these need to be discounted and added to the current reward for the target Q values. If no target_network is provided, it is assumed vanilla Q learning should be used to provide the discounted target Q values. If not, double Q learning is implemented.

According to that methodology, first the a* actions are selected which are those actions with the highest Q values in the next state (t + 1). These actions are taken from the primary network, using the numpy argmax function. Next, the Q values from the target network are extracted from the next state (t + 1). Finally, the updates value is incremented for valid indices by adding the discounted future Q values from the target network, for the actions a* selected from the primary network. Finally, the network is trained using the Keras train_on_batch function.

The main Atari training loop

Now it is time to review the main training loop:

num_episodes = 1000000
eps = MAX_EPSILON
render = False
train_writer = tf.summary.create_file_writer(STORE_PATH + f"/DuelingQSI_{dt.datetime.now().strftime('%d%m%Y%H%M')}")
double_q = True
steps = 0
for i in range(num_episodes):
state = env.reset()
state = image_preprocess(state)
state_stack = tf.Variable(np.repeat(state.numpy(), NUM_FRAMES).reshape((POST_PROCESS_IMAGE_SIZE[0],
POST_PROCESS_IMAGE_SIZE[1],
NUM_FRAMES)))
cnt = 1
avg_loss = 0
tot_reward = 0
if i % GIF_RECORDING_FREQ == 0:
frame_list = []
while True:
if render:
env.render()
action = choose_action(state_stack, primary_network, eps, steps)
next_state, reward, done, info = env.step(action)
tot_reward += reward
if i % GIF_RECORDING_FREQ == 0:
frame_list.append(tf.cast(tf.image.resize(next_state, (480, 320)), tf.uint8).numpy())
next_state = image_preprocess(next_state)
state_stack = process_state_stack(state_stack, next_state)
# store in memory
if steps > DELAY_TRAINING:
loss = train(primary_network, memory, target_network if double_q else None)
update_network(primary_network, target_network)
else:
loss = -1
avg_loss += loss
# linearly decay the eps value
if steps > DELAY_TRAINING:
eps = MAX_EPSILON - ((steps - DELAY_TRAINING) / EPSILON_MIN_ITER) * \
(MAX_EPSILON - MIN_EPSILON) if steps < EPSILON_MIN_ITER else \
MIN_EPSILON
steps += 1
if done:
if steps > DELAY_TRAINING:
avg_loss /= cnt
print(f"Episode: {i}, Reward: {tot_reward}, avg loss: {avg_loss:.5f}, eps: {eps:.3f}")
with train_writer.as_default():
tf.summary.scalar('reward', tot_reward, step=i)
tf.summary.scalar('avg loss', avg_loss, step=i)
else:
print(f"Pre-training...Episode: {i}")
if i % GIF_RECORDING_FREQ == 0:
record_gif(frame_list, i)
break
cnt += 1

This training loop is very similar to the training loop in my Dueling Q tutorial, so for a detailed review, please see that post. The main differences relate to how the frame stacking is handled. First, you’ll notice at the start of the loop that the environment is reset, and the first state / image is extracted. This state or image is pre-processed and then repeated NUM_FRAMES times and reshaped to create the first state or frame stack, of size (105, 80, 3) in this example. Another point to note is that a gif recording function has been created which is called every GIF_RECORDING_FREQ episodes. This function involves simply outputting every frame to a gif so that the training progress can be monitored by observing actual gameplay. As such, there is a frame list which is filled whenever each GIF_RECORDING_FREQ episode comes around, and this frame list is passed to the gif recording function. Check out the code for this tutorial for more details. Finally, it can be observed that after every state, the state stack is processed by shuffling along each recorded frame / state in that stack.

The image below shows how the training progresses through each episode with respect to the total reward received for each episode:

Atari Space Invaders – Dueling Q training reward

As can be observed from the plot above, the reward steadily increases over 1500 episodes of game play. Note – if you wish to replicate this training on your own, you will need GPU processing support in order to reduce the training timeframes to a reasonable level. In this case, I utilised the Google Cloud Compute Engine and a single GPU. The gifs below show the progress of the agent in gameplay between episode 50 and episode 1450:

Atari Space Invaders – gameplay episode 50

Atari Space Invaders – gameplay episode 1450

As can be observed, after 50 epsiodes the agent still moves around randomly and is quickly killed, achieving a score of only 60 points. However, after 1450 episodes, the agent can be seen to be playing the game much more effectively, even having learnt to destroy the occasional purple “master ship” flying overhead to gain extra points.

This post has demonstrated how to effectively train agents to operate in Atari environments such as Space Invaders. In particular it has demonstrated how to use the Dueling Q reinforcement learning algorithm to train the agent. A future post will demonstrate how to make the training even more efficient using the Prioritised Experience Replay (PER) approach.