本篇目標

在前一篇中,我們已經完成了 tpool_request(),讓主執行緒能夠提交計算請求,並且讓工作執行緒(workers)可以從 task queue 取出任務並執行計算。

但是,主執行緒如果要讀取計算結果,它 不能在工作執行緒還沒完成時就去存取結果,否則會發生 race condition。因此,本篇的目標是 實作 tpool_synchronize(),讓主執行緒可以等待所有計算任務完成後再繼續執行。

最終程式碼:Cpp-Projects/thread-pool/thread-pool.c at main · loijilai/Cpp-Projects (github.com)

我們要解的問題

  1. 主執行緒怎麼知道所有task已經完成?
    • 我們需要追蹤還剩下多少未完成的 task,這個變數在前面已經定義為 tp->pending_tasks
    • pending_tasks == 0 時,代表所有計算都完成,這時候 tpool_synchronize() 就可以讓主執行緒繼續執行。
    • 我們要使用 pthread_cond_wait() 讓主執行緒等待,直到 pending_tasks == 0 為止
  2. 如何確保 pending_tasks 的正確性?
    • 主執行緒呼叫tpool_request() 會在每次新增一個任務時增加 pending_tasks
    • worker() 會在 每次完成一個任務時減少 pending_tasks

實作 tpool_synchronize()

我們希望 tpool_synchronize() 讓主執行緒「阻塞」直到所有計算都完成。這可以用pthread_cond_wait()來達成。

void tpool_synchronize(tpool_t *tp) {
    pthread_mutex_lock(&tp->task_queue_mutex);
    while (tp->pending_tasks > 0) {
        pthread_cond_wait(&tp->sync_cond, &tp->task_queue_mutex);
    }
    pthread_mutex_unlock(&tp->task_queue_mutex);
}
 
  1. 加鎖因為pending_tasks 是共享變數,需要加鎖以確保 thread safety。
  2. pending_tasks > 0,就等待
    • pthread_cond_wait() 會讓主執行緒 釋放鎖並進入等待狀態,直到工作執行緒通知它 pending_tasks == 0,才會醒來
  3. 當主執行緒從這個function return,表示所有計算都已經完成,主執行緒就可以繼續執行

如何讓 worker() 在所有 task 完成後通知主執行緒?

上一篇的worker還沒寫完,我們在此加上:

  1. 每個worker完成一個計算時,它會減少pending_tasks
  2. 而在最後一個workerpending_tasks == 0時,最後一個worker要負責通知主執行緒起來

也就是以下step 3的步驟。

// 不完整的worker,缺少tpool_destroy
// 目前可以
// 1. 取得task
// 2. 計算並存放結果
// 3. 紀錄pending_tasks,並且把主執行緒叫醒
void* worker(void * args) {
    tpool_t *tp = (tpool_t *) args;
    while(true) {
		    // Step 1. 取得task
        pthread_mutex_lock(&tp->task_queue_mutex);
        while(tp->task_queue_head == tp->task_queue_tail)
            pthread_cond_wait(&tp->task_queue_cond, &tp->task_queue_mutex);
 
        task_t *dummy = tp->task_queue_head;
        tp->task_queue_head = dummy->next;
        task_t *task = dummy->next;
 
        pthread_mutex_unlock(&tp->task_queue_mutex);
 
        // Step 2. 計算並存放結果
        fprintf(stderr, "worker calculate row %d\\n", task->row_index);
        for(int i = 0; i < tp->matrix_size; i++)
            task->C[task->row_index][i] = inner_product(tp->matrix_size, task->A[task->row_index], task->B[i]);
				
				// Step 3. 重新取得鎖,為了要更新pending_tasks並且signal主執行緒
				pthread_mutex_lock(&tp->task_queue_mutex);
				tp->pending_tasks--;
				if (tp->pending_tasks == 0) {
				    pthread_cond_signal(&tp->sync_cond);  // 喚醒主執行緒
				}
				pthread_mutex_unlock(&tp->task_queue_mutex);
 
        free(dummy) ;
    }
    return NULL;
}

下一篇

現在我們已經讓 主執行緒可以等待所有計算完成 (tpool_synchronize()),但我們還缺少 如何優雅地關閉執行緒池 (tpool_destroy())。

在下一篇學習執行緒池 5:如何結束執行緒池?中,我們會:

  1. 討論 如何讓 worker 安全退出
  2. 確保 tpool_destroy() 釋放所有資源,不會發生 memory leak