Trying to implement DDPG in 2D boid flocking. Error: Window Unresponsive
import pygame
import random
import time
import gym
from gym import spaces
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# Initialize Pygame
pygame.init()
# Set up the display
width, height = 800, 600
screen = pygame.display.set_mode((width, height))
clock = pygame.time.Clock()
# Define UI text style
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
font = pygame.font.SysFont(None, 30)
# Define Boid class
class Boid:
def __init__(self):
self.position = pygame.Vector2(random.randint(0, width), random.randint(0, height))
self.velocity = pygame.Vector2(random.uniform(-1, 1), random.uniform(-1, 1))
self.max_velocity = 2
def update(self, boids):
neighbors = self.get_closest_neighbors(boids, 6)
self.flock(neighbors)
self.position += self.velocity
self.check_boundaries()
def flock(self, neighbors):
alignment = self.align(neighbors)
cohesion = self.cohere(neighbors)
separation = self.separate(neighbors)
self.velocity += alignment + cohesion + separation
self.velocity.scale_to_length(self.max_velocity)
def align(self, neighbors):
if len(neighbors) > 0:
average_velocity = pygame.Vector2(0, 0)
for neighbor in neighbors:
average_velocity += neighbor.velocity
average_velocity /= len(neighbors)
average_velocity.scale_to_length(self.max_velocity)
alignment = average_velocity - self.velocity
return alignment
else:
return pygame.Vector2(0, 0)
def cohere(self, neighbors):
if len(neighbors) > 0:
center_of_mass = pygame.Vector2(0, 0)
for neighbor in neighbors:
center_of_mass += neighbor.position
center_of_mass /= len(neighbors)
cohesion = center_of_mass - self.position
cohesion.scale_to_length(0.02)
return cohesion
else:
return pygame.Vector2(0, 0)
def separate(self, neighbors):
separation_radius = 30
separation_vector = pygame.Vector2(0, 0)
for neighbor in neighbors:
distance = self.position.distance_to(neighbor.position)
if distance < separation_radius:
separation_vector += self.position - neighbor.position
if separation_vector.length() > 0:
separation_vector.scale_to_length(0.1)
return separation_vector
def get_closest_neighbors(self, boids, num_neighbors):
distances = [(boid, self.position.distance_to(boid.position)) for boid in boids if boid != self]
distances.sort(key=lambda x: x[1]) # Sort by distance
neighbors = [distance[0] for distance in distances[:num_neighbors]]
return neighbors
def check_boundaries(self):
if self.position.x < 0 or self.position.x > width:
self.velocity.x *= -1
if self.position.y < 0 or self.position.y > height:
self.velocity.y *= -1
def draw(self):
pygame.draw.circle(screen, WHITE, (int(self.position.x), int(self.position.y)), 3)
class BoidsEnv(gym.Env):
def __init__(self):
super(BoidsEnv, self).__init__()
self.boids = []
self.num_boids = 50
self.collision_threshold = 5
self.reward_range = (-float('inf'), float('inf'))
self.action_space = spaces.Discrete(8)
self.observation_space = spaces.Box(low=0, high=1, shape=(self.num_boids, 4), dtype=np.float32)
self.reset()
def step(self, action):
self.update_boids(action)
observation = self.get_observation()
reward = self.calculate_reward()
done = False # Add termination condition if needed
info = {}
return observation, reward, done, info
def reset(self):
self.boids = [Boid() for _ in range(self.num_boids)]
return self.get_observation()
def render(self, mode='human'):
screen.fill(BLACK)
for boid in self.boids:
boid.draw()
count_label = font.render("Number of Boids: {}".format(len(self.boids)), True, WHITE)
screen.blit(count_label, (10, 10))
reward = self.calculate_reward()
reward_label = font.render("Reward: {}".format(reward), True, WHITE)
screen.blit(reward_label, (10, 40))
pygame.display.flip()
clock.tick(60)
def close(self):
pygame.quit()
def update_boids(self, action):
for boid in self.boids:
boid.update(self.boids)
self.handle_collision(boid)
def handle_collision(self, boid):
for other in self.boids:
if boid != other and boid.position.distance_to(other.position) < self.collision_threshold:
# Add collision handling logic here
pass
def get_observation(self):
observation = np.zeros((self.num_boids, 4), dtype=np.float32)
for i, boid in enumerate(self.boids):
observation[i] = [boid.position.x / width, boid.position.y / height,
boid.velocity.x / boid.max_velocity, boid.velocity.y / boid.max_velocity]
return observation
def calculate_reward(self):
num_boids = len(self.boids)
if num_boids <= 1:
return 0
total_reward = 0
for boid in self.boids:
cohesion_reward = self.calculate_cohesion_reward(boid)
separation_reward = self.calculate_separation_reward(boid)
total_reward += cohesion_reward + separation_reward
return total_reward
def calculate_cohesion_reward(self, boid):
if len(self.boids) > 1:
closest_neighbors = boid.get_closest_neighbors(self.boids, 6)
center_of_mass = pygame.Vector2(0, 0)
for neighbor in closest_neighbors:
center_of_mass += neighbor.position
center_of_mass /= len(closest_neighbors)
distance_to_center = boid.position.distance_to(center_of_mass)
if distance_to_center <= 50:
return 1
return -1
def calculate_separation_reward(self, boid):
separation_distance = 20
closest_neighbors = boid.get_closest_neighbors(self.boids, 6)
for neighbor in closest_neighbors:
distance = boid.position.distance_to(neighbor.position)
if distance < separation_distance:
return -1
return 1
class Actor(nn.Module):
def __init__(self,state_size, action_size, hidden_size):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, action_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = torch.tanh(self.fc2(x))
return x
# Define the critic network
class Critic(nn.Module):
def __init__(self, state_size, action_size, hidden_size):
super(Critic, self).__init__()
self.fc1 = nn.Linear(state_size + action_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, 1)
def forward(self, x, a):
x = self.fc1(torch.cat([x, a], dim=1))
x = F.relu(x)
x = self.fc2(x)
return x
# Create the environment
env = BoidsEnv()
# Set hyperparameters
state_size = env.observation_space.shape[1]
action_size = env.action_space.n
hidden_size = 128
lr_actor = 0.001
lr_critic = 0.001
gamma = 0.99
tau = 0.001
buffer_size = 10000
batch_size = 128
# Create the actor and critic networks
actor = Actor(state_size, action_size, hidden_size)
critic = Critic(state_size, action_size, hidden_size)
# Create target networks
target_actor = Actor(state_size, action_size, hidden_size)
target_critic = Critic(state_size, action_size, hidden_size)
# Copy the initial network parameters to the target networks
target_actor.load_state_dict(actor.state_dict())
target_critic.load_state_dict(critic.state_dict())
# Create the replay buffer
replay_buffer = []
# Create the optimizer
actor_optimizer = optim.Adam(actor.parameters(), lr=lr_actor)
critic_optimizer = optim.Adam(critic.parameters(), lr=lr_critic)
# Define the DDPG update function
def ddpg_update():
if len(replay_buffer) < batch_size:
return
# Sample a batch from the replay buffer
batch = random.sample(replay_buffer, batch_size)
state_batch = torch.tensor([transition[0] for transition in batch], dtype=torch.float32)
action_batch = torch.tensor([transition[1] for transition in batch], dtype=torch.float32)
reward_batch = torch.tensor([transition[2] for transition in batch], dtype=torch.float32)
next_state_batch = torch.tensor([transition[3] for transition in batch], dtype=torch.float32)
done_batch = torch.tensor([transition[4] for transition in batch], dtype=torch.float32)
# Compute the target Q value
with torch.no_grad():
next_actions = target_actor(next_state_batch)
next_state_values = target_critic(next_state_batch, next_actions)
target_q_values = reward_batch + gamma * (1 - done_batch) * next_state_values
# Update the critic
critic_optimizer.zero_grad()
state_values = critic(state_batch, action_batch)
critic_loss = F.mse_loss(state_values, target_q_values.unsqueeze(1))
critic_loss.backward()
critic_optimizer.step()
# Update the actor
actor_optimizer.zero_grad()
actions = actor(state_batch)
actor_loss = -critic(state_batch, actions).mean()
actor_loss.backward()
actor_optimizer.step()
# Update the target networks
for param, target_param in zip(actor.parameters(), target_actor.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
for param, target_param in zip(critic.parameters(), target_critic.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
# Set hyperparameters
state_size = env.observation_space.shape[1]
action_size = env.action_space.n
hidden_size = 128
lr_actor = 0.001
lr_critic = 0.001
gamma = 0.99
tau = 0.001
buffer_size = 10000
batch_size = 128
# Create the actor and critic networks
actor = Actor(state_size, action_size, hidden_size)
critic = Critic(state_size, action_size, hidden_size)
# Create target networks
target_actor = Actor(state_size, action_size, hidden_size)
target_critic = Critic(state_size, action_size, hidden_size)
# Copy the initial network parameters to the target networks
target_actor.load_state_dict(actor.state_dict())
target_critic.load_state_dict(critic.state_dict())
# Create the optimizer
actor_optimizer = optim.Adam(actor.parameters(), lr=lr_actor)
critic_optimizer = optim.Adam(critic.parameters(), lr=lr_critic)
# Training loop
total_episodes = 1000
for episode in range(total_episodes):
state = env.reset()
episode_reward = 0
while True:
action = actor(torch.tensor(state, dtype=torch.float32))
action = action.detach().numpy()
next_state, reward, done, _ = env.step(action)
Pygame for visualization. My pygame window crashes unexpectedly and processor utilization increases but no error at all. Had similar bug with DQN, thought too heavy but ddpg has same bug or whatever.
No other symptoms. PC 6 core 12 threads 16 gb memory 1650 mobile