1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Michael Vandeberg
3  
// Copyright (c) 2026 Michael Vandeberg
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/boostorg/capy
8  
// Official repository: https://github.com/boostorg/capy
9  
//
9  
//
10  

10  

11  
#include <boost/capy/ex/thread_pool.hpp>
11  
#include <boost/capy/ex/thread_pool.hpp>
12  
#include <boost/capy/detail/intrusive.hpp>
12  
#include <boost/capy/detail/intrusive.hpp>
13 -
#include <algorithm>
 
14 -
#include <atomic>
 
15  
#include <boost/capy/test/thread_name.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
16  
#include <condition_variable>
14  
#include <condition_variable>
17  
#include <cstdio>
15  
#include <cstdio>
18  
#include <mutex>
16  
#include <mutex>
19  
#include <thread>
17  
#include <thread>
20  
#include <vector>
18  
#include <vector>
21  

19  

22  
/*
20  
/*
23  
    Thread pool implementation using a shared work queue.
21  
    Thread pool implementation using a shared work queue.
24  

22  

25  
    Work items are coroutine handles wrapped in intrusive list nodes, stored
23  
    Work items are coroutine handles wrapped in intrusive list nodes, stored
26  
    in a single queue protected by a mutex. Worker threads wait on a
24  
    in a single queue protected by a mutex. Worker threads wait on a
27  
    condition_variable until work is available or stop is requested.
25  
    condition_variable until work is available or stop is requested.
28  

26  

29  
    Threads are started lazily on first post() via std::call_once to avoid
27  
    Threads are started lazily on first post() via std::call_once to avoid
30  
    spawning threads for pools that are constructed but never used. Each
28  
    spawning threads for pools that are constructed but never used. Each
31  
    thread is named with a configurable prefix plus index for debugger
29  
    thread is named with a configurable prefix plus index for debugger
32  
    visibility.
30  
    visibility.
33  

31  

34 -
    Work tracking: on_work_started/on_work_finished maintain an atomic
32 +
    Shutdown sequence: stop() sets the stop flag and notifies all threads,
35 -
    outstanding_work_ counter. join() blocks until this counter reaches
33 +
    then the destructor joins threads and destroys any remaining queued
36 -
    zero, then signals workers to stop and joins threads.
34 +
    work without executing it.
37 -

 
38 -
    Two shutdown paths:
 
39 -
    - join(): waits for outstanding work to drain, then stops workers.
 
40 -
    - stop(): immediately signals workers to exit; queued work is abandoned.
 
41 -
    - Destructor: stop() then join() (abandon + wait for threads).
 
42  
*/
35  
*/
43  

36  

