3

I'm currently sitting on a copy function that fills a destination byte array from a source byte array and replicates the source array as many times as needed until the destination array is filled (some call it MemCpyReplicate or similar). The target array is always a multiple of the length of the source array. My first attempt was a simple copy via the Unsafe.CopyBlockUnaligned intrinsic which simply emits a rep movsb:

public static void CopyRepeat(byte* destination, byte* source, int byteCount, int count) {
  while(count-- > 0) {
    Unsafe.CopyBlockUnaligned(destination, source, (uint)byteCount);
    destination += byteCount;
  }
}

Since the results were not satisfactory, I now wanted to use SIMD, more precisely the Vector<T> interface. But I don't know how to handle unaligned addresses and byte patterns smaller than the vector length. This would be my ideal solution: Source Array -> 10 Bytes, Vector -> 32 Bytes = 3 x byte pattern

The byte sequences are mostly in the range of 1 to 64 bytes. The number of repetitions ranges from 1 to 500. Is there a better solution or are there sample implementations for similar functions?

UPDATE: I have built two vectorized variants from the original version. The first one repeats the pattern in the vector so that the vector contains n patterns. If the pattern is too big for the vector, CopyBlock is used. The second variant repeats the pattern until there are more than the vector size of bytes in the destination and then always copies vector sized blocks (and moves the source window) without using CopyBlock.

Source code of the vectorized variants

However, I now get weird results in runtime for pattern sizes between 2 and 32 (the vector size in my case). I suspect it is related to reading from the moving source window, since doubling the window halved the execution time. For sizes larger than the vector size, I get the expected results:

Method byteCount count Mean Error StdDev
Repeat_CopyBlock 3 16 19.38 ns 0.002 ns 0.002 ns
Repeat_NoCopyBlock 3 16 13.90 ns 0.106 ns 0.100 ns
Repeat_CopyBlock 3 128 25.00 ns 0.005 ns 0.005 ns
Repeat_NoCopyBlock 3 128 39.31 ns 0.135 ns 0.126 ns
Repeat_CopyBlock 12 16 10.64 ns 0.037 ns 0.031 ns
Repeat_NoCopyBlock 12 16 13.35 ns 0.024 ns 0.023 ns
Repeat_CopyBlock 12 128 25.56 ns 0.020 ns 0.019 ns
Repeat_NoCopyBlock 12 128 108.61 ns 0.164 ns 0.154 ns
Repeat_CopyBlock 16 16 68.74 ns 0.010 ns 0.009 ns
Repeat_NoCopyBlock 16 16 13.50 ns 0.002 ns 0.002 ns
Repeat_CopyBlock 16 128 81.41 ns 0.024 ns 0.022 ns
Repeat_NoCopyBlock 16 128 81.52 ns 0.067 ns 0.062 ns
Repeat_CopyBlock 48 16 48.84 ns 0.045 ns 0.042 ns
Repeat_NoCopyBlock 48 16 23.80 ns 0.089 ns 0.083 ns
Repeat_CopyBlock 48 128 364.76 ns 0.053 ns 0.045 ns
Repeat_NoCopyBlock 48 128 165.34 ns 0.145 ns 0.136 ns
public static unsafe void Repeat_NoCopyBlock(byte* destination, byte* source, int byteCount, int count) {
    if(byteCount == 1) {
        Unsafe.InitBlockUnaligned(destination, *source, (uint)count);
        return;
    }

    var absoluteByteCount = byteCount * count;
    var dst = destination;
    var offset = 0;

    do
    {
        if(offset == absoluteByteCount) return;

        offset += byteCount;

        var src = source;
        var remaining = byteCount;

        while((remaining & -4) != 0) {
            *((uint*)dst) = *((uint*)src);
            dst += 4;
            src += 4;
            remaining -= 4;
        }

        if((remaining & 2) != 0) {
            *((ushort*)dst) = *((ushort*)src);
            dst += 2;
            src += 2;
            remaining -= 2;
        }

        if((remaining & 1) != 0)
            *dst++ = *src;
    } while((offset & (2 * -Vector<byte>.Count)) == 0); // & -Vector<byte>.Count was 2x slower.

    var stopLoopAtOffset = absoluteByteCount - Vector<byte>.Count;
    var from = destination;
    // var buffer = offset;

    while(offset <= stopLoopAtOffset) {
        Unsafe.WriteUnaligned(dst, Unsafe.ReadUnaligned<Vector<byte>>(from));
        offset += Vector<byte>.Count;
        from   += Vector<byte>.Count;
        dst    += Vector<byte>.Count;
    }

    var rep = (offset / byteCount) * byteCount; // Closest pattern end.

    if(offset != absoluteByteCount) {
        // next line is the replacement for (buffer is the offset from destination before the loop above):
        // destination + buffer - Vector<byte>.Count
        var repEnd = destination + rep - Vector<byte>.Count;
        var dstEnd = destination + stopLoopAtOffset;
        Unsafe.WriteUnaligned(dstEnd, Unsafe.ReadUnaligned<Vector<byte>>(repEnd));
    }
}

