multithreadingimagerustthreadpoolheic

Thread pool implementation do not end properly when joined


I am working on a Rust project (as a side-training project) to implement a HEIC (Apple iPhone picture format) to JPEG converter. Full code is already available here. Within this project I am trying to implement a simple threadpool used to distribute conversion tasks over multiple CPU cores.

Right now, the code works perfectly well, except when threadpool is being joined. Let's say I have N thread workers within the pool. What I figure out is that when workers receive their last task, first worker to complete joins successfully, but then N-1 remaining workers would join but image is not fully written to disk (meaning part of pixels are written fully gray instead of containing the expected image). When I add a delay before thread shutdown (say 1000ms), pictures are properly written to disk.

What is going wrong ? My intuition is something related to lifetimes or file handling behavior.

Here is the code :

// threadpool.rs
use std::thread;
use std::process::exit;

use crossbeam::channel::{self,Receiver,Sender};
use log::{debug,error};


type Job = Box<dyn FnOnce() + Send + 'static>;


enum WorkerMessage {
    Terminate,
    Task(Job),
}

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Sender<WorkerMessage>,
}

impl ThreadPool {
    pub fn new(size: usize) -> Self {

        // Open channel
        let (sender,receiver) = channel::unbounded::<WorkerMessage>();

        // Build the set of threads
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id, receiver.clone()));
        }

        ThreadPool { workers, sender }
    }

    pub fn spawn<F>(&self, f: F)
        where F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);
        self.sender.send(WorkerMessage::Task(job)).expect("The thread pool has no thread");
    }

    pub fn join(&mut self) {
        // Send termination message to all workers
        for _ in &mut self.workers {
            self.sender.send(WorkerMessage::Terminate).unwrap();
        }

        // Wait until all workers join
        for worker in &mut self.workers {
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
                debug!("Shutting down worker {}", worker.id);
            }
        }
    }
}


struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Receiver<WorkerMessage>) -> Self {
        debug!("Starting thread worker {}", id);
        let thread = thread::spawn(move || {
            loop {
                match receiver.recv() {
                    Ok(msg) => {
                        match msg {
                            WorkerMessage::Terminate => {
                                debug!("Terminating thread worker {} as end request was received", id);

                                // Sleeping is necessary to make sure all thread actually end
                                std::thread::sleep(std::time::Duration::from_millis(1000));
                                exit(0);
                            },
                            WorkerMessage::Task(job) => {
                                debug!("Worker {} starts running a new job", id);
                                job();
                            }
                        }
                    },
                    Err(_) => {
                        error!("Thread fails because the pool is destroyed");
                    }
                };
            }
        });

        Worker { id, thread: Some(thread) }
    }
}
// convert.rs
use std::fs::{self, File};
use std::io::BufWriter;
use std::path::PathBuf;

use image::DynamicImage;
use libheif_rs::{
    ColorSpace,HeifContext,LibHeif,RgbChroma,
};


pub fn convert_heic_to_jpeg(heic_file: &PathBuf, jpeg_file: &PathBuf) -> () {
    fs::create_dir_all(jpeg_file.parent().unwrap()).unwrap();

    // Some conversion code not relevant here

    let dyn_image = DynamicImage::ImageRgba8(rgb_image).to_rgb8();

    let mut file = BufWriter::new(File::create(jpeg_file).unwrap());
    dyn_image.write_to(&mut file, image::ImageFormat::Jpeg).unwrap();
}
// bin/heic2jpeg.rs
use std::path::PathBuf;
use std::process::exit;

use clap::Parser;
use log::{error,info,LevelFilter};

use heic2jpeg::{ThreadPool,convert_heic_to_jpeg,utils};


#[derive(Parser,Debug)]
#[command(author = "Alexis Tourneux", version, about = "Convert HEIF pictures to JPEG")]
struct Cli {
    #[arg(
        help = "Input target for conversion (either a file or a folder)",
    )]
    input: PathBuf, 
    #[arg(
        help = "Target for output of conversion (type must match input option)",
    )]
    output: PathBuf, 
    #[arg(
        short,
        long,
        help = "Number of workers to spawn for conversion",
        default_value_t = num_cpus::get(),
    )]
    workers: usize,
}

fn main() -> () {
    // Parse all command parameters
    let args = Cli::parse();

    // Sanity check to make sure input argument actually exists
    // If input argument does not exist (nor a folder or a file), it means there is
    // no target to start from, and processing cannot start
    if !args.input.exists() {
        error!("Input parameter must be a valid file/directory!");
        exit(1);
    }

    // Create the array of images to process
    let mut images: Vec<PathBuf> = Vec::new();
    // Some CLI args parsing code

    // Create the thread pool for parallel processing
    let mut pool = ThreadPool::new(args.workers);

    // Send all tasks to the thread pool
    for image in images {
        let input = args.input.clone();
        let output = args.output.clone();
        pool.spawn(move || {
            info!("Starting processing file : {:?}", image);
            if input.is_dir() {
                let jpeg_file = utils::generate_jpeg_filename_from_heif(&image, &output);
                convert_heic_to_jpeg(&image, &jpeg_file)
            } else {
                convert_heic_to_jpeg(&image, &output);
            }
        });
    }

    // Wait for all tasks to be executed
    pool.join();

    // Exit CLI
    exit(0);
}

I am nearly a beginner in Rust but willing to improve and learn !


Solution

  • std::process::exit() stops the entire process immediately. The first worker that reaches exit(0); in impl Worker kills the entire process. Just break from the loop.