I am trying to implement the fastest possible join of two datasets and taking the product of the values when the indices match. I have a scalar approach to this but I believe using SIMD I can accelerate this algorithm. I have two Span<int>
which are the keys that must match to take the product: aKeys
and bKeys
. Where the value in aKeys
matches the value in bKeys
, the corresponding values in aValues
and bValues
should be multiplied and stored.
I am looking to use SIMD instruction to be able to compare a single key from aKeys
to many values in bKeys
. The trick is that once the values for bKeys
are greater than the aKey
value I am testing, I need to move to the next aKey
value. This is a follow up to this question. A full example with benchmark code can be found in this repo.
The algorithm for IndexOf<T>
in SpanHelpers is close but it is only for looking up a single value. I'm trying to take advantage of the fact that I have several values to lookup, all unique and in ascending order.
// NOTE: The length of aKeys and aValues is the same.
// The length of bKeys and bValues is the same.
// The values in `aKeys` and `bKeys` are unique and sorted but not every value
// in aKeys will be present in bKeys and vice versa.
let hadamardProduct (aKeys: Span<int>, aValues: Span<float>, bKeys: Span<int>, bValues: Span<float>) =
let maxN = Math.Min (aKeys.Length, bKeys.Length)
let outKeys = Array.zeroCreate maxN
let outValues = Array.zeroCreate maxN
let mutable aIdx = 0
let mutable bIdx = 0
let mutable outIdx = 0
while aIdx < aKeys.Length && bIdx < bKeys.Length do
if aKeys.[aIdx] = bKeys.[bIdx] then
outKeys.[outIdx] <- aKeys.[aIdx]
outValues.[outIdx] <- aValues.[aIdx] * bValues.[bIdx]
outIdx <- outIdx + 1
aIdx <- aIdx + 1
bIdx <- bIdx + 1
elif aKeys.[aIdx] < bKeys.[bIdx] then
aIdx <- aIdx + 1
else
bIdx <- bIdx + 1
let resultKeys = Memory (outKeys, 0, outIdx)
let resultValues = Memory (outValues, 0, outIdx)
resultKeys, resultValues
UPDATE 2021-08-23 09:26
After working on this some more I have now come up with the following approach. I am curious if there is anything to be done to make the code go faster?
#nowarn "9" "51" "20" // Don't want warnings about pointers
open System
open FSharp.NativeInterop
open System.Numerics
open System.Runtime.Intrinsics.X86
open System.Runtime.Intrinsics
let hadamardProduct (aKeys: Span<int>, aValues: Span<float>, bKeys: Span<int>, bValues: Span<float>) =
let maxN = Math.Min (aKeys.Length, bKeys.Length)
let outKeys = Array.zeroCreate maxN
let outValues = Array.zeroCreate maxN
let mutable aIdx = 0
let mutable bIdx = 0
let mutable outIdx = 0
if bKeys.Length > 4 then
let lastBlockIdx = bKeys.Length - (bKeys.Length % 4)
let bPointer = && (bKeys.GetPinnableReference ())
let mutable bVector = Sse2.LoadVector128 (NativePtr.add bPointer bIdx)
while aIdx < aKeys.Length && bIdx < lastBlockIdx do
let aVector = Vector128.Create aKeys.[aIdx]
let comparison = Sse2.CompareEqual (aVector, bVector)
let matches = Sse2.MoveMask (comparison.AsByte ())
if matches > 0 then
let bIdxOffset = (BitOperations.TrailingZeroCount matches) / 4 // Convert byte offset to index
outKeys.[outIdx] <- aKeys.[aIdx]
outValues.[outIdx] <- aValues.[aIdx] * bValues.[bIdx + bIdxOffset]
aIdx <- aIdx + 1
outIdx <- outIdx + 1
// REMEMBER, bIdx is testing 4 values at a time so we don't always want to jump
elif aKeys.[aIdx] > bKeys.[bIdx + 3] then
// REMEMBER!! bIdx needs to stride, not increment
bIdx <- bIdx + 4
// We only want to load new values when necessary
if bIdx < lastBlockIdx then
bVector <- Sse2.LoadVector128 (NativePtr.add bPointer bIdx)
else
aIdx <- aIdx + 1
// Final pass for data that didn't fit the Vector128 size
while aIdx < aKeys.Length && bIdx < bKeys.Length do
if aKeys.[aIdx] = bKeys.[bIdx] then
outKeys.[outIdx] <- aKeys.[aIdx]
outValues.[outIdx] <- aValues.[aIdx] * bValues.[bIdx]
outIdx <- outIdx + 1
aIdx <- aIdx + 1
bIdx <- bIdx + 1
elif aKeys.[aIdx] < bKeys.[bIdx] then
aIdx <- aIdx + 1
else
bIdx <- bIdx + 1
let resultKeys = Memory (outKeys, 0, outIdx)
let resultValues = Memory (outValues, 0, outIdx)
resultKeys, resultValues