public static unsafe void Repeat_CopyBlock(byte* destination, byte* source, int byteCount, int count) {
    if(count == 0) return;
    if(byteCount == 0) return;

    if(byteCount == 1) {
        Unsafe.InitBlockUnaligned(destination, *source, (uint)count);
        return;
    }

    var numElements = Vector<byte>.Count / byteCount;
    var numElementsByteCount = numElements * byteCount;

    var i = 0;
    var dst = destination;

    do
    {
        var remaining = byteCount;
        var src = source;

        while(remaining >= 4) {
            *((uint*)dst) = *((uint*)src);
            dst += 4;
            src += 4;
            remaining -= 4;
        }

        if((remaining & 2) != 0) {
            *((ushort*)dst) = *((ushort*)src);
            dst += 2;
            src += 2;
            remaining -= 2;
        }

        if((remaining & 1) != 0)
            *dst++ = *src;

        ++i; --count;
    } while(count != 0 && i < numElements);

    if(numElements > 0) { // Skip byteCounts larger than Vector<byte>.Count.
        var src = Unsafe.ReadUnaligned<Vector<byte>>(destination);

        while(count > numElements) {
            Unsafe.WriteUnaligned(dst, src);
            count -= numElements;
            dst += numElementsByteCount;
        }
    }

    while(count > 0) {
        Unsafe.CopyBlockUnaligned(dst, destination, (uint)byteCount);
        dst += byteCount;
        --count;
    }
}
  • Your question should directly include the source for your vectorized variants, as well as the sharplab link for the asm. Off-site links can rot. You're probably not hitting the 30k limit on number of characters in a post. – Peter Cordes Dec 11 '22 at 14:43
  • @Peter Cordes: Added the source code. – ListigerLurch Dec 11 '22 at 14:49

1 Answers1

1

In asm, it's fast to do overlapping stores, e.g. for a 10-byte pattern, you would do a 16-byte SIMD store and increment the pointer by 10.

But even more efficient to unroll the pattern across multiple registers and unroll the loop some. Ideally to lowest_common_multiple(pattern, vector_width), but even just unrolling 3x to fill most of a 32-byte vector is good. (Or without AVX, across a pair of 16-byte vectors, so two stores that don't overlap each other for a total of 32 bytes). Especially when the repeat count isn't huge, so you can't spend forever setting up vectors.

Or to make the setup easier for longer patterns (without reading outside the boundaries of the src buffer): borrow glibc memcpy's strategy of doing for example a 30-byte copy with two overlapping 16-byte loads, one that starts at the start, one that ends at the end. So in the main loop you'd be doing a sequence of N stores with potential overlap, and then the next 30 bytes would be stored without overlapping the first.

Hmm, but a variable number of registers isn't easy to loop with, that would require different loops. Maybe always 4 vector registers but with variable offsets between them, so a single loop can use indexed addressing modes and a pointer increment. (That's not ideal for the stores running on AGUs on Intel before Ice Lake (the port7 AGU only handles 1-register addressing modes), but they're not competing with any loads from this logical core so it's probably fine.) Maybe some of the offsets can be fixed at the vector width, with only the last vector potentially partially overlapping the 3rd.

