3

Context: I am using datafusion to build a data validator for a csv file input.

Requirement: I want to add row number where the error occurred in output report. In pandas, I have ability to add row index which can be used for this purpose. Is there a way to achieve similar result in datafusion.

waitingkuo
  • 89,478
  • 28
  • 112
  • 118
praveent
  • 562
  • 3
  • 10

2 Answers2

2

There doesn't appear to be any easy way to do this within datafusion after opening the CSV file. But you could instead open the CSV file directly with arrow, produce a new RecordBatch that incorporates the index column, and then feed this to datafusion using a MemTable. Here's the example assuming we are only processing one batch ...

use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow::array::{UInt32Array, Int64Array};
use arrow::datatypes::{Schema, Field, DataType};
use arrow::csv;

use std::fs::File;
use std::sync::Arc;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {


    let schema = Schema::new(vec![
        Field::new("a", DataType::Int64, false),
        Field::new("b", DataType::Int64, false),
    ]);
    
    let file = File::open("tests/example.csv")?;
    
    let mut csv = csv::Reader::new(file, Arc::new(schema), true, None, 1024, None, None);
    let batch = csv.next().unwrap()?;

    let length = batch.num_rows() as u32;
    let idx_array = UInt32Array::from((0..length).collect::<Vec<u32>>());
    let a_array = Int64Array::from(batch.column(0).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
    let b_array = Int64Array::from(batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
    let new_schema = Schema::new(vec![
        Field::new("idx", DataType::UInt32, true),
        Field::new("a", DataType::Int64, false),
        Field::new("b", DataType::Int64, false),
    ]);

    let new_batch = RecordBatch::try_new(Arc::new(new_schema),
        vec![Arc::new(idx_array), Arc::new(a_array), Arc::new(b_array)])?;
    let mem_table = MemTable::try_new(new_batch.schema(), vec![vec![new_batch]])?;
    
    let mut ctx = ExecutionContext::new();

    // create the dataframe
    let df = ctx.read_table(Arc::new(mem_table))?;

    let results = df.collect().await?;

    print_batches(&results).unwrap();

    // do whatever you need to do
    // do whatever you need to do
    // do whatever you need to do
    
    Ok(())
}

My example.csv looks like this ...

a,b
1,2
1,3
4,2
2,6
3,7

And the output should be ...

+-----+---+---+
| idx | a | b |
+-----+---+---+
| 0   | 1 | 2 |
| 1   | 1 | 3 |
| 2   | 4 | 2 |
| 3   | 2 | 6 |
| 4   | 3 | 7 |
+-----+---+---+

Though if you're really just in search of a crate with functionality like pandas in python, I'd urge you to checkout polars.

Ian Graham
  • 346
  • 1
  • 11
2

Given Ian Grahams advice to checkout polars, I thought I give an example on how this could be achieved in polars as well:

use polars::prelude::*;
use std::io::Cursor;

fn main() -> Result<()> {

    // use an in memory repr for the csv
    let csv = Cursor::new(
        "a,b
1,2
1,3
4,2
2,6
3,7
",
    );

    // parse the csv into a DataFrame
    let mut df = CsvReader::new(csv).finish()?;

    // create the index column based on the dataframes height
    // note that we use the `NoNull` wrapper to create from `T` instead of `Option<T>`
    let mut idx: NoNull<UInt32Chunked> = (0..df.height() as u32).collect();
    idx.rename("idx");

    // add the index column to the DataFrame
    df.insert_at_idx(0, idx.into_inner().into_series())?;


    // print output
    dbg!(df);

    Ok(())
}

Outputs:

+-----+-----+-----+
| idx | a   | b   |
| --- | --- | --- |
| u32 | i64 | i64 |
+=====+=====+=====+
| 0   | 1   | 2   |
+-----+-----+-----+
| 1   | 1   | 3   |
+-----+-----+-----+
| 2   | 4   | 2   |
+-----+-----+-----+
| 3   | 2   | 6   |
+-----+-----+-----+
| 4   | 3   | 7   |
+-----+-----+-----+

ritchie46
  • 10,405
  • 1
  • 24
  • 43