Thread Pool em C
Thread Pool (ou "piscina de threads") é uma abstração que resolve os problemas associados ao uso indiscriminado de threads que vimos no tópico anterior. Em vez de criar e destruir threads para cada tarefa, uma pool de threads mantém um conjunto fixo de threads, previamente criadas, que executam tarefas conforme necessário.
Assim, quando uma tarefa é concluída, a thread é devolvida ao pool e reutilizada para outras tarefas.
Isso mitiga problemas como overhead de criação de threads, limitação de recursos no sistema operacional e latência associada à troca de contexto e sincronização de threads.
Funcionamento de uma thread pool
O funcionamento de uma pool de threads é muito simples, e passa por:
criar um número fixo de threads
cria uma fila de tarefas compartilhada entre as threads
cada thread na pool:
retira uma tarefa da fila
executa a tarefa
volta a ficar disponível para a próxima tarefa
Em temos técnicos, cada thread fica em loop verificando se há tarefa nova na fila. Basicamente, o processo principal adiciona tarefas na fila (push), e depois cada uma das threads na pool fica retirando as tarefas da fila (pop):

Entretanto, o quê acontece se 2 ou mais threads tentarem fazer pop da fila ao mesmo tempo?

Pode dar ruim, né? Race condition! E o quê fazemos pra resolver race condition?
Você acertou, tá mandando bem hein? Mutex neles!

Se a gente protege o recurso com um mutex, basicamente sincronizamos o acesso entre as threads e evitamos a condição de corrida. Contudo, o quê acontece se não houver nenhuma tarefa na fila?

Quando não há nada na fila, a operação de pop da fila retorna NULL
, portanto a thread fica repetindo o loop até que uma nova tarefa seja adicionada na fila. Loops são CPU-intensive, portanto o uso da CPU ficaria bastante comprometido neste caso.
Mas a biblioteca padrão traz uma técnica primitiva para lidar com isso, que basicamente são variáveis condicionais, condvar, ou simplesmente condition.
Dada uma certa condição, a thread é colocada em estado de wait, e isto pausa a execução do loop da thread, evitando o consumo desnecessário de CPU. Este mecanismo usa por trás recursos de thread signaling, que basicamente é o envio de sinais para as threads.

Assim que uma nova tarefa é adicionada na fila, é enviado o sinal de wake para a thread que detém o mutex, portanto esta ganha prioridade no escalonador e volta a executar o loop, fazendo pop da tarefa da fila.

