Trabalhando com I/O

Antes de abordarmos I/O em Ruby, sugiro fortemente a leitura do módulo Trabalhando com I/O em C, onde eu trago conceitos importantes sobre I/O, suas limitações e funcionalidades de I/O não-bloqueante.

Para ilustrar como algumas operações de I/O são por natureza bloqueantes, vamos demonstrar a comunicação entre dois processos utilizando UNIX named pipes, ou FIFO (veja mais sobre arquivos FIFO no meu artigo).

Comunicação com FIFO

A primeira coisa que temos que fazer é criar um arquivo especial do tipo "pipe" com o comando mkfifo :

$ mkfifo queue

$ ls -l queue
prw-r--r-- 1 leandronsp leandronsp 0 Dec 31 23:59 queue

A saída começando com "p" indica justamente que este arquivo é especial, ou seja, um pipe FIFO.

Reader

Vamos representar em Ruby o leitor da fila:

# Abre o arquivo em modo somente leitura
File.open("queue", "r") do |file|
  # Lê o conteúdo do arquivo
  buffer = file.read(1024)

  # Exibe a mensagem recebida
  puts "Mensagem recebida: #{buffer}"
end

Ao executar o programa, note que ele fica bloqueado a espera que alguém escreva na fila.

Writer

Em outra janela do terminal, podemos escrever no FIFO utilizando o comando echo :

Copy

$ echo Hello! > queue

Veja na janela do writer que a mensagem apareceu na saída padrão! Superb!

Outra característica importante pra notar aqui: o writer também fica bloqueado de escrever, caso não haja nenhum reader disponível.

Isto é o puro suco do I/O bloqueante! Entretanto, I/O bloqueante apresenta algumas limitações, como já abordado no módulo em C

I/O não-bloqueante em Ruby

Vimos no tópico anterior que a chamada File.open por definição deixa o programa bloqueado até que os dados fiquem "prontos" no I/O durante a leitura. Para trabalharmos com I/O não-bloqueante em Ruby, não é muito diferente do que já vimos em C, pois basta chamar a syscall open com as devidas flags de RDONLY e NONBLOCK .

# Abre o arquivo no modo não-bloqueante
File.open("queue", File::RDONLY |File::NONBLOCK) do |file|
    # Lê o arquivo não-bloqueante e retorna os bytes lidos ou nil, caso o I/O não esteja pronto
    buffer = file.read(1024) 
    puts "Mensagem recebida: #{buffer}"
end

Este simples programa irá terminar com a mensagem:

Mensagem recebida:

Por ser não-bloqueante, o programa não fica bloqueado e continua sua execução rumo ao fim do programa.

Meio óbvio, não?

Agora vamos a um exemplo com loop, simulando um """""""loop de eventos""""""":

File.open("queue", File::RDONLY |File::NONBLOCK) do |file|
  loop do
    buffer = file.read(1024)

    if buffer
      puts "Mensagem recebida: #{buffer}"
    else
      puts "Nenhum dado disponível no FIFO agora."
      sleep(1)
    end
  end
end
Nenhum dado disponível no FIFO agora.
Nenhum dado disponível no FIFO agora.
Mensagem recebida: Barata
Nenhum dado disponível no FIFO agora.
Nenhum dado disponível no FIFO agora.
Nenhum dado disponível no FIFO agora.
Nenhum dado disponível no FIFO agora.

Meo deos do céoooo, que dia maravilhoso!!!!!11

Contudo, esta solução ainda é bastante rudimentar e ineficiente para monitorarmos descritores de arquivos. Assim como em C, conseguimos em Ruby ter acesso à syscall select do sistema operacional através do módulo IO .

Monitorando I/O com select

Monitorar descritores de arquivos com select em Ruby é flocos com morango:

fifo = File.open('queue', File::RDONLY | File::NONBLOCK) 

loop do
  # Usa IO.select para monitorar o descritor de arquivo
  ready = IO.select([fifo], nil, nil, 1) # Timeout de 1 segundo

  if ready.nil?
    puts "Nenhum descritor disponível. Continuando..."
    next
  end

  # Verifica se há dados no FIFO
  ready[0].each do |io|
    data = io.read(1024)
    puts "Mensagem recebida: #{data}"
  end
end
Nenhum descritor disponível. Continuando...
Nenhum descritor disponível. Continuando...
Mensagem recebida: Barata
enhum descritor disponível. Continuando...
Nenhum descritor disponível. Continuando...

OMG! Que dia incrível!


Até agora, exploramos os recursos que Ruby oferece para realizar operações de I/O de forma não-bloqueante. Essa característica muda fundamentalmente a maneira como escrevemos código, afastando-nos da abordagem tradicional síncrona para adotar uma abordagem assíncrona.

Normalmente, escrevemos código de forma síncrona, onde as funções e rotinas são executadas em sequência e os dados necessários já estão disponíveis na memória. Com I/O não-bloqueante, perdemos essa certeza. Não sabemos quando os dados estarão prontos, o que nos obriga a estruturar o código para lidar com a disponibilidade futura das informações.

