0

Trying to implement DDPG in 2D boid flocking. Error: Window Unresponsive

import pygame
import random
import time
import gym
from gym import spaces
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# Initialize Pygame
pygame.init()

# Set up the display
width, height = 800, 600
screen = pygame.display.set_mode((width, height))
clock = pygame.time.Clock()

# Define UI text style
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
font = pygame.font.SysFont(None, 30)

# Define Boid class
class Boid:
    def __init__(self):
        self.position = pygame.Vector2(random.randint(0, width), random.randint(0, height))
        self.velocity = pygame.Vector2(random.uniform(-1, 1), random.uniform(-1, 1))
        self.max_velocity = 2

    def update(self, boids):
        neighbors = self.get_closest_neighbors(boids, 6)
        self.flock(neighbors)
        self.position += self.velocity
        self.check_boundaries()

    def flock(self, neighbors):
        alignment = self.align(neighbors)
        cohesion = self.cohere(neighbors)
        separation = self.separate(neighbors)

        self.velocity += alignment + cohesion + separation
        self.velocity.scale_to_length(self.max_velocity)

    def align(self, neighbors):
        if len(neighbors) > 0:
            average_velocity = pygame.Vector2(0, 0)
            for neighbor in neighbors:
                average_velocity += neighbor.velocity
            average_velocity /= len(neighbors)
            average_velocity.scale_to_length(self.max_velocity)
            alignment = average_velocity - self.velocity
            return alignment
        else:
            return pygame.Vector2(0, 0)

    def cohere(self, neighbors):
        if len(neighbors) > 0:
            center_of_mass = pygame.Vector2(0, 0)
            for neighbor in neighbors:
                center_of_mass += neighbor.position
            center_of_mass /= len(neighbors)
            cohesion = center_of_mass - self.position
            cohesion.scale_to_length(0.02)
            return cohesion
        else:
            return pygame.Vector2(0, 0)

    def separate(self, neighbors):
        separation_radius = 30
        separation_vector = pygame.Vector2(0, 0)
        for neighbor in neighbors:
            distance = self.position.distance_to(neighbor.position)
            if distance < separation_radius:
                separation_vector += self.position - neighbor.position
        if separation_vector.length() > 0:
            separation_vector.scale_to_length(0.1)
        return separation_vector

    def get_closest_neighbors(self, boids, num_neighbors):
        distances = [(boid, self.position.distance_to(boid.position)) for boid in boids if boid != self]
        distances.sort(key=lambda x: x[1])  # Sort by distance
        neighbors = [distance[0] for distance in distances[:num_neighbors]]
        return neighbors

    def check_boundaries(self):
        if self.position.x < 0 or self.position.x > width:
            self.velocity.x *= -1
        if self.position.y < 0 or self.position.y > height:
            self.velocity.y *= -1

    def draw(self):
        pygame.draw.circle(screen, WHITE, (int(self.position.x), int(self.position.y)), 3)


class BoidsEnv(gym.Env):
    def __init__(self):
        super(BoidsEnv, self).__init__()
        self.boids = []
        self.num_boids = 50
        self.collision_threshold = 5
        self.reward_range = (-float('inf'), float('inf'))
        self.action_space = spaces.Discrete(8)
        self.observation_space = spaces.Box(low=0, high=1, shape=(self.num_boids, 4), dtype=np.float32)
        self.reset()

    def step(self, action):
        self.update_boids(action)
        observation = self.get_observation()
        reward = self.calculate_reward()
        done = False  # Add termination condition if needed
        info = {}
        return observation, reward, done, info

    def reset(self):
        self.boids = [Boid() for _ in range(self.num_boids)]
        return self.get_observation()

    def render(self, mode='human'):
        screen.fill(BLACK)
        for boid in self.boids:
            boid.draw()

        count_label = font.render("Number of Boids: {}".format(len(self.boids)), True, WHITE)
        screen.blit(count_label, (10, 10))

        reward = self.calculate_reward()
        reward_label = font.render("Reward: {}".format(reward), True, WHITE)
        screen.blit(reward_label, (10, 40))

        pygame.display.flip()
        clock.tick(60)

    def close(self):
        pygame.quit()

    def update_boids(self, action):
        for boid in self.boids:
            boid.update(self.boids)
            self.handle_collision(boid)

    def handle_collision(self, boid):
        for other in self.boids:
            if boid != other and boid.position.distance_to(other.position) < self.collision_threshold:
                # Add collision handling logic here
                pass

    def get_observation(self):
        observation = np.zeros((self.num_boids, 4), dtype=np.float32)
        for i, boid in enumerate(self.boids):
            observation[i] = [boid.position.x / width, boid.position.y / height,
                              boid.velocity.x / boid.max_velocity, boid.velocity.y / boid.max_velocity]
        return observation

    def calculate_reward(self):
        num_boids = len(self.boids)
        if num_boids <= 1:
            return 0

        total_reward = 0

        for boid in self.boids:
            cohesion_reward = self.calculate_cohesion_reward(boid)
            separation_reward = self.calculate_separation_reward(boid)

            total_reward += cohesion_reward + separation_reward

        return total_reward

    def calculate_cohesion_reward(self, boid):
        if len(self.boids) > 1:
            closest_neighbors = boid.get_closest_neighbors(self.boids, 6)
            center_of_mass = pygame.Vector2(0, 0)
            for neighbor in closest_neighbors:
                center_of_mass += neighbor.position
            center_of_mass /= len(closest_neighbors)
            distance_to_center = boid.position.distance_to(center_of_mass)
            if distance_to_center <= 50:
                return 1
        return -1

    def calculate_separation_reward(self, boid):
        separation_distance = 20
        closest_neighbors = boid.get_closest_neighbors(self.boids, 6)
        for neighbor in closest_neighbors:
            distance = boid.position.distance_to(neighbor.position)
            if distance < separation_distance:
                return -1
        return 1


