本篇目標

這篇要講的重頭戲是當主執行緒呼叫tpool_request傳入request之後,工作執行緒如何取任務並開始計算?

最終程式碼:Cpp-Projects/thread-pool/thread-pool.c at main · loijilai/Cpp-Projects (github.com)

我們要解的問題

  1. tpool_request應該要能夠
    1. 處理轉置(不可平行化的部分)
    2. 紀錄當前總共有多少tasks
    3. request拆分成tasks
    4. 擔任生產者enqueue到task queue
  2. worker工作執行緒應該要能夠
    1. 取得task
    2. 開始計算並存放結果

tpool_request

void tpool_request(tpool_t *tp, Matrix a, Matrix b, Matrix c);

首先第一步是做轉置,因為這部分不可平行化,交給主執行緒來做。

    // Step 1. transpose matrix b
    for(int i = 0; i < tp->matrix_size; i++) {
        for(int j = 0; j < tp->matrix_size; j++) {
            if(i < j) {
                int tmp = b[i][j];
                b[i][j] = b[j][i];
                b[j][i] = tmp;
            }
        }
    }

轉置之後C_ij = A.row(i) * B.row(j),這樣就可以統一取row向量了。

第二部分是紀錄當前有多少tasks,因為每個方陣都可以拆成n個task,所以直接加上方陣的大小。

    // Step 2. update pending_tasks (for tpool_synchronize use)
    pthread_mutex_lock(&tp->task_queue_mutex);
    tp->pending_tasks += tp->matrix_size;
    pthread_mutex_unlock(&tp->task_queue_mutex);

最後一步就是主執行緒把request拆分成task,其實拆分的方式就是紀錄每個task要計算C矩陣的第幾的row,最後主執行緒再把結果放進task queue,這邊主執行緒扮演的是生產者的角色,當放入task後要記得呼叫pthread_cond_signal把可能在等待的執行緒叫醒。

這邊用pthread_cond_broadcast也可以,但是for-loop中每次只enqueue一個task,沒有必要把全部的thread叫醒,這樣應該會比較有效率,可以降低執行緒之間的競爭。

    for(int i = 0; i < tp->matrix_size; i++) {
        task_t *newTask = malloc(sizeof(task_t));
        newTask->A = a;
        newTask->B = b;
        newTask->C = c;
        newTask->row_index = i; // Step 3. split request into n tasks
        newTask->next = NULL;
 
        // Step 4. enqueue tasks into task queue
        pthread_mutex_lock(&tp->task_queue_mutex);
        tp->task_queue_tail->next = newTask;
        tp->task_queue_tail = newTask;
        pthread_cond_signal(&tp->task_queue_cond); // 記得signal workers
        pthread_mutex_unlock(&tp->task_queue_mutex);
    }

一般的生產者-消費者問題還要考慮buffer滿的情況生產者要等待,但是這邊我用linked list實作queue所以不會滿,這樣enqueue就簡單很多,直接把東西放進去然後broadcast消費者就好了。

綜合在一起的樣子就是:

void tpool_request(tpool_t *tp, Matrix a, Matrix b, Matrix c) {
    // Step 1. transpose matrix b
    for(int i = 0; i < tp->matrix_size; i++) {
        for(int j = 0; j < tp->matrix_size; j++) {
            if(i < j) {
                int tmp = b[i][j];
                b[i][j] = b[j][i];
                b[j][i] = tmp;
            }
        }
    }
 
    // Step 2. update pending_tasks (for tpool_synchronize use)
    pthread_mutex_lock(&tp->task_queue_mutex);
    tp->pending_tasks += tp->matrix_size;
    pthread_mutex_unlock(&tp->task_queue_mutex);
 
    for(int i = 0; i < tp->matrix_size; i++) {
        task_t *newTask = malloc(sizeof(task_t));
        newTask->A = a;
        newTask->B = b;
        newTask->C = c;
        newTask->row_index = i; // Step 3. split request into n tasks
        newTask->next = NULL;
 
        // Step 4. enqueue tasks into task queue
        pthread_mutex_lock(&tp->task_queue_mutex);
        tp->task_queue_tail->next = newTask;
        tp->task_queue_tail = newTask;
        pthread_cond_signal(&tp->task_queue_cond); // 記得signal workers
        pthread_mutex_unlock(&tp->task_queue_mutex);
    }
}

