concorrencia101
  • Introdução
  • First things first
  • Agradecimentos
  • Parte I - Concorrência no sistema operacional
    • O que é o programa no sistema operacional
    • Escalonador preemptivo de tarefas
    • Uma nota sobre escalonamento cooperativo
    • Propriedades de um processo
    • Clone de processo (forking)
    • Clone leve de processo (thread)
    • Todo processo tem uma thread principal
    • Uma nota sobre paralelismo
    • Principais desafios em cenário de concorrência
      • Race condition
      • Sincronização com locks
      • Modelo de atores
    • E o I/O?
      • Latência de CPU vs Latência de I/O
      • Chamadas bloqueantes
      • Chamadas não-bloqueantes
      • Assincronismo e escalonamento cooperativo
    • Vamos colocar em prática...
  • PARTE II - Concorrência em diferentes linguagens
    • Definindo ambientes de execução
    • Concorrência em C
      • Forking de processos
      • Threads
      • Race condition e sincronização de threads com mutex
      • Desafios com o uso de threads
      • Thread Pool em C
      • Green threads
      • Modelo de Atores
      • Trabalhando com I/O
    • Concorrência em Ruby
      • Forking de processos
      • Threads
      • Race condition, YARV, GVL e paralelismo em Ruby
      • Modelo de Atores
      • Trabalhando com I/O
Powered by GitBook
On this page
  • Funcionamento de uma thread pool
  • Mutex e condvar em C
  • A implementação final
  1. PARTE II - Concorrência em diferentes linguagens
  2. Concorrência em C

Thread Pool em C

PreviousDesafios com o uso de threadsNextGreen threads

Last updated 4 months ago

Thread Pool (ou "piscina de threads") é uma abstração que resolve os problemas associados ao uso indiscriminado de threads que vimos no tópico anterior. Em vez de criar e destruir threads para cada tarefa, uma pool de threads mantém um conjunto fixo de threads, previamente criadas, que executam tarefas conforme necessário.

Assim, quando uma tarefa é concluída, a thread é devolvida ao pool e reutilizada para outras tarefas.

Isso mitiga problemas como overhead de criação de threads, limitação de recursos no sistema operacional e latência associada à troca de contexto e sincronização de threads.

Funcionamento de uma thread pool

O funcionamento de uma pool de threads é muito simples, e passa por:

  • criar um número fixo de threads

  • cria uma fila de tarefas compartilhada entre as threads

  • cada thread na pool:

    • retira uma tarefa da fila

    • executa a tarefa

    • volta a ficar disponível para a próxima tarefa

Em temos técnicos, cada thread fica em loop verificando se há tarefa nova na fila. Basicamente, o processo principal adiciona tarefas na fila (push), e depois cada uma das threads na pool fica retirando as tarefas da fila (pop):

Entretanto, o quê acontece se 2 ou mais threads tentarem fazer pop da fila ao mesmo tempo?

Pode dar ruim, né? Race condition! E o quê fazemos pra resolver race condition?

Você acertou, tá mandando bem hein? Mutex neles!

Se a gente protege o recurso com um mutex, basicamente sincronizamos o acesso entre as threads e evitamos a condição de corrida. Contudo, o quê acontece se não houver nenhuma tarefa na fila?

Quando não há nada na fila, a operação de pop da fila retorna NULL , portanto a thread fica repetindo o loop até que uma nova tarefa seja adicionada na fila. Loops são CPU-intensive, portanto o uso da CPU ficaria bastante comprometido neste caso.

Mas a biblioteca padrão traz uma técnica primitiva para lidar com isso, que basicamente são variáveis condicionais, condvar, ou simplesmente condition.

Dada uma certa condição, a thread é colocada em estado de wait, e isto pausa a execução do loop da thread, evitando o consumo desnecessário de CPU. Este mecanismo usa por trás recursos de thread signaling, que basicamente é o envio de sinais para as threads.

Assim que uma nova tarefa é adicionada na fila, é enviado o sinal de wake para a thread que detém o mutex, portanto esta ganha prioridade no escalonador e volta a executar o loop, fazendo pop da tarefa da fila.

