60

Note: I'd appreciate more of a guide to how to approach and come up with these kinds of solutions rather than the solution itself.

I have a very performance-critical function in my system showing up as a number one profiling hotspot in specific contexts. It's in the middle of a k-means iteration (already multi-threaded using a parallel for processing sub-ranges of points in each worker thread).

ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i=0; i < num_centroids; ++i)
{
    const ClusterCentroid& cent = centroids[i];
    const float dist = ...;
    if (dist < pt.min_dist) // <-- #1 hotspot
    {
        pt.min_dist = dist;
        pt.min_index = i;
    }
}

Any savings in the time required to process this section of code counts substantially, so I've often been fiddling with it a lot. It might be worth putting the centroid loop outside, for example, and iterate through the points in parallel for a given centroid. The number of cluster points here spans in the millions, while the number of centroids spans in the thousands. The algorithm is applied for a handful of iterations (often under 10). It doesn't seek perfect convergence/stability, just some 'reasonable' approximation.

Any ideas are appreciated, but what I'm really eager to discover is if this code can be made branchless as it would allow for a SIMD version. I haven't really developed the kind of mental ability to easily grasp how to come up with branchless solutions: my brain fails there much like it did when I was first exposed to recursion in the early days, so a guide on how to write branchless code and how to develop the appropriate mindset for it would also be helpful.

In short, I'm looking for any guides and hints and suggestions (not necessarily solutions) on how to micro-optimize this code. It most likely has room for algorithmic improvements, but my blindspot has always been in micro-optimization solutions (and I'm curious to learn how to apply them more effectively without going overboard with it). It's already tightly multithreaded with chunky parallel for logic, so I'm pretty much pushed into the micro-optimization corner as one of the quicker things to try without a smarter algorithm outright. We're completely free to change the memory layout.

In Response to Algorithmic Suggestions

About looking at this all wrong in seeking to micro-optimize an O(knm) algorithm which could clearly be improved at the algorithmic level, I wholeheartedly agree. This pushes this specific question into a somewhat academic and impractical realm. However, if I could be allowed an anecdote, I come from an original background of high-level programming -- big emphasis on broad, large-scale viewpoint, safety, and very little on the low-level implementation details. I've recently switched projects to a very different kind of modern-flavored one and I'm learning all kinds of new tricks from my peers of cache efficiency, GPGPU, branchless techniques, SIMD, special-purpose mem allocators that actually outperform malloc (but for specific scenarios), etc.

It's where I'm trying to catch up with the latest performance trends, and surprisingly I've found that those old data structures I often favored during the 90s which were often linked/tree-type structures are actually being vastly outperformed by much more naive, brutish, micro-optimized, parallelized code applying tuned instructions over contiguous memory blocks. It's somewhat disappointing at the same time since I feel like we're fitting the algorithms more to the machine now and narrowing the possibilities this way (especially with GPGPU).

The funniest thing is that I find this type of micro-optimized, fast array-processing code much easier to maintain than the sophisticated algorithms and data structures I was using before. For a start, they're easier to generalize. Furthermore, my peers can often take a customer complaint about a specific slowdown in an area, just slap a parallel for and possibly some SIMD and call it done with a decent speed up. Algorithmic improvements can often offer substantially more, but the speed and non-intrusiveness at which these micro-optimizations can be applied has me wanting to learn more in that area, as reading papers on better algorithms can take some time (as well as require more extensive changes). So I've been jumping on that micro-optimization bandwagon a bit more lately, and perhaps a little too much in this specific case, but my curiosity is more about expanding my range of possible solutions for any scenario.

Disassembly

Note: I am really, really bad at assembly so I have often tuned things more in a trial and error kind of way, coming up with somewhat educated guesses about why a hotspot shown in vtune might be the bottleneck and then trying things out to see if the times improve, assuming the guesses have some hint of truth if the times do improve, or completely missed the mark if they don't.

000007FEEE3FB8A1  jl          thread_partition+70h (7FEEE3FB780h) 
    {
        ClusterPoint& pt = points[j];
        pt.min_index = -1;
        pt.min_dist = numeric_limits<float>::max();
        for (int i = 0; i < num_centroids; ++i)
000007FEEE3FB8A7  cmp         ecx,r10d 
000007FEEE3FB8AA  jge         thread_partition+1F4h (7FEEE3FB904h) 
000007FEEE3FB8AC  lea         rax,[rbx+rbx*2] 
000007FEEE3FB8B0  add         rax,rax 
000007FEEE3FB8B3  lea         r8,[rbp+rax*8+8] 
        {
            const ClusterCentroid& cent = centroids[i];
            const float x = pt.pos[0] - cent.pos[0];
            const float y = pt.pos[1] - cent.pos[1];
000007FEEE3FB8B8  movss       xmm0,dword ptr [rdx] 
            const float z = pt.pos[2] - cent.pos[2];
000007FEEE3FB8BC  movss       xmm2,dword ptr [rdx+4] 
000007FEEE3FB8C1  movss       xmm1,dword ptr [rdx-4] 
000007FEEE3FB8C6  subss       xmm2,dword ptr [r8] 
000007FEEE3FB8CB  subss       xmm0,dword ptr [r8-4] 
000007FEEE3FB8D1  subss       xmm1,dword ptr [r8-8] 
            const float dist = x*x + y*y + z*z;
000007FEEE3FB8D7  mulss       xmm2,xmm2 
000007FEEE3FB8DB  mulss       xmm0,xmm0 
000007FEEE3FB8DF  mulss       xmm1,xmm1 
000007FEEE3FB8E3  addss       xmm2,xmm0 
000007FEEE3FB8E7  addss       xmm2,xmm1 

            if (dist < pt.min_dist)
// VTUNE HOTSPOT
000007FEEE3FB8EB  comiss      xmm2,dword ptr [rdx-8] 
000007FEEE3FB8EF  jae         thread_partition+1E9h (7FEEE3FB8F9h) 
            {
                pt.min_dist = dist;
000007FEEE3FB8F1  movss       dword ptr [rdx-8],xmm2 
                pt.min_index = i;
000007FEEE3FB8F6  mov         dword ptr [rdx-10h],ecx 
000007FEEE3FB8F9  inc         ecx  
000007FEEE3FB8FB  add         r8,30h 
000007FEEE3FB8FF  cmp         ecx,r10d 
000007FEEE3FB902  jl          thread_partition+1A8h (7FEEE3FB8B8h) 
    for (int j = *irange.first; j < *irange.last; ++j)
000007FEEE3FB904  inc         edi  
000007FEEE3FB906  add         rdx,20h 
000007FEEE3FB90A  cmp         edi,dword ptr [rsi+4] 
000007FEEE3FB90D  jl          thread_partition+31h (7FEEE3FB741h) 
000007FEEE3FB913  mov         rbx,qword ptr [irange] 
            }
        }
    }
}

