rustparallel-processingrust-tokiofile-processing

Parallelly processing HUGE file by splitting it into logical shards


I'm trying to parallelly process a huge file ~15GB - ~60GB which contains 560 Million to 2 Billion records

the record looks something like the following

<id> <amount>
123, 6000
123, 4593
111, 1
111, 100
111, -50
111, 10000

there could be thousands of users contained within a file whose activity is recorded as series of transactions.

I processed this file sequentially. Not an issue.

This can be safely parallelized by processing every client data by same thread/task.

But when I try to process it parallelly for optimize other cores available based on creating logical group which will be processed by the same tokio task. For now I'm sticking to creating spawning a single task per available core. And the transaction goes to same task by looking at client id.

This approach is way slow than sequential.

Following is snippet of the approach

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let max_threads_supported = num_cpus::get();
    let mut account_state: HashMap<u16, Client> = HashMap::new();

    let (result_sender, mut result_receiver) =
        mpsc::channel::<HashMap<u16, Client>>(max_threads_supported);

    // repository of sender for each shard
    let mut sender_repository: HashMap<u16, Sender<Transaction>> = HashMap::new();

    for task_counter in 0..max_threads_supported {
        let result_sender_clone = result_sender.clone();
        // create separate mpsc channel for each processor
        let (sender, mut receiver) = mpsc::channel::<Transaction>(10_000);
        sender_repository.insert(task_counter as u16, sender);

        tokio::spawn(async move {
            let mut exec_engine = Engine::initialize();

            while let Some(tx) = receiver.recv().await {
                match exec_engine.execute_transaction(tx) {
                    Ok(_) => (),
                    Err(err) => ()
                }
            }
            result_sender_clone
                .send(exec_engine.get_account_state_owned())
                .await
        });
    }
    drop(result_sender);

        tokio::spawn(async move {
                // just getting reading tx from file sequential std::io::BufferedReader
                for result in reader.deserialize::<Transaction>() {
                    match result {
                        Ok(tx) => {
                            match sender_repository.get(&(&tx.get_client_id() % max_threads_supported)) {
                                Some(sender) => {
                                    sender.send(tx).await;
                                }
                                None => ()
                            }
                        }
                        _ =>()
                    }
                }
            });

            // accumulate result from all the processor
    while let Some(result) = result_receiver.recv().await {
        account_state.extend(result.into_iter());
    }
    // do what ever you like with result

    Ok(())
}

But this seems pretty slow than sequential approach. What am I doing wrong? Btw I've also tried to use broadcast approach but there is chance of lagging consumer and losing messages. So moved to mpsc.

How can I optimize this for better performance??


Solution

  • There are a couple of misconceptions here. The main one being that tokio is not meant for cpu-bound parallelism. It is meant for io-bound event based situations where short reaction times and good scalability is required, like web servers.

    What further reinforces my impression is that you "spawn one task per CPU core", which is the opposite of what you want in tokio. Tokio's strengh is that you can spawn a very large number of tasks, and tokio efficiently schedules them on the available CPU resources. I mean, some configurations of the tokio runtime are single-threaded! So spawning more tasks achieves absolutely no speedup whatsoever; spawning tasks is not for speedup, but for waiting at more await points at the same time. For example in a web server, if you are connected to 100 clients at the same time, you need 100 wait points to wait for a message from each of them. That's where you need one task per connection.

    What you actually want is not asynchronism but parallelism. The current go-to library for structured parallelism is rayon combined with the excellent crossbeam-channel library for dataflow.

    This should point you in the right direction:

    fn main() -> Result<(), Box<dyn std::error::Error>> {
        let max_threads_supported = num_cpus::get();
        let mut account_state: Mutex<HashMap<u16, Client>> = Mutex::new(HashMap::new());
    
        rayon::scope(|s| {
            let (work_queue_sender, work_queue_receiver) = crossbeam_channel::bounded(1000);
    
            for task_counter in 0..max_threads_supported {
                let work_receiver = work_queue_receiver.clone();
                s.spawn(|_| {
                    let mut exec_engine = Engine::initialize();
    
                    while let Ok(tx) = work_receiver.recv() {
                        // TODO: do some proper error handling
                        exec_engine.execute_transaction(tx).unwrap();
                    }
                    account_state
                        .lock()
                        .extend(exec_engine.get_account_state_owned().into_iter());
                });
            }
    
            let reader = Reader;
            for result in reader.deserialize::<Transaction>() {
                work_queue_sender.send(result.unwrap()).unwrap();
            }
            drop(work_queue_sender);
        });
    
        // Do whatever you want with the `account_state` HashMap.
    
        Ok(())
    }
    

    Although many imports were missing from your code (please provide a minimal reproducible example next time), so I wasn't able to test the code.

    But it should look somewhat similar to this.