Essa mudança nos leva ao assincronismo, onde não esperamos pela conclusão de uma operação, mas configuramos o código para reagir quando a operação for concluída.

I/O assíncrono em Ruby

Uma vez que vimos formas de lidar com I/O não-bloqueante em Ruby e monitorar descritores, chegou o momento de explorar I/O assíncrono em Ruby.

Antes de avançar, vamos retomar o exemplo da leitura do FIFO de forma não-bloqueante com select:

fifo = File.open('queue', File::RDONLY | File::NONBLOCK) 

loop do
  # Usa IO.select para monitorar o descritor de arquivo
  ready = IO.select([fifo], nil, nil, 1) # Timeout de 1 segundo

  if ready.nil?
    puts "Nenhum descritor disponível. Continuando..."
    next
  end

  # Verifica se há dados no FIFO
  ready[0].each do |io|
    data = io.read(1024)
    puts "Mensagem recebida: #{data}"
  end
end

O bloco entre a linha 13 e 16 indica que temos uma lógica arbitrária para lidar com a mensagem que chega no FIFO, no caso uma lógica simples que imprime no STDOUT a mensagem recebida.

Repare que esta lógica pode ser aplicada para qualquer I/O que fique pronto, portanto podemos levar esta lógica para outra estrutura de lazy evaluation, funcionando como um callback. Em Ruby, podemos recorrer ao uso de lambdas.

fifo = File.open('queue', File::RDONLY | File::NONBLOCK) 

# Callback para quando dados estiverem disponíveis
on_data_available = ->(data) do 
  puts "Mensagem recebida: #{data}"
end

loop do
  # Usa IO.select para monitorar o descritor de arquivo
  ready = IO.select([fifo], nil, nil, 1) # Timeout de 1 segundo

  if ready.nil?
    puts "Nenhum descritor disponível. Continuando..."
    next
  end

  # Verifica se há dados no FIFO
  ready[0].each do |io|
    data = io.read(1024)
    on_data_available.call(data) # Uso do callback
  end
end

O problema dos callbacks

Embora callbacks sejam uma solução funcional para lidar com I/O assíncrono, eles podem rapidamente se tornar difíceis de gerenciar à medida que a complexidade do código cresce. Esse problema é conhecido como callback hell, onde o código tem um certo nível de indireção, tornando a lógica fica fragmentada e difícil de entender.

fifo = File.open('queue', File::RDONLY | File::NONBLOCK)

# CALLBACK HELL!!!!!1
on_data_available = ->(data) do 
  process_message(data) do |processed_data|
    log_message(processed_data) do |log_status|
      notify_clients(log_status)
    end
  end
end

loop do
  ready = IO.select([fifo], nil, nil, 1)
  next if ready.nil?

  ready[0].each do |io|
    data = io.read(1024)
    on_data_available.call(data)
  end
end

O problema começa a se tornar evidente:

  • Cada callback aninhado adiciona uma nova camada de indireção

  • O código fica menos intuitivo e mais difícil de depurar

  • O fluxo de controle não é óbvio, tornando a manutenção mais complexa

Para evitar o callback hell, temos de pensar numa solução que permita um certo tipo de controle sobre o código, trazendo mais direção, ou seja, permitir lidar com código assíncrono de maneira mais linear e retomar a execução do código de maneira mais explícita, de forma cooperativa.

Fibers to the rescue.

Escalonamento cooperativo com Fibers

Se você leu a primeira parte do guia e também o módulo em C, já está familiarizado com o conceito de escalonamento cooperativo.

Fibers são estrutuas leves de concorrência em Ruby que permitem que sejam pausadas e retomadas explicitamente no código. Diferente das Threads, as Fibers nunca são escalonadas preemptivamente, e portanto seguem um modelo cooperativo de concorrência.

Apesar de serem utilizadas para diversos propósitos, como por exemplo processamento de fluxo de dados e simulação de corrotinas, Fibers são muito úteis no contexo de I/O não-bloqueante pois trazem esta característica de cooperação, resolvendo o problema de indireção no código assíncrono.

No código de exemplo com callbacks, vamos refatorar para o uso de Fibers:

fifo = File.open('queue', File::RDONLY | File::NONBLOCK) 

monitor = Fiber.new do
  loop do
    # Usa IO.select para monitorar o descritor de arquivo
    ready = IO.select([fifo], nil, nil, 1) # Timeout de 1 segundo

    if ready.nil?
      # Retorna o controle para o loop principal até que haja dados disponíveis
      puts "Nenhum descritor disponível. Continuando..."
      Fiber.yield 
    else
      ready[0].each do |io|
        data = io.read(1024)
        # Retorna o controle para o loop principal com os dados lidos
        Fiber.yield(data) if data
      end
    end
  end
end

# Loop principal
loop do
  data = monitor.resume
  next unless data

  puts "Mensagem recebida: #{data}"
