6

tl-dr version: why do the first 2 action/observations i take not line up with my first two objects in my replay buffer?

Do tf-agent replay buffers automatically shuffle data around?

by adding these prints im able to see what my first 2 steps look like

print("just addding this as traj num = "+str(num))
print(" next time step  = "+str(next_time_step))
replay_buffer.add_batch(traj)

this produces

just addding this as traj num = 0
 next time step  = TimeStep(
{'discount': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.], dtype=float32)>,
 'observation': <tf.Tensor: shape=(1, 1, 5, 5), dtype=float32, numpy=
array([[[[0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0.],
         [0., 0., 0., 1., 0.]]]], dtype=float32)>,
 'reward': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-0.05], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1,), dtype=int32, numpy=array([2])>})
just addding this as traj num = 1
 next time step  = TimeStep(
{'discount': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.], dtype=float32)>,
 'observation': <tf.Tensor: shape=(1, 1, 5, 5), dtype=float32, numpy=
array([[[[0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0.],
         [0., 0., 1., 1., 0.]]]], dtype=float32)>,
 'reward': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-0.05], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1,), dtype=int32, numpy=array([2])>})

a few lines later when i have my data set as an iterator, i once again explicitly print the first data points. (i have set my batch size to 3 so we should get the first 3 results, we seem to get 3 copies of the first result)

