1

Problem description

Using serde_json to deserialize a very long array of objects into a Vec<T> can take a long time, because the entire array must be read into memory up front. I'd like to iterate over the items in the array instead to avoid the up-front processing and memory requirements.

My approach so far

StreamDeserializer cannot be used directly, because it can only iterate over self-delimiting types placed back-to-back. So what I've done so far is to write a custom struct to implement Read, wrapping another Read but omitting the starting and ending square brackets, as well as any commas.

For example, the reader will transform the JSON [{"name": "foo"}, {"name": "bar"}, {"name": "baz"}] into {"name": "foo"} {"name": "bar"} {"name": "baz"} so it can be used with StreamDeserializer.

Here is the code in its entirety:

use std::io;

/// An implementation of `Read` that transforms JSON input where the outermost
/// structure is an array. The enclosing brackets and commas are removed,
/// causing the items to be adjacent to one another. This works with
/// [`serde_json::StreamDeserializer`].
pub(crate) struct ArrayStreamReader<T> {
    inner: T,
    depth: Option<usize>,
    inside_string: bool,
    escape_next: bool,
}

impl<T: io::Read> ArrayStreamReader<T> {
    pub(crate) fn new_buffered(inner: T) -> io::BufReader<Self> {
        io::BufReader::new(ArrayStreamReader {
            inner,
            depth: None,
            inside_string: false,
            escape_next: false,
        })
    }
}

#[inline]
fn do_copy(dst: &mut [u8], src: &[u8], len: usize) {
    if len == 1 {
        dst[0] = src[0]; // Avoids memcpy call.
    } else {
        dst[..len].copy_from_slice(&src[..len]);
    }
}

impl<T: io::Read> io::Read for ArrayStreamReader<T> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        if buf.is_empty() {
            return Ok(0);
        }

        let mut tmp = vec![0u8; buf.len()];

        // The outer loop is here in case every byte was skipped, which can happen
        // easily if `buf.len()` is 1. In this situation, the operation is retried
        // until either no bytes are read from the inner stream, or at least 1 byte
        // is written to `buf`.
        loop {
            let byte_count = self.inner.read(&mut tmp)?;
            if byte_count == 0 {
                return if self.depth.is_some() {
                    Err(io::ErrorKind::UnexpectedEof.into())
                } else {
                    Ok(0)
                };
            }

            let mut tmp_pos = 0;
            let mut buf_pos = 0;
            for (i, b) in tmp.iter().cloned().enumerate() {
                if self.depth.is_none() {
                    match b {
                        b'[' => {
                            tmp_pos = i + 1;
                            self.depth = Some(0);
                        },
                        b if b.is_ascii_whitespace() => {},
                        b'\0' => break,
                        _ => return Err(io::ErrorKind::InvalidData.into()),
                    }
                    continue;
                }

                if self.inside_string {
                    match b {
                        _ if self.escape_next => self.escape_next = false,
                        b'\\' => self.escape_next = true,
                        b'"' if !self.escape_next => self.inside_string = false,
                        _ => {},
                    }
                    continue;
                }

                let depth = self.depth.unwrap();
                match b {
                    b'[' | b'{' => self.depth = Some(depth + 1),
                    b']' | b'}' if depth > 0 => self.depth = Some(depth - 1),
                    b'"' => self.inside_string = true,
                    b'}' if depth == 0 => return Err(io::ErrorKind::InvalidData.into()),
                    b',' | b']' if depth == 0 => {
                        let len = i - tmp_pos;
                        do_copy(&mut buf[buf_pos..], &tmp[tmp_pos..], len);
                        tmp_pos = i + 1;
                        buf_pos += len;

                        // Then write a space to separate items.
                        buf[buf_pos] = b' ';
                        buf_pos += 1;

                        if b == b']' {
                            // Reached the end of outer array. If another array
                            // follows, the stream will continue.
                            self.depth = None;
                        }
                    },
                    _ => {},
                }
            }

            if tmp_pos < byte_count {
                let len = byte_count - tmp_pos;
                do_copy(&mut buf[buf_pos..], &tmp[tmp_pos..], len);
                buf_pos += len;
            }

            if buf_pos > 0 {
                // If at least some data was read, return with the amount. Otherwise, the outer
                // loop will try again.
                return Ok(buf_pos);
            }
        }
    }
}

It is used like so:

use std::io;

use serde::Deserialize;

#[derive(Deserialize)]
struct Item {
    name: String,
}

fn main() -> io::Result<()> {
    let json = br#"[{"name": "foo"}, {"name": "bar"}]"#;
    let wrapped = ArrayStreamReader::new_buffered(&json[..]);
    let first_item: Item = serde_json::Deserializer::from_reader(wrapped)
        .into_iter()
        .next()
        .unwrap()?;
    assert_eq!(first_item.name, "foo");
    Ok(())
}

At last, a question

There must be a better way to do this, right?

  • What are you doing with the items after deserialization? If you are only collecting them in a `Vec` you are not gaining anything memory wise. If you want to process them, e.g., to find the maximum value in the list, you can do that with just serde https://serde.rs/stream-array.html. – jonasbb Apr 22 '21 at 16:38
  • I'm dealing with them one-by-one in a loop or with `Iterator` methods. I looked into that example, and that sort of approach does save on memory. However it still has the drawback that it needs to process the entire array all at once. – Michael Morgan Apr 22 '21 at 16:51
  • It's not possible to pick a more suitable data format? JSON is depressingly bad for huge data sets. I've gone this route before (in Python) and, sure, there are workarounds but at the end of the day it's mostly lipstick on a pig. If you have influence over the data format, I strongly encourage you to push for something more suitable. – trent Apr 22 '21 at 16:54
  • I'm reading data published by a third party, so unfortunately I have no control over the format. – Michael Morgan Apr 22 '21 at 16:57
  • Looks like [this](https://serde.rs/stream-array.html) is what you want. – Jmb Apr 23 '21 at 06:58
  • Does this answer your question? [How can I stream elements from inside a JSON array using serde\_json?](https://stackoverflow.com/questions/68641157/how-can-i-stream-elements-from-inside-a-json-array-using-serde-json) – Marcono1234 Aug 25 '23 at 22:45

0 Answers0