本篇目標
在前一篇中,我們已經完成了 tpool_request(),讓主執行緒能夠提交計算請求,並且讓工作執行緒(workers)可以從 task queue 取出任務並執行計算。
但是,主執行緒如果要讀取計算結果,它 不能在工作執行緒還沒完成時就去存取結果,否則會發生 race condition。因此,本篇的目標是 實作 tpool_synchronize(),讓主執行緒可以等待所有計算任務完成後再繼續執行。
最終程式碼:Cpp-Projects/thread-pool/thread-pool.c at main · loijilai/Cpp-Projects (github.com)
我們要解的問題
- 主執行緒怎麼知道所有task已經完成?
- 我們需要追蹤還剩下多少未完成的 task,這個變數在前面已經定義為
tp->pending_tasks - 當
pending_tasks == 0時,代表所有計算都完成,這時候tpool_synchronize()就可以讓主執行緒繼續執行。 - 我們要使用
pthread_cond_wait()讓主執行緒等待,直到pending_tasks == 0為止
- 我們需要追蹤還剩下多少未完成的 task,這個變數在前面已經定義為
- 如何確保
pending_tasks的正確性?- 主執行緒呼叫
tpool_request()會在每次新增一個任務時增加pending_tasks worker()會在 每次完成一個任務時減少pending_tasks
- 主執行緒呼叫
實作 tpool_synchronize()
我們希望 tpool_synchronize() 讓主執行緒「阻塞」直到所有計算都完成。這可以用pthread_cond_wait()來達成。
void tpool_synchronize(tpool_t *tp) {
pthread_mutex_lock(&tp->task_queue_mutex);
while (tp->pending_tasks > 0) {
pthread_cond_wait(&tp->sync_cond, &tp->task_queue_mutex);
}
pthread_mutex_unlock(&tp->task_queue_mutex);
}
- 加鎖因為
pending_tasks是共享變數,需要加鎖以確保 thread safety。 - 當
pending_tasks > 0,就等待pthread_cond_wait()會讓主執行緒 釋放鎖並進入等待狀態,直到工作執行緒通知它pending_tasks == 0,才會醒來
- 當主執行緒從這個function return,表示所有計算都已經完成,主執行緒就可以繼續執行
如何讓 worker() 在所有 task 完成後通知主執行緒?
上一篇的worker還沒寫完,我們在此加上:
- 每個worker完成一個計算時,它會減少
pending_tasks - 而在最後一個worker
pending_tasks == 0時,最後一個worker要負責通知主執行緒起來
也就是以下step 3的步驟。
// 不完整的worker,缺少tpool_destroy
// 目前可以
// 1. 取得task
// 2. 計算並存放結果
// 3. 紀錄pending_tasks,並且把主執行緒叫醒
void* worker(void * args) {
tpool_t *tp = (tpool_t *) args;
while(true) {
// Step 1. 取得task
pthread_mutex_lock(&tp->task_queue_mutex);
while(tp->task_queue_head == tp->task_queue_tail)
pthread_cond_wait(&tp->task_queue_cond, &tp->task_queue_mutex);
task_t *dummy = tp->task_queue_head;
tp->task_queue_head = dummy->next;
task_t *task = dummy->next;
pthread_mutex_unlock(&tp->task_queue_mutex);
// Step 2. 計算並存放結果
fprintf(stderr, "worker calculate row %d\\n", task->row_index);
for(int i = 0; i < tp->matrix_size; i++)
task->C[task->row_index][i] = inner_product(tp->matrix_size, task->A[task->row_index], task->B[i]);
// Step 3. 重新取得鎖,為了要更新pending_tasks並且signal主執行緒
pthread_mutex_lock(&tp->task_queue_mutex);
tp->pending_tasks--;
if (tp->pending_tasks == 0) {
pthread_cond_signal(&tp->sync_cond); // 喚醒主執行緒
}
pthread_mutex_unlock(&tp->task_queue_mutex);
free(dummy) ;
}
return NULL;
}下一篇
現在我們已經讓 主執行緒可以等待所有計算完成 (tpool_synchronize()),但我們還缺少 如何優雅地關閉執行緒池 (tpool_destroy())。
在下一篇學習執行緒池 5:如何結束執行緒池?中,我們會:
- 討論 如何讓 worker 安全退出
- 確保
tpool_destroy()釋放所有資源,不會發生 memory leak