Kaique Mitsuo Silva Yamamoto
Arquitetura software

Mensageria

Sistemas de mensageria permitem comunicacao assincrona entre servicos, desacoplando produtores e consumidores de mensagens.

Conceitos Fundamentais

ConceitoDescricao
ProducerEnvia mensagens
ConsumerRecebe mensagens
BrokerIntermediario que gerencia mensagens
QueueFila de mensagens (point-to-point)
TopicCanal pub/sub (broadcast)
ExchangeRoteador de mensagens (RabbitMQ)

Apache Kafka

Plataforma de streaming distribuido para pipelines de dados em tempo real.

Arquitetura

┌──────────┐     ┌─────────────────────┐     ┌──────────┐
│ Producer │────▶│       Kafka         │────▶│ Consumer │
└──────────┘     │  ┌───────────────┐  │     └──────────┘
                 │  │    Topic      │  │
                 │  │ ┌───────────┐ │  │
                 │  │ │Partition 0│ │  │
                 │  │ │Partition 1│ │  │
                 │  │ │Partition 2│ │  │
                 │  │ └───────────┘ │  │
                 │  └───────────────┘  │
                 └─────────────────────┘

Docker Compose

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

Node.js (KafkaJS)

import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'],
})

// Producer
const producer: Producer = kafka.producer()

async function sendMessage(topic: string, message: object) {
  await producer.connect()
  await producer.send({
    topic,
    messages: [
      {
        key: String(Date.now()),
        value: JSON.stringify(message),
        headers: {
          'correlation-id': crypto.randomUUID(),
        },
      },
    ],
  })
}

// Consumer
const consumer: Consumer = kafka.consumer({ groupId: 'my-group' })

async function startConsumer(topic: string) {
  await consumer.connect()
  await consumer.subscribe({ topic, fromBeginning: false })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
      const value = message.value?.toString()
      if (value) {
        const data = JSON.parse(value)
        console.log(`Received: ${JSON.stringify(data)}`)
        // Processar mensagem
      }
    },
  })
}

Spring Boot

// Configuracao
@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

// Producer
@Service
public class OrderProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendOrder(Order order) {
        kafkaTemplate.send("orders", order.getId(), objectMapper.writeValueAsString(order));
    }
}

// Consumer
@Service
public class OrderConsumer {

    @KafkaListener(topics = "orders", groupId = "order-processor")
    public void consume(ConsumerRecord<String, String> record) {
        Order order = objectMapper.readValue(record.value(), Order.class);
        processOrder(order);
    }
}

RabbitMQ

Message broker tradicional com suporte a multiplos protocolos.

Conceitos RabbitMQ

ConceitoDescricao
ExchangeRecebe e roteia mensagens
QueueArmazena mensagens
BindingConecta exchange a queue
Routing KeyChave para roteamento

Tipos de Exchange