We're forced into targeting SSE 2 -- a bit behind on our times, but the user base actually tripped up once when we assumed that even SSE 4 was okay as a min requirement (the user had some prototype Intel machine).

Update with Standalone Test: ~5.6 secs

I'm very appreciative of all the help being offered! Because the codebase is quite extensive and the conditions for triggering that code are complex (system events triggered across multiple threads), it's a bit unwieldy to make experimental changes and profile them each time. So I've set up a superficial test on the side as a standalone application that others can also run and try out so that I can experiment with all these graciously offered solutions.

#define _SECURE_SCL 0
#include <iostream>
#include <fstream>
#include <vector>
#include <limits>
#include <ctime>
#if defined(_MSC_VER)
    #define ALIGN16 __declspec(align(16))
#else
    #include <malloc.h>
    #define ALIGN16 __attribute__((aligned(16)))
#endif

using namespace std;

// Aligned memory allocation (for SIMD).
static void* malloc16(size_t amount)
{
    #ifdef _MSC_VER
        return _aligned_malloc(amount, 16);
    #else
        void* mem = 0;
        posix_memalign(&mem, 16, amount);
        return mem;
    #endif
}
template <class T>
static T* malloc16_t(size_t num_elements)
{
    return static_cast<T*>(malloc16(num_elements * sizeof(T)));
}

// Aligned free.
static void free16(void* mem)
{
    #ifdef _MSC_VER
        return _aligned_free(mem);
    #else
        free(mem);
    #endif
}

// Test parameters.
enum {num_centroids = 512};
enum {num_points = num_centroids * 2000};
enum {num_iterations = 5};
static const float range = 10.0f;

class Points
{
public:
    Points(): data(malloc16_t<Point>(num_points))
    {
        for (int p=0; p < num_points; ++p)
        {
            const float xyz[3] =
            {
                range * static_cast<float>(rand()) / RAND_MAX,
                range * static_cast<float>(rand()) / RAND_MAX,
                range * static_cast<float>(rand()) / RAND_MAX
            };
            init(p, xyz);
        }
    }
    ~Points()
    {
        free16(data);
    }
    void init(int n, const float* xyz)
    {
        data[n].centroid = -1;
        data[n].xyz[0] = xyz[0];
        data[n].xyz[1] = xyz[1];
        data[n].xyz[2] = xyz[2];
    }
    void associate(int n, int new_centroid)
    {
        data[n].centroid = new_centroid;
    }
    int centroid(int n) const
    {
        return data[n].centroid;
    }
    float* operator[](int n)
    {
        return data[n].xyz;
    }

private:
    Points(const Points&);
    Points& operator=(const Points&);
    struct Point
    {
        int centroid;
        float xyz[3];
    };
    Point* data;
};

class Centroids
{
public:
    Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids))
    {
        // Naive initial selection algorithm, but outside the 
        // current area of interest.
        for (int c=0; c < num_centroids; ++c)
            init(c, points[c]);
    }
    ~Centroids()
    {
        free16(data);
    }
    void init(int n, const float* xyz)
    {
        data[n].count = 0;
        data[n].xyz[0] = xyz[0];
        data[n].xyz[1] = xyz[1];
        data[n].xyz[2] = xyz[2];
    }
    void reset(int n)
    {
        data[n].count = 0;
        data[n].xyz[0] = 0.0f;
        data[n].xyz[1] = 0.0f;
        data[n].xyz[2] = 0.0f;
    }
    void sum(int n, const float* pt_xyz)
    {
        data[n].xyz[0] += pt_xyz[0];
        data[n].xyz[1] += pt_xyz[1];
        data[n].xyz[2] += pt_xyz[2];
        ++data[n].count;
    }
    void average(int n)
    {
        if (data[n].count > 0)
        {
            const float inv_count = 1.0f / data[n].count;
            data[n].xyz[0] *= inv_count;
            data[n].xyz[1] *= inv_count;
            data[n].xyz[2] *= inv_count;
        }
    }
    float* operator[](int n)
    {
        return data[n].xyz;
    }
    int find_nearest(const float* pt_xyz) const
    {
        float min_dist_squared = numeric_limits<float>::max();
        int min_centroid = -1;
        for (int c=0; c < num_centroids; ++c)
        {
            const float* cen_xyz = data[c].xyz;
            const float x = pt_xyz[0] - cen_xyz[0];
            const float y = pt_xyz[1] - cen_xyz[1];
            const float z = pt_xyz[2] - cen_xyz[2];
            const float dist_squared = x*x + y*y * z*z;

            if (min_dist_squared > dist_squared)
            {
                min_dist_squared = dist_squared;
                min_centroid = c;
            }
        }
        return min_centroid;
    }

private:
    Centroids(const Centroids&);
    Centroids& operator=(const Centroids&);
    struct Centroid
    {
        int count;
        float xyz[3];
    };
    Centroid* data;
};

// A high-precision real timer would be nice, but we lack C++11 and
// the coarseness of the testing here should allow this to suffice.
static double sys_time()
{
    return static_cast<double>(clock()) / CLOCKS_PER_SEC;
}

static void k_means(Points& points, Centroids& centroids)
{
    // Find the closest centroid for each point.
    for (int p=0; p < num_points; ++p)
    {
        const float* pt_xyz = points[p];
        points.associate(p, centroids.find_nearest(pt_xyz));
    }

    // Reset the data of each centroid.
    for (int c=0; c < num_centroids; ++c)
        centroids.reset(c);

    // Compute new position sum of each centroid.
    for (int p=0; p < num_points; ++p)
        centroids.sum(points.centroid(p), points[p]);

    // Compute average position of each centroid.
    for (int c=0; c < num_centroids; ++c)
        centroids.average(c);
}

int main()
{
    Points points;
    Centroids centroids(points);

    cout << "Starting simulation..." << endl;
    double start_time = sys_time();
    for (int i=0; i < num_iterations; ++i)
        k_means(points, centroids);
    cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl;
    cout << "# Points: " << num_points << endl;
    cout << "# Centroids: " << num_centroids << endl;

    // Write the centroids to a file to give us some crude verification
    // of consistency as we make changes.
    ofstream out("centroids.txt");
    for (int c=0; c < num_centroids; ++c)
        out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl;
}

I'm aware of the dangers of superficial testing, but since it's already deemed to be a hotspot from previous real-world sessions, I hope it's excusable. I'm also just interested in the general techniques associated with micro-optimizing such code.