O uso de condvar deve ser feito em conjunto com mutex, pois a única forma de saber "qual thread acordar" é justamente a thread que detém o controle do mutex naquele exato momento.
Mutex e condvar em C
Para nossa sorte, a biblioteca padrão traz suporte ao uso de condvars, através das funções pthread_cond_wait
e pthread_cond_signal
Agora, vamos abordar a implementação completa de uma thread pool em C, em pequenos passos. Primeiramente, incluímos os cabeçalhos necessários para o uso das funções do programa:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
Definimos também as constantes do programa:
#define NUM_THREADS 4 // Número de threads no pool
#define NUM_TASKS 10 // Número total de tarefas
A fila de tarefas será representada por uma simples struct, onde uma Task
contém um task_id
:
// Fila de tarefas
typedef struct {
int task_id;
} Task;
Task queue[NUM_TASKS];
// Contador e índice de tarefas
int task_count = 0;
int task_index = 0;
Agora, definimos as variáveis de sincronização, no caso o mutex e condvar:
// Variáveis de sincronização
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condvar = PTHREAD_COND_INITIALIZER;
Okay, próximo passo é a implementação da função handle
, que será executada dentro de cada thread:
// Função executada pelas threads do pool
void* handle(void* arg) {
int thread_id = *((int*)arg);
while (1) {
// Bloqueia o mutex para acessar a fila de tarefas
pthread_mutex_lock(&mutex);
// Espera até que uma tarefa esteja disponível
while (task_index >= task_count) {
pthread_cond_wait(&condvar, &mutex);
}
// Retira a próxima tarefa da fila
Task task = queue[task_index];
task_index++;
// Executa a tarefa
printf("Thread %d processando tarefa %d...\n", thread_id, task.task_id);
sleep(1); // Simula o processamento da tarefa
printf("Thread %d completou tarefa %d.\n", thread_id, task.task_id);
pthread_mutex_unlock(&mutex); // Desbloqueia o mutex
}
return NULL;
}
Quando a função handle for executada:
o ID da thread é enviado nos argumentos
inicia-se um loop infinito. Então dentro do loop:
bloqueia o mutex
verifica se há tarefas na fila. Se houver:
retira a tarefa da fila
executa a tarefa
desbloqueia o mutex
volta ao início do loop
Caso não há tarefas na fila (fila vazia):
envia um sinal condicional de wait para a thread
Simples, não?
Próximo passo é implementar a função que adiciona a tarefa na fila, a função add_task:
// Adiciona uma tarefa à fila
void add_task(int task_id) {
pthread_mutex_lock(&mutex);
if (task_count < NUM_TASKS) {
queue[task_count].task_id = task_id;
task_count++;
pthread_cond_signal(&condvar); // Notifica as threads
} else {
printf("Fila de tarefas cheia! Não foi possível adicionar tarefa %d.\n", task_id);
}
pthread_mutex_unlock(&mutex);
}
Repare que, depois de adicionar, é chamada a função pthread_cond_signal
que envia um sinal de wake para a thread que detém o mutex. Isto faz com que a thread "acorde" e continue e execução depois do ponto do pthread_cond_wait
localizado na função handle do exemplo anterior.
Por último, nos resta implementar a função main, que deve ser bastante simples:
cria as 4 threads na pool
cada thread irá iniciar um loop infinito e ficar em estado de wait até que a primeira tarefa seja adicionada na fila (como explicado anteriormente)
adiciona tarefas à fila, simulando um intervalo de tempo entre cada inserção de tarefa
int main() {
pthread_t threads[NUM_THREADS];
// Cria as threads no pool
for (int i = 0; i < NUM_THREADS; i++) {
int* thread_id = malloc(sizeof(int));
*thread_id = i + 1;
pthread_create(&threads[i], NULL, handle, thread_id);
}
// Adiciona tarefas à fila
for (int i = 0; i < NUM_TASKS; i++) {
printf("Adicionando tarefa %d\n", i + 1);
add_task(i + 1);
sleep(0.5); // Simula um intervalo entre tarefas
}
// Em um sistema real, seria necessário um mecanismo para finalizar as threads.
// Aqui, como as threads ficam em loop infinito, use Ctrl+C para encerrar.
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
Atenção para o último bloco entre as linhas 20 e 22: é preciso fazer o join das threads, entretanto as threads estão em loop infinito, portanto o programa nunca irá terminar, a não ser que seja utilizado o Ctrl+C
.
A implementação final
Agora sim, vamos colar tudo e mostrar para a galera no churrasco:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#define NUM_THREADS 4 // Número de threads no pool
#define NUM_TASKS 10 // Número total de tarefas
// Fila de tarefas
typedef struct {
int task_id;
} Task;
Task queue[NUM_TASKS];
int task_count = 0;
int task_index = 0;
// Variáveis de sincronização
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condvar = PTHREAD_COND_INITIALIZER;
// Função executada pelas threads do pool
void* handle(void* arg) {
int thread_id = *((int*)arg);
free(arg); // Libera a memória alocada para o ID da thread
while (1) {
// Bloqueia o mutex para acessar a fila de tarefas
pthread_mutex_lock(&mutex);
// Espera até que uma tarefa esteja disponível
while (task_index >= task_count) {
pthread_cond_wait(&condvar, &mutex);
}
// Retira a próxima tarefa da fila
Task task = queue[task_index];
task_index++;
// Executa a tarefa
printf("Thread %d processando tarefa %d...\n", thread_id, task.task_id);
sleep(1); // Simula o processamento da tarefa
printf("Thread %d completou tarefa %d.\n", thread_id, task.task_id);
pthread_mutex_unlock(&mutex); // Desbloqueia o mutex
}
return NULL;
}
// Adiciona uma tarefa à fila
void add_task(int task_id) {
pthread_mutex_lock(&mutex);
if (task_count < NUM_TASKS) {
queue[task_count].task_id = task_id;
task_count++;
pthread_cond_signal(&condvar); // Notifica as threads
} else {
printf("Fila de tarefas cheia! Não foi possível adicionar tarefa %d.\n", task_id);
}
pthread_mutex_unlock(&mutex);
}
int main() {
pthread_t threads[NUM_THREADS];
// Cria as threads no pool
for (int i = 0; i < NUM_THREADS; i++) {
int* thread_id = malloc(sizeof(int));
*thread_id = i + 1;
pthread_create(&threads[i], NULL, handle, thread_id);
}
// Adiciona tarefas à fila
for (int i = 0; i < NUM_TASKS; i++) {
printf("Adicionando tarefa %d\n", i + 1);
add_task(i + 1);
sleep(0.5); // Simula um intervalo entre tarefas
}
// Em um sistema real, seria necessário um mecanismo para finalizar as threads.
// Aqui, como as threads ficam em loop infinito, use Ctrl+C para encerrar.
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
...
Thread 3 processando tarefa 3...
Adicionando tarefa 4
Thread 3 completou tarefa 3.
Thread 4 processando tarefa 4...
Adicionando tarefa 5
Thread 4 completou tarefa 4.
Thread 2 processando tarefa 5...
Adicionando tarefa 6
Thread 2 completou tarefa 5.
Thread 1 processando tarefa 6...
Adicionando tarefa 7
Thread 1 completou tarefa 6.
Thread 3 processando tarefa 7...
Adicionando tarefa 8
Thread 3 completou tarefa 7.
Thread 4 processando tarefa 8...
Adicionando tarefa 9
...
Simplesmente maravilhoso, não?
Last updated