O uso de condvar deve ser feito em conjunto com mutex, pois a única forma de saber "qual thread acordar" é justamente a thread que detém o controle do mutex naquele exato momento.

Mutex e condvar em C

Para nossa sorte, a biblioteca padrão traz suporte ao uso de condvars, através das funções pthread_cond_wait e pthread_cond_signal

Agora, vamos abordar a implementação completa de uma thread pool em C, em pequenos passos. Primeiramente, incluímos os cabeçalhos necessários para o uso das funções do programa:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

Definimos também as constantes do programa:

#define NUM_THREADS 4    // Número de threads no pool
#define NUM_TASKS 10     // Número total de tarefas

A fila de tarefas será representada por uma simples struct, onde uma Task contém um task_id :

// Fila de tarefas
typedef struct {
	int task_id;
} Task;

Task queue[NUM_TASKS];

// Contador e índice de tarefas
int task_count = 0;
int task_index = 0;

Agora, definimos as variáveis de sincronização, no caso o mutex e condvar:

// Variáveis de sincronização
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condvar = PTHREAD_COND_INITIALIZER;

Okay, próximo passo é a implementação da função handle , que será executada dentro de cada thread:

// Função executada pelas threads do pool
void* handle(void* arg) {
	int thread_id = *((int*)arg);

	while (1) {
		// Bloqueia o mutex para acessar a fila de tarefas
		pthread_mutex_lock(&mutex);

		// Espera até que uma tarefa esteja disponível
		while (task_index >= task_count) {
			pthread_cond_wait(&condvar, &mutex);
		}

		// Retira a próxima tarefa da fila
		Task task = queue[task_index];
		task_index++;

		// Executa a tarefa
		printf("Thread %d processando tarefa %d...\n", thread_id, task.task_id);
		sleep(1); // Simula o processamento da tarefa
		printf("Thread %d completou tarefa %d.\n", thread_id, task.task_id);
		
		pthread_mutex_unlock(&mutex); // Desbloqueia o mutex		
	}

	return NULL;
}

Quando a função handle for executada:

  • o ID da thread é enviado nos argumentos

  • inicia-se um loop infinito. Então dentro do loop:

    • bloqueia o mutex

    • verifica se há tarefas na fila. Se houver:

      • retira a tarefa da fila

      • executa a tarefa

      • desbloqueia o mutex

      • volta ao início do loop

    • Caso não há tarefas na fila (fila vazia):

      • envia um sinal condicional de wait para a thread

Simples, não?

Próximo passo é implementar a função que adiciona a tarefa na fila, a função add_task:

// Adiciona uma tarefa à fila
void add_task(int task_id) {
	pthread_mutex_lock(&mutex);

	if (task_count < NUM_TASKS) {
		queue[task_count].task_id = task_id;
		task_count++;
		pthread_cond_signal(&condvar); // Notifica as threads
	} else {
		printf("Fila de tarefas cheia! Não foi possível adicionar tarefa %d.\n", task_id);
	}

	pthread_mutex_unlock(&mutex);
}

Repare que, depois de adicionar, é chamada a função pthread_cond_signal que envia um sinal de wake para a thread que detém o mutex. Isto faz com que a thread "acorde" e continue e execução depois do ponto do pthread_cond_wait localizado na função handle do exemplo anterior.

Por último, nos resta implementar a função main, que deve ser bastante simples:

  • cria as 4 threads na pool

    • cada thread irá iniciar um loop infinito e ficar em estado de wait até que a primeira tarefa seja adicionada na fila (como explicado anteriormente)

  • adiciona tarefas à fila, simulando um intervalo de tempo entre cada inserção de tarefa