I did get slightly different results in profiling this one. The times are a bit more evenly dispersed within the loop here, and I'm not sure why. Perhaps it's because the data is smaller (I omitted members and hoisted out the min_dist member and made it a local variable). The exact ratio between centroids to points is also a bit different, but hopefully close enough to translate improvements here to the original code. It's also single-threaded in this superficial test, and the disassembly looks quite different so I may be risking optimizing this superficial test without the original (a risk I'm willing to take for now, as I'm more interested in expanding my knowledge of techniques that could optimize these cases rather than a solution for this exact case).

VTune

Update with Yochai Timmer's Suggestion -- ~12.5 secs

Oh, I face the woes of micro-optimization without understanding assembly very well. I replaced this:

        -if (min_dist_squared > dist_squared)
        -{
        -    min_dist_squared = dist_squared;
        -    pt.centroid = c;
        -}

With this:

        +const bool found_closer = min_dist_squared > dist_squared;
        +pt.centroid = bitselect(found_closer, c, pt.centroid);
        +min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);

.. only to find the times escalated from ~5.6 secs to ~12.5 secs. Nevertheless, that is not his fault nor does it take away from the value of his solution -- that's mine for failing to understand what's really going on at the machine level and taking stabs in the dark. That one apparently missed, and apparently I was not the victim of branch misprediction as I initially thought. Nevertheless, his proposed solution is a wonderful and generalized function to try in such cases, and I'm grateful to add it to my toolbox of tips and tricks. Now for round 2.

Harold's SIMD Solution - 2.496 secs (see caveat)

This solution might be amazing. After converting the cluster rep to SoA, I'm getting times of ~2.5 seconds with this one! Unfortunately, there appears to be a glitch of some sort. I'm getting very different results for the final output that suggests more than slight precision differences, including some centroids towards the end with values of 0 (implying that they were not found in the search). I've been trying to go through the SIMD logic with the debugger to see what might be up -- it could merely be a transcription error on my part, but here's the code in case someone could spot the error.

If the error could be corrected without slowing down the results, this speed improvement is more than I ever imagined from a pure micro-optimization!

    // New version of Centroids::find_nearest (from harold's solution):
    int find_nearest(const float* pt_xyz) const
    {
        __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
        __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x));
        __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y));
        __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z));
        __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
        __m128i index = min_index;
        for (int i=4; i < num_centroids; i += 4) 
        {
            xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i));
            ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i));
            zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i));
            __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
            __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
            min_dist = _mm_min_ps(min_dist, dist);
            min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                     _mm_andnot_si128(mask, min_index));
            index = _mm_add_epi32(index, _mm_set1_epi32(4));
        }

        ALIGN16 float mdist[4];
        ALIGN16 uint32_t mindex[4];
        _mm_store_ps(mdist, min_dist);
        _mm_store_si128((__m128i*)mindex, min_index);

        float closest = mdist[0];
        int closest_i = mindex[0];
        for (int i=1; i < 4; i++)
        {
            if (mdist[i] < closest) 
            {
                closest = mdist[i];
                closest_i = mindex[i];
            }
        }
        return closest_i;
    }

Harold's SIMD Solution (Corrected) - ~2.5 secs

After applying the corrections and testing them out, the results are intact and function correctly with similar improvements to the original codebase!

Since this hits the holy grail of knowledge I was seeking to understand better (branchless SIMD), I'm going to award the solution with some extra props for more than doubling the speed of the operation. I have my homework cut out in trying to understand it, since my goal was not merely to mitigate this hotspot, but to expand on my personal understanding of possible solutions to deal with them.

Nevertheless, I'm grateful for all the contributions here from the algorithmic suggestions to the really cool bitselect trick! I wish I could accept all the answers. I may end up trying all of them at some point, but for now I have my homework cut out in understanding some of these non-arithmetical SIMD ops.