worker

worker就是執行緒池中的苦力,實際把task中下來計算,他們扮演的是消費者的角色,他要能夠

  1. 從task queue取得task
  2. 計算並存放結果

這邊把重點放在實作出這兩個功能,所以先寫一個不完整的版本,缺少tpool_synchronizetpool_destroy的功能,但我認爲一次專注在一部分的功能比較好理解,因此想看最終的實作可以往下看剩下的文章。

從task queue取得task

        pthread_mutex_lock(&tp->task_queue_mutex);
        while(tp->task_queue_head == tp->task_queue_tail)
            pthread_cond_wait(&tp->task_queue_cond, &tp->task_queue_mutex);
 
        task_t *dummy = tp->task_queue_head;
        tp->task_queue_head = dummy->next;
        task_t *task = dummy->next;
 
        pthread_mutex_unlock(&tp->task_queue_mutex);

這部分就是生產者-消費者問題中的消費者,首先要先取得task queue的lock,然後檢查task queue內有沒有task,這邊檢查的方式是利用tp->task_queue_head == tp->task_queue_tail,這在上一篇設計一個執行緒池 — struct設計中有討論task queue的設計。

接著如果task queue為空,就用pthread_cond_wait(&tp->task_queue_cond, &tp->task_queue_mutex);釋放鎖並且在此等待主執行緒叫醒。

這部分用while迴圈是為了避免spurious wakeup,白話文就是在「我被叫醒」和「我實際被OS排班到」之間,條件又被其他執行緒改變了,也許是那個task又被其他執行緒取走了,所以在拿到鎖之後還要再檢查一次,才能從while迴圈中break掉。

從while迴圈break掉之後,就可以進入task queue的dequeue階段了,這部分也在上一篇有討論過。

        task_t *dummy = tp->task_queue_head;
        tp->task_queue_head = dummy->next;
        task_t *task = dummy->next;

拿到task之後就可以釋放鎖,和開始計算。注意要釋放鎖在開始計算,想像一下如果這個計算很花時間,拿著鎖會讓其他worker無法平行化的dequeue和工作,因此一定要記得先釋放鎖。

        fprintf(stderr, "worker calculate row %d\\n", task->row_index);
        for(int i = 0; i < tp->matrix_size; i++)
            task->C[task->row_index][i] = inner_product(tp->matrix_size, task->A[task->row_index], task->B[i]);
 
        free(dummy) ;

這邊在做的事情就是把C矩陣中的第row_index列全部計算完,總共耗時O(n^2)。

完整的程式碼如下:

// 不完整的worker,缺少tpool_synchronize和tpool_destroy
// 目前可以
// 1. 取得task
// 2. 計算並存放結果
void* worker(void * args) {
    tpool_t *tp = (tpool_t *) args;
    while(true) {
		    // Step 1. 取得task
        pthread_mutex_lock(&tp->task_queue_mutex);
        while(tp->task_queue_head == tp->task_queue_tail)
            pthread_cond_wait(&tp->task_queue_cond, &tp->task_queue_mutex);
 
        task_t *dummy = tp->task_queue_head;
        tp->task_queue_head = dummy->next;
        task_t *task = dummy->next;
 
        pthread_mutex_unlock(&tp->task_queue_mutex);
 
        // Step 2. 計算並存放結果
        fprintf(stderr, "worker calculate row %d\\n", task->row_index);
        for(int i = 0; i < tp->matrix_size; i++)
            task->C[task->row_index][i] = inner_product(tp->matrix_size, task->A[task->row_index], task->B[i]);
 
        free(dummy) ;
    }
    return NULL;
}

下一步

現在我們完成讓主執行緒呼叫tpool_request傳入request了,而工作執行緒可以取任務並開始計算。

但我們還缺少tpool_synchronizetpool_destroy的功能,我們將會在下一篇講解如何實作出tpool_synchronize,讓主執行緒能夠確保所有的task都執行完畢。

下一篇:學習執行緒池 4:如何讓主執行緒等待計算完成?