So it would be up to the setup code to figure out how many repeats of the pattern to fit into 3 to 4x the vector width, with what overlap within. Unfortunately palign is only available with an immediate count, and there'd be a store-forwarding stall if you use narrower stores to do the first few iterations of the pattern into the destination buffer your current way, and then reload from there into XMM or YMM registers. (And multiple SF stalls can't overlap their delay.)


IDK how easy it is to get C#'s JIT to emit asm like that, either with Vector<> intrinsics or Sse2.whatever / Avx.whater; I haven't used C# for anything except SO answers; I'm just trying to point you in the direction of a good goal.

Peter Cordes
  • 328,167
  • 45
  • 605
  • 847
  • Thanks, that is definitely a direction in which I can do more research. What do you think about a log(n) copy function (like [here](https://stackoverflow.com/questions/28551153/how-to-quickly-replicate-a-6-byte-unsigned-integer-into-a-memory-region)) similar to my second solution, so that the written result is directly copied again (and increases in length accordingly). For me it looks like the cost should be quite high in this case for a unaligned movsb. – ListigerLurch Oct 07 '22 at 10:15
  • @ListigerLurch: Hrm. If you can't get the C# JIT to do anything better than `rep movsb` for block copies, at least you're reducing the startup overhead. I'd have to double check on unaligned `rep movsb` across different microarchitectures, but IIRC it still benefits from "fast strings" microcode. Maybe even higher startup overhead, and maybe not as fast once it gets going, but still better than starting `n` small rep movsb. ("fast short rep movs" is a new feature on Ice Lake which I think makes startup significantly lower for stuff like this, perhaps because Intel saw C# emitting this asm) – Peter Cordes Oct 07 '22 at 10:43
  • I can use the direct SIMD intrinsics (like AVX and SSE), but `Vector` has the advantage of using the largest possible vector length at the expense of fewer available functions. I can reduce the cost per vector a bit more by using `Unsafe.SkipInit`. For writing only `Unsafe.Write` or `Unsafe.WriteUnaligned` can be used. – ListigerLurch Oct 07 '22 at 11:11
  • @ListigerLurch: Being forced to use the widest possible vector length could be a *dis*advantage. If the LCM(pattern, 16) is say 112, you might consider a strategy of three 32-byte stores and one 16-byte store, instead of having to prepare a 4th 32-byte vector that can correctly overlap, or that always stores garbage in the top 16 bytes that the next group overlaps. Inconvenient at the end of the loop, maybe have to stop sooner for cleanup. And maybe an extra store-forwarding stall and/or cache-line split load getting it initialized. – Peter Cordes Oct 07 '22 at 11:18
  • @ListigerLurch: But IDK if that corner case would come up for runtime-variable pattern lengths and buffer lengths; hard to take advantage of, like maybe branch to a version of the loop with one narrower vector, or a version that uses 16-byte vectors for everything for small patterns and buffers? Likely just finding a good tradeoff for max-width Vector<> is a good plan. This is likely going to be a case where "fast enough" is sufficient, and trying to get "optimal" would be really really hard. Pick a strategy that does well for sizes that are typical in your use-case. – Peter Cordes Oct 07 '22 at 11:21
  • I had some time to develop better variants, but now I get strange results with AVX. If you have time I would be grateful if you could read the update (source code and the generated assembly are provided). – ListigerLurch Dec 11 '22 at 14:09