Trajectory(
{'action': <tf.Tensor: shape=(3, 1), dtype=int32, numpy=
array([[3],
       [0],
       [0]])>,
 'discount': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[0.],
       [0.],
       [0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(3, 1), dtype=int32, numpy=
array([[2],
       [2],
       [2]])>,
 'observation': <tf.Tensor: shape=(3, 1, 1, 5, 5), dtype=float32, numpy=
array([[[[[0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.]]]],



       [[[[0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.]]]],



       [[[[0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.],
          [0., 0., 0., 0., 0.]]]]], dtype=float32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[-1.  ],
       [-0.05],
       [ 1.  ]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(3, 1), dtype=int32, numpy=
array([[0],
       [0],
       [0]])>})
### experience 1 above
### experience 1 above
### experience 1 above

the experiences are blank. if we continue to iterate through we continue to get the same results.

what is wrong with this? How can i keep the data in the replay buffer in the same order i collected it?

########################

reproducable example below (much of the game has been gutted to make it smaller)

import tensorflow as tf
from tf_agents.networks import q_network
from tf_agents.agents.dqn import dqn_agent
import tf_agents
import tf_agents.environments.py_environment as PyEnvironment 
from tf_agents.trajectories import time_step as ts
import numpy as np
import keras 
import tf_agents.policies.random_tf_policy as random_tf_policy
import tf_agents.environments as tf_py_environment
import numpy as np
import random
import copy

class simple_slots():
    def __init__(self, x, y):
        self.x_rows = x
        self.y_rows = y
        self.slots = []
        for i in range(x):
            ys=[]
            for j in range(y):
                ys.append(0)
            self.slots.append(ys)
    def new_game(self):
        for xs in self.slots:
            for slot in xs:
                slot = 0
        
    def find_lowest_slot(self , x):
        lowest_y = 0
        best_slot = "none"
        for slot, y_ind in zip(reversed(self.slots[x]), reversed(range(len(self.slots[x])))):
            if slot == 0:
                if y_ind > lowest_y:
                    lowest_y = y_ind
                    best_slot = slot
        if best_slot != "none":
            return lowest_y
        return False
    
    def ml_plays_turn(self, action):
        y = self.find_lowest_slot(action)
        self.slots[action][y] = 1
    
    def script_plays_turn(self, action = 5):
        y = self.find_lowest_slot(action)
        self.slots[action][y] = 2
        
    def arbirtrarily_decide_if_game_over(self):
        if random.random() < 0.2:
            reward = 1
        elif  random.random() < 0.5:
            reward = -1
        else:
            reward = 0
        return reward

class Con4Env(PyEnvironment.PyEnvironment):
    
    def __init__(self, game):
        self.game = game
        self._action_spec = tf_agents.specs.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=0, maximum=game.x_rows-1 , name='action')
        self._observation_spec = tf_agents.specs.BoundedArraySpec(
            shape=(1, game.x_rows,game.y_rows), dtype=np.float32, minimum=0, name='observation')
        self._state = np.zeros((game.x_rows,game.y_rows) , dtype=np.float32)
        self._time_step_spec = ts.time_step_spec(self._observation_spec)
        self._episode_ended = False
        
    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        self._state = np.zeros((game.x_rows,game.y_rows) , dtype=np.float32)
        self._episode_ended = False
        return ts.restart(np.array([self._state], dtype=np.float32))
    
    def copy_gameboard_to_state(self):
        for ys, yind in zip(self.game.slots, range(len(self.game.slots))):
            for x , xind in zip(ys, range(len(ys))):
                self._state[xind][yind] = x

    def _step(self, action):
        if self._episode_ended:
            return self.reset()
                               
        reward = self.game.arbirtrarily_decide_if_game_over()
        if reward != 0:
            self._episode_ended = True
            
            
        elif self.game.ml_plays_turn(action):
            self.game.script_plays_turn()
            self.copy_gameboard_to_state()
        else:
            reward = -0.05 #### column full,     call it draw 
            self._episode_ended = True
                               
        if self._episode_ended: #### if game was ended last round the reward then we go in here 1 last time                             
            self.game.new_game()
            self.copy_gameboard_to_state()
            return ts.termination(np.array([self._state], dtype=np.float32), reward)
        else:
            self.copy_gameboard_to_state()
            return ts.transition(np.array([self._state], dtype=np.float32), reward=0.0, discount=0.0)
            
game = simple_slots(5,5)
the_env = Con4Env(game)
eval_env = Con4Env(game)
the_env = tf_py_environment.TFPyEnvironment(the_env)
eval_env = tf_py_environment.TFPyEnvironment(eval_env)

#    create time_step_spec
from tf_agents.utils import common
step_type_spec = tf.TensorSpec(shape=(), dtype=tf.dtypes.int32, name='step_type') # just declare a time step spec 
reward_spec= tf.TensorSpec(shape=(), dtype=tf.dtypes.float32, name='reward_spec')
discount_spec= tf.TensorSpec(shape=(), dtype=tf.dtypes.float32, name='discount_spec')
time_step_spec = tf_agents.trajectories.TimeStep( step_type_spec ,reward_spec, discount_spec, the_env.observation_spec() )

#####################################################################

q_net = tf_agents.networks.q_network.QNetwork(
    input_tensor_spec = the_env.observation_spec(),
    action_spec = the_env.action_spec(),
    preprocessing_layers=None,
    preprocessing_combiner=None,
    conv_layer_params=None,
    fc_layer_params=(75, 40),
    dropout_layer_params=None,
    activation_fn=tf.keras.activations.relu,
    kernel_initializer=None,
    batch_squash=True,
    dtype=tf.float32,
    q_layer_activation_fn=None,
    name='QNetwork'
)

train_step_counter = tf.Variable(0)
gamma = 0.99
min_q_value = -20 
max_q_value = 20  
n_step_update = 2 
agent = dqn_agent.DqnAgent( 
    time_step_spec , 
    the_env.action_spec() , 
    q_net, 
    optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=0.000001),
    n_step_update=n_step_update,
    td_errors_loss_fn=common.element_wise_squared_loss,
    gamma=gamma,
    train_step_counter=train_step_counter
)

random_policy = random_tf_policy.RandomTFPolicy(time_step_spec, the_env.action_spec())


# data collector 

data_spec=agent.collect_data_spec
print(data_spec)
from tf_agents.utils import common
import copy
replay_buffer_capacity = 999
initial_collect_steps = 50
batch_size = 3
n_step_update = 1
num_parallel_calls = 2
replay_buffer = tf_agents.replay_buffers.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=the_env.batch_size,
    max_length=replay_buffer_capacity
)

