3

I'm trying to write elegant audio processing code in Rust that can consume chunks of audio from N channels (mono, stereo or surround, known at compile-time) in lock-step, reduce them and send the reduced value (or flat-mapped value) off to another stream for processing.

In Clojure, the sequence abstractions and higher-order transducers make it easy to bite chunks off a stream and post a processed result to another core.async channel, but I'm struggling to get this working in Rust, especially since Rust complains about generic iterators not being sized at compile time.

Specifically, how do I consume equal-sized chunks of audio from multiple channels in lock-step, to e.g. calculate the summed square of all values, and then do something with that value? I'm aware of crossbeam.

Here is the pseudo-code I'm reaching for:

type AudioSample = f64;
struct Signal<S, const N: usize> {
    sample_rate: f64,
    channels: [Iterator<S>; N], // I know this won't compile
}

fn process_signal(signal: Signal<AudioSample, 5>) -> f64 {
    let mut sum_squared = 0.0;
    let chunk_size = 0.1 * signal.sample_rate; // 100ms of audio from each channel
    for channel in signal.channels {  // how to parallelize this blocking call?
        let chunk = channel.take(chunk_size); // assuming this blocks until 100ms of signal is available
        sum_squared += chunk.fold(|0., sample| sample * sample);
    }

    sum_squared
}

Bonus points if you can show a pragmatic way to make the process_signal() function "incremental", i.e. asynchronous and parallelizable for N-channels.

Jonas
  • 121,568
  • 97
  • 310
  • 388
Petrus Theron
  • 27,855
  • 36
  • 153
  • 287
  • https://doc.rust-lang.org/std/primitive.slice.html#method.chunks_exact ? not clear what you want – Stargateur Aug 07 '19 at 09:29
  • 1
    Since you mention parallelize and blocking calls: do you expect the "iterators" to be receiving data from another thread while you are pulling data from them, or is all this single-threaded? – Matthieu M. Aug 07 '19 at 09:29
  • I expect it would have to be multi-threaded, but it would be pretty cool if it could be single-threaded, assuming there is a way to peek/select at N buffers that are being populated by a hardware interrupt. – Petrus Theron Aug 07 '19 at 09:32
  • @Stargateur I want to consolidate N channels in parallel by taking equal-sized chunks from N audio channels, map over those chunks and reduce them to a single value which gets sent off for processing elsewhere. Any concrete solution to that general problem will do. Extra bonus points if N does not need to be known at compile-time. – Petrus Theron Aug 07 '19 at 09:36
  • I've now found this DSP crate, which might be more appropriate for what I'm trying to do - haven't tried yet, though: http://rustaudio.github.io/dsp-chain/dsp/ – Petrus Theron Aug 07 '19 at 09:45
  • @PetrusTheron How about losing a bit performance, yet being able to both **avoid blocking** + **run concurrently** (a true-[PARALLEL] process-flow is not possible(or rather is an anti-pattern for cases)where asynchronous,potentially blocking inputs dis-coordinate otherwise potentially true-[PARALLEL] processes). A per-channel *parallel*, principally non-blocking loop, .poll()-only pre-testing,if anything arrived and just reading if ready-to-read input channel byte-by-byte, producing sum_squared fully ***incrementally***,as the chunking logic/windowing now becomes controlled in my loop code. – user3666197 Aug 07 '19 at 11:16
  • Not a Rust-specific approach, yet working this way in high-performance, low-latency, non-blocking multi-channel processing distributed-system designs, where each locally-processed channel has also a different priority and different payload-processing. Byte-sized reading may seem awfully on a first look, yet may help in deterministic sizing for compile-time, may help avoid both the blocking and other problems with skewed synchronicity / slippage / jitter / temporal LoS / {source|transport}-failure-detection / spurious events / flooding ingress. – user3666197 Aug 07 '19 at 11:23
  • *especially since Rust complains about generic iterators not being sized* — [How do I create a heterogeneous collection of objects?](https://stackoverflow.com/q/27957103/155423) – Shepmaster Aug 07 '19 at 12:42
  • *to another `core.async` channel* — are you aware that iterators are not the equivalent to this? You want [futures](https://rust-lang-nursery.github.io/futures-rs/) / streams. – Shepmaster Aug 07 '19 at 13:33

1 Answers1

0

Perhaps you can have another thread responsible for processing, rather than try to synchronize buffer reads? This seems like a nice opportunity to use a mpsc channel.

Since you'll likely only be able to react to the callback from the audio scheduler being called, it's worry you'll spend a lot of time waiting/blocking for buffers to be ready if you try to for processing to happen in unison.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Danny Meyer
  • 321
  • 6
  • 11