multithreadingrustparallel-processingsimulation

Improving performance of a threadpool


I am tying to write my own threadpool, I am noticing that currently, trying to use this custom made threadpool is slower than doing the exact same work in a single threaded fashion and I don't understand why.

This is the threadpool:

use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};
use std::thread;

use concurrent_queue::ConcurrentQueue;

pub struct TaskInfo
{
    should_stop: bool,
}

pub struct TaskBatchInfo
{
    pub task_index: usize,
    pub current_task_count: Arc<AtomicUsize>,
}

pub struct Task
{
    func: Box<dyn FnMut() -> Option<TaskInfo> + Send + Sync + 'static>,
}

impl Task
{
    fn new(func: impl FnMut() -> Option<TaskInfo> + Send + Sync + Clone + 'static)
    -> Self
    {
        Self {
            func: Box::new(func),
        }
    }

    fn run(mut self) -> Option<TaskInfo> { (self.func)() }
}

pub struct ThreadPool
{
    queue: Arc<ConcurrentQueue<Task>>,
    pub should_stop: Arc<AtomicBool>,
    concurrency: usize,
}

impl ThreadPool
{
    pub fn new(size: usize) -> Self
    {
        Self {
            queue: Arc::new(ConcurrentQueue::unbounded()),
            should_stop: Arc::new(AtomicBool::new(false)),
            concurrency: size,
        }
    }

    pub fn start(&self)
    {
        let queue = self.queue.clone();
        let concurrency = self.concurrency;
        let should_stop = self.should_stop.clone();

        // Spawn the entire thread pool loop on a new thread
        thread::spawn(move || {
            let mut handles = Vec::new();

            for _task_id in 0..concurrency
            {
                let _queue = queue.clone();
                let _should_stop = should_stop.clone();
                let handle = thread::spawn(move || {
                    loop
                    {
                        if _should_stop.load(Ordering::Relaxed)
                        {
                            break;
                        }

                        let task = _queue.pop();
                        if let Ok(task) = task
                        {
                            let info = task.run();

                            if let Some(info) = info
                            {
                                if info.should_stop
                                {
                                    _should_stop.store(true, Ordering::Relaxed);
                                    break;
                                }
                            }
                        } else {
                            thread::sleep(std::time::Duration::from_micros(500));
                        }
                    }
                });

                handles.push(handle);
            }

            for handle in handles
            {
                handle.join().unwrap();
            }
        });
    }

    pub fn add_task<F>(&self, task: F)
    where
        F: FnMut() -> Option<TaskInfo> + Send + Sync + Clone + 'static,
    {
        let _ = self.queue.push(Task::new(task));
    }

    pub fn task_batch<F, T>(pool: Arc<Self>, task_count: usize, task: F, termination: T)
    where
        F: FnMut(TaskBatchInfo) -> Option<TaskInfo> + Send + Sync + Clone + 'static,
        T: FnMut() + Send + Sync + Clone + 'static,
    {
        let task_counter = Arc::new(AtomicUsize::new(task_count));
        Self::inner_task_batch(pool, task, task_count, termination, task_counter)
    }

    pub fn task_batch_with_barrier<F, T, C>(
        pool: Arc<Self>,
        task_count: usize,
        mut task: F,
        mut termination: T,
        context: C,
    ) where
        F: FnMut(TaskBatchInfo, C) + Send + Sync + Clone + 'static,
        T: FnMut() + Send + Sync + Clone + 'static,
        C: Clone + Send + Sync + 'static,
    {
        let wait_flag = Arc::new(AtomicU32::new(0));
        let _wait_flag = wait_flag.clone();

        let context = Arc::new(context);
        ThreadPool::task_batch(
            pool.clone(),
            task_count,
            move |info| {
                task(info, context.clone().as_ref().clone());

                None
            },
            move || {
                termination();
                _wait_flag.store(1, Ordering::Relaxed);
                atomic_wait::wake_all(_wait_flag.as_ref());
            },
        );
        atomic_wait::wait(&wait_flag, 0);
    }

