1

I'm receiving RecordBatches serialized as bytes and I'm trying to de-serialize into RecordBatches. Using StreamReader::try_new() and passing in the byte data and an empty Vec<usize> pleases the compiler, but when I try to call reader.next() I get an error.

I'm stuck because I'm not sure what the 2nd parameter (the projection parameter) is supposed to be. It is typed as an Option<Vec<usize>>. When I print out reader.schema() it is the correct schema but it looks like I'm doing something wrong as far as reading the rest of the data into RecordBatch form.

                let buf: Vec<usize> = Vec::new();
                let mut reader = StreamReader::try_new(data.data.as_slice(), Some(buf))?;
                while !reader.is_finished() {
                    println!("scehma: {}", reader.schema());
                    println!("next batch: {:?}", reader.next());
                }

Output:

scehma: Field { name: "my_int64_column", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }
next batch: Some(Err(InvalidArgumentError("at least one column must be defined to create a record batch")))

When changing the buf to be non-empty, the error message changes to

SchemaError("project index 1 out of bounds, max field 1")',
omgirok
  • 178
  • 2
  • 11

1 Answers1

0

From reading the answer to the question asked here and trying to feed in more Vec<usize> into the try_new() function, I've figured out that the you basically have to pass in a Vec<usize> where the size corresponds to the number of columns in your schema (it's actually num col-1 since it's 0 based indexing). I'm now able to successfully read in bytes and convert them to RecordBatch using this projection:

let num_cols = batches.num_columns() - 1;
let projection: Vec<usize> = Vec::from([num_cols]);
let mut reader = StreamReader::try_new(data.data.as_slice(), Some(projection))?;

Gives the output:

schema: Field { name: "vals", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }
next batch: Some(Ok(RecordBatch { schema: Schema { fields: [Field { name: "vals", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }], metadata: {} }, columns: [PrimitiveArray<Int64>
[
  2,
  4,
  6,
  8,
  10,
  12,
  14,
]] }))
omgirok
  • 178
  • 2
  • 11