DEV Community

Cover image for Streaming em tempo real: implementando Server-Sent Events com IA generativa em chatbots
Thais Soares
Thais Soares

Posted on

Streaming em tempo real: implementando Server-Sent Events com IA generativa em chatbots

🎯 Introdução

A experiência do usuário com chatbots é diretamente impactada pelo tempo de espera. Quando um usuário envia uma mensagem e fica observando um indicador de carregamento sem feedback imediato, a sensação de lentidão no sistema é inevitável - mesmo que o tempo total de resposta seja OK.

É aqui que entra o streaming de respostas: em vez de aguardar que a IA termine de gerar a resposta completa, é possível exibir o conteúdo progressivamente, o que proporciona uma experiência mais dinâmica para o usuário.

Neste tutorial, vamos implementar streaming em tempo real para respostas de IA usando server-sent events (SSE) com FastAPI e Vue.js. Vamos transformar o uso de um chatbot comum em uma interação fluida.

Nosso exemplo é um assistente virtual da Dunder Mifflin Paper Company, empresa da série "The Office". E, caso você não seja fã de "The Office", um aviso sobre a resposta na imagem abaixo: o chatbot não é burro - é só uma piada da série. 🤓

 

Chatbot respondendo

 

📚 Entendendo conceitos fundamentais

O chatbot usa uma tecnologia de resposta em tempo real que torna a conversa mais natural e fluida.

 

🌊 Como funciona o streaming no chatbot

Quando a usuária envia uma mensagem pelo chatbot:

  1. O frontend estabelece uma conexão que permanece aberta com o backend
  2. O backend inicia uma conexão que também permanece aberta com o LLM
  3. Conforme o LLM gera o texto, cada pedaço é enviado imediatamente: LLM → Backend → Frontend → Tela do usuário

Esta "ponte" de dados contínua elimina esperas e cria a experiência de ver o texto sendo digitado em tempo real.

 

🔄 Streaming vs. requisições tradicionais

Requisições tradicionais:

  • O cliente envia um pedido e espera a resposta completa
  • A usuária vê uma indicação de carregamento ou espera
  • Só depois de pronta, a resposta inteira aparece

Com streaming:

  • O texto começa a aparecer em segundos
  • A usuária vê a resposta sendo construída em tempo real
  • A interface nunca fica "travada" esperando

 

📡 O que são server-sent events (SSE)?

SSE é a tecnologia que usamos para criar o streaming entre o backend e o frontend. Com o SSE:

  • O backend envia atualizações contínuas ao cliente
  • Uma única conexão HTTP permanece aberta
  • O navegador gerencia a conexão (reconexão automática, manutenção do canal)
  • O frontend processa e formata o conteúdo dos eventos
  • Os dados fluem em um formato simples e padronizado

 

Cada pequeno pedaço de texto gerado pelo LLM é formatado como um "evento" SSE:

data: {"text": "Olá, como"}\n\n data: {"text": " posso ajudar?"}\n\n data: [DONE] 
Enter fullscreen mode Exit fullscreen mode

 

🚀 Benefícios do streaming com IAs

O chatbot aproveita o streaming oferecido pelos LLMs modernos para:

  • Resposta imediata: O usuário vê o texto aparecendo em segundos
  • Experiência mais natural: Simula uma pessoa digitando em tempo real
  • Controle melhorado: É possível interromper respostas inadequadas rapidamente
  • Leitura antecipada: O usuário lê o início da resposta enquanto o restante ainda está sendo gerado

 

Esta abordagem cria uma experiência de conversa muito mais dinâmica e interativa do que sistemas de chat tradicionais.

 

Fluxo de streaming com SSE

Fluxo de streaming com SSE

 

🛠️ Implementação do backend (FastAPI)

🔧 Estrutura do Backend

O backend está organizado da seguinte forma:

dunder_mifflin_py/ ├── app/ │ ├── api/ # Endpoints da API │ ├── core/ # Configurações centrais │ ├── data/ # Dados e prompts │ └── services/ # Serviços de negócio └── requirements.txt 
Enter fullscreen mode Exit fullscreen mode

 

