Context: I am using datafusion to build a data validator for a csv file input.
Requirement: I want to add row number where the error occurred in output report. In pandas, I have ability to add row index which can be used for this purpose. Is there a way to achieve similar result in datafusion.
There doesn't appear to be any easy way to do this within datafusion after opening the CSV file. But you could instead open the CSV file directly with arrow, produce a new RecordBatch that incorporates the index column, and then feed this to datafusion using a MemTable. Here's the example assuming we are only processing one batch ...
use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow::array::{UInt32Array, Int64Array};
use arrow::datatypes::{Schema, Field, DataType};
use arrow::csv;
use std::fs::File;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]);
let file = File::open("tests/example.csv")?;
let mut csv = csv::Reader::new(file, Arc::new(schema), true, None, 1024, None, None);
let batch = csv.next().unwrap()?;
let length = batch.num_rows() as u32;
let idx_array = UInt32Array::from((0..length).collect::<Vec<u32>>());
let a_array = Int64Array::from(batch.column(0).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
let b_array = Int64Array::from(batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
let new_schema = Schema::new(vec![
Field::new("idx", DataType::UInt32, true),
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]);
let new_batch = RecordBatch::try_new(Arc::new(new_schema),
vec![Arc::new(idx_array), Arc::new(a_array), Arc::new(b_array)])?;
let mem_table = MemTable::try_new(new_batch.schema(), vec![vec![new_batch]])?;
let mut ctx = ExecutionContext::new();
// create the dataframe
let df = ctx.read_table(Arc::new(mem_table))?;
let results = df.collect().await?;
print_batches(&results).unwrap();
// do whatever you need to do
// do whatever you need to do
// do whatever you need to do
Ok(())
}
My example.csv looks like this ...
a,b
1,2
1,3
4,2
2,6
3,7
And the output should be ...
+-----+---+---+
| idx | a | b |
+-----+---+---+
| 0 | 1 | 2 |
| 1 | 1 | 3 |
| 2 | 4 | 2 |
| 3 | 2 | 6 |
| 4 | 3 | 7 |
+-----+---+---+
Though if you're really just in search of a crate with functionality like pandas in python, I'd urge you to checkout polars.