int find_nearest_simd(const float* pt_xyz) const
{
    __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
    __m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]);
    __m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]);
    __m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]);

    __m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x));
    __m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y));
    __m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z));
    __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
    __m128i index = min_index;
    for (int i=4; i < num_centroids; i += 4) 
    {
        xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i));
        ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i));
        zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i));
        __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
        index = _mm_add_epi32(index, _mm_set1_epi32(4));
        __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
        min_dist = _mm_min_ps(min_dist, dist);
        min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                 _mm_andnot_si128(mask, min_index));
    }

    ALIGN16 float mdist[4];
    ALIGN16 uint32_t mindex[4];
    _mm_store_ps(mdist, min_dist);
    _mm_store_si128((__m128i*)mindex, min_index);

    float closest = mdist[0];
    int closest_i = mindex[0];
    for (int i=1; i < 4; i++)
    {
        if (mdist[i] < closest) 
        {
            closest = mdist[i];
            closest_i = mindex[i];
        }
    }
    return closest_i;
}
Community
  • 1
  • 1
  • 17
    It's so refreshing to see someone asking for performance help who says they've already profiled and found the hotspot. It would be miniscule improvement, but you could lift the first iteration of the loop out, and just initialize your min_index and min_dist to the first centroid. No sense checking it; you know what the answer will be. – Jay Kominek May 04 '15 at 06:17
  • 1
    See http://stackoverflow.com/a/6566033/108719 - if raw speed is more important than portability, there are SSE branchless min instructions that will do what you want. – pjc50 May 04 '15 at 08:38
  • 1
    After you have made this branchless there will still be a dependency chain sequencing all iterations of the loop. Break it by computing multiple minimums independently and merging them after the loop. That will surface more ILP. – usr May 04 '15 at 10:26
  • 1
    It's too bad your memory layout is an AoS (array of structs) instead of a SoA (Struct of arrays). This disallows the use of SSE vector loads, which also means no vector SSE min/max operations. The one other option I see is for you to implement a _bitsliced_ comparator algorithm. – Iwillnotexist Idonotexist May 04 '15 at 10:31
  • 1
    @IwillnotexistIdonotexist: That is not completely true: There exist gather instructions (at least for AVX) but they are way slower than an normal SIMD load (see the link in my answer) – MikeMB May 04 '15 at 10:35
  • 1
    @MikeMB I'm aware of the AVX2 `vgatherps` but I rejected it offhand on account of it being, as you pointed out, slow, and rarely available; I happen to have one of the rare processors based on the microarchitecture in which `vgather` is available, namely Haswell. – Iwillnotexist Idonotexist May 04 '15 at 10:41
  • 2
    @SimonAndréForsberg: Of course you would have to add at least the whole function body including the distance calculation and the definition of points and centroids, but in order to make meaningfull statements about performance that would be quite hepfull anyway. – MikeMB May 04 '15 at 10:44
  • 2
    How sure are you that that's the culprit? Many profilers will point to a "consumer of a value that takes a long time to produce" as the culprit because it will be stalled for a long time. Anyway if you post the distance calculation I'll write an AVX version for you (including the "branch", because it's not a branch) – harold May 04 '15 at 10:44
  • 1
    @IwillnotexistIdonotexist: I guess that's a valid argument. I've to admit that I didn't know that these instructions didn't exist in pre AVX2 instruction sets. – MikeMB May 04 '15 at 10:47
  • 1
    You don't say (unless I missed it) what percent of time (more or less) is consumed by this hot spot (when you run it on real data in the whole app). 90%? 50%? 10%? – Mike Dunlavey May 04 '15 at 12:46
  • 1
    @MikeDunlavey In a real-world case, that particular area consumes 72% of the total time spent in that function (thread_partition) while the function itself dominates at about 92%. For details, I use it to partition a 3D mesh into crude segments that can then be used to accelerate things like picking -- it's like a poor man's BVH, and a BVH would be wonderful but I've had much better success building this structure on the fly for high density meshes quicker and with less mem use than a BVH. The bottlenecks occur because this has to kick in every time the user creates/load a mesh. –  May 04 '15 at 12:50
  • @MikeDunlavey We then use the mesh segmentation for a variety of other tasks such as building smaller VBOs with higher mem locality (segment VBOs rather than whole mesh VBOs). –  May 04 '15 at 12:53
  • @MikeDunlavey The particular test case I was using was loading a mesh in our native format, but our native format is pretty quick -- like a one to one memory transfer (it's an uncompressed zip file containing files inside that just stores big binary files containing like a giant array of floats or a giant array of indices with a uniform contiguous representation in each file), so mesh loading is pretty fast and pushes this segmentation algorithm as the number one bottleneck, taking sometimes 15 seconds or so to complete on a multi-million poly mesh. –  May 04 '15 at 12:55
  • 1
    Sounds like a fun problem. You're doing this for ray-tracing, perhaps? It's not my area of expertise, but when I tried performance tuning one of those, it immediately became obvious that there was a lot of similarity between adjacent rays, that could be exploited for speedup. – Mike Dunlavey May 04 '15 at 13:01
  • @MikeDunlavey Though hmm... A BVH might actually be better at this point, especially something like Intel's Embree BVH which builds pretty darn fast. This segmentation algorithm excels for the more common scenarios where the meshes have tens to hundreds of thousands of control polys rather than millions, but it doesn't scale quite as well when we start trying to load like a 10 million poly mesh. –  May 04 '15 at 13:01
  • @MikeDunlavey Yes -- raytracing and a bit of modeling and texturing thrown in. –  May 04 '15 at 13:01
  • @MikeDunlavey For raytracing I just use Intels' wonderful Embree -- this kind of K-means-partitioned segment structure would be awful there given how many rays are being cast. Though I have been tempted at times to hack up Embree and try to make it suitable as more than a raytracing kernel -- the speed and quality at which they can build a BVH astounds me, but it seems like a shame that its API is so raytracing-oriented. –  May 04 '15 at 13:04
  • 1
    Also, if num_centroids is huge, you're basically doing linear search through them, which makes me wonder if some sort of K-D tree of the centroids could save time, either that or look at one dimension at a time. i.e. if delta_x > min, there's no point calculating distance. – Mike Dunlavey May 04 '15 at 13:11
  • @MikeDunlavey I was tempted to do that before -- to partition the centroids themselves to make a faster search. I've often found it a little awkward with 3D data -- I'm used to forming crude 2D grids in old 2D games (the quad-tree usually ends up consuming too much time to build/update it) -- and have often found it difficult to balance the cost of building vs. the acceleration in searching for 3D, especially with the centroids moving every iteration. I'll definitely consider it though -- here the number of centroids tends to get quite ihgh in these pathological millions of poly meshes. –  May 04 '15 at 13:15
  • I hope I'm not extending the discussion too much, but I suppose I should mention that one of the reasons I'm curious specifically about these branchless versions is simply because I've had a barrier there with some moderate success at SIMD, but often my brain just malfunctions if it's trying to vectorize code that involves any branching. So my curiosity into branchless programming is mainly not just to solve this very specific problem, but to expand my arsenal of possible solutions I can try for any kind of critical hotspot. I'm interested in learning more than a solution. –  May 04 '15 at 13:35
  • 1
    That screenshot shows x87 style floating point code, but your disassembly from earlier does not. What's going on there? Also, it's using an ancient comparison (`fcom \ fnstsw` instead of `fcomi`). Did you forget to specify the target to the compiler? – harold May 04 '15 at 15:07
  • @harold It might possibly be due to the use of an ancient compiler (unfortunately). We're stuck in MSVC 2003 land for Windows and still in the C++03 era. The project is built with /Machine:X64 and max speed (/O2). Enhanced instruction set is disabled, and the FP model is set to precise. –  May 04 '15 at 15:12
  • @harold Though if it's with respect to the disassembly shown in the profiler, I'm afraid I'm not sure. Perhaps I need to configure vtune differently and that would be quite a discovery if I was using it incorrectly (I used the basic wizard setup). But I've rarely ever drilled down to the disassembly level -- that stuff has always been a bit scary to me. –  May 04 '15 at 15:14
  • 7
    **You're looking at this all wrong** - instead of optimizing the check you need to optimize the algorithm. Microbenchmarks < Algorithms. You can get a significant boost by not implementing the algorithm naively - here are two papers _to get you started_ - http://papers.nips.cc/paper/4362-fast-and-accurate-k-means-for-large-datasets.pdf http://research.microsoft.com/pubs/164185/1158.pdf they also reference a lot of other good stuff. Also- this is a simple but effective implementation you can read and learn from http://github.com/scikit-learn/scikit-learn/blob/master/sklearn/cluster/_k_means.pyx – Benjamin Gruenbaum May 04 '15 at 15:21
  • @harold Argh I'm so sorry! I feel like an idiot, so embarrassing, but I set up vtune to run the 32-bit build rather than the x64 release. I pointed it to the wrong directory. So sorry about that. –  May 04 '15 at 15:25
  • @BenjaminGruenbaum I'm very much agreed -- I think I'm far from honestly exhausting the algorithmic options available here. One of the reasons I was curious about micro-level is just that I have always had a mental barrier when it comes to branching in vectorized code, and i was curious specifically about the more micro-level solutions available. I've had a long exhaustive history with algorithmic and data structure-level work, but I have never been particularly good at the low-level understanding, and have run into cases where even sequential linear complexity operations like simply... –  May 04 '15 at 15:29
  • 1
    @Ike that's a good point (and the question itself is on-topic here) but it would be beneficial to reduce it to the smallest possible example exhibiting the issue to make it more broadly applicable. – Benjamin Gruenbaum May 04 '15 at 15:30
  • @BenjaminGruenbaum Applying a transform to a series of particles could use a speed up, as well as cases where the legacy system was so thoroughly entrenched in a particular algorithm or data structure that we were kind of cornered into micro-optimizations being the only viable option. I'm more interested in solutions to a family of problems rather than this specific one. –  May 04 '15 at 15:30
  • 2
    @Ike: Sorry, that doesn't anwer your question, but a) What machines are you running this on and b) why are you stuck with such an ancient compiler?I Guarantee you, that just switching to a current compiler will have a bigger impact on your performance than most of the optimizations suggested by us, because your compiler just doesn't know what machine instructions there are. Also, please mention the type of your compiler, OS and Hardware in the question. So far I asumed we are dealing with somewhat current technology. – MikeMB May 04 '15 at 15:30
  • @MikeMB At the moment a measly dev notebook, Intel Core 2 Duo, and the ancient compilers is primarily due to standards and processes outside of my control. I would love to use C++11, but we're kind of held back by a lack of compilers/licenses on certain exotic platforms and have to target lowest common denominators. –  May 04 '15 at 15:32
  • 1
    @Ike MSVC older than 2010 even compiles code with intrinsics badly, doing a particularly bad job with the register allocation for some reason. I could give you the code in assembly if you want. – harold May 04 '15 at 15:34
  • @harold That's something we discovered to our grief that on some of the older compilers we're using, std::min/max were actually doing branching in the resulting assembly and turning out to be minor hotspots. We ended up replacing them with our own and do a number of things that would probably be evil and dirty if we had better optimizers available. Though I somewhat enjoy these adventures into the low-level land of coding. –  May 04 '15 at 15:35
  • @BenjaminGruenbaum That second paper from MS is really interesting with the concept of cluster closures. Many thanks! Maybe I should have found a better problem to focus on micro-optimizations. In this particular case, I was a bit obsessed with micro-optimizations since that's my biggest blindspot in optimization knowledge. I'm used to reading algorithmic Siggraph papers all the time, but in this day and age of branchless coding, SIMD, cache efficiency, GPGPU, etc. -- I often feel a bit behind, so I was curious specifically about those kinds of techniques. –  May 04 '15 at 16:14
  • How much are N and K ? How many dimensions ? What distance metric ? Not having this information closes the door to possible alternatives. –  May 05 '15 at 08:42

6 Answers6

23

Too bad we can't use SSE4.1, but very well then, SSE2 it is. I haven't tested this, just compiled it to see if there were syntax errors and to see whether the assembly made sense (it's mostly alright, though GCC spills min_index even with some xmm registers not used, not sure why that happens)

int find_closest(float *x, float *y, float *z,
                 float pt_x, float pt_y, float pt_z, int n) {
    __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
    __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x));
    __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y));
    __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z));
    __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
    __m128i index = min_index;
    for (int i = 4; i < n; i += 4) {
        xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i));
        ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i));
        zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i));
        __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
        index = _mm_add_epi32(index, _mm_set1_epi32(4));
        __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
        min_dist = _mm_min_ps(min_dist, dist);
        min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                 _mm_andnot_si128(mask, min_index));
    }
    float mdist[4];
    _mm_store_ps(mdist, min_dist);
    uint32_t mindex[4];
    _mm_store_si128((__m128i*)mindex, min_index);
    float closest = mdist[0];
    int closest_i = mindex[0];
    for (int i = 1; i < 4; i++) {
        if (mdist[i] < closest) {
            closest = mdist[i];
            closest_i = mindex[i];
        }
    }
    return closest_i;
}

