rustapache-arrowapache-arrow-datafusion

Indexing in datafusion


Context: I am using datafusion to build a data validator for a csv file input.

Requirement: I want to add row number where the error occurred in output report. In pandas, I have ability to add row index which can be used for this purpose. Is there a way to achieve similar result in datafusion.


Solution

  • There doesn't appear to be any easy way to do this within datafusion after opening the CSV file. But you could instead open the CSV file directly with arrow, produce a new RecordBatch that incorporates the index column, and then feed this to datafusion using a MemTable. Here's the example assuming we are only processing one batch ...

    use datafusion::prelude::*;
    use datafusion::datasource::MemTable;
    use arrow::util::pretty::print_batches;
    use arrow::record_batch::RecordBatch;
    use arrow::array::{UInt32Array, Int64Array};
    use arrow::datatypes::{Schema, Field, DataType};
    use arrow::csv;
    
    use std::fs::File;
    use std::sync::Arc;
    
    #[tokio::main]
    async fn main() -> datafusion::error::Result<()> {
    
    
        let schema = Schema::new(vec![
            Field::new("a", DataType::Int64, false),
            Field::new("b", DataType::Int64, false),
        ]);
        
        let file = File::open("tests/example.csv")?;
        
        let mut csv = csv::Reader::new(file, Arc::new(schema), true, None, 1024, None, None);
        let batch = csv.next().unwrap()?;
    
        let length = batch.num_rows() as u32;
        let idx_array = UInt32Array::from((0..length).collect::<Vec<u32>>());
        let a_array = Int64Array::from(batch.column(0).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
        let b_array = Int64Array::from(batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
        let new_schema = Schema::new(vec![
            Field::new("idx", DataType::UInt32, true),
            Field::new("a", DataType::Int64, false),
            Field::new("b", DataType::Int64, false),
        ]);
    
        let new_batch = RecordBatch::try_new(Arc::new(new_schema),
            vec![Arc::new(idx_array), Arc::new(a_array), Arc::new(b_array)])?;
        let mem_table = MemTable::try_new(new_batch.schema(), vec![vec![new_batch]])?;
        
        let mut ctx = ExecutionContext::new();
    
        // create the dataframe
        let df = ctx.read_table(Arc::new(mem_table))?;
    
        let results = df.collect().await?;
    
        print_batches(&results).unwrap();
    
        // do whatever you need to do
        // do whatever you need to do
        // do whatever you need to do
        
        Ok(())
    }
    
    

    My example.csv looks like this ...

    a,b
    1,2
    1,3
    4,2
    2,6
    3,7
    

    And the output should be ...

    +-----+---+---+
    | idx | a | b |
    +-----+---+---+
    | 0   | 1 | 2 |
    | 1   | 1 | 3 |
    | 2   | 4 | 2 |
    | 3   | 2 | 6 |
    | 4   | 3 | 7 |
    +-----+---+---+
    

    Though if you're really just in search of a crate with functionality like pandas in python, I'd urge you to checkout polars.