I am working on a Rust project (as a side-training project) to implement a HEIC (Apple iPhone picture format) to JPEG converter. Full code is already available here. Within this project I am trying to implement a simple threadpool used to distribute conversion tasks over multiple CPU cores.
Right now, the code works perfectly well, except when threadpool is being joined. Let's say I have N thread workers within the pool. What I figure out is that when workers receive their last task, first worker to complete joins successfully, but then N-1 remaining workers would join but image is not fully written to disk (meaning part of pixels are written fully gray instead of containing the expected image). When I add a delay before thread shutdown (say 1000ms), pictures are properly written to disk.
What is going wrong ? My intuition is something related to lifetimes or file handling behavior.
Here is the code :
// threadpool.rs
use std::thread;
use std::process::exit;
use crossbeam::channel::{self,Receiver,Sender};
use log::{debug,error};
type Job = Box<dyn FnOnce() + Send + 'static>;
enum WorkerMessage {
Terminate,
Task(Job),
}
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Sender<WorkerMessage>,
}
impl ThreadPool {
pub fn new(size: usize) -> Self {
// Open channel
let (sender,receiver) = channel::unbounded::<WorkerMessage>();
// Build the set of threads
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver.clone()));
}
ThreadPool { workers, sender }
}
pub fn spawn<F>(&self, f: F)
where F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(WorkerMessage::Task(job)).expect("The thread pool has no thread");
}
pub fn join(&mut self) {
// Send termination message to all workers
for _ in &mut self.workers {
self.sender.send(WorkerMessage::Terminate).unwrap();
}
// Wait until all workers join
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
debug!("Shutting down worker {}", worker.id);
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Receiver<WorkerMessage>) -> Self {
debug!("Starting thread worker {}", id);
let thread = thread::spawn(move || {
loop {
match receiver.recv() {
Ok(msg) => {
match msg {
WorkerMessage::Terminate => {
debug!("Terminating thread worker {} as end request was received", id);
// Sleeping is necessary to make sure all thread actually end
std::thread::sleep(std::time::Duration::from_millis(1000));
exit(0);
},
WorkerMessage::Task(job) => {
debug!("Worker {} starts running a new job", id);
job();
}
}
},
Err(_) => {
error!("Thread fails because the pool is destroyed");
}
};
}
});
Worker { id, thread: Some(thread) }
}
}
// convert.rs
use std::fs::{self, File};
use std::io::BufWriter;
use std::path::PathBuf;
use image::DynamicImage;
use libheif_rs::{
ColorSpace,HeifContext,LibHeif,RgbChroma,
};
pub fn convert_heic_to_jpeg(heic_file: &PathBuf, jpeg_file: &PathBuf) -> () {
fs::create_dir_all(jpeg_file.parent().unwrap()).unwrap();
// Some conversion code not relevant here
let dyn_image = DynamicImage::ImageRgba8(rgb_image).to_rgb8();
let mut file = BufWriter::new(File::create(jpeg_file).unwrap());
dyn_image.write_to(&mut file, image::ImageFormat::Jpeg).unwrap();
}
// bin/heic2jpeg.rs
use std::path::PathBuf;
use std::process::exit;
use clap::Parser;
use log::{error,info,LevelFilter};
use heic2jpeg::{ThreadPool,convert_heic_to_jpeg,utils};
#[derive(Parser,Debug)]
#[command(author = "Alexis Tourneux", version, about = "Convert HEIF pictures to JPEG")]
struct Cli {
#[arg(
help = "Input target for conversion (either a file or a folder)",
)]
input: PathBuf,
#[arg(
help = "Target for output of conversion (type must match input option)",
)]
output: PathBuf,
#[arg(
short,
long,
help = "Number of workers to spawn for conversion",
default_value_t = num_cpus::get(),
)]
workers: usize,
}
fn main() -> () {
// Parse all command parameters
let args = Cli::parse();
// Sanity check to make sure input argument actually exists
// If input argument does not exist (nor a folder or a file), it means there is
// no target to start from, and processing cannot start
if !args.input.exists() {
error!("Input parameter must be a valid file/directory!");
exit(1);
}
// Create the array of images to process
let mut images: Vec<PathBuf> = Vec::new();
// Some CLI args parsing code
// Create the thread pool for parallel processing
let mut pool = ThreadPool::new(args.workers);
// Send all tasks to the thread pool
for image in images {
let input = args.input.clone();
let output = args.output.clone();
pool.spawn(move || {
info!("Starting processing file : {:?}", image);
if input.is_dir() {
let jpeg_file = utils::generate_jpeg_filename_from_heif(&image, &output);
convert_heic_to_jpeg(&image, &jpeg_file)
} else {
convert_heic_to_jpeg(&image, &output);
}
});
}
// Wait for all tasks to be executed
pool.join();
// Exit CLI
exit(0);
}
I am nearly a beginner in Rust but willing to improve and learn !
std::process::exit()
stops the entire process immediately. The first worker that reaches exit(0);
in impl Worker
kills the entire process. Just break
from the loop.