📊 Classe de Serviço SSE

Primeiro, criamos um serviço dedicado para lidar com as respostas SSE:

# app/services/sse_service.py  from fastapi.responses import StreamingResponse import json from typing import AsyncGenerator class SSEService: """Service for managing Server-Sent Events (SSE).""" def create_stream_response(self, generator: AsyncGenerator) -> StreamingResponse: """ Creates an SSE streaming response from an async generator. Args: generator: An async generator producing text fragments Returns: A configured StreamingResponse object for SSE """ return StreamingResponse( generator, media_type="text/event-stream" ) async def create_error_event(self, message: str) -> AsyncGenerator[str, None]: """ Creates error events to send via SSE. Args: message: Error message Returns: An async generator with the error event """ async def error_generator(): yield f"data: {json.dumps({'error': message})}\n\n" yield "data: [DONE]\n\n" return error_generator() 
Enter fullscreen mode Exit fullscreen mode

 

Essa classe encapsula a criação de respostas SSE, o que facilita gerar tanto respostas normais quanto de erro no formato SSE.

 

🧠 Serviço de IA com Streaming

Agora, vamos olhar para o cerne da implementação: a integração com a API de IA da Google com suporte a streaming.

Escolhi a API da Google porque ela oferece um limite alto pra teste gratuito sem a necessidade de inserir um cartão de crédito. Mas é importante ter em mente que existe um motivo pelo qual é gratuito.

 

⚠️ Lembre-se: Se você não paga pelo produto, você é o produto.

 

# app/services/ai_service.py - versão simplificada  async def generate_response(self, message: str, company_data: Dict[str, Any]): try: # Configuração do streaming com a API do Google  response_stream = self.model.generate_content( contents=[/* configuração */], stream=True # Ativa o streaming!  ) async def response_generator(): try: for chunk in response_stream: if hasattr(chunk, 'text') and chunk.text: # Formata cada pedaço como um evento SSE  yield f"data: {json.dumps({'text': chunk.text})}\n\n" # Sinaliza o fim do stream  yield "data: [DONE]\n\n" except Exception as e: # Tratamento de erro omitido  return response_generator() except Exception as e: # Tratamento de erro omitido 
Enter fullscreen mode Exit fullscreen mode

 

Pontos importantes:

  1. Definir stream=True ao chamar a API de IA
  2. Criar um gerador assíncrono que converte cada fragmento de resposta em um evento SSE
  3. Sinalizar o fim do stream com uma mensagem [DONE]

 

🌐 Endpoint de chat com SSE

Finalmente, vamos juntas as peças no endpoint da API:

# app/api/chat.py  @router.post("/", summary="Send a message to the Dunder Mifflin chatbot" ) async def chat(request: ChatRequest): """ Process user messages and return chatbot responses via SSE. """ try: if not request.message: raise HTTPException(status_code=400, detail="Message cannot be empty") dunder_mifflin_data = get_dunder_mifflin_data() response_generator = await ai_service.generate_response( request.message, dunder_mifflin_data ) return sse_service.create_stream_response(response_generator) except Exception as e: logger.error(f"Error processing chat request: {str(e)}") error_generator = await sse_service.create_error_event(f"Error: {str(e)}") return sse_service.create_stream_response(error_generator) 
Enter fullscreen mode Exit fullscreen mode

Este endpoint recebe a mensagem do usuário, passa a mensagem para o serviço de IA e retorna a resposta como um stream SSE.

Bom demais! Python entregando tudo e mais um pouco!

 

🖥️ Implementação do frontend (Vue.js)

Agora, vamos consumir o stream no lado do cliente usando meu amado Vue.js.

 

🔧 Estrutura do Frontend

O frontend está organizado da seguinte forma:

dunder_mifflin_vue/ ├── src/ │ ├── assets/ # Arquivos estáticos e estilos CSS │ ├── components/ # Componentes Vue reutilizáveis │ ├── composables/ # Lógica de negócios e gerenciamento de estado │ └── ultils/ # Funções utilitárias (como o parser SSE) ├── public/ # Arquivos estáticos públicos └── index.html # Ponto de entrada HTML 
Enter fullscreen mode Exit fullscreen mode

 

🔍 Parser de SSE

Primeiro, criamos uma classe para processar os eventos (SSE) recebidos:

// src/ultils/sseParser.js - versão simplificada export class SSEParser { constructor() { this.buffer = ''; this.fullResponse = ''; } processChunk(chunk, onMessage, onComplete, onError) { try { this.buffer += chunk; // Dividir o buffer em mensagens individuais (separadas por \n\n) const messages = this.buffer.split('\n\n'); this.buffer = messages.pop() || ''; // Processar cada mensagem for (const message of messages) { if (!message.trim()) continue; const result = this.parseSSEMessage(message); if (result === '[DONE]') { if (onComplete) onComplete(this.fullResponse); } else if (result) { if (onMessage) onMessage(result); this.fullResponse += result; } } return this.fullResponse; } catch (error) { // Tratamento de erro simplificado if (onError) onError('Error processing server response'); return this.fullResponse; } } // Resto da implementação omitido } 
Enter fullscreen mode Exit fullscreen mode

 

Este parser:

  1. Acumula dados recebidos em um buffer
  2. Divide o buffer em mensagens SSE individuais (separadas por \n\n)
  3. Processa cada mensagem, extraindo seu conteúdo
  4. Fornece callbacks para diferentes eventos (nova mensagem, conclusão, erro)

 

🔌 Serviço de API

Em seguida, precisamos de um serviço para lidar com as chamadas de API e o processamento de stream:

// src/composables/useAPI.js - trecho principal const sendMessage = async (message, onChunk, onComplete, onError) => { try { // Configuração da requisição omitida const response = await fetch(apiUrl, { /* configuração */ }); // Importante: configuração do streaming const reader = response.body.getReader(); const decoder = new TextDecoder(); const parser = new SSEParser(); // Loop de leitura do stream while (true) { const { value, done } = await reader.read(); if (done) break; const chunk = decoder.decode(value, { stream: true }); parser.processChunk(chunk, onChunk, onComplete, onError); } } catch (err) { // Tratamento de erro omitido } }; 
Enter fullscreen mode Exit fullscreen mode

 

Este serviço:

  1. Aproveita o response.body, que já é um ReadableStream retornado pela API Fetch
  2. Usa o método getReader() para consumir o stream chunk por chunk
  3. Utiliza TextDecoder para converter os bytes recebidos em texto, mantendo contexto entre chunks com o parâmetro { stream: true }
  4. Processa cada fragmento com o parser SSE

 

Essa abordagem elimina a necessidade de esperar a resposta completa, e permite o processamento dos dados à medida que chegam do servidor.

 

📱 Composable de chat

Finalmente, vamos gerenciar o chat com um composable:

// src/composables/useChat.js import { ref } from 'vue' import { useAPI } from './useAPI' export function useChat() { const { isLoading, error, sendMessage: apiSendMessage } = useAPI() const messages = ref([]) const userInput = ref('') const isChatOpen = ref(false) if (messages.value.length === 0) { messages.value.push({ sender: 'bot', text: 'Sou o chatbot da Dunder Mifflin. O que podemos fazer por você hoje?' }) } const toggleChat = () => { isChatOpen.value = !isChatOpen.value } const sendMessage = async () => { if (!userInput.value.trim() || isLoading.value) return messages.value.push({ sender: 'user', text: userInput.value }) const messageToSend = userInput.value userInput.value = '' const botMsgIndex = messages.value.length messages.value.push({ sender: 'bot', text: '', streaming: true }) await apiSendMessage( messageToSend, (chunk) => { // Callback para cada novo fragmento recebido if (messages.value[botMsgIndex]) { messages.value[botMsgIndex].text += chunk; } }, () => { // Callback para conclusão do stream if (messages.value[botMsgIndex]) { messages.value[botMsgIndex].streaming = false; } }, (errorMsg) => { // Callback para erros if (messages.value[botMsgIndex]) { messages.value[botMsgIndex].text = 'Connection error. Please try again later.'; messages.value[botMsgIndex].streaming = false; } } ) } return { messages, userInput, isLoading, error, isChatOpen, toggleChat, sendMessage } } 
Enter fullscreen mode Exit fullscreen mode

 