    pub fn task_batch_with_barrier_contextless<F>(
        pool: Arc<Self>,
        task_count: usize,
        mut task: F,
    ) where
        F: FnMut(TaskBatchInfo) + Send + Sync + Clone + 'static,
    {
        let wait_flag = Arc::new(AtomicU32::new(0));
        let _wait_flag = wait_flag.clone();

        ThreadPool::task_batch(
            pool.clone(),
            task_count,
            move |info| {
                task(info);

                None
            },
            move || {
                _wait_flag.store(1, Ordering::Relaxed);
                atomic_wait::wake_all(_wait_flag.as_ref());
            },
        );
        atomic_wait::wait(&wait_flag, 0);
    }

    fn inner_task_batch<F, T>(
        pool: Arc<Self>,
        task: F,
        mut task_count: usize,
        termination: T,
        current_task_counter: Arc<AtomicUsize>,
    ) where
        F: FnMut(TaskBatchInfo) -> Option<TaskInfo> + Send + Sync + Clone + 'static,
        T: FnMut() + Send + Sync + Clone + 'static,
    {
        let task_chunk_size = (task_count / pool.as_ref().concurrency).max(1);
        let quotient = task_count / task_chunk_size;
        let remainder = task_count % task_chunk_size;
        for i in 0..quotient + (remainder > 0usize) as usize
        {
            let mut _task = task.clone();
            let mut _termination = termination.clone();
            let mut _current_task_counter = current_task_counter.clone();
            let _pool = pool.clone();

            pool.add_task(Box::new(move || {
                for j in i * task_chunk_size..((i+1) * task_chunk_size).min(task_count)
                {
                    _task(TaskBatchInfo {
                        task_index: j,
                        current_task_count: _current_task_counter.clone(),
                    });
                    let val = _current_task_counter.fetch_sub(1, Ordering::Relaxed);
                    if val == 1
                    {
                        _termination()
                    }
                }

                None
            }));
        }
    }
}

This is a criterion benchmark file I made:

use std::hint::black_box;
use std::sync::{Arc, atomic::Ordering};

use atomic_float::AtomicF64;
use criterion::{Criterion, criterion_group, criterion_main};
use thread_pool::*;


fn st_addition(input: &mut Vec<f64>)
{
    for v in input
    {
        *v += 1.0;
    }
}

fn mt_addition(thread_pool: Arc<ThreadPool>, input: Arc<Vec<AtomicF64>>)
{
    ThreadPool::task_batch_with_barrier_contextless(
        thread_pool.clone(),
        input.len(),
        move |info: TaskBatchInfo| {
            let input = input.clone();
            let i = info.task_index;
            input[i].fetch_add(1., Ordering::Relaxed);
        },
    );
}

fn threadpool_benchmark(c: &mut Criterion)
{
    let mut test_bed = vec![0.; 50_000_000];
    c.bench_function("single thread", |b| {
        b.iter(|| st_addition(black_box(&mut test_bed)))
    });

    let thread_num: usize = std::thread::available_parallelism().unwrap().into();
    let threadpool = Arc::new(ThreadPool::new(thread_num));

    threadpool.start();

    let mut test_bed = vec![];
    for _ in 0..50_000_000
    {
        test_bed.push(AtomicF64::new(0.0));
    }
    let test_bed = Arc::new(test_bed);

    let pool = threadpool.clone();
    let data = test_bed.clone();
    c.bench_function("multi thread", |b| {
        b.iter({
            let value = data.clone();
            let pool = pool.clone();

            move || mt_addition(pool.clone(), value.clone())
        })
    });
}

fn custom_criterion() -> Criterion
{
    Criterion::default()
        .measurement_time(std::time::Duration::from_secs(300))
        .warm_up_time(std::time::Duration::from_secs(2))
}

criterion_group! {
    name = benches;
    config = custom_criterion();
    targets = threadpool_benchmark
}
criterion_main!(benches);

Which currently reports:

single thread           time:   [33.355 ms 33.528 ms 33.728 ms]
                        change: [+0.6652% +1.7882% +2.8456%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 13 outliers among 100 measurements (13.00%)
  6 (6.00%) high mild
  7 (7.00%) high severe

multi thread            time:   [2.7053 s 2.7124 s 2.7198 s]
                        change: [+9.0915% +10.740% +12.440%] (p = 0.00 < 0.05)
                        Performance has regressed.

So, somehow, the MT version is slower than the ST version for the same amount of work.


Solution

  • Your threadpool implementation is fine, your benchmarks and expectations are skewed.

    NOTE: This answer is only based on benchmarking, I didn't go through and pick at your code. Consider Code Review for that.


    On my system I get these numbers, which mimic what you report (I also threw in a rayon equivalent like mentioned in the comments):

    single thread                24.2 ms
    makogan's thread pool     2,669.0 ms
    rayon's thread pool      35,147.0 ms
    

    This looks horrendous even though your threadpool appears to do much better than rayon's. Why does it look so bad?


    First, we should be comparing apples to apples. Your single-threaded benchmark is doing += 1.0 while the multi-threaded version is doing .fetch_add(1.0, Ordering::Relaxed). These are mathematically equivalent, but not the same thing. The atomic version is almost certainly doing more work. If we use the atomic operations in the single-threaded benchmark, I got this difference:

    fn st_addition(input: &mut Vec<AtomicF64>) {
        for v in input {
            v.fetch_add(1., Ordering::Relaxed);
        }
    }
    
    single thread                24.2 ms
    single thread (atomic)       92.6 ms
    

    I do not know whether this difference comes from cache effects of atomic operations, or from using multiple instructions compared to a single CPU fadd instruction, or a combination.

    There are certainly ways to avoid requiring atomic operations on disjoint slices, but I'll leave that out of scope for this answer since the benchmark is synthetic anyway.


    That difference doesn't seem to scratch the 30x difference that remains. The primary reason for the difference is your pool overhead is massive compared to the task size.

    In the single-threaded case, the overhead is negligible - all it has beyond the floating-point addition is just the loop.

    In the multi-threaded case, you have an Arc::clone (useless by the way) which at minimum consists of an atomic increment, a index bounds check (that the single-threaded version trivially avoids), and looking in the inner loop of inner_task_batch an additional atomic update, another Arc::clone, another if check, and maybe more in addition to the loop. All that just to do one floating-point addition.

    This kind of nano-task doesn't make sense in real-world scenarios. If you actually do more work per-task, then the cost melts away:

    fn mt_addition(thread_pool: Arc<ThreadPool>, input: Arc<Vec<AtomicF64>>) {
        const TASK_SIZE: usize = 1_000;
        ThreadPool::task_batch_with_barrier_contextless(
            thread_pool.clone(),
            input.len() / TASK_SIZE,
            move |info: TaskBatchInfo| {
                let input = input.clone(); // still useless
                for j in 0..TASK_SIZE {
                    let i = j + TASK_SIZE * info.task_index;
                    input[i].fetch_add(1., Ordering::Relaxed);
                }
            },
        );
    }
    
    single thread                24.2 ms
    single thread (atomic)       92.6 ms
    makogan's thread pool     2,669.0 ms
    makogan's thread pool (1000) 23.3 ms
    

    And even this I would still say is too small in the real world. 5us per task (1000 fetch_adds based on the single-threaded rate) is still extremely small. However, even increasing to what I would call more reasonable - a million - doesn't really change the numbers (rayon again for good measure):

    single thread                   24.2 ms
    single thread (atomic)          92.6 ms
    makogan's thread pool        2,669.0 ms
    rayon's thread pool         35,147.0 ms
    makogan's thread pool (1000)    23.3 ms
    makogan's thread pool (1000000) 23.5 ms
    rayon's thread pool (1000000)   22.9 ms
    

    The reason all seem to converge on 24-ish ms is because...


    The benchmark (now unleashed) is memory constrained. The 24-ish ms is simply a reflection on how fast my system can shuffle 50 million f64s around. The performance of the threadpool is not going to make this any lower. For a synthetic benchmark like this, you'd want to do CPU-bound work instead (throw some fibonacci or collatz at it).

    As a final note, I believe your nano-task implementation does much better than rayon's because you aren't using a global pool of tasks - task_batch_with_barrier_contextless pre-splits the tasks between the available threads so there's no contention at all. This may or may not be desirable depending on your anticipated workflows and the variance between tasks.

    That all being said your goal was to measure the threadpool itself and not the code running on it. So your nano-task benchmark is actually a good way to measure the overhead (eyeballing about 50ns). It was just your comparison to the single-threaded version was way off the mark