def collect_step(environment, policy, num):
    if environment.current_time_step().is_last():
        time_step = environment.reset()
    else:
        time_step = environment.current_time_step()
    action_step = policy.action(time_step)
    next_time_step = environment.step(action_step.action)
    traj = tf_agents.trajectories.from_transition(time_step, action_step, next_time_step)
    print("just addding this as traj num = "+str(num))
    print(" next time step  = "+str(next_time_step))
    replay_buffer.add_batch(traj)

nom = 0 
for _ in range(initial_collect_steps):
    collect_step(the_env, random_policy , nom)
    nom+=1

#    after each step check to see if data is in order 

dataset = replay_buffer.as_dataset(
num_parallel_calls=num_parallel_calls, 
sample_batch_size=batch_size,
num_steps=n_step_update).prefetch(9)

iterator = iter(dataset)
experience, unused_info = next(iterator)
print(experience)#### why is this thing out of order
for i in range(3):
    print("### experience 1 above")
experience, unused_info = next(iterator)
print(experience)#### why is this thing out of order
for i in range(3):
    print("### experience 2 above")
tgm_learn
  • 61
  • 7

1 Answers1

0

Late response for those still looking.

    replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec,
    batch_size=batch_size,
    max_length=max_length)

    dataset = replay_buffer.as_dataset(sample_batch_size=32, num_steps=2, num_parallel_calls=1, single_deterministic_pass=True)

    iterator = iter(dataset)
    sample = next(iterator)

Notes about the code:

  1. num_steps in the .as_dataset() call tells how man sequential rows to pull. This is helpful in models that rely on both the current step and next step. (Not applicable in your case since n_step_update=1,), but mentioned for generality. The behavior isn't necessarily intuitive.

Consider a batch size 32.

replay_buffer.add_batch(my_batch_32) replay_buffer.add_batch(my_new_batch_32)

num_steps would pull the first element from the first batch and the second elmement from the second batch.

Here are my notes detailing this process and how to use it.

def get_buffer():
    data_spec =  (
        tf.TensorSpec([3], tf.float32, 'action')
    )

    batch_size = 32
    max_length = 1000

    replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
        data_spec,
        batch_size=batch_size,
        max_length=max_length)


    for i in tf.range(1,2, dtype=np.float32):
        action = tf.cast(i,dtype=np.float32) * np.ones(
            data_spec.shape.as_list(), dtype=np.float32)

        values = (action)
        values_batched = tf.nest.map_structure(lambda t: tf.stack([t] * batch_size),
                                           values)
        print(values_batched)
        replay_buffer.add_batch(values_batched)

    return replay_buffer

replay_buffer = get_buffer()
dataset = replay_buffer.as_dataset(sample_batch_size=32, num_steps=2, num_parallel_calls=1, single_deterministic_pass=True)

sampler = iter(dataset)

# note because dataset num_steps=2, this will generate a sample of size (sample_batch_size, num_steps, size_action_space)
# this means if batch 1 is all tf.ones() and batch 2 is 2* tf.ones(), the first element with be (1,2), NOT (1,1). ie you will get 32 (1,2) records
sample, _ = next(sampler)

    """--------------------------------------------------------------------------------------------------------
Observations of Replay Buffer use:
- Replay buffer batch size tells how many items should be added at a single time to the buffer
    -If you don't add that many during an add_batch() call, it fails
- Dataset single_Deterministic_pass can be used to get a random sample if False, or a deterministic sample if True
- as_dataset()
    - Batch size tells how many to records to pull. It does not have to be the same as replay_buffer batch size
    - num_steps tells how many consecutive rows to combine into a single "row". Some loss functions (SAC) require two
      which pertain to the current and next time steps
- next(iter(dataset)) gets the first value from a batch, then the next from the following batch when considering num_steps.
    -This implies that a batch should be thought of as the SAME time step?
    - when sampling with deterministic=False, it can sample the same element multiple times.
- next() throws an OutOfRangeError()
--------------------------------------------------------------------------------------------------------"""
brian_ds
  • 317
  • 4
  • 12