This can be done with SIMD. The solution is similar to the solution for the prefix sum with SIMD.
Within a SIMD register the number of iterations goes as O(Log2(simd_width))
. Each iteration requires: one shift, one subtraction, and one max. For example with SSE it requires Log2(4) = 2
iterations. You can apply your function on four elements like this:
__m128i foo_SSE(__m128i x, int c) {
__m128i t, c1, c2;
c1 = _mm_set1_epi32(c);
c2 = _mm_set1_epi32(2*c);
t = _mm_slli_si128(x, 4);
t = _mm_sub_epi32(t, c1);
x = _mm_max_epi32(x, t);
t = _mm_slli_si128(x, 8);
t = _mm_sub_epi32(t, c2);
x = _mm_max_epi32(x, t);
return x;
}
Once you have the result of a SIMD register you need to apply the "carry" to the next register. For example let's say you have an array a
of eight elements. You load SSE register x1
and x2
like this
__m128i x1 = _mm_loadu_si128((__m128i*)&a[0]);
__m128i x2 = _mm_loadu_si128((__m128i*)&a[4]);
Then to apply your function to all eight elements you would do
__m128i t, s;
s = _mm_setr_epi32(c, 2*c, 3*c, 4*c);
x1 = foo_SSE(x1,c);
x2 = foo_SSE(x2,c);
t = _mm_shuffle_epi32(x1, 0xff);
t = _mm_sub_epi32(t,s);
x2 = _mm_max_epi32(x2,t);
Note that c1
, c2
, and s
are all constants within a loop so they only need to be calculated once.
In general you could apply your function to an unsigned int array a
like this with SSE (with n
a multiple of 4):
void fill_SSE(int *a, int n, int c) {
__m128i offset = _mm_setzero_si128();
__m128i s = _mm_setr_epi32(c, 2*c, 3*c, 4*c);
for(int i=0; i<n/4; i++) {
__m128i x = _mm_loadu_si128((__m128i*)&a[4*i]);
__m128i out = foo_SSE(x, c);
out = _mm_max_epi32(out,offset);
_mm_storeu_si128((__m128i*)&a[4*i], out);
offset = _mm_shuffle_epi32(out, 0xff);
offset = _mm_sub_epi32(offset,s);
}
}
I went ahead and profiled this SSE code. It's about 2.5 times faster than the serial version.
Another major advantage to this method besides going as log2(simd_width)
is that it break the dependency chain so that multiple SIMD operations can go at the same time (using multiple ports) instead of waiting for the previous result. The serial code is latency bound.
The current code works for unsigned integers but you could generalize it to signed integers as well as floats.
Here is the general code I used to test this. I created a bunch of abstract SIMD functions to emulate SIMD hardware before I implemented the SSE version.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <x86intrin.h>
#include <omp.h>
__m128i foo_SSE(__m128i x, int c) {
__m128i t, c1, c2;
c1 = _mm_set1_epi32(c);
c2 = _mm_set1_epi32(2*c);
t = _mm_slli_si128(x, 4);
t = _mm_sub_epi32(t, c1);
x = _mm_max_epi32(x, t);
t = _mm_slli_si128(x, 8);
t = _mm_sub_epi32(t, c2);
x = _mm_max_epi32(x, t);
return x;
}
void foo(int *a, int n, int c) {
for(int i=0; i<n-1; i++) {
if(a[i]-c > a[i+1]) a[i+1] = a[i]-c;
}
}
void broad(int *a, int n, int k) {
for(int i=0; i<n; i++) a[i] = k;
}
void shiftr(int *a, int *b, int n, int m) {
int i;
for(i=0; i<m; i++) b[i] = a[i];
for(; i<n; i++) b[i] = a[i-m];
}
/*
void shiftr(int *a, int *b, int n, int m) {
int i;
for(i=0; i<m; i++) b[i] = 0;
for(; i<n; i++) b[i] = a[i-m];
}
*/
void sub(int *a, int n, int c) {
for(int i=0; i<n; i++) a[i] -= c;
}
void max(int *a, int *b, int n) {
for(int i=0; i<n; i++) if(b[i]>a[i]) a[i] = b[i];
}
void step(int *a, int n, int c) {
for(int i=0; i<n; i++) {
a[i] -= (i+1)*c;
}
}
void foo2(int *a, int n, int c) {
int b[n];
for(int m=1; m<n; m*=2) {
shiftr(a,b,n,m);
sub(b, n, m*c);
max(a,b,n);
//printf("n %d, m %d; ", n,m ); for(int i=0; i<n; i++) printf("%2d ", b[i]); puts("");
}
}
void fill(int *a, int n, int w, int c) {
int b[w], offset[w];
broad(offset, w, -1000);
for(int i=0; i<n/w; i++) {
for(int m=1; m<w; m*=2) {
shiftr(&a[w*i],b,w,m);
sub(b, w, m*c);
max(&a[w*i],b,w);
}
max(&a[w*i],offset,w);
broad(offset,w,a[w*i+w-1]);
step(offset, w, c);
}
}
void fill_SSE(int *a, int n, int c) {
__m128i offset = _mm_setzero_si128();
__m128i s = _mm_setr_epi32(c, 2*c, 3*c, 4*c);
for(int i=0; i<n/4; i++) {
__m128i x = _mm_loadu_si128((__m128i*)&a[4*i]);
__m128i out = foo_SSE(x, c);
out = _mm_max_epi32(out,offset);
_mm_storeu_si128((__m128i*)&a[4*i], out);
offset = _mm_shuffle_epi32(out, 0xff);
offset = _mm_sub_epi32(offset,s);
}
}
void fill_SSEv2(int *a, int n, int c) {
__m128i offset = _mm_setzero_si128();
__m128i s = _mm_setr_epi32(1*c, 2*c, 3*c, 4*c);
__m128i c1 = _mm_set1_epi32(1*c);
__m128i c2 = _mm_set1_epi32(2*c);
for(int i=0; i<n/4; i++) {
__m128i x1 = _mm_loadu_si128((__m128i*)&a[4*i]);
__m128i t1;
t1 = _mm_slli_si128(x1, 4);
t1 = _mm_sub_epi32 (t1, c1);
x1 = _mm_max_epi32 (x1, t1);
t1 = _mm_slli_si128(x1, 8);
t1 = _mm_sub_epi32 (t1, c2);
x1 = _mm_max_epi32 (x1, t1);
x1 = _mm_max_epi32(x1,offset);
_mm_storeu_si128((__m128i*)&a[4*i], x1);
offset = _mm_shuffle_epi32(x1, 0xff);
offset = _mm_sub_epi32(offset,s);
}
}
void fill_SSEv3(int *a, int n, int c) {
__m128i offset = _mm_setzero_si128();
__m128i s = _mm_setr_epi32(1*c, 2*c, 3*c, 4*c);
__m128i c1 = _mm_set1_epi32(1*c);
__m128i c2 = _mm_set1_epi32(2*c);
for(int i=0; i<n/8; i++) {
__m128i x1 = _mm_loadu_si128((__m128i*)&a[8*i]);
__m128i x2 = _mm_loadu_si128((__m128i*)&a[8*i+4]);
__m128i t1, t2;
t1 = _mm_slli_si128(x1, 4);
t1 = _mm_sub_epi32 (t1, c1);
x1 = _mm_max_epi32 (x1, t1);
t1 = _mm_slli_si128(x1, 8);
t1 = _mm_sub_epi32 (t1, c2);
x1 = _mm_max_epi32 (x1, t1);
t2 = _mm_slli_si128(x2, 4);
t2 = _mm_sub_epi32 (t2, c1);
x2 = _mm_max_epi32 (x2, t2);
t2 = _mm_slli_si128(x2, 8);
t2 = _mm_sub_epi32 (t2, c2);
x2 = _mm_max_epi32 (x2, t2);
x1 = _mm_max_epi32(x1,offset);
_mm_storeu_si128((__m128i*)&a[8*i], x1);
offset = _mm_shuffle_epi32(x1, 0xff);
offset = _mm_sub_epi32(offset,s);
x2 = _mm_max_epi32(x2,offset);
_mm_storeu_si128((__m128i*)&a[8*i+4], x2);
offset = _mm_shuffle_epi32(x2, 0xff);
offset = _mm_sub_epi32(offset,s);
}
}
int main(void) {
int n = 8, a[n], a1[n], a2[n];
for(int i=0; i<n; i++) a[i] = i;
/*
a[0] = 1, a[1] = 0;
a[2] = 2, a[3] = 0;
a[4] = 3, a[5] = 13;
a[6] = 4, a[7] = 0;
*/
a[0] = 5, a[1] = 6;
a[2] = 7, a[3] = 8;
a[4] = 1, a[5] = 2;
a[6] = 3, a[7] = 4;
for(int i=0; i<n; i++) printf("%2d ", a[i]); puts("");
for(int i=0; i<n; i++) a1[i] = a[i], a2[i] = a[i];
int c = 1;
foo(a1,n,c);
foo2(a2,n,c);
for(int i=0; i<n; i++) printf("%2d ", a1[i]); puts("");
for(int i=0; i<n; i++) printf("%2d ", a2[i]); puts("");
__m128i x1 = _mm_loadu_si128((__m128i*)&a[0]);
__m128i x2 = _mm_loadu_si128((__m128i*)&a[4]);
__m128i t, s;
s = _mm_setr_epi32(c, 2*c, 3*c, 4*c);
x1 = foo_SSE(x1,c);
x2 = foo_SSE(x2,c);
t = _mm_shuffle_epi32(x1, 0xff);
t = _mm_sub_epi32(t,s);
x2 = _mm_max_epi32(x2,t);
int a3[8];
_mm_storeu_si128((__m128i*)&a3[0], x1);
_mm_storeu_si128((__m128i*)&a3[4], x2);
for(int i=0; i<8; i++) printf("%2d ", a3[i]); puts("");
int w = 8;
n = w*1000;
int f1[n], f2[n];
for(int i=0; i<n; i++) f1[i] = rand()%1000;
for(int i=0; i<n; i++) f2[i] = f1[i];
//for(int i=0; i<n; i++) printf("%2d ", f1[i]); puts("");
foo(f1, n, c);
//fill(f2, n, 8, c);
fill_SSEv3(f2, n, c);
printf("%d\n", memcmp(f1,f2,sizeof(int)*n));
for(int i=0; i<n; i++) {
// if(f1[i] != f2[i]) printf("%d\n", i);
}
//for(int i=0; i<n; i++) printf("%2d ", f1[i]); puts("");
//for(int i=0; i<n; i++) printf("%2d ", f2[i]); puts("");
int r = 200000;
double dtime;
dtime = -omp_get_wtime();
for(int i=0; i<r; i++) fill_SSEv2(f2, n, c);
//for(int i=0; i<r; i++) foo(f1, n, c);
dtime += omp_get_wtime();
printf("time %f\n", dtime);
dtime = -omp_get_wtime();
for(int i=0; i<r; i++) fill_SSEv3(f2, n, c);
//for(int i=0; i<r; i++) foo(f1, n, c);
dtime += omp_get_wtime();
printf("time %f\n", dtime);
dtime = -omp_get_wtime();
for(int i=0; i<r; i++) foo(f1, n, c);
//for(int i=0; i<r; i++) fill_SSEv2(f2, n, c);
dtime += omp_get_wtime();
printf("time %f\n", dtime);
}
Based on a comment by Paul R I was able to fix my function to work with signed integers. However, it requires c>=0
. I am sure it could be fixed to work for c<0
.
void fill_SSEv2(int *a, int n, int c) {
__m128i offset = _mm_set1_epi32(0xf0000000);
__m128i s = _mm_setr_epi32(1*c, 2*c, 3*c, 4*c);
__m128i c1 = _mm_set1_epi32(1*c);
__m128i c2 = _mm_set1_epi32(2*c);
for(int i=0; i<n/4; i++) {
__m128i x1 = _mm_loadu_si128((__m128i*)&a[4*i]);
__m128i t1;
t1 = _mm_shuffle_epi32(x1, 0x90);
t1 = _mm_sub_epi32 (t1, c1);
x1 = _mm_max_epi32 (x1, t1);
t1 = _mm_shuffle_epi32(x1, 0x44);
t1 = _mm_sub_epi32 (t1, c2);
x1 = _mm_max_epi32 (x1, t1);
x1 = _mm_max_epi32(x1,offset);
_mm_storeu_si128((__m128i*)&a[4*i], x1);
offset = _mm_shuffle_epi32(x1, 0xff);
offset = _mm_sub_epi32(offset,s);
}
}
This method should be easily extended to floats now.