Arquitetura software
Mensageria
Sistemas de mensageria permitem comunicacao assincrona entre servicos, desacoplando produtores e consumidores de mensagens.
Conceitos Fundamentais
| Conceito | Descricao |
|---|---|
| Producer | Envia mensagens |
| Consumer | Recebe mensagens |
| Broker | Intermediario que gerencia mensagens |
| Queue | Fila de mensagens (point-to-point) |
| Topic | Canal pub/sub (broadcast) |
| Exchange | Roteador de mensagens (RabbitMQ) |
Apache Kafka
Plataforma de streaming distribuido para pipelines de dados em tempo real.
Arquitetura
┌──────────┐ ┌─────────────────────┐ ┌──────────┐
│ Producer │────▶│ Kafka │────▶│ Consumer │
└──────────┘ │ ┌───────────────┐ │ └──────────┘
│ │ Topic │ │
│ │ ┌───────────┐ │ │
│ │ │Partition 0│ │ │
│ │ │Partition 1│ │ │
│ │ │Partition 2│ │ │
│ │ └───────────┘ │ │
│ └───────────────┘ │
└─────────────────────┘Docker Compose
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092Node.js (KafkaJS)
import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
})
// Producer
const producer: Producer = kafka.producer()
async function sendMessage(topic: string, message: object) {
await producer.connect()
await producer.send({
topic,
messages: [
{
key: String(Date.now()),
value: JSON.stringify(message),
headers: {
'correlation-id': crypto.randomUUID(),
},
},
],
})
}
// Consumer
const consumer: Consumer = kafka.consumer({ groupId: 'my-group' })
async function startConsumer(topic: string) {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: false })
await consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const value = message.value?.toString()
if (value) {
const data = JSON.parse(value)
console.log(`Received: ${JSON.stringify(data)}`)
// Processar mensagem
}
},
})
}Spring Boot
// Configuracao
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
// Producer
@Service
public class OrderProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendOrder(Order order) {
kafkaTemplate.send("orders", order.getId(), objectMapper.writeValueAsString(order));
}
}
// Consumer
@Service
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-processor")
public void consume(ConsumerRecord<String, String> record) {
Order order = objectMapper.readValue(record.value(), Order.class);
processOrder(order);
}
}RabbitMQ
Message broker tradicional com suporte a multiplos protocolos.
Conceitos RabbitMQ
| Conceito | Descricao |
|---|---|
| Exchange | Recebe e roteia mensagens |
| Queue | Armazena mensagens |
| Binding | Conecta exchange a queue |
| Routing Key | Chave para roteamento |
Tipos de Exchange
| Tipo | Comportamento |
|---|---|
| Direct | Routing key exata |
| Topic | Pattern matching (*.logs.#) |
| Fanout | Broadcast para todas queues |
| Headers | Match por headers |
Docker Compose
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
volumes:
- rabbitmq_data:/var/lib/rabbitmqNode.js (amqplib)
import amqp, { Connection, Channel, ConsumeMessage } from 'amqplib'
class RabbitMQService {
private connection: Connection | null = null
private channel: Channel | null = null
async connect() {
this.connection = await amqp.connect('amqp://admin:admin@localhost:5672')
this.channel = await this.connection.createChannel()
}
async setupExchange(exchange: string, type: string) {
await this.channel?.assertExchange(exchange, type, { durable: true })
}
async setupQueue(queue: string, exchange: string, routingKey: string) {
await this.channel?.assertQueue(queue, { durable: true })
await this.channel?.bindQueue(queue, exchange, routingKey)
}
async publish(exchange: string, routingKey: string, message: object) {
this.channel?.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
{ persistent: true }
)
}
async consume(queue: string, handler: (msg: object) => Promise<void>) {
await this.channel?.consume(queue, async (msg: ConsumeMessage | null) => {
if (msg) {
try {
const content = JSON.parse(msg.content.toString())
await handler(content)
this.channel?.ack(msg)
} catch (error) {
this.channel?.nack(msg, false, false) // Dead letter
}
}
})
}
}
// Uso
const rabbit = new RabbitMQService()
await rabbit.connect()
await rabbit.setupExchange('orders', 'topic')
await rabbit.setupQueue('order-processor', 'orders', 'order.*')
await rabbit.publish('orders', 'order.created', { id: '123', total: 99.99 })
await rabbit.consume('order-processor', async (order) => {
console.log('Processing order:', order)
})Dead Letter Queue
// Setup DLQ
await channel.assertExchange('dlx', 'direct', { durable: true })
await channel.assertQueue('dlq', { durable: true })
await channel.bindQueue('dlq', 'dlx', 'dead-letter')
// Queue com DLQ
await channel.assertQueue('main-queue', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dead-letter',
'x-message-ttl': 60000, // 60 segundos
},
})Redis Streams
Estrutura de dados para mensageria no Redis.
import Redis from 'ioredis'
const redis = new Redis()
// Producer
async function addToStream(stream: string, data: Record<string, string>) {
await redis.xadd(stream, '*', ...Object.entries(data).flat())
}
// Consumer Group
async function createConsumerGroup(stream: string, group: string) {
try {
await redis.xgroup('CREATE', stream, group, '0', 'MKSTREAM')
} catch (e) {
// Group already exists
}
}
// Consumer
async function consumeStream(
stream: string,
group: string,
consumer: string,
handler: (id: string, data: Record<string, string>) => Promise<void>
) {
while (true) {
const results = await redis.xreadgroup(
'GROUP', group, consumer,
'COUNT', 10,
'BLOCK', 5000,
'STREAMS', stream, '>'
)
if (results) {
for (const [, messages] of results) {
for (const [id, fields] of messages as [string, string[]][]) {
const data: Record<string, string> = {}
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1]
}
await handler(id, data)
await redis.xack(stream, group, id)
}
}
}
}
}Padroes de Mensageria
Saga Pattern
Coordenacao de transacoes distribuidas.
// Orchestrator
class OrderSaga {
async execute(order: Order) {
try {
// Step 1: Reserve inventory
await this.publish('inventory.reserve', { orderId: order.id, items: order.items })
// Step 2: Process payment
await this.publish('payment.process', { orderId: order.id, amount: order.total })
// Step 3: Ship order
await this.publish('shipping.create', { orderId: order.id, address: order.address })
} catch (error) {
// Compensating transactions
await this.publish('inventory.release', { orderId: order.id })
await this.publish('payment.refund', { orderId: order.id })
throw error
}
}
}Outbox Pattern
Garante entrega de mensagens junto com transacao de banco.
// Transacao atomica
await prisma.$transaction(async (tx) => {
// 1. Salvar no banco
const order = await tx.order.create({ data: orderData })
// 2. Salvar mensagem na tabela outbox
await tx.outbox.create({
data: {
aggregateType: 'Order',
aggregateId: order.id,
eventType: 'OrderCreated',
payload: JSON.stringify(order),
},
})
})
// Processo separado le outbox e publica
async function processOutbox() {
const messages = await prisma.outbox.findMany({
where: { processedAt: null },
orderBy: { createdAt: 'asc' },
})
for (const msg of messages) {
await kafka.send(msg.eventType, JSON.parse(msg.payload))
await prisma.outbox.update({
where: { id: msg.id },
data: { processedAt: new Date() },
})
}
}Event Sourcing
interface Event {
id: string
aggregateId: string
type: string
data: object
timestamp: Date
}
class OrderAggregate {
private events: Event[] = []
private state: Order = {} as Order
apply(event: Event) {
switch (event.type) {
case 'OrderCreated':
this.state = { ...event.data, status: 'pending' }
break
case 'OrderPaid':
this.state.status = 'paid'
break
case 'OrderShipped':
this.state.status = 'shipped'
break
}
this.events.push(event)
}
getState() {
return this.state
}
getUncommittedEvents() {
return this.events
}
}Comparativo
| Feature | Kafka | RabbitMQ | Redis Streams |
|---|---|---|---|
| Throughput | Muito Alto | Alto | Alto |
| Latencia | Baixa-Media | Muito Baixa | Muito Baixa |
| Persistencia | Sim (log) | Sim (queue) | Opcional |
| Replay | Sim | Nao | Limitado |
| Ordenacao | Por particao | Por queue | Por stream |
| Complexidade | Alta | Media | Baixa |
| Caso de Uso | Event streaming | Task queues | Cache + messaging |