As usual, it expects the pointers to be 16-aligned. Also, the padding should be with points at infinity (so they're never closest to the target).

SSE 4.1 would let you replace this

min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                         _mm_andnot_si128(mask, min_index));

By this

min_index = _mm_blendv_epi8(min_index, index, mask);

Here's an asm version, made for vsyasm, tested a bit (seems to work)

bits 64

section .data

align 16
centroid_four:
    dd 4, 4, 4, 4
centroid_index:
    dd 0, 1, 2, 3

section .text

global find_closest

proc_frame find_closest
    ;
    ;   arguments:
    ;       ecx: number of points (multiple of 4 and at least 4)
    ;       rdx -> array of 3 pointers to floats (x, y, z) (the points)
    ;       r8 -> array of 3 floats (the reference point)
    ;
    alloc_stack 0x58
    save_xmm128 xmm6, 0
    save_xmm128 xmm7, 16
    save_xmm128 xmm8, 32
    save_xmm128 xmm9, 48
[endprolog]
    movss xmm0, [r8]
    shufps xmm0, xmm0, 0
    movss xmm1, [r8 + 4]
    shufps xmm1, xmm1, 0
    movss xmm2, [r8 + 8]
    shufps xmm2, xmm2, 0
    ; pointers to x, y, z in r8, r9, r10
    mov r8, [rdx]
    mov r9, [rdx + 8]
    mov r10, [rdx + 16]
    ; reference point is in xmm0, xmm1, xmm2 (x, y, z)
    movdqa xmm3, [rel centroid_index]   ; min_index
    movdqa xmm4, xmm3                   ; current index
    movdqa xmm9, [rel centroid_four]     ; index increment
    paddd xmm4, xmm9
    ; calculate initial min_dist, xmm5
    movaps xmm5, [r8]
    subps xmm5, xmm0
    movaps xmm7, [r9]
    subps xmm7, xmm1
    movaps xmm8, [r10]
    subps xmm8, xmm2
    mulps xmm5, xmm5
    mulps xmm7, xmm7
    mulps xmm8, xmm8
    addps xmm5, xmm7
    addps xmm5, xmm8
    add r8, 16
    add r9, 16
    add r10, 16
    sub ecx, 4
    jna _tail
_loop:
    movaps xmm6, [r8]
    subps xmm6, xmm0
    movaps xmm7, [r9]
    subps xmm7, xmm1
    movaps xmm8, [r10]
    subps xmm8, xmm2
    mulps xmm6, xmm6
    mulps xmm7, xmm7
    mulps xmm8, xmm8
    addps xmm6, xmm7
    addps xmm6, xmm8
    add r8, 16
    add r9, 16
    add r10, 16
    movaps xmm7, xmm6
    cmpps xmm6, xmm5, 1
    minps xmm5, xmm7
    movdqa xmm7, xmm6
    pand xmm6, xmm4
    pandn xmm7, xmm3
    por xmm6, xmm7
    movdqa xmm3, xmm6
    paddd xmm4, xmm9
    sub ecx, 4
    ja _loop