int main() {
	pthread_t threads[NUM_THREADS];

	// Cria as threads no pool
	for (int i = 0; i < NUM_THREADS; i++) {
		int* thread_id = malloc(sizeof(int));
		*thread_id = i + 1;
		pthread_create(&threads[i], NULL, handle, thread_id);
	}

	// Adiciona tarefas à fila
	for (int i = 0; i < NUM_TASKS; i++) {
		printf("Adicionando tarefa %d\n", i + 1);
		add_task(i + 1);
		sleep(0.5); // Simula um intervalo entre tarefas
	}

	// Em um sistema real, seria necessário um mecanismo para finalizar as threads.
	// Aqui, como as threads ficam em loop infinito, use Ctrl+C para encerrar.
	for (int i = 0; i < NUM_THREADS; i++) {
		pthread_join(threads[i], NULL);
	}

	return 0;
}

Atenção para o último bloco entre as linhas 20 e 22: é preciso fazer o join das threads, entretanto as threads estão em loop infinito, portanto o programa nunca irá terminar, a não ser que seja utilizado o Ctrl+C .

A implementação final

Agora sim, vamos colar tudo e mostrar para a galera no churrasco:

thread-pool.c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_THREADS 4    // Número de threads no pool
#define NUM_TASKS 10     // Número total de tarefas

// Fila de tarefas
typedef struct {
	int task_id;
} Task;

Task queue[NUM_TASKS];
int task_count = 0;
int task_index = 0;

// Variáveis de sincronização
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condvar = PTHREAD_COND_INITIALIZER;

// Função executada pelas threads do pool
void* handle(void* arg) {
	int thread_id = *((int*)arg);
	free(arg); // Libera a memória alocada para o ID da thread

	while (1) {
		// Bloqueia o mutex para acessar a fila de tarefas
		pthread_mutex_lock(&mutex);

		// Espera até que uma tarefa esteja disponível
		while (task_index >= task_count) {
			pthread_cond_wait(&condvar, &mutex);
		}

		// Retira a próxima tarefa da fila
		Task task = queue[task_index];
		task_index++;

		// Executa a tarefa
		printf("Thread %d processando tarefa %d...\n", thread_id, task.task_id);
		sleep(1); // Simula o processamento da tarefa
		printf("Thread %d completou tarefa %d.\n", thread_id, task.task_id);

		pthread_mutex_unlock(&mutex); // Desbloqueia o mutex
	}

	return NULL;
}

// Adiciona uma tarefa à fila
void add_task(int task_id) {
	pthread_mutex_lock(&mutex);

	if (task_count < NUM_TASKS) {
		queue[task_count].task_id = task_id;
		task_count++;
		pthread_cond_signal(&condvar); // Notifica as threads
	} else {
		printf("Fila de tarefas cheia! Não foi possível adicionar tarefa %d.\n", task_id);
	}

	pthread_mutex_unlock(&mutex);
}

int main() {
	pthread_t threads[NUM_THREADS];

	// Cria as threads no pool
	for (int i = 0; i < NUM_THREADS; i++) {
		int* thread_id = malloc(sizeof(int));
		*thread_id = i + 1;
		pthread_create(&threads[i], NULL, handle, thread_id);
	}

	// Adiciona tarefas à fila
	for (int i = 0; i < NUM_TASKS; i++) {
		printf("Adicionando tarefa %d\n", i + 1);
		add_task(i + 1);
		sleep(0.5); // Simula um intervalo entre tarefas
	}

	// Em um sistema real, seria necessário um mecanismo para finalizar as threads.
	// Aqui, como as threads ficam em loop infinito, use Ctrl+C para encerrar.
	for (int i = 0; i < NUM_THREADS; i++) {
		pthread_join(threads[i], NULL);
	}

	return 0;
}
...

Thread 3 processando tarefa 3...
Adicionando tarefa 4
Thread 3 completou tarefa 3.
Thread 4 processando tarefa 4...
Adicionando tarefa 5
Thread 4 completou tarefa 4.
Thread 2 processando tarefa 5...
Adicionando tarefa 6
Thread 2 completou tarefa 5.
Thread 1 processando tarefa 6...
Adicionando tarefa 7
Thread 1 completou tarefa 6.
Thread 3 processando tarefa 7...
Adicionando tarefa 8
Thread 3 completou tarefa 7.
Thread 4 processando tarefa 8...
Adicionando tarefa 9

...

Simplesmente maravilhoso, não?