TipoComportamento
DirectRouting key exata
TopicPattern matching (*.logs.#)
FanoutBroadcast para todas queues
HeadersMatch por headers

Docker Compose

services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"   # AMQP
      - "15672:15672" # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

Node.js (amqplib)

import amqp, { Connection, Channel, ConsumeMessage } from 'amqplib'

class RabbitMQService {
  private connection: Connection | null = null
  private channel: Channel | null = null

  async connect() {
    this.connection = await amqp.connect('amqp://admin:admin@localhost:5672')
    this.channel = await this.connection.createChannel()
  }

  async setupExchange(exchange: string, type: string) {
    await this.channel?.assertExchange(exchange, type, { durable: true })
  }

  async setupQueue(queue: string, exchange: string, routingKey: string) {
    await this.channel?.assertQueue(queue, { durable: true })
    await this.channel?.bindQueue(queue, exchange, routingKey)
  }

  async publish(exchange: string, routingKey: string, message: object) {
    this.channel?.publish(
      exchange,
      routingKey,
      Buffer.from(JSON.stringify(message)),
      { persistent: true }
    )
  }

  async consume(queue: string, handler: (msg: object) => Promise<void>) {
    await this.channel?.consume(queue, async (msg: ConsumeMessage | null) => {
      if (msg) {
        try {
          const content = JSON.parse(msg.content.toString())
          await handler(content)
          this.channel?.ack(msg)
        } catch (error) {
          this.channel?.nack(msg, false, false) // Dead letter
        }
      }
    })
  }
}

// Uso
const rabbit = new RabbitMQService()
await rabbit.connect()
await rabbit.setupExchange('orders', 'topic')
await rabbit.setupQueue('order-processor', 'orders', 'order.*')

await rabbit.publish('orders', 'order.created', { id: '123', total: 99.99 })

await rabbit.consume('order-processor', async (order) => {
  console.log('Processing order:', order)
})

Dead Letter Queue

// Setup DLQ
await channel.assertExchange('dlx', 'direct', { durable: true })
await channel.assertQueue('dlq', { durable: true })
await channel.bindQueue('dlq', 'dlx', 'dead-letter')

// Queue com DLQ
await channel.assertQueue('main-queue', {
  durable: true,
  arguments: {
    'x-dead-letter-exchange': 'dlx',
    'x-dead-letter-routing-key': 'dead-letter',
    'x-message-ttl': 60000, // 60 segundos
  },
})

Redis Streams

Estrutura de dados para mensageria no Redis.

import Redis from 'ioredis'

const redis = new Redis()

// Producer
async function addToStream(stream: string, data: Record<string, string>) {
  await redis.xadd(stream, '*', ...Object.entries(data).flat())
}

// Consumer Group
async function createConsumerGroup(stream: string, group: string) {
  try {
    await redis.xgroup('CREATE', stream, group, '0', 'MKSTREAM')
  } catch (e) {
    // Group already exists
  }
}

// Consumer
async function consumeStream(
  stream: string,
  group: string,
  consumer: string,
  handler: (id: string, data: Record<string, string>) => Promise<void>
) {
  while (true) {
    const results = await redis.xreadgroup(
      'GROUP', group, consumer,
      'COUNT', 10,
      'BLOCK', 5000,
      'STREAMS', stream, '>'
    )

    if (results) {
      for (const [, messages] of results) {
        for (const [id, fields] of messages as [string, string[]][]) {
          const data: Record<string, string> = {}
          for (let i = 0; i < fields.length; i += 2) {
            data[fields[i]] = fields[i + 1]
          }

          await handler(id, data)
          await redis.xack(stream, group, id)
        }
      }
    }
  }
}

Padroes de Mensageria

Saga Pattern

Coordenacao de transacoes distribuidas.

// Orchestrator
class OrderSaga {
  async execute(order: Order) {
    try {
      // Step 1: Reserve inventory
      await this.publish('inventory.reserve', { orderId: order.id, items: order.items })

      // Step 2: Process payment
      await this.publish('payment.process', { orderId: order.id, amount: order.total })

      // Step 3: Ship order
      await this.publish('shipping.create', { orderId: order.id, address: order.address })

    } catch (error) {
      // Compensating transactions
      await this.publish('inventory.release', { orderId: order.id })
      await this.publish('payment.refund', { orderId: order.id })
      throw error
    }
  }
}

Outbox Pattern

Garante entrega de mensagens junto com transacao de banco.

// Transacao atomica
await prisma.$transaction(async (tx) => {
  // 1. Salvar no banco
  const order = await tx.order.create({ data: orderData })

  // 2. Salvar mensagem na tabela outbox
  await tx.outbox.create({
    data: {
      aggregateType: 'Order',
      aggregateId: order.id,
      eventType: 'OrderCreated',
      payload: JSON.stringify(order),
    },
  })
})

// Processo separado le outbox e publica
async function processOutbox() {
  const messages = await prisma.outbox.findMany({
    where: { processedAt: null },
    orderBy: { createdAt: 'asc' },
  })

  for (const msg of messages) {
    await kafka.send(msg.eventType, JSON.parse(msg.payload))
    await prisma.outbox.update({
      where: { id: msg.id },
      data: { processedAt: new Date() },
    })
  }
}

Event Sourcing

interface Event {
  id: string
  aggregateId: string
  type: string
  data: object
  timestamp: Date
}

class OrderAggregate {
  private events: Event[] = []
  private state: Order = {} as Order

  apply(event: Event) {
    switch (event.type) {
      case 'OrderCreated':
        this.state = { ...event.data, status: 'pending' }
        break
      case 'OrderPaid':
        this.state.status = 'paid'
        break
      case 'OrderShipped':
        this.state.status = 'shipped'
        break
    }
    this.events.push(event)
  }

  getState() {
    return this.state
  }

  getUncommittedEvents() {
    return this.events
  }
}

Comparativo

FeatureKafkaRabbitMQRedis Streams
ThroughputMuito AltoAltoAlto
LatenciaBaixa-MediaMuito BaixaMuito Baixa
PersistenciaSim (log)Sim (queue)Opcional
ReplaySimNaoLimitado
OrdenacaoPor particaoPor queuePor stream
ComplexidadeAltaMediaBaixa
Caso de UsoEvent streamingTask queuesCache + messaging

Recursos