O composable de chat:

  1. Prepara o estado inicial do chat
  2. Adiciona uma mensagem do bot quando criado
  3. Gerencia o estado de streaming para cada mensagem
  4. Atualiza progressivamente o texto da mensagem conforme novos fragmentos chegam
  5. Altera o status de streaming quando a resposta completa é recebida

 

⌨️ Bônus: componente de efeito de digitação

Para melhorar a experiência do usuário, vamos criar um efeito que faz com que o texto pareça estar sendo digitado em tempo real.

// src/components/TypeWriter.vue <script setup> // Imports e setup inicial omitidos const typeNextCharacter = () => { if (textIndex.value < props.text.length) { // Exibe o texto progressivamente displayedText.value = props.text.substring(0, textIndex.value + 1) textIndex.value += 1 // Programa a próxima letra timer.value = setTimeout(typeNextCharacter, props.speed) } } // Resto omitido </script> <template> <span class="typewriter-text">{{ displayedText }}</span> </template> 
Enter fullscreen mode Exit fullscreen mode

 

Este componente:

  1. Recebe o texto completo do stream
  2. Exibe o texto gradualmente, simulando alguém digitando
  3. Emite eventos de atualização para permitir ajustes de UI (como rolar para baixo)

 

Resumo do fluxo no frontend

  1. Entrada do usuário: No componente de chat, o usuário digita uma mensagem e clica no botão de envio.

  2. Processamento da mensagem: O composable de chat adiciona imediatamente a mensagem do usuário ao array de mensagens e cria uma mensagem vazia do bot com flag streaming: true, para indicar que a resposta está em andamento.

  3. Preparação da requisição: No composable da API, a requisição HTTP POST para o endpoint do backend é preparada.

  4. Estabelecimento de conexão SSE: O frontend abre uma conexão HTTP com o backend e configura o modo de stream para ficar escutando eventos. O parser de SSE, utilitário, permanece ativo, monitorando continuamente o fluxo de dados recebidos.

  5. Processamento do stream: À medida que o backend envia eventos no formato SSE, o parser de SSE captura cada chunk de dados, processa as informações e mantém a conexão aberta para continuar recebendo mais dados.

  6. Atualização gradual da interface: Cada fragmento de texto extraído é imediatamente adicionado à mensagem do bot no array messages, sem esperar a resposta completa.

  7. Renderização em tempo real: O componente de digitação renderiza cada fragmento de texto recebido na interface, criando o efeito de digitação progressiva que simula uma resposta sendo digitada em tempo real.

  8. Finalização da conexão: Quando o backend envia o evento especial [DONE], o parser detecta o fim da transmissão, a flag streaming é alterada para false e a conexão HTTP é encerrada.

 

📝 Conclusão

Implementar streaming de respostas com SSE para chatbots de IA traz benefícios significativos para a experiência do usuário, tempo de resposta percebido e eficiência geral do sistema. Neste tutorial, exploramos uma implementação completa usando:

  • 🐍 FastAPI
  • 🟢 Vue.js
  • 🌊 Streaming eficiente de respostas em tempo real

 

O código completo está disponível no repositório do projeto. Se você tiver dúvidas ou sugestões, deixe um comentário abaixo!

Esta foi a minha contribuição para a Dunder Mifflin Infinity 3.0.

📄 Papel ilimitado em um mundo sem papel...

E agora com respostas em streaming!


Top comments (0)