_tail:
    ; calculate horizontal minumum
    pshufd xmm0, xmm5, 0xB1
    minps xmm0, xmm5
    pshufd xmm1, xmm0, 0x4E
    minps xmm0, xmm1
    ; find index of the minimum
    cmpps xmm0, xmm5, 0
    movmskps eax, xmm0
    bsf eax, eax
    ; index into xmm3, sort of
    movaps [rsp + 64], xmm3
    mov eax, [rsp + 64 + rax * 4]
    movaps xmm9, [rsp + 48]
    movaps xmm8, [rsp + 32]
    movaps xmm7, [rsp + 16]
    movaps xmm6, [rsp]
    add rsp, 0x58
    ret
endproc_frame

In C++:

extern "C" int find_closest(int n, float** points, float* reference_point);
harold
  • 61,398
  • 6
  • 86
  • 164
  • This is wonderful, and wow, you came up with it so quickly -- very impressed! I'll have to take some time to convert my structures to an SoA representation, but that should be quite doable. I very much appreciate the share and all the help here! I'll also try to post some updates about improvements. I wish I could accept multiple answers. –  May 04 '15 at 13:09
  • How do you come up with this stuff so fast? SSE intrinsics and assembly just flows out of your fingertips like a natural thought? –  May 04 '15 at 17:16
  • 1
    @Ike not entirely, I do have to look things up occasionally – harold May 04 '15 at 17:27
  • Your solution offers promises of delights, working at under half the time of my original!!!!!! Unfortunately the results appear glitchy with certain centroids towards the end being unassigned. It may be a transcription error on my part, and I updated the post with your solution incorporated into it with a full example that can build. I'm reviewing the logic with a debug build to try to see if I can narrow down what went wrong. Nevertheless, if the glitch can be fixed and the times remain, it's amazing!!! –  May 04 '15 at 20:09
  • @Ike do you have a test case for that? – harold May 04 '15 at 20:21
  • I should write some kind of proper test case. I was outputting results to a file as a crude way of making sure the results stay intact (with mercurial pointing out the diffs to me), and I was getting differences that appeared to be much more than FP differences. I'll see about adding proper testing for correctness at the end by making sure each point is actually associated to the nearest centroid. –  May 04 '15 at 20:24
  • @Ike oh I just noticed, I fixed a bug in the assembly version that's still in the intrinsics version.. that's fixed now. If the problem was that it was off by 4, then that was it. Otherwise there are more bugs – harold May 04 '15 at 20:36
  • Sweet! I was noticing in the debugger when comparing SIMD and scalar versions side-by-side that the first mismatch was on the 4th cluster. –  May 04 '15 at 20:40
  • One of the things I'm curious about, and I wish I could pick your brain a bit, is how you get your brain to manage thinking all vectorized like this. 4-way arithmetic I can manage, but when it comes to comparisons and bitmasks, my brain wants to shut down. It really wants to think in terms of scalars there and code branches. –  May 04 '15 at 20:43
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/76931/discussion-between-harold-and-ike). – harold May 04 '15 at 20:44
22

You could use a branchless ternary operator, sometimes called bitselect ( condition ? true : false).
Just use it for the 2 members, defaulting to doing nothing.
Don't worry about the extra operations, they are nothing compared to the if statement branching.

bitselect implementation:

inline static int bitselect(int condition, int truereturnvalue, int falsereturnvalue)
{
    return (truereturnvalue & -condition) | (falsereturnvalue & ~(-condition)); //a when TRUE and b when FALSE
}

inline static float bitselect(int condition, float truereturnvalue, float falsereturnvalue)
{
    //Reinterpret floats. Would work because it's just a bit select, no matter the actual value
    int& at = reinterpret_cast<int&>(truereturnvalue);
    int& af = reinterpret_cast<int&>(falsereturnvalue);
    int res = (at & -condition) | (af & ~(-condition)); //a when TRUE and b when FALSE
    return  reinterpret_cast<float&>(res);
}

And your loop should look like this:

