rustiteratorbufferedreadersendrayon

How to make par_bridge() work for a BufReader?


I wanted to use rayon's par_bridge() to parallelize an Iterator, but I could not because of the error "the method par_bridge exists for struct MyReader, but its trait bounds were not satisfied the following trait bounds were not satisfied: MyReader: Send...".

Below is the serial version that worked:

use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;

struct MyReader {
    buf: Box<dyn BufRead>,
}

impl MyReader {
    fn new(filename: &str) -> Result<MyReader, Box<dyn std::error::Error>> {
        let path = Path::new(filename);
        let file = File::open(path)?;
        let buf = Box::new(BufReader::new(file));
        Ok(MyReader { buf })
    }
}

impl Iterator for MyReader {
    type Item = String;
    fn next(&mut self) -> Option<Self::Item> {
        let mut line = String::new();
        if self.buf.read_line(&mut line).unwrap() > 0 {
            return Some(line.trim_end().to_owned());
        } else {
            return None;
        }
    }
}

fn main() -> Result<(), Box<dyn std::error::Error>>{
    let filename = "test/lines.txt";
    let my_reader = MyReader::new(filename)?;
    // serial version
    my_reader.for_each(|x| println!("{}", x));
    Ok(())
}

The results:

$ cat test/lines.txt
01
02
03

$ cargo run
01
02
03

Below is the one that failed:

... // same as the serial
fn main() -> Result<(), Box<dyn std::error::Error>>{
    let filename = "test/lines.txt";
    let my_reader = MyReader::new(filename)?;
    // parallel version
    my_reader.par_bridge().for_each(|x| println!("{}", x));
    Ok(())
}

PS. I understand the above example is clumsy, but in fact the data I'm processing has multiple lines as a record. That is why I have to implement my own Iterator instead of using BufRead's lines() to create one.

PS2. The goal is to read a very big file in parallel, and process each record independently.

PS3. I'm trying rayon's ParallelIterator just for the sake of simplicity. If anybody could show me alternative ways in particular those using the std library only, it would be also appreciated.

Thanks.


Solution

  • The issue is that dyn BufRead isn't Send. Trait objects have to be explicitly marked in order to implement a trait.

    struct MyReader {
        buf: Box<dyn BufRead + Send>,
    }
    

    This will work, but is not ideal. It's neither performant nor idiomatic. Trait objects have to figure out what type they are at runtime, while generics do that at compile time, which means they have much more opportunity for optimization. You should use a generic here.

    struct MyReader<R> {
        buf: R,
    }
    

    Then modify your impl blocks.

    /// A generic constructor that handles any implementor of `Read`.
    // You may also want a constructor that takes an implementor of `BufRead`.
    impl<R: Read> MyReader<BufReader<R>> {
        fn new(read: R) -> Self {
            Self { buf: BufReader::new(read) }
        }
    }
    
    impl MyReader<BufReader<File>> {
        fn from_file(filename: &str) -> Result<Self, Box<dyn std::error::Error>> {
            let path = Path::new(filename);
            let file = File::open(path)?;
            Ok(Self::new(file))
        }
    }
    // This is how you would make another kind of `MyReader`.
    // Note that you can't store a `StdinLock` since that does not implement `Send`.
    use std::io::{stdin, Stdin};
    impl MyReader<BufReader<Stdin>> {
        fn from_stdin() -> Self {
            Self::new(stdin())
        }
    }
    
    impl<R: BufRead> Iterator for MyReader<R> { ... }
    

    Since BufReader<File> is Send, so will MyReader<BufReader<File>>.

    Then, I would suggest putting all your processing logic into a generic function. Since you already have a struct at hand, this can be a method, but do whatever works best for organization.

    impl<R: BufRead + Send> MyReader<R> {
        fn process_parallel(self) {
            self.par_bridge().for_each(|x| println!("{}", x));
        }
    }
    

    Then you can call it on each kind of MyReader you want to create.

    if use_stdin {
        MyReader::from_stdin().process_parallel();
    } else {
        MyReader::from_file(filename)?.process_parallel();
    }
    

    Note that you can still create MyReader<Box<dyn BufRead + Send>> if necessary, so using the generic is a pure upgrade in flexibility.

    (playground)