44  
namespace boost {
37  
namespace boost {
45  
namespace capy {
38  
namespace capy {
46  

39  

47  
//------------------------------------------------------------------------------
40  
//------------------------------------------------------------------------------
48  

41  

49  
class thread_pool::impl
42  
class thread_pool::impl
50  
{
43  
{
51  
    struct work : detail::intrusive_queue<work>::node
44  
    struct work : detail::intrusive_queue<work>::node
52  
    {
45  
    {
53  
        std::coroutine_handle<> h_;
46  
        std::coroutine_handle<> h_;
54  

47  

55  
        explicit work(std::coroutine_handle<> h) noexcept
48  
        explicit work(std::coroutine_handle<> h) noexcept
56  
            : h_(h)
49  
            : h_(h)
57  
        {
50  
        {
58  
        }
51  
        }
59  

52  

60  
        void run()
53  
        void run()
61  
        {
54  
        {
62  
            auto h = h_;
55  
            auto h = h_;
63  
            delete this;
56  
            delete this;
64  
            h.resume();
57  
            h.resume();
65  
        }
58  
        }
66  

59  

67  
        void destroy()
60  
        void destroy()
68 -
            auto h = h_;
 
69  
        {
61  
        {
70 -
            if(h && h != std::noop_coroutine())
 
71 -
                h.destroy();
 
72  
            delete this;
62  
            delete this;
73  
        }
63  
        }
74  
    };
64  
    };
75  

65  

76  
    std::mutex mutex_;
66  
    std::mutex mutex_;
77  
    std::condition_variable cv_;
67  
    std::condition_variable cv_;
78  
    detail::intrusive_queue<work> q_;
68  
    detail::intrusive_queue<work> q_;
79 -
    std::atomic<std::size_t> outstanding_work_{0};
 
80  
    std::vector<std::thread> threads_;
69  
    std::vector<std::thread> threads_;
81 -
    bool joined_{false};
 
82  
    bool stop_{false};
70  
    bool stop_{false};
83  
    std::size_t num_threads_;
71  
    std::size_t num_threads_;
84  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
72  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
85  
    std::once_flag start_flag_;
73  
    std::once_flag start_flag_;
86  

74  

87  
public:
75  
public:
88  
    ~impl()
76  
    ~impl()
89  
    {
77  
    {
 
78 +
        stop();
 
79 +
        for(auto& t : threads_)
 
80 +
            if(t.joinable())
 
81 +
                t.join();
 
82 +

90  
        while(auto* w = q_.pop())
83  
        while(auto* w = q_.pop())
91  
            w->destroy();
84  
            w->destroy();
92  
    }
85  
    }
93  

86  

94  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
87  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
95  
        : num_threads_(num_threads)
88  
        : num_threads_(num_threads)
96  
    {
89  
    {
97  
        if(num_threads_ == 0)
90  
        if(num_threads_ == 0)
98 -
            num_threads_ = std::max(
91 +
            num_threads_ = std::thread::hardware_concurrency();
99 -
                std::thread::hardware_concurrency(), 1u);
92 +
        if(num_threads_ == 0)
 
93 +
            num_threads_ = 1;
100  

94  

101  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
95  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
102  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
96  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
103  
        thread_name_prefix_[n] = '\0';
97  
        thread_name_prefix_[n] = '\0';
104  
    }
98  
    }
105  

99  

106  
    void
100  
    void
107  
    post(std::coroutine_handle<> h)
101  
    post(std::coroutine_handle<> h)
108  
    {
102  
    {
109  
        ensure_started();
103  
        ensure_started();
110  
        auto* w = new work(h);
104  
        auto* w = new work(h);
111  
        {
105  
        {
112  
            std::lock_guard<std::mutex> lock(mutex_);
106  
            std::lock_guard<std::mutex> lock(mutex_);
113  
            q_.push(w);
107  
            q_.push(w);
114  
        }
108  
        }
115  
        cv_.notify_one();
109  
        cv_.notify_one();
116  
    }
110  
    }
117  

111  

118 -
    on_work_started() noexcept
 
119 -
    {
 
120 -
        outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
 
121 -
    }
 
122 -

 
123 -
    void
 
124 -
    on_work_finished() noexcept
 
125 -
    {
 
126 -
        if(outstanding_work_.fetch_sub(
 
127 -
            1, std::memory_order_acq_rel) == 1)
 
128 -
        {
 
129 -
            std::lock_guard<std::mutex> lock(mutex_);
 
130 -
            if(joined_ && !stop_)
 
131 -
                stop_ = true;
 
132 -
            cv_.notify_all();
 
133 -
        }
 
134 -
    }
 
135 -

 
136 -
    void
 
137  
    void
112  
    void
138  
    join() noexcept
113  
    join() noexcept
139  
    {
114  
    {
140 -
        {
115 +
        stop();
141 -
            std::unique_lock<std::mutex> lock(mutex_);
 
142 -
            if(joined_)
 
143 -
                return;
 
144 -
            joined_ = true;
 
145 -

 
146 -
            if(outstanding_work_.load(
 
147 -
                std::memory_order_acquire) == 0)
 
148 -
            {
 
149 -
                stop_ = true;
 
150 -
                cv_.notify_all();
 
151 -
            }
 
152 -
            else
 
153 -
            {
 
154 -
                cv_.wait(lock, [this]{
 
155 -
                    return stop_;
 
156 -
                });
 
157 -
            }
 
158 -
        }
 
159 -

 
160  
        for(auto& t : threads_)
116  
        for(auto& t : threads_)
161  
            if(t.joinable())
117  
            if(t.joinable())
162  
                t.join();
118  
                t.join();
163  
    }
119  
    }
164  

120  

165  
    void
121  
    void
166  
    stop() noexcept
122  
    stop() noexcept
167  
    {
123  
    {
168  
        {
124  
        {
169  
            std::lock_guard<std::mutex> lock(mutex_);
125  
            std::lock_guard<std::mutex> lock(mutex_);
170  
            stop_ = true;
126  
            stop_ = true;
171  
        }
127  
        }
172  
        cv_.notify_all();
128  
        cv_.notify_all();
173  
    }
129  
    }
174  

130  

175  
private:
131  
private:
176  
    void
132  
    void
177  
    ensure_started()
133  
    ensure_started()
178  
    {
134  
    {
179  
        std::call_once(start_flag_, [this]{
135  
        std::call_once(start_flag_, [this]{
180  
            threads_.reserve(num_threads_);
136  
            threads_.reserve(num_threads_);
181  
            for(std::size_t i = 0; i < num_threads_; ++i)
137  
            for(std::size_t i = 0; i < num_threads_; ++i)
182  
                threads_.emplace_back([this, i]{ run(i); });
138  
                threads_.emplace_back([this, i]{ run(i); });
183  
        });
139  
        });
184  
    }
140  
    }
185  

141  

186  
    void
142  
    void
187  
    run(std::size_t index)
143  
    run(std::size_t index)
188  
    {
144  
    {
189  
        // Build name; set_current_thread_name truncates to platform limits.
145  
        // Build name; set_current_thread_name truncates to platform limits.
190  
        char name[16];
146  
        char name[16];
191  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
147  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
192  
        set_current_thread_name(name);
148  
        set_current_thread_name(name);
193  

149  

194  
        for(;;)
150  
        for(;;)
195  
        {
151  
        {
196  
            work* w = nullptr;
152  
            work* w = nullptr;
197  
            {
153  
            {
198  
                std::unique_lock<std::mutex> lock(mutex_);
154  
                std::unique_lock<std::mutex> lock(mutex_);
199  
                cv_.wait(lock, [this]{
155  
                cv_.wait(lock, [this]{
200  
                    return !q_.empty() ||
156  
                    return !q_.empty() ||
201  
                        stop_;
157  
                        stop_;
202  
                });
158  
                });
203 -
                if(stop_)
159 +
                if(stop_ && q_.empty())
204  
                    return;
160  
                    return;
205  
                w = q_.pop();
161  
                w = q_.pop();
206  
            }
162  
            }
207  
            if(w)
163  
            if(w)
208  
                w->run();
164  
                w->run();
209  
        }
165  
        }
210  
    }
166  
    }
211  
};
167  
};
212  

168  

213  
//------------------------------------------------------------------------------
169  
//------------------------------------------------------------------------------
214  

170  

215  
thread_pool::
171  
thread_pool::
216  
~thread_pool()
172  
~thread_pool()
217 -
    impl_->stop();
 
218  
{
173  
{
219  
    impl_->join();
174  
    impl_->join();
220  
    shutdown();
175  
    shutdown();
221  
    destroy();
176  
    destroy();
222  
    delete impl_;
177  
    delete impl_;
223  
}
178  
}
224  

179  

225  
thread_pool::
180  
thread_pool::
226  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
181  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
227  
    : impl_(new impl(num_threads, thread_name_prefix))
182  
    : impl_(new impl(num_threads, thread_name_prefix))
228  
{
183  
{
229  
    this->set_frame_allocator(std::allocator<void>{});
184  
    this->set_frame_allocator(std::allocator<void>{});
230  
}
185  
}
231  

186  

232  
void
187  
void
233 -
join() noexcept
 
234 -
{
 
235 -
    impl_->join();
 
236 -
}
 
237 -

 
238 -
void
 
239 -
thread_pool::
 
240  
thread_pool::
188  
thread_pool::
241  
stop() noexcept
189  
stop() noexcept
242  
{
190  
{
243  
    impl_->stop();
191  
    impl_->stop();
244  
}
192  
}
245  

193  

246  
//------------------------------------------------------------------------------
194  
//------------------------------------------------------------------------------
247  

195  

248  
thread_pool::executor_type
196  
thread_pool::executor_type
249  
thread_pool::
197  
thread_pool::
250  
get_executor() const noexcept
198  
get_executor() const noexcept
251  
{
199  
{
252  
    return executor_type(
200  
    return executor_type(
253 -
}
 
254 -

 
255 -
void
 
256 -
thread_pool::executor_type::
 
257 -
on_work_started() const noexcept
 
258 -
{
 
259 -
    pool_->impl_->on_work_started();
 
260 -
}
 
261 -

 
262 -
void
 
263 -
thread_pool::executor_type::
 
264 -
on_work_finished() const noexcept
 
265 -
{
 
266 -
    pool_->impl_->on_work_finished();
 
267  
        const_cast<thread_pool&>(*this));
201  
        const_cast<thread_pool&>(*this));
268  
}
202  
}
269  

203  

270  
void
204  
void
271  
thread_pool::executor_type::
205  
thread_pool::executor_type::
272  
post(std::coroutine_handle<> h) const
206  
post(std::coroutine_handle<> h) const
273  
{
207  
{
274  
    pool_->impl_->post(h);
208  
    pool_->impl_->post(h);
275  
}
209  
}
276  

210  

277  
} // capy
211  
} // capy
278  
} // boost
212  
} // boost