for (int i=0; i < num_centroids; ++i)
{
  const ClusterCentroid& cent = centroids[i];
  const float dist = ...;
  bool isSmaeller = dist < pt.min_dist;

  //use same value if not smaller
  pt.min_index = bitselect(isSmaeller, i, pt.min_index);
  pt.min_dist = bitselect(isSmaeller, dist, pt.min_dist);
}
Yochai Timmer
  • 48,127
  • 24
  • 147
  • 185
  • 3
    If you cet to measure the improvement, please add a comment about it. I think we would all like to know how it went. – Yochai Timmer May 04 '15 at 06:47
  • I don't understand `bitselect`. Aren't `dist` and `pt.min_dist` of type `float`. ? – bolov May 04 '15 at 07:02
  • @bolov My implemetation is for int, but you could do exactly the same for a float (it's the byte same size). It's just a bit manipulation of the bytes to nullify one of the values, and it uses the condition value as the mask, – Yochai Timmer May 04 '15 at 07:05
  • I gave the function a second look and got the point of it. It's a neat trick. – bolov May 04 '15 at 07:15
  • 3
    Are bit manipulations on (reinterpret casted) floating point numbers defined behavior? – MikeMB May 04 '15 at 07:27
  • @MikeMB reinterpret cast is well defined. So after reinterpert-casting the float to an int, that part is well defined. The bit manipulations are well defined too, so we got transitivity of well defined behaviour. BUT, I would make a bitselect for floats just to be sure, and I've never tested it on floats. – Yochai Timmer May 04 '15 at 07:36
  • 1
    Actually, reinterpret_cast from float to int is not allowed and I think it is UB to cast from `float*` to `int*` and access the value through that pointer. I'd be satisfied however, if someone could tell me if g++ 4.9 for x64 would compile such code "as expected" in the presence of other optimizations (maybe with strict aliasing turned off?). Btw: Bitwise operators are not defined at all for floats - thats why I'm asking about casting to int. – MikeMB May 04 '15 at 08:15
  • @MikeMB Good question. You could probably use references and reinterpert that. int& intDist = reinterpert_cast( pt.min_dist); – Yochai Timmer May 04 '15 at 09:13
  • @MikeMB I checked it and it works fine with interpret cast for references. Fixed the answer to work with that. Thanks for pointing that out. – Yochai Timmer May 04 '15 at 09:21
  • Well, I stil pretty sure that this is UB (maybe you should add a note), but I don't see a reasonable compiler optimization that would break this code (unfortunately im in no way an expert on that topic). I'll try it in a few places in my own codebase. – MikeMB May 04 '15 at 10:29
  • This kind of branchless bitselect is a wonderful building block! If it's valid and reasonably safe/portable, it would be a wonderful thing to have around as I'm one of those types that has a hard time remembering these low-level magic tricks. I do best when I tuck that into a function of that kind of sort and simply use it as a possible thing to try when I'm up against a hotspot. –  May 04 '15 at 12:26
  • 1
    @Ike I haven't tried the float version on anything else but visual studio, but the int version of it works well on unbuntu, android, and windows (and on these processors: ARM, x86, x64) – Yochai Timmer May 04 '15 at 12:31
  • Wonderful! I love it. Many thanks for sharing that one, and I'll try it out and post an update! –  May 04 '15 at 12:35
10

C++ is a high-level language. Your assumption that control flow in the C++ source code translates into branching instructions is flawed. I don't have the definition of some types from your example, so I made a simple test program with similar conditional assignments:

int g(int, int);

int f(const int *arr)
{
    int min = 10000, minIndex = -1;
    for ( int i = 0; i < 1000; ++i )
    {
        if ( arr[i] < min )
        {
            min = arr[i];
            minIndex = i;
        }
    }
    return g(min, minIndex);
}

Note that the use of the undefined "g" is merely to prevent the optimizer from deleting everything. I translated this with G++ 4.9.2 with -O3 and -S into x86_64 assembly (without even having to change the default for -march) and the (not overly surprising) result is that the loop body contains no branches

movl    (%rdi,%rax,4), %ecx
movl    %edx, %r8d
cmpl    %edx, %ecx
cmovle  %ecx, %r8d
cmovl   %eax, %esi
addq    $1, %rax

Apart from that, the assumption that branchless is necessarily faster may also be flawed because the probability that a new distance "beats" the old is decreasing the more elements you have looked at. It's not a coin toss. The "bitselect" trick was invented when compilers were much less aggressive at generating "as-if" assembly than they are today. I would much rather suggest to take a look at the kind of assembly your compiler is actually generating before either trying to rework the code so the compiler is better able to optimize it, or taking the result as a basis for hand-written assembly. If you want to look into SIMD, I would suggest trying a "minimum of minimums" approach with reduced data dependencies (in my example, the dependencies on "min" are probably a bottleneck).

Arne Vogel
  • 6,346
  • 2
  • 18
  • 31
  • 2
    This is true. But, the compilers don't always get it right. There's only a certain level of complexity that a compiler can handle. And it's much less obvious to the compiler if the values aren't constant (like you have them). If a performance analysis reviles a problem, then bitselect or a similar trick is the way to go. – Yochai Timmer May 04 '15 at 10:26
  • Apologies, you are right that my assumptions simply based on what I posted could be incorrect. But the vtune hotspot points at the 'if' and I thought a branchless version might be worth trying. One of the problems is that I'm not very good at assembly, so I tend to profile and just try things out to see if the times improve. I'm somewhat blind in that sense, but I'll try to post a disassembly soon and maybe we can see if there's like a JLE in there. Just based on the behavior of it, I thought it might be due to branch misprediction, but it could also be cache-related. –  May 04 '15 at 12:13
  • I've posted an update showing the disassembly for the machine-level experts out there! –  May 04 '15 at 12:35
  • That is an interesting point about the fact that this is not necessarily a coin toss. I failed to think about the nature of the branching there at such an in-depth level, and the lack of benefits I got from trying `bitselect` suggests that I was completely wrong to think it had to do with branching (the profiler showed the timings around the `if` but since it's sampling, it might be the instructions around it). –  May 04 '15 at 16:38
6

This might go both ways, but I'd give the following structure a try:

std::vector<float> centDists(num_centroids); //<-- one for each thread. 
for (size_t p=0; p<num_points; ++p) {
    Point& pt = points[p];
    for (size_t c=0; c<num_centroids; ++c) {
        const float dist = ...;
        centDists[c]=dist;
    }
    pt.min_idx it= min_element(centDists.begin(),centDists.end())-centDists.begin();    
}

Obviously, you now have to iterate two times over memory, which probably hurts the cache hit to miss ratio (you could also split it into sub ranges) but on the other hand, each of the inner loops should be easy to vectorize and unroll - so you just have to measure whether it is worth it.

And even if you stick to your version, I'd try using local variables to keep track of the minimum index and distance and apply the results to point at the end.
The rational is, that each read or write to pt.min_dist is effectively done through a pointer, which - depending on the compiler optimizations - may or may not decrease your performance.

Another thing that is important for vectorizations is to turn an array of Structs (in this case cententroids) into a struct of arrays (So e.g. one array for each coordinate of the points), because that way you don't need extra gather instructions in order to load the data for usage with SIMD instructions. See Eric Brumer's talk for more information on that topic.

EDIT: Some numbers for my system (haswell, clang 3.5):
I did a short test with your benchmark and on my system, above code slowed the algorithm down by about 10% - essentially, nothing could be vectorized.

However, when applying the AoS to SoA transformation for your centroids, the distance calculation was vectorized, which lead to a reduction of the overall runtime of about 40% compared to your original structure with applied AoS to SoA transformation.

MikeMB
  • 20,029
  • 9
  • 57
  • 102
  • Many thanks! I have my share of things to try out, and I'll try to keep everyone updated about the results! –  May 04 '15 at 12:15
  • Very good point about the locals -- I don't even know why I had them there as they were used nowhere else -- perhaps some late night debugging session, but I realized the same and hoisted them out when I created that standalone test. –  May 04 '15 at 23:35
  • @Ike actually I meant that you should use local variables. – MikeMB May 05 '15 at 07:22
  • Oh I did -- sorry, my English might be poor. By 'hoisting out' of the structure, I meant using them as locals within the function. I think having `min_dist` as a member was some debugging artifact from long ago. –  May 05 '15 at 13:29
6

Firstly, I'd suggest that before you try any code changes you look at the disassembly in an optimized build. Ideally you want to look at the profiler data at an assembly level. This can show up various things, for example:

  1. The compiler may not have generated an actual branch instruction.
  2. The line of code that has the bottleneck may have many more instructions associated with it than you might think - the dist calculation for example.

In addition to that there's the standard trick that when you're talking about distances computing them often requires a square root. You should do that square root at the end of the process on the minimum squared value.

SSE can process four values at once, without any branches, using _mm_min_ps. If you really need speed then you want to be using SSE (or AVX) intrinsics. Here's a basic example:

  float MinimumDistance(const float *values, int count)
  {
    __m128 min = _mm_set_ps(FLT_MAX, FLT_MAX, FLT_MAX, FLT_MAX);
    int i=0;
    for (; i < count - 3; i+=4)
    {
        __m128 distances = _mm_loadu_ps(&values[i]);
        min = _mm_min_ps(min, distances);
    }
    // Combine the four separate minimums to a single value
    min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(2, 3, 0, 1)));
    min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(1, 0, 3, 2)));

    // Deal with the last 0-3 elements the slow way
    float result = FLT_MAX;
    if (count > 3) _mm_store_ss(&result, min);
    for (; i < count; i++)
    {
        result = min(values[i], result);
    }

    return result;
  }