class Actor(nn.Module):
    def __init__(self,state_size, action_size, hidden_size):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, action_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        return x


# Define the critic network
class Critic(nn.Module):
    def __init__(self, state_size, action_size, hidden_size):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_size + action_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, 1)

    def forward(self, x, a):
        x = self.fc1(torch.cat([x, a], dim=1))
        x = F.relu(x)
        x = self.fc2(x)
        return x

# Create the environment
env = BoidsEnv()

# Set hyperparameters
state_size = env.observation_space.shape[1]
action_size = env.action_space.n
hidden_size = 128
lr_actor = 0.001
lr_critic = 0.001
gamma = 0.99
tau = 0.001
buffer_size = 10000
batch_size = 128

# Create the actor and critic networks
actor = Actor(state_size, action_size, hidden_size)
critic = Critic(state_size, action_size, hidden_size)

# Create target networks
target_actor = Actor(state_size, action_size, hidden_size)
target_critic = Critic(state_size, action_size, hidden_size)

# Copy the initial network parameters to the target networks
target_actor.load_state_dict(actor.state_dict())
target_critic.load_state_dict(critic.state_dict())

# Create the replay buffer
replay_buffer = []

# Create the optimizer
actor_optimizer = optim.Adam(actor.parameters(), lr=lr_actor)
critic_optimizer = optim.Adam(critic.parameters(), lr=lr_critic)

# Define the DDPG update function
def ddpg_update():
    if len(replay_buffer) < batch_size:
        return

    # Sample a batch from the replay buffer
    batch = random.sample(replay_buffer, batch_size)
    state_batch = torch.tensor([transition[0] for transition in batch], dtype=torch.float32)
    action_batch = torch.tensor([transition[1] for transition in batch], dtype=torch.float32)
    reward_batch = torch.tensor([transition[2] for transition in batch], dtype=torch.float32)
    next_state_batch = torch.tensor([transition[3] for transition in batch], dtype=torch.float32)
    done_batch = torch.tensor([transition[4] for transition in batch], dtype=torch.float32)

    # Compute the target Q value
    with torch.no_grad():
        next_actions = target_actor(next_state_batch)
        next_state_values = target_critic(next_state_batch, next_actions)
    target_q_values = reward_batch + gamma * (1 - done_batch) * next_state_values

    # Update the critic
    critic_optimizer.zero_grad()
    state_values = critic(state_batch, action_batch)
    critic_loss = F.mse_loss(state_values, target_q_values.unsqueeze(1))
    critic_loss.backward()
    critic_optimizer.step()

    # Update the actor
    actor_optimizer.zero_grad()
    actions = actor(state_batch)
    actor_loss = -critic(state_batch, actions).mean()
    actor_loss.backward()
    actor_optimizer.step()

    # Update the target networks
    for param, target_param in zip(actor.parameters(), target_actor.parameters()):
        target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
    for param, target_param in zip(critic.parameters(), target_critic.parameters()):
        target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

# Set hyperparameters
state_size = env.observation_space.shape[1]
action_size = env.action_space.n
hidden_size = 128
lr_actor = 0.001
lr_critic = 0.001
gamma = 0.99
tau = 0.001
buffer_size = 10000
batch_size = 128

# Create the actor and critic networks
actor = Actor(state_size, action_size, hidden_size)
critic = Critic(state_size, action_size, hidden_size)

# Create target networks
target_actor = Actor(state_size, action_size, hidden_size)
target_critic = Critic(state_size, action_size, hidden_size)

# Copy the initial network parameters to the target networks
target_actor.load_state_dict(actor.state_dict())
target_critic.load_state_dict(critic.state_dict())

# Create the optimizer
actor_optimizer = optim.Adam(actor.parameters(), lr=lr_actor)
critic_optimizer = optim.Adam(critic.parameters(), lr=lr_critic)

# Training loop
total_episodes = 1000
for episode in range(total_episodes):
    state = env.reset()
    episode_reward = 0

    while True:
        action = actor(torch.tensor(state, dtype=torch.float32))
        action = action.detach().numpy()
        next_state, reward, done, _ = env.step(action)

Pygame for visualization. My pygame window crashes unexpectedly and processor utilization increases but no error at all. Had similar bug with DQN, thought too heavy but ddpg has same bug or whatever.

No other symptoms. PC 6 core 12 threads 16 gb memory 1650 mobile

Rabbid76
  • 202,892
  • 27
  • 131
  • 174
  • You need to be handling events, otherwise your desktop windows manager will think your application has crashed and set its window title appropriately. If you add [`pygame.event.pump()`](https://www.pygame.org/docs/ref/event.html#pygame.event.pump) to your `while True` loop that might be enough to get functional. See [this question](https://stackoverflow.com/q/20165492/2280890) – import random Jul 06 '23 at 04:41
  • So @importrandom edit my code like this? while True: pygame.event.pump() action = actor(torch.tensor(state, dtype=torch.float32)) action = action.detach().numpy() next_state, reward, done, _ = env.step(action) # Add the transition to the replay buffer –  Jul 06 '23 at 09:07

0 Answers0