本篇目標
這篇要講的重頭戲是當主執行緒呼叫tpool_request傳入request之後,工作執行緒如何取任務並開始計算?
最終程式碼:Cpp-Projects/thread-pool/thread-pool.c at main · loijilai/Cpp-Projects (github.com)
我們要解的問題
tpool_request應該要能夠- 處理轉置(不可平行化的部分)
- 紀錄當前總共有多少tasks
- request拆分成tasks
- 擔任生產者enqueue到task queue
worker工作執行緒應該要能夠- 取得task
- 開始計算並存放結果
tpool_request
void tpool_request(tpool_t *tp, Matrix a, Matrix b, Matrix c);首先第一步是做轉置,因為這部分不可平行化,交給主執行緒來做。
// Step 1. transpose matrix b
for(int i = 0; i < tp->matrix_size; i++) {
for(int j = 0; j < tp->matrix_size; j++) {
if(i < j) {
int tmp = b[i][j];
b[i][j] = b[j][i];
b[j][i] = tmp;
}
}
}轉置之後C_ij = A.row(i) * B.row(j),這樣就可以統一取row向量了。
第二部分是紀錄當前有多少tasks,因為每個方陣都可以拆成n個task,所以直接加上方陣的大小。
// Step 2. update pending_tasks (for tpool_synchronize use)
pthread_mutex_lock(&tp->task_queue_mutex);
tp->pending_tasks += tp->matrix_size;
pthread_mutex_unlock(&tp->task_queue_mutex);最後一步就是主執行緒把request拆分成task,其實拆分的方式就是紀錄每個task要計算C矩陣的第幾的row,最後主執行緒再把結果放進task queue,這邊主執行緒扮演的是生產者的角色,當放入task後要記得呼叫pthread_cond_signal把可能在等待的執行緒叫醒。
這邊用pthread_cond_broadcast也可以,但是for-loop中每次只enqueue一個task,沒有必要把全部的thread叫醒,這樣應該會比較有效率,可以降低執行緒之間的競爭。
for(int i = 0; i < tp->matrix_size; i++) {
task_t *newTask = malloc(sizeof(task_t));
newTask->A = a;
newTask->B = b;
newTask->C = c;
newTask->row_index = i; // Step 3. split request into n tasks
newTask->next = NULL;
// Step 4. enqueue tasks into task queue
pthread_mutex_lock(&tp->task_queue_mutex);
tp->task_queue_tail->next = newTask;
tp->task_queue_tail = newTask;
pthread_cond_signal(&tp->task_queue_cond); // 記得signal workers
pthread_mutex_unlock(&tp->task_queue_mutex);
}一般的生產者-消費者問題還要考慮buffer滿的情況生產者要等待,但是這邊我用linked list實作queue所以不會滿,這樣enqueue就簡單很多,直接把東西放進去然後broadcast消費者就好了。
綜合在一起的樣子就是:
void tpool_request(tpool_t *tp, Matrix a, Matrix b, Matrix c) {
// Step 1. transpose matrix b
for(int i = 0; i < tp->matrix_size; i++) {
for(int j = 0; j < tp->matrix_size; j++) {
if(i < j) {
int tmp = b[i][j];
b[i][j] = b[j][i];
b[j][i] = tmp;
}
}
}
// Step 2. update pending_tasks (for tpool_synchronize use)
pthread_mutex_lock(&tp->task_queue_mutex);
tp->pending_tasks += tp->matrix_size;
pthread_mutex_unlock(&tp->task_queue_mutex);
for(int i = 0; i < tp->matrix_size; i++) {
task_t *newTask = malloc(sizeof(task_t));
newTask->A = a;
newTask->B = b;
newTask->C = c;
newTask->row_index = i; // Step 3. split request into n tasks
newTask->next = NULL;
// Step 4. enqueue tasks into task queue
pthread_mutex_lock(&tp->task_queue_mutex);
tp->task_queue_tail->next = newTask;
tp->task_queue_tail = newTask;
pthread_cond_signal(&tp->task_queue_cond); // 記得signal workers
pthread_mutex_unlock(&tp->task_queue_mutex);
}
}worker
worker就是執行緒池中的苦力,實際把task中下來計算,他們扮演的是消費者的角色,他要能夠
- 從task queue取得task
- 計算並存放結果
這邊把重點放在實作出這兩個功能,所以先寫一個不完整的版本,缺少tpool_synchronize和tpool_destroy的功能,但我認爲一次專注在一部分的功能比較好理解,因此想看最終的實作可以往下看剩下的文章。
從task queue取得task
pthread_mutex_lock(&tp->task_queue_mutex);
while(tp->task_queue_head == tp->task_queue_tail)
pthread_cond_wait(&tp->task_queue_cond, &tp->task_queue_mutex);
task_t *dummy = tp->task_queue_head;
tp->task_queue_head = dummy->next;
task_t *task = dummy->next;
pthread_mutex_unlock(&tp->task_queue_mutex);這部分就是生產者-消費者問題中的消費者,首先要先取得task queue的lock,然後檢查task queue內有沒有task,這邊檢查的方式是利用tp->task_queue_head == tp->task_queue_tail,這在上一篇設計一個執行緒池 — struct設計中有討論task queue的設計。
接著如果task queue為空,就用pthread_cond_wait(&tp->task_queue_cond, &tp->task_queue_mutex);釋放鎖並且在此等待主執行緒叫醒。
這部分用while迴圈是為了避免spurious wakeup,白話文就是在「我被叫醒」和「我實際被OS排班到」之間,條件又被其他執行緒改變了,也許是那個task又被其他執行緒取走了,所以在拿到鎖之後還要再檢查一次,才能從while迴圈中break掉。
從while迴圈break掉之後,就可以進入task queue的dequeue階段了,這部分也在上一篇有討論過。
task_t *dummy = tp->task_queue_head;
tp->task_queue_head = dummy->next;
task_t *task = dummy->next;拿到task之後就可以釋放鎖,和開始計算。注意要釋放鎖在開始計算,想像一下如果這個計算很花時間,拿著鎖會讓其他worker無法平行化的dequeue和工作,因此一定要記得先釋放鎖。
fprintf(stderr, "worker calculate row %d\\n", task->row_index);
for(int i = 0; i < tp->matrix_size; i++)
task->C[task->row_index][i] = inner_product(tp->matrix_size, task->A[task->row_index], task->B[i]);
free(dummy) ;這邊在做的事情就是把C矩陣中的第row_index列全部計算完,總共耗時O(n^2)。
完整的程式碼如下:
// 不完整的worker,缺少tpool_synchronize和tpool_destroy
// 目前可以
// 1. 取得task
// 2. 計算並存放結果
void* worker(void * args) {
tpool_t *tp = (tpool_t *) args;
while(true) {
// Step 1. 取得task
pthread_mutex_lock(&tp->task_queue_mutex);
while(tp->task_queue_head == tp->task_queue_tail)
pthread_cond_wait(&tp->task_queue_cond, &tp->task_queue_mutex);
task_t *dummy = tp->task_queue_head;
tp->task_queue_head = dummy->next;
task_t *task = dummy->next;
pthread_mutex_unlock(&tp->task_queue_mutex);
// Step 2. 計算並存放結果
fprintf(stderr, "worker calculate row %d\\n", task->row_index);
for(int i = 0; i < tp->matrix_size; i++)
task->C[task->row_index][i] = inner_product(tp->matrix_size, task->A[task->row_index], task->B[i]);
free(dummy) ;
}
return NULL;
}下一步
現在我們完成讓主執行緒呼叫tpool_request傳入request了,而工作執行緒可以取任務並開始計算。
但我們還缺少tpool_synchronize和tpool_destroy的功能,我們將會在下一篇講解如何實作出tpool_synchronize,讓主執行緒能夠確保所有的task都執行完畢。