For best SSE performance you should make sure the loads happen at aligned addresses. You can handle the first few misaligned elements in the same way as the last few in the code above if necessary.

The other thing to watch out for is memory bandwidth. If there's several members of the ClusterCentroid structure that you don't use during that loop then you'll be reading much more data from memory than you really need to as memory is read in cache line sized chunks, which are 64 bytes each.

Adam
  • 882
  • 5
  • 10
  • 1
    You cheater, you computed only the minimum distance and not which cluster it belongs to :) not that it would be hard to add.. – harold May 04 '15 at 12:17
  • One of the things I'm always wondering about profiling is that I use a version of vtune that only has sampling test. I used to have an older version that did a full-blown call graph test, and that took ages to run, but it seemed to give me so much more complete and accurate results. With the sampling tests, I always feel like maybe it's off by an instruction or two, and maybe I misunderstood them this time because it didn't appear to be branching that was hurting it. –  May 04 '15 at 16:42
  • 1
    One thing that can confuse people looking at sampling profiles is that cache misses don't get counted against the load instruction. They count against the next instruction that actually uses the value. That can make the hotspot show up in unexpected places if you don't realize what's going on. – Adam May 04 '15 at 22:49
  • @Adam I see, that makes a lot of sense. Often I've found that most of my hotspots which I misattributed (which I do quite often, and it generally takes a few stabs to get improvements) as being due to some other cause almost always benefited most from improvements to memory locality. That explains a lot of what I see. –  May 04 '15 at 23:29
3

One possible micro-optimizations: Store min_dist and min_index in local variables. The compiler may have to write to memory more often the way you have it written; on some architectures this can have a big performance impact. See my answer here for another example.

Adams's suggestion of doing 4 compares at once is also a good one.

However, your best speedup is going to come from reducing the number of centroids you have to check. Ideally, build a kd-tree (or similar) around the centroids, then query that to find the closest point.

If you don't have any tree building code lying around, here's my favorite "poor-man's" closest point search:

Sort the points by one coordinate, e.g. cent.pos[0]
Pick a starting index for the query point (pt)
Iterate forwards through the candidate points until you reach the end, OR when abs(pt.pos[0] - cent.pos[0]) > min_dist
Repeat the previous step going the opposite direction.

The extra stopping condition for the search means that you should skip a fair amount of points; you're also guaranteed not to skip any points closer than the best you've already found.

So for your code, this looks something like

// sort centroid by x coordinate.
min_index = -1;
min_dist = numeric_limits<float>::max();

// pick the start index. This works well if the points are evenly distributed.
float min_x = centroids[0].pos[0];
float max_x = centroids[num_centroids-1].pos[0];
float cur_x = pt.pos[0];
float t = (max_x - cur_x) / (max_x - min_x);
// TODO clamp t between 0 and 1
int start_index = int(t * float(num_centroids))

// Forward search
for (int i=start_index ; i < num_centroids; ++i)
{
    const ClusterCentroid& cent = centroids[i];
    if (fabs(cent.pos[0] - pt.pos[0]) > min_i)
        // Everything to the right of this must be further min_dist, so break.
        // This is where the savings comes from!
        break; 
    const float dist = ...;
    if (dist < min_dist)
    {
        min_dist = dist;
        min_index = i;
    }
}

// Backwards search
for (int i=start_index ; i >= 0; --i)
{
    // same as above
}
pt.min_dist = min_dist
pt.min_index = min_index

(Note that this assumes you're computing the distance between points, but your assembly indicates it's the distance squared. Adjust the break condition accordingly).

There's slight overhead to building the tree or sorting the centroids, but this should be offset by making the calculations faster in the bigger loop (over the number of points).

Community
  • 1
  • 1
celion
  • 3,864
  • 25
  • 19
  • This is a really interesting idea! It'll take me a little time to try it, but I'm very curious about it. –  May 04 '15 at 16:45
  • I think I've seen this basic idea involved before with a name like 'sweep and prune' or something like that. The exact name of this technique of dealing with one coordinate escapes me. One of the difficulties I had with a KD-tree or BVH or Octree is just rebuilding it for every iteration of k as the centroids move around, though we might be able to exploit the fact that they generally don't move much with expanding AABBs. I like this poor man's method a lot though -- it's handy to me even when such structures are available to have a coarser algorithm to apply first with a lower setup overhead. –  May 04 '15 at 16:48
  • 1
    The last place that I used to work referred to this approach (sorting on one axis) as "1 axis sweep", as opposed to "3 axis sweep" that they'd use for full collision detection. There are a few cases that it behaves badly on that a tree wouldn't have problems with; in particular, if your points are on a grid, that means you have a lot of points with the same x value. You'll have to experiment with the tradeoff between the quality of the tree build and the time savings, but I've generally found that a little extra time building a good tree gives a lot of savings in other areas. – celion May 05 '15 at 04:06
  • Sometimes I wonder if like a coarse 10x10x10 grid would work. Often the types of algorithms I'm applying, like this k-means one, have the points moving around (and possibly a lot) every frame/iteration -- lots of dynamic content. So like with the quad-tree in a game with sprites moving everywhere, it gets rather awkward for very, very dynamic content, and I've found I actually do better using just a fixed grid that's really tuned to transfer elements from one cell to another cheaply (ex: linked list with fast alloc where moving elements only updates list pointers). –  May 05 '15 at 04:18
  • 1
    That might also work but it wasn't what I meant :) The one-axis sweep approach works well when everything is distributed evenly along that axis, but can perform badly when that's not true. Suppose you (stupidly) initialized your centroids so that they were on a line; since all their x-coordinates are the same, we can't exit the loop early and you wind up with the same O(N) behavior you were trying to avoid. – celion May 05 '15 at 04:32
  • 1
    I had a similar experience trying to use this approach on a grid of points without realizing that was how they were set up. Each check took O(sqrt(n)) instead of the roughly O(log(n)) I was expecting. But that's still better than the O(N) approach you're using now! – celion May 05 '15 at 04:33
  • Ah yes, I understand the weakness with this one-axis sweep. But also I always have trouble with the variable-depth tree structures when pitted against very dynamic content. For raytracing I use a BVH already (supplied by Intel's Embree), but there the number of ray queries prior to a frame update are so substantial as to trivialize the build cost in comparison. I've often found it difficult to get a good balance there between a lower-quality, really-fast-building accelerator and one that can respond to dynamic changes at interactive rates. –  May 05 '15 at 04:41