end

Primeiramente criamos uma fiber chamada monitor, que fica em loop infinito monitorando o I/O com select. Em seguida, temos o loop principal, que controla a execução da fiber de monitoramento.

  • Dentro da fiber monitor, quando não há dados disponíveis, o controle é devolvido ao loop principal com Fiber.yield

  • Ainda na fiber monitor, quando de fato há dados disponíveis, o controle também é devolvido ao loop mas é enviado junto os dados do I/O

  • No loop principal, é feita a exeução da Fiber (resume), e caso haja dados disponíveis, estes são enviados para o STDOUT com puts

Nenhum descritor disponível. Continuando...
Nenhum descritor disponível. Continuando...
Nenhum descritor disponível. Continuando...
Nenhum descritor disponível. Continuando...
Nenhum descritor disponível. Continuando...
Nenhum descritor disponível. Continuando...
Mensagem recebida: Barata
Nenhum descritor disponível. Continuando...
Nenhum descritor disponível. Continuando...
Mensagem recebida: Barata
Nenhum descritor disponível. Continuando...

Que dia M.A.R.A.V.I.L.H.O.S.O!

Com Fibers, o código fica mais linear, tornando o fluxo de execução mais próximo de um código síncrono, facilitando a leitura e manutenção. E temos também menos indireção, evitando o callback hell.

No entanto Fibers não eliminam completamente a complexidade do código assíncrono, mas ajudam a tornar a estrutura mais legível e menos fragmentada. Com o auxílio de um padrão de loop de eventos, podemos deixar o código ainda mais legível e fácil de entender.

Event Loop

Quando combinadas com event loops, Fibers fornecem um excelente modelo para I/O assíncrono em Ruby, sem precisar recorrer a threads pesadas.

Não é difícil criarmos nosso próprio event loop que fica registrando de escalonando Fibers de forma cooperativa.

Basicamente, o loop precisa de:

  • Uma estrutura de fila que guarda as fibers registradas

  • Um método para registrar fibers na fila

  • Outro método que consome fibers da fila e as executa enquanto estiverem em execução

Lembrando do modelo cooperativo: a fiber devolve o controle com yield, e o código fora da fiber (no caso o event loop) executa a fiber com resume

class EventLoop
  def initialize
    @queue = []
  end

  def schedule(&block)
    @queue << Fiber.new(&block)
  end

  def run
    while @queue.any?
      fiber = @queue.shift # Pega a próxima Fiber da fila

      if fiber.alive?
        fiber.resume # Executa a fiber ou continua a execução
        @queue << fiber # Reinsere no final da fila se ainda estiver ativa
      end
    end
  end
end

Agora, para adaptar nosso exemplo de leitura assíncrona do FIFO, podemos de forma super simples utilizar o loop de eventos:

event_loop = EventLoop.new

event_loop.schedule do
  fifo = File.open('queue', File::RDONLY | File::NONBLOCK)
  loop do
    ready = IO.select([fifo], nil, nil, 1)

    if ready.nil?
      puts "Nenhum descritor disponível. Continuando..."
      next
    else 
      ready[0].each do |io|
        data = io.read(1024)
        puts "Mensagem recebida: #{data}"
      end
    end
  end
end

event_loop.schedule do
  loop do
    puts "Monitorando status do sistema..."
  end
end

event_loop.run

Na linha 3, o método schedule cria e registra a fiber de monitoramento no loop de eventos. E na linha 20, o schedule cria e registra outra fiber que pode representar outra operação qualquer, tudo de forma assíncrona!

E pra finalizar, o método run da linha 26 é simplesmente o gatilho para iniciar o loop e escalonamento das fibers.

Lindo, não? Quem diria, e você aí pensando que loop de eventos era uma invenção super inovadora do Javascript no backend, né?

Escalonador de Fibers no Ruby 3+

Com o lançamento do Ruby 3, o core da linguagem introduziu um novo recurso: Fiber Scheduler, uma interface nativa para escalonamento de Fibers. Esse recurso permite que I/O assíncrono ocorra de forma transparente, sem necessidade de manipular diretamente event loops ou utilizar o select.

Esse avanço coloca Ruby mais próximo de outras linguagens que já possuem suporte nativo para concorrência assíncrona, como JavaScript (async/await) e Go (goroutines).

A implementação de um escalonador de Fibers permite que operações de I/O sejam automaticamente não bloqueantes quando usamos Fibers. Isto significa que, quando uma Fiber precisar esperar por I/O, o Ruby pode suspender automaticamente e permitir que outras tarefas rodem na mesmo thread.

Antes do Ruby 3, para conseguir esse comportamento, precisávamos criar nosso próprio event loop - como fizemos anteriormente -, mas agora, basta fornecer um escalonador customizado, e o runtime cuida do restante.


No próximo e último tópico de Ruby, iremos abordar algumas gems que lidam com concorrência em Ruby, incluindo Celulloid, parallel e Async.

Fiquem ligades!

Last updated