本篇目標
在前面幾篇中,我們已經完成了:
tpool_request()讓主執行緒可以提交計算請求。worker()讓工作執行緒可以取出 task 並執行計算。tpool_synchronize()讓主執行緒可以等待所有計算完成。
本篇的最後一步,就是如何關閉執行緒池 (tpool_destroy()),確保:
- 所有 worker thread 正確結束,不會產生 zombie thread
- 所有分配的記憶體都被釋放,避免 memory leak
- 執行緒池的狀態變數 (
task queue、mutex、condition variable等)都被正確清理。
最終程式碼:Cpp-Projects/thread-pool/thread-pool.c at main · loijilai/Cpp-Projects (github.com)
我們要解決的問題
-
如何讓 worker threads 知道該結束?
Worker 目前仍然在無限迴圈等待新工作:
while (true) { pthread_mutex_lock(&tp->task_queue_mutex); while (tp->task_queue_head == tp->task_queue_tail) pthread_cond_wait(&tp->task_queue_cond, &tp->task_queue_mutex); ... }如果沒有新任務,worker會永遠等待,這時候
tpool_destroy()需要有機制通知所有 worker 該結束。 -
如何避免 worker 還在跑時,主執行緒就釋放記憶體?
舉例來講,Worker可能正在計算某個任務,這時候如果
tpool_destroy()直接free(tp),是不正確的。所以正確的流程應該是:
- 先通知所有 worker 退出
- 確保所有 worker 都結束後,才釋放記憶體
如何實作 tpool_destroy()
這邊先想一個有趣的問題:如何讓 worker threads 知道該結束?
直觀上來看,我想enqueue一種特別的task,如果worker收到了就知道要結束,但是沒有這麼容易,因為「我要怎麼確保每個worker都會收到這個task?」畢竟他們是競爭在搶task的關係,不可能保證這些task能夠平分送到每個worker。
打個比方好了,這就好像如果沒辦法每個worker都寄一封信給他,那不如我就立個旗子好了,每個worker都可以自己來查看旗子的狀況。
就是基於這個想法,所以我們要添加一個所有worker都可以看到的變數來扮演旗子,叫做stop,worker要去檢查這個變數去得知現在是不是要結束執行了,這樣就不用真的把某種特殊的work放進queue裡。
pthread_mutex_lock(&tp->task_queue_mutex);
tp->stop = true; // 把旗子設起來
pthread_cond_broadcast(&tp->task_queue_cond); // 通知worker來查看,讓worker threads知道該結束
pthread_mutex_unlock(&tp->task_queue_mutex);所以雖然我們不需要實際執行enqueue,取而代之的是設立stop = true,但我們還是要記得broadcast所有的worker起來查看。
接著就可以等待所有 worker 結束 (pthread_join()),並釋放所有分配的記憶體。
void tpool_destroy(tpool_t *tp) {
// Step 1. 通知所有worker threads該結束
pthread_mutex_lock(&tp->task_queue_mutex);
tp->stop = true;
pthread_cond_broadcast(&tp->task_queue_cond);
pthread_mutex_unlock(&tp->task_queue_mutex);
// Step 2. 等待所有worker結束
for(int i = 0; i < tp->num_threads; i++) {
pthread_join(tp->threads[i], NULL);
}
// Step 3. 釋放task queue的所有任務(應該要只剩下dummy node)
task_t *current = tp->task_queue_head;
while (current) {
task_t *next = current->next;
free(current);
current = next;
}
// Step 4. 銷毀mutex和condition variable
pthread_mutex_destroy(&tp->task_queue_mutex);
pthread_cond_destroy(&tp->task_queue_cond);
pthread_cond_destroy(&tp->sync_cond);
// Step 5. 釋放thread陣列與tpool_t本體
free(tp->threads);
free(tp);
}修正worker()讓它能夠正確結束
接著我們要思考當主執行緒把stop設成true的當下,worker可能在什麼狀態?
- 一種可能是這個worker無所事事,正在等新的工作進來,但因為tpool_destroy是最後一個呼叫的API,後面不可能再有工作了,所以這種執行緒在耍冗,叫醒來之後應該要結束
- 另一種可能是task queue還很多東西,不能因為stop被設起來每個worker就要結束,應該要把task queue裡面的所有task都執行完才能結束
所以結束的條件是「當stop == true而且queue已空時」,這樣第一種會正確結束,而如果task queue還有很多東西便不會結束。
// 當stop == true而且queue已空時,不可能有新的task所以worker應該結束
if (tp->task_queue_head == tp->task_queue_tail && tp->stop) {
pthread_mutex_unlock(&tp->task_queue_mutex); // 結束前記得釋放鎖
break;
}另外要注意等待的條件也要加上當!tp->stop才等待,這樣被叫醒時就可以跳出這個while迴圈
// 更改:只有!tp->stop時,task queue為空才要等待
while(tp->task_queue_head == tp->task_queue_tail && !tp->stop)
pthread_cond_wait(&tp->task_queue_cond, &tp->task_queue_mutex);完整的worker如下:
// 完整的worker
// 1. 取得task
// 2. 計算並存放結果
// 3. 紀錄pending_tasks,並且把主執行緒叫醒
// 4. 支援tpool_destroy()
void* worker(void * args) {
tpool_t *tp = (tpool_t *) args;
while(true) {
pthread_mutex_lock(&tp->task_queue_mutex);
// 更改:只有!tp->stop時,task queue為空才要等待
while(tp->task_queue_head == tp->task_queue_tail && !tp->stop)
pthread_cond_wait(&tp->task_queue_cond, &tp->task_queue_mutex);
// 當 stop == true 而且queue已空時,不可能有新的task所以worker應該結束
if (tp->task_queue_head == tp->task_queue_tail && tp->stop) {
pthread_mutex_unlock(&tp->task_queue_mutex); // 結束前記得釋放鎖
break;
}
// ... 其餘沒有變動
}
return NULL;
}本系列完結
我們從設計API 開始,一步步建立了一個能夠並行計算矩陣乘法的執行緒池:
- 設計API,並行計算 O(n^3) 的方陣乘法 (
tpool_request,tpool_synchronize,tpool_destroy) - 設計執行緒池結構 (
tpool_t,task_t) - 主執行緒提交請求 (
tpool_request()) - Worker 取出 task 計算 (
worker()) - 主執行緒等待所有計算完成 (
tpool_synchronize()) - 優雅地結束執行緒池 (
tpool_destroy())
這個系列已經涵蓋了執行緒池的核心概念,希望對你有幫助,有興趣可以到[我的github](Cpp-Projects/thread-pool at main · loijilai/Cpp-Projects (github.com))上查看程式碼,README內有附上如何測試程式碼,相信能夠幫助理解!