1

I have a simple 2D ray-casting routine that gets terribly slow as soon as the number of obstacles increases.

This routine is made up of:

  • 2 for loops (outer loop iterates over each ray/direction, then inner loop iterates over each line obstacle)

  • multiple if statements (check if a value is > or < than another value or if an array is empty)

Question: How can I condense all these operations into 1 single block of vectorized instructions using Numpy ?

More specifically, I am facing 2 issues:

  • I have managed to vectorize the inner loop (intersection between a ray and each obstacle) but I am unable to run this operation for all rays at once.

  • The only workaround I found to deal with the if statements is to use masked arrays. Something tells me it is not the proper way to handle these statements in this case (it seems clumsy, cumbersome and unpythonic)

Original code:

from math import radians, cos, sin
import matplotlib.pyplot as plt
import numpy as np

N = 10  # dimensions of canvas (NxN)
sides = np.array([[0, N, 0, 0], [0, N, N, N], [0, 0, 0, N], [N, N, 0, N]])
edges = np.random.rand(5, 4) * N # coordinates of 5 random segments (x1, x2, y1, y2)
edges = np.concatenate((edges, sides))
center = np.array([N/2, N/2]) # coordinates of center point
directions = np.array([(cos(radians(a)), sin(radians(a))) for a in range(0, 360, 10)]) # vectors pointing in all directions
intersections = []

# for each direction
for d in directions:
    
    min_dist = float('inf')
    
    # for each edge
    for e in edges:
        p1x, p1y = e[0], e[2]
        p2x, p2y = e[1], e[3]
        p3x, p3y = center
        p4x, p4y = center + d
        
        # find intersection point
        den = (p1x - p2x) * (p3y - p4y) - (p1y - p2y) * (p3x - p4x)
        
        if den:
            t = ((p1x - p3x) * (p3y - p4y) - (p1y - p3y) * (p3x - p4x)) / den
            u = -((p1x - p2x) * (p1y - p3y) - (p1y - p2y) * (p1x - p3x)) / den
            
            # if any:
            if t > 0 and t < 1 and u > 0:
                sx = p1x + t * (p2x - p1x)
                sy = p1y + t * (p2y - p1y)
                
                isec = np.array([sx, sy])           
                dist = np.linalg.norm(isec-center) 
                
                # make sure to select the nearest one (from center)
                if dist < min_dist:
                    min_dist = dist
                    nearest = isec
    
    # store nearest interesection point for each ray
    intersections.append(nearest)

    
# Render
plt.axis('off')

for x, y in zip(edges[:,:2], edges[:,2:]):
    plt.plot(x, y)
    
for isec in np.array(intersections):
    plt.plot((center[0], isec[0]), (center[1], isec[1]), '--', color="#aaaaaa", linewidth=.8)

enter image description here

Vectorized version (attempt):

from math import radians, cos, sin
import matplotlib.pyplot as plt
from scipy import spatial
import numpy as np

N = 10  # dimensions of canvas (NxN)
sides = np.array([[0, N, 0, 0], [0, N, N, N], [0, 0, 0, N], [N, N, 0, N]])
edges = np.random.rand(5, 4) * N # coordinates of 5 random segments (x1, x2, y1, y2)
edges = np.concatenate((edges, sides))
center = np.array([N/2, N/2]) # coordinates of center point
directions = np.array([(cos(radians(a)), sin(radians(a))) for a in range(0, 360, 10)]) # vectors pointing in all directions
intersections = []

# Render edges
plt.axis('off')
for x, y in zip(edges[:,:2], edges[:,2:]):
    plt.plot(x, y)
    
    
# for each direction
for d in directions:

    p1x, p1y = edges[:,0], edges[:,2]
    p2x, p2y = edges[:,1], edges[:,3]
    p3x, p3y = center
    p4x, p4y = center + d

    # denominator
    den = (p1x - p2x) * (p3y - p4y) - (p1y - p2y) * (p3x - p4x)

    # first 'if' statement -> if den > 0
    mask = den > 0
    den = den[mask]
    p1x = p1x[mask]
    p1y = p1y[mask]
    p2x = p2x[mask]
    p2y = p2y[mask]

    t = ((p1x - p3x) * (p3y - p4y) - (p1y - p3y) * (p3x - p4x)) / den
    u = -((p1x - p2x) * (p1y - p3y) - (p1y - p2y) * (p1x - p3x)) / den

    # second 'if' statement -> if (t>0) & (t<1) & (u>0)
    mask2 = (t > 0) & (t < 1) & (u > 0)
    t = t[mask2]
    p1x = p1x[mask2]
    p1y = p1y[mask2]
    p2x = p2x[mask2]
    p2y = p2y[mask2]
    
    # x, y coordinates of all intersection points in the current direction
    sx = p1x + t * (p2x - p1x)
    sy = p1y + t * (p2y - p1y)
    pts = np.c_[sx, sy]
    
    # if any:
    if pts.size > 0:
        
        # find nearest intersection point
        tree = spatial.KDTree(pts)
        nearest = pts[tree.query(center)[1]]

        # Render
        plt.plot((center[0], nearest[0]), (center[1], nearest[1]), '--', color="#aaaaaa", linewidth=.8)
solub
  • 1,291
  • 17
  • 40
  • "Something tells me it is not the proper way ... cumbersome and unpythonic)". What's this something? `if` expressions on single elements are often replaced by 'mask' indexing on portions of the array. – hpaulj Aug 04 '21 at 23:03
  • The fact that I have to apply a mask to each related array (1 per line) every time a conditional statement occurs. Would there be a Numpy way to apply a single mask to multiple arrays at once ? – solub Aug 05 '21 at 08:58

1 Answers1

1

Reformulation of the problem – Finding the intersection between a line segment and a line ray

Let q and q2 be the endpoints of a segment (obstacle). For convenience let's define a class to represent points and vectors in the plane. In addition to the usual operations, a vector multiplication is defined by u × v = u.x * v.y - u.y * v.x.

Caution: here Coord(2, 1) * 3 returns Coord(6, 3) while Coord(2, 1) * Coord(-1, 4) outputs 9. To avoid this confusion it might have been possible to restrict * to the scalar multiplication and use ^ via __xor__ for the vector multiplication.

class Coord:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    @property
    def radius(self):
        return np.sqrt(self.x ** 2 + self.y ** 2)

    def _cross_product(self, other):
        assert isinstance(other, Coord)
        return self.x * other.y - self.y * other.x

    def __mul__(self, other):
        if isinstance(other, Coord):
            # 2D "cross"-product
            return self._cross_product(other)
        elif isinstance(other, int) or isinstance(other, float):
            # scalar multiplication
            return Coord(self.x * other, self.y * other)
    
    def __rmul__(self, other):
        return self * other

    def __sub__(self, other):
        return Coord(self.x - other.x, self.y - other.y)

    def __add__(self, other):
        return Coord(self.x + other.x, self.y + other.y)

    def __repr__(self):
        return f"Coord({self.x}, {self.y})"

Now, I find it easier to handle a ray in polar coordinates: For a given angle theta (direction) the goal is to determine if it intersects the segment, and if so determine the corresponding radius. Here is a function to find that. See here for an explanation of why and how. I tried to use the same variable names as in the previous link.

def find_intersect_btw_ray_and_sgmt(q, q2, theta):
    """
    Args:
        q (Coord): first endpoint of the segment
        q2 (Coord): second endpoint of the segment
        theta (float): angle of the ray

    Returns:
        (float): np.inf if the ray does not intersect the segment,
                 the distance from the origin of the intersection otherwise
   """
    assert isinstance(q, Coord) and isinstance(q2, Coord)
    s = q2 - q
    r = Coord(np.cos(theta), np.sin(theta))
    cross = r * s  # 2d cross-product
    t_num = q * s
    u_num = q * r
    ## the intersection point is roughly at a distance t_num / cross
    ## from the origin. But some cases must be checked beforehand.

    ## (1) the segment [PQ2] is aligned with the ray
    if np.isclose(cross, 0) and np.isclose(u_num, 0):
        return min(q.radius, q2.radius)
    ## (2) the segment [PQ2] is parallel with the ray
    elif np.isclose(cross, 0):
        return np.inf
   
    t, u = t_num / cross, u_num / cross
    ## There is actually an intersection point
    if t >= 0 and 0 <= u <= 1:
        return t
   
    ## (3) No intersection point
    return np.inf

For instance find_intersect_btw_ray_and_sgmt(Coord(1, 2), Coord(-1, 2), np.pi / 2) should returns 2.

Note that here for simplicity, I only considered the case where the origin of the rays is at Coord(0, 0). This can be easily extended to the general case by setting t_num = (q - origin) * s and u_num = (q - origin) * r.

Let's vectorize it!

What is very interesting here is that the operations defined in the Coord class also apply to cases where x and y are numpy arrays! Hence applying any defined operation on Coord(np.array([1, 2, 0]), np.array([2, -1, 3])) amounts applying it elementwise to the points (1, 2), (2, -1) and (0, 3). The operations of Coord are therefore already vectorized. The constructor can be modified into:

    def __init__(self, x, y):
        x, y = np.array(x), np.array(y)
        assert x.shape == y.shape
        self.x, self.y = x, y
        self.shape = x.shape

Now, we would like the function find_intersect_btw_ray_and_sgmt to be able to handle the case where the parameters q and q2contains sequences of endpoints. Before the sanity checks, all the operations are working properly since, as we have mentioned, they are already vectorized. As you mentionned the conditional statements can be "vectorized" using masks. Here is what I propose:

def find_intersect_btw_ray_and_sgmts(q, q2, theta):
    assert isinstance(q, Coord) and isinstance(q2, Coord)
    assert q.shape == q2.shape
    EPS = 1e-14
    s = q2 - q
    r = Coord(np.cos(theta), np.sin(theta))
    cross = r * s
   
    cross_sign = np.sign(cross)
    cross = cross * cross_sign
    t_num = (q * s) * cross_sign
    u_num = (q * r) * cross_sign

    radii = np.zeros_like(t_num)
    mask = ~np.isclose(cross, 0) & (t_num >= -EPS) & (-EPS <= u_num) & (u_num <= cross + EPS)

    radii[~mask] = np.inf  # no intersection
    radii[mask] = t_num[mask] / cross[mask]  # intersection
    return radii

Note that cross, t_num and u_num are multiplied by the sign of cross to ensure that the division by cross keeps the sign of the dividends. Hence conditions of the form ((t_num >= 0) & (cross >= 0)) | ((t_num <= 0) & (cross <= 0)) can be replaced by (t_num >= 0).

For simplicity, we omitted the case (1) where the radius and the segment were aligned ((cross == 0) & (u_num == 0)). This could be incorporated by carefully adding a second mask.

For a given value of theta, we are able to determine if the corresponing ray intersects with several segments at once.

## Some useful functions
def polar_to_cartesian(r, theta):
    return Coord(r * np.cos(theta), r * np.sin(theta))

def plot_segments(p, q, *args, **kwargs):
    plt.plot([p.x, q.x], [p.y, q.y], *args, **kwargs)

def plot_rays(radii, thetas, *args, **kwargs):
    endpoints = polar_to_cartesian(radii, thetas)
    n = endpoints.shape
    origin = Coord(np.zeros(n), np.zeros(n))
    plot_segments(origin, endpoints, *args, **kwargs)

## Data generation
M = 5  # size of the canvas
N = 10  # number of segments
K = 16  # number of rays

q = Coord(*np.random.uniform(-M/2, M/2, size=(2, N)))
p = q + Coord(*np.random.uniform(-M/2, M/2, size=(2, N)))
thetas = np.linspace(0, 2 * np.pi, K, endpoint=False)

## For each ray, find the minimal distance of intersection
## with all segments
plt.figure(figsize=(5, 5))
plot_segments(p, q, "royalblue", marker=".")

for theta in thetas:
    radii = find_intersect_btw_ray_and_sgmts(p, q, theta)
    radius = np.min(radii)
    if not np.isinf(radius):
        plot_rays(radius, theta, color="orange")
    else:
        plot_rays(2*M, theta, ':', c='orange')

plt.plot(0, 0, 'kx')
plt.xlim(-M, M)
plt.ylim(-M, M)

And that's not all! Thanks to the broadcasting of python, it is possible to avoid iteration on theta values. For example, recall that np.array([1, 2, 3]) * np.array([[1], [2], [3], [4]]) produces a matrix of size 4 × 3 of the pairwise products. In the same way Coord([[5],[7]], [[5],[1]]) * Coord([2, 4, 6], [-2, 4, 0]) outputs a 2 × 3 matrix containing all the pairwise cross product between vectors (5, 5), (7, 1) and (2, -2), (4, 4), (6, 0).

Finally, the intersections can be determined in the following way:

radii_all = find_intersect_btw_ray_and_sgmts(p, q, np.vstack(thetas))
# p and q have a shape of (N,) and np.vstack(thetas) of (K, 1)
# this radii_all have a shape of (K, N)
# radii_all[k, n] contains the distance from the origin of the intersection
# between k-th ray and n-th segment (or np.inf if there is no intersection point)
radii = np.min(radii_all, axis=1)
# radii[k] contains the distance from the origin of the closest intersection
# between k-th ray and all segments


do_intersect = ~np.isinf(radii)
plot_rays(radii[do_intersect], thetas[do_intersect], color="orange")
plot_rays(2*M, thetas[~do_intersect], ":", color="orange")

results

nonin
  • 704
  • 4
  • 10
  • 1
    Comprehensive answer, clear explanations and elegant solution, thanks a lot for your time! – solub Aug 10 '21 at 22:04