Apache Kafka

Что такое Apache Kafka

Apache Kafka — это распределенная платформа для потоковой обработки данных, работающая как высокопроизводительная система обмена сообщениями. Kafka может обрабатывать миллионы сообщений в секунду с минимальной задержкой.

Основные концепции:

  • Topic — логический канал для сообщений (как таблица в БД)
  • Partition — физическое разделение топика для масштабирования
  • Producer — отправляет сообщения в топики
  • Consumer — читает сообщения из топиков
  • Broker — сервер Kafka (узел кластера)
  • Offset — уникальный номер сообщения в партиции

Архитектурные преимущества:

  • Высокая пропускная способность — миллионы сообщений/сек
  • Горизонтальное масштабирование — добавление брокеров
  • Отказоустойчивость — репликация данных
  • Долговременное хранение — сообщения хранятся на диске

Подключение к Spring Boot

Maven зависимости

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<!-- Для Avro сериализации -->
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.4.0</version>
</dependency>

<!-- Для JSON сериализации -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

Базовая конфигурация

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all  # Ждать подтверждения от всех реплик
      retries: 3
      batch-size: 16384
      linger-ms: 5  # Ждать 5мс для батчинга
      buffer-memory: 33554432
    consumer:
      group-id: my-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest  # Читать с начала при первом подключении
      enable-auto-commit: false  # Ручное управление offset'ами
      properties:
        spring.json.trusted.packages: "com.example.model"

Пояснение:

  • acks=all — максимальная надежность доставки, но снижает производительность
  • auto-offset-reset=earliest — при отсутствии сохраненного offset'а читать с самого начала
  • enable-auto-commit=false — ручное подтверждение обработки для точного контроля

Producer (Отправка сообщений)

Простой Producer

@Service
public class OrderEventProducer {
    
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
    
    public void sendOrderEvent(OrderEvent orderEvent) {
        kafkaTemplate.send("order-events", orderEvent.getOrderId(), orderEvent)
            .addCallback(
                result -> log.info("Sent message: {}", orderEvent),
                failure -> log.error("Failed to send: {}", orderEvent, failure)
            );
    }
}

Пояснение: KafkaTemplate — основной класс для отправки. Ключ сообщения (orderId) определяет, в какую партицию попадет сообщение. Callback'и позволяют обрабатывать успех/неудачу отправки.

Асинхронная отправка

@Service
public class NotificationProducer {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    public CompletableFuture<SendResult<String, Object>> sendNotificationAsync(String userId, NotificationEvent event) {
        return kafkaTemplate.send("notifications", userId, event)
            .thenApply(result -> {
                log.info("Notification sent to user: {} at offset: {}", 
                    userId, result.getRecordMetadata().offset());
                return result;
            })
            .exceptionally(throwable -> {
                log.error("Failed to send notification to user: {}", userId, throwable);
                return null;
            });
    }
}

Синхронная отправка

@Service
public class CriticalEventProducer {
    
    private final KafkaTemplate<String, CriticalEvent> kafkaTemplate;
    
    public void sendCriticalEvent(CriticalEvent event) {
        try {
            SendResult<String, CriticalEvent> result = kafkaTemplate
                .send("critical-events", event.getId(), event)
                .get(10, TimeUnit.SECONDS);  // Таймаут 10 секунд
            
            log.info("Critical event sent successfully: offset {}", 
                result.getRecordMetadata().offset());
        } catch (Exception e) {
            log.error("Failed to send critical event", e);
            throw new EventSendingException("Critical event not sent", e);
        }
    }
}

Пояснение: Синхронная отправка блокирует поток до получения подтверждения. Используйте только для критически важных сообщений, так как снижает производительность.

Транзакционная отправка

@Service
@Transactional
public class TransactionalProducer {
    
    private final KafkaTransactionManager transactionManager;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    @KafkaTransactional
    public void processOrderWithEvents(Order order) {
        // Обновление в БД
        orderRepository.save(order);
        
        // Отправка событий в рамках транзакции
        kafkaTemplate.send("order-created", order.getId(), new OrderCreatedEvent(order));
        kafkaTemplate.send("inventory-update", order.getProductId(), 
            new InventoryUpdateEvent(order.getProductId(), -order.getQuantity()));
        
        // Если что-то пойдет не так, откатятся и БД, и Kafka сообщения
    }
}

Кастомный партиционер

public class CustomPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                        Object value, byte[] valueBytes, Cluster cluster) {
        
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        if (key instanceof String) {
            String stringKey = (String) key;
            // VIP пользователи идут в первую партицию
            if (stringKey.startsWith("VIP_")) {
                return 0;
            }
            // Остальные распределяются по hash
            return Math.abs(stringKey.hashCode()) % numPartitions;
        }
        
        return 0;
    }
}

Пояснение: Кастомный партиционер позволяет контролировать распределение сообщений по партициям. Например, важные сообщения можно направлять в специальную партицию.


Consumer (Получение сообщений)

Простой Consumer

@Component
public class OrderEventConsumer {
    
    @KafkaListener(topics = "order-events", groupId = "order-processing-group")
    public void handleOrderEvent(OrderEvent orderEvent, 
                                @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                @Header(KafkaHeaders.OFFSET) long offset) {
        
        log.info("Received order event: {} from topic: {}, partition: {}, offset: {}", 
            orderEvent, topic, partition, offset);
        
        try {
            orderService.processOrder(orderEvent);
        } catch (Exception e) {
            log.error("Failed to process order: {}", orderEvent.getOrderId(), e);
            // Здесь можно отправить в DLQ или retry topic
            throw e; // Повторная обработка
        }
    }
}

Пояснение: @KafkaListener автоматически десериализует сообщения. Аннотация @Header позволяет получить метаданные сообщения. При исключении сообщение будет повторно обработано.

Consumer с ручным управлением offset'ами

@Component
public class ManualAckConsumer {
    
    @KafkaListener(
        topics = "payment-events",
        containerFactory = "manualAckContainerFactory"
    )
    public void handlePaymentEvent(PaymentEvent event, Acknowledgment ack) {
        try {
            paymentService.processPayment(event);
            
            // Подтверждаем обработку только после успешного выполнения
            ack.acknowledge();
            log.info("Successfully processed payment: {}", event.getPaymentId());
            
        } catch (RetryableException e) {
            log.warn("Retryable error processing payment: {}", event.getPaymentId(), e);
            // Не подтверждаем - сообщение будет повторно обработано
        } catch (NonRetryableException e) {
            log.error("Non-retryable error: {}", event.getPaymentId(), e);
            // Подтверждаем, чтобы не зациклиться
            ack.acknowledge();
        }
    }
}

Batch Consumer

@Component
public class BatchConsumer {
    
    @KafkaListener(
        topics = "analytics-events",
        containerFactory = "batchContainerFactory"
    )
    public void handleAnalyticsBatch(List<AnalyticsEvent> events,
                                   List<Acknowledgment> acknowledgments) {
        
        log.info("Processing batch of {} analytics events", events.size());
        
        try {
            // Обработка пакета для улучшения производительности
            analyticsService.processBatch(events);
            
            // Подтверждение всего пакета
            acknowledgments.forEach(Acknowledgment::acknowledge);
            
        } catch (Exception e) {
            log.error("Failed to process analytics batch", e);
            // При ошибке можно подтвердить только часть сообщений
            handlePartialBatchFailure(events, acknowledgments, e);
        }
    }
}

Пояснение: Batch processing позволяет обрабатывать множество сообщений за раз, что повышает производительность при работе с аналитикой или ETL-процессами.

Consumer с фильтрацией

@Component
public class FilteredConsumer {
    
    @KafkaListener(
        topics = "user-events",
        filter = "userEventFilter"
    )
    public void handleUserEvent(UserEvent event) {
        // Обрабатываем только отфильтрованные события
        userService.processEvent(event);
    }
}

@Component("userEventFilter")
public class UserEventFilter implements RecordFilterStrategy<String, UserEvent> {
    
    @Override
    public boolean filter(ConsumerRecord<String, UserEvent> record) {
        UserEvent event = record.value();
        // Фильтруем только события активных пользователеай
        return !event.isActiveUser();
    }
}

Конфигурация Consumer'ов

Manual Acknowledgment

@Configuration
public class KafkaConsumerConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> manualAckContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setConcurrency(3); // 3 потока на партицию
        
        return factory;
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> batchContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
        
        return factory;
    }
}

Retry и Dead Letter Queue

@Configuration
public class KafkaRetryConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> retryContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setConsumerFactory(consumerFactory());
        factory.setRetryTemplate(retryTemplate());
        factory.setRecoveryCallback(context -> {
            ConsumerRecord<String, Object> record = 
                (ConsumerRecord<String, Object>) context.getAttribute("record");
            
            // Отправляем в Dead Letter Queue
            kafkaTemplate.send("dlq-topic", record.key(), record.value());
            log.error("Sent to DLQ: {}", record.value());
            return null;
        });
        
        return factory;
    }
    
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1000); // 1 секунда между попытками
        retryTemplate.setBackOffPolicy(backOffPolicy);
        
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        
        return retryTemplate;
    }
}

Пояснение: Dead Letter Queue (DLQ) — это топик для сообщений, которые не удалось обработать после всех попыток retry. Это предотвращает блокировку обработки других сообщений.


Сериализация данных

JSON сериализация

// Модель данных
public class OrderEvent {
    private String orderId;
    private String customerId;
    private BigDecimal amount;
    private LocalDateTime timestamp;
    
    // Конструкторы, геттеры, сеттеры
}

// Producer конфигурация
@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
    return new DefaultKafkaProducerFactory<>(configProps);
}

Avro сериализация

// Avro schema (order-event.avsc)
{
  "type": "record",
  "name": "OrderEvent",
  "namespace": "com.example.avro",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "timestamp", "type": "long"}
  ]
}
@Bean
public ProducerFactory<String, OrderEvent> avroProducerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    props.put("schema.registry.url", "http://localhost:8081");
    
    return new DefaultKafkaProducerFactory<>(props);
}

Пояснение: Avro обеспечивает схему данных и эволюцию схемы. Сообщения компактнее JSON, но требует Schema Registry для управления схемами.


Мониторинг и метрики

Actuator endpoints

management:
  endpoints:
    web:
      exposure:
        include: health, metrics, kafka
  endpoint:
    health:
      show-details: always
    kafka:
      enabled: true

Кастомные метрики

@Component
public class KafkaMetrics {
    
    private final Counter messagesProduced;
    private final Counter messagesConsumed;
    private final Timer processingTime;
    
    public KafkaMetrics(MeterRegistry meterRegistry) {
        this.messagesProduced = Counter.builder("kafka.messages.produced")
            .description("Number of messages produced")
            .register(meterRegistry);
            
        this.messagesConsumed = Counter.builder("kafka.messages.consumed")
            .description("Number of messages consumed")
            .register(meterRegistry);
            
        this.processingTime = Timer.builder("kafka.processing.time")
            .description("Message processing time")
            .register(meterRegistry);
    }
    
    @EventListener
    public void handleProducerEvent(ProducerEvent event) {
        messagesProduced.increment(
            Tags.of("topic", event.getTopic(), "status", "success")
        );
    }
    
    public void recordProcessingTime(String topic, long timeMs) {
        processingTime.record(timeMs, TimeUnit.MILLISECONDS, 
            Tags.of("topic", topic));
    }
}

Health Check

@Component
public class KafkaHealthIndicator implements HealthIndicator {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    @Override
    public Health health() {
        try {
            // Проверяем доступность Kafka
            kafkaTemplate.partitionsFor("health-check-topic");
            return Health.up()
                .withDetail("kafka", "Available")
                .build();
        } catch (Exception e) {
            return Health.down()
                .withDetail("kafka", "Unavailable")
                .withDetail("error", e.getMessage())
                .build();
        }
    }
}

Patterns и Best Practices

Saga Pattern

@Component
public class OrderSagaOrchestrator {
    
    @KafkaListener(topics = "order-started")
    public void handleOrderStarted(OrderStartedEvent event) {
        try {
            // Шаг 1: Резервирование инвентаря
            kafkaTemplate.send("inventory-reserve", event.getOrderId(), 
                new ReserveInventoryCommand(event));
        } catch (Exception e) {
            // Компенсация
            kafkaTemplate.send("order-failed", event.getOrderId(), 
                new OrderFailedEvent(event.getOrderId(), e.getMessage()));
        }
    }
    
    @KafkaListener(topics = "inventory-reserved")
    public void handleInventoryReserved(InventoryReservedEvent event) {
        // Шаг 2: Обработка платежа
        kafkaTemplate.send("payment-process", event.getOrderId(),
            new ProcessPaymentCommand(event));
    }
    
    @KafkaListener(topics = "payment-failed")
    public void handlePaymentFailed(PaymentFailedEvent event) {
        // Компенсация: отмена резервирования
        kafkaTemplate.send("inventory-release", event.getOrderId(),
            new ReleaseInventoryCommand(event));
    }
}

Event Sourcing

@Component
public class EventStore {
    
    @KafkaListener(topics = "domain-events")
    public void storeEvent(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                          @Header(KafkaHeaders.OFFSET) long offset,
                          DomainEvent event) {
        
        // Сохранение события в Event Store
        EventRecord record = EventRecord.builder()
            .aggregateId(event.getAggregateId())
            .eventType(event.getClass().getSimpleName())
            .eventData(serializeEvent(event))
            .version(event.getVersion())
            .timestamp(event.getTimestamp())
            .streamOffset(offset)
            .build();
            
        eventRepository.save(record);
    }
    
    public List<DomainEvent> getEventsForAggregate(String aggregateId) {
        return eventRepository.findByAggregateIdOrderByVersion(aggregateId)
            .stream()
            .map(this::deserializeEvent)
            .collect(Collectors.toList());
    }
}

CQRS с проекциями

@Component
public class ProductProjectionHandler {
    
    @KafkaListener(topics = "product-events")
    public void handleProductEvent(ProductEvent event) {
        switch (event.getEventType()) {
            case "ProductCreated":
                createProductProjection((ProductCreatedEvent) event);
                break;
            case "ProductUpdated":
                updateProductProjection((ProductUpdatedEvent) event);
                break;
            case "ProductDeleted":
                deleteProductProjection((ProductDeletedEvent) event);
                break;
        }
    }
    
    private void createProductProjection(ProductCreatedEvent event) {
        ProductProjection projection = ProductProjection.builder()
            .id(event.getProductId())
            .name(event.getName())
            .price(event.getPrice())
            .category(event.getCategory())
            .lastUpdated(event.getTimestamp())
            .build();
            
        productProjectionRepository.save(projection);
    }
}

Пояснение: CQRS разделяет команды (запись) и запросы (чтение). События изменений проецируются в оптимизированные для чтения модели.


Безопасность

SSL/TLS конфигурация

spring:
  kafka:
    ssl:
      trust-store-location: classpath:kafka.client.truststore.jks
      trust-store-password: password
      key-store-location: classpath:kafka.client.keystore.jks
      key-store-password: password
      key-password: password
    properties:
      security.protocol: SSL

SASL аутентификация

spring:
  kafka:
    properties:
      security.protocol: SASL_SSL
      sasl.mechanism: PLAIN
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";

ACL и авторизация

// Пример настройки ACL через AdminClient
@Bean
public NewTopic createTopicWithAcl() {
    NewTopic topic = TopicBuilder.name("secure-topic")
        .partitions(3)
        .replicas(2)
        .build();
    
    // ACL настраивается на уровне брокера
    return topic;
}

Производительность и оптимизация

Producer оптимизация

spring:
  kafka:
    producer:
      # Батчинг для увеличения throughput
      batch-size: 65536  # 64KB
      linger-ms: 10      # Ждать 10мс для заполнения батча
      buffer-memory: 67108864  # 64MB буфер
      
      # Компрессия для экономии сети
      compression-type: snappy
      
      # Производительность vs надежность
      acks: 1  # Ждать подтверждения только от лидера
      retries: 0  # Без повторов для максимальной скорости

Consumer оптимизация

spring:
  kafka:
    consumer:
      # Размер fetch для увеличения throughput
      fetch-min-size: 1048576  # 1MB
      fetch-max-wait: 500      # Максимум 500мс ожидания
      
      # Обработка в память
      max-poll-records: 1000   # Читать до 1000 записей за раз
      
      # Параллелизм
    listener:
      concurrency: 10  # 10 потоков на слушатель

Кеширование и пулинг

@Configuration
public class KafkaPerformanceConfig {
    
    @Bean
    public ProducerFactory<String, Object> cachedProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        // ... базовая конфигурация
        
        // Включение кеширования соединений
        props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 300000);
        
        return new DefaultKafkaProducerFactory<>(props);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> optimizedContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setConsumerFactory(consumerFactory());
        
        // Настройка пула потоков
        factory.getContainerProperties().setConsumerTaskExecutor(
            new SimpleAsyncTaskExecutor("kafka-consumer-")
        );
        
        return factory;
    }
}

Лучшие практики

1. Схема именования топиков

// Структурированное именование
public class TopicNames {
    public static final String ORDER_EVENTS = "orders.events.v1";
    public static final String USER_PROFILE_UPDATES = "users.profile.updates.v1";
    public static final String PAYMENT_COMMANDS = "payments.commands.v1";
    
    // DLQ топики
    public static final String ORDER_EVENTS_DLQ = "orders.events.v1.dlq";
}

2. Версионирование сообщений

public abstract class BaseEvent {
    @JsonProperty("eventVersion")
    private String eventVersion = "1.0";
    
    @JsonProperty("eventType")
    private String eventType;
    
    @JsonProperty("timestamp")
    private LocalDateTime timestamp = LocalDateTime.now();
    
    // Базовые поля для всех событий
}

public class OrderCreatedEvent extends BaseEvent {
    public OrderCreatedEvent() {
        setEventType("OrderCreated");
        setEventVersion("1.1"); // Новая версия
    }
    
    // Поля события
}

3. Идемпотентность

@Component
public class IdempotentConsumer {
    
    private final RedisTemplate<String, String> redisTemplate;
    
    @KafkaListener(topics = "payment-events")
    public void handlePayment(PaymentEvent event,
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                             @Header(KafkaHeaders.OFFSET) long offset) {
        
        String idempotencyKey = generateKey(topic, offset, event.getPaymentId());
        
        // Проверка, не обработано ли уже
        if (redisTemplate.hasKey(idempotencyKey)) {
            log.info("Payment already processed: {}", event.getPaymentId());
            return;
        }
        
        try {
            paymentService.processPayment(event);
            
            // Сохранение ключа идемпотентности
            redisTemplate.opsForValue().set(idempotencyKey, "processed", 
                Duration.ofHours(24));
                
        } catch (Exception e) {
            log.error("Payment processing failed: {}", event.getPaymentId(), e);
            throw e;
        }
    }
}

4. Graceful Shutdown

@Component
public class KafkaGracefulShutdown {
    
    private final KafkaListenerEndpointRegistry registry;
    
    @PreDestroy
    public void shutdown() {
        registry.getListenerContainers().forEach(container -> {
            container.pause();
            container.stop();
        });
        
        log.info("All Kafka consumers stopped gracefully");
    }
}

Заключение

Apache Kafka — мощная платформа для построения событийно-ориентированных архитектур. Ключевые принципы:

  1. Начинайте с простой конфигурации — добавляйте сложность по мере необходимости
  2. Правильно выбирайте ключи партиционирования — это влияет на производительность и порядок
  3. Используйте мониторинг — отслеживайте lag, throughput, ошибки
  4. Планируйте схему эволюции — данные в Kafka живут долго
  5. Обеспечивайте идемпотентность — сообщения могут дублироваться
  6. Настраивайте retry и DLQ — для обработки ошибок
  7. Тестируйте производительность — под реальной нагрузкой

Правильная настройка Kafka критически важна для надежности и производительности системы. Регулярно мониторьте метрики и адаптируйте конфигурацию под изменяющиеся требования.

Внутреннее устройству Apache Kafka

Архитектура кластера Kafka

Основные компоненты

Kafka Cluster состоит из нескольких типов узлов:

  • Broker — сервер Kafka, хранящий данные и обслуживающий клиентов
  • ZooKeeper (до версии 2.8) — координация кластера, метаданные, выборы лидеров
  • KRaft (с версии 2.8+) — замена ZooKeeper, встроенный consensus protocol

Роли брокеров

Controller Broker — специальная роль одного из брокеров:

  • Управляет метаданными кластера
  • Координирует выборы лидеров партиций
  • Отслеживает живые/мертвые брокеры
  • Распределяет партиции между брокерами

Обычные Brokers:

  • Хранят данные партиций
  • Обслуживают Producer и Consumer запросы
  • Участвуют в репликации
  • Могут быть лидерами или фолловерами партиций

Процесс bootstrap

При старте кластера происходит:

  1. Broker registration — каждый брокер регистрируется в ZooKeeper/KRaft
  2. Controller election — выбирается контроллер кластера
  3. Partition leader election — назначаются лидеры для каждой партиции
  4. Metadata propagation — распространение метаданных по кластеру

Пояснение: Controller отказоустойчив — при его падении автоматически выбирается новый из оставшихся брокеров.


Topics и Partitions

Структура Topic

Topic — это логический канал, физически разделенный на partitions:

  • Каждая партиция хранится на диске как набор segment файлов
  • Партиции распределены по разным брокерам для масштабируемости
  • Количество партиций задается при создании топика

Partition структура

Каждая партиция состоит из:

  • Log segments — файлы данных фиксированного размера (обычно 1GB)
  • Index files — индексы для быстрого поиска по offset
  • Timeindex files — индексы для поиска по timestamp

Сегментация данных

Active segment — текущий segment, в который записываются новые сообщения:

  • Когда segment достигает лимита размера или времени, он закрывается
  • Создается новый active segment
  • Старые segments могут удаляться согласно retention policy

Retention policies:

  • time-based — удаление по времени (например, 7 дней)
  • size-based — удаление по размеру (например, 1TB на партицию)
  • log compaction — сохранение только последних значений по ключу

Пояснение: Сегментация позволяет эффективно управлять большими объемами данных и оптимизировать производительность I/O операций.


Replication механизм

Leader-Follower модель

Каждая партиция имеет:

  • Leader replica — обрабатывает все read/write операции
  • Follower replicas — синхронизируются с лидером, готовы стать лидерами

In-Sync Replicas (ISR)

ISR — набор реплик, которые синхронизированы с лидером:

  • Только ISR могут стать новыми лидерами при failover
  • Follower исключается из ISR если отстает больше чем на replica.lag.time.max.ms
  • Если ISR содержит только лидер, это критическая ситуация

Процесс репликации

  1. Producer отправляет сообщение → Leader принимает и записывает в лог
  2. Follower fetch → Follower'ы периодически запрашивают новые данные
  3. Acknowledgment → Leader отправляет подтверждение Producer'у согласно acks
  4. ISR update → Controller обновляет список ISR при изменениях

Уровни подтверждения (acks)

  • acks=0 — Producer не ждет подтверждения (максимальная производительность)
  • acks=1 — Ждет подтверждения только от лидера
  • acks=all/-1 — Ждет подтверждения от всех ISR (максимальная надежность)

Пояснение: Выбор уровня acks — это компромисс между производительностью и надежностью. В критических системах используйте acks=all.


Log Storage и File Format

Физическое хранение

Данные хранятся в директориях вида: {topic}-{partition}/

/var/kafka-logs/
├── orders-0/
│   ├── 00000000000000000000.log    # Segment file
│   ├── 00000000000000000000.index  # Offset index
│   ├── 00000000000000000000.timeindex # Time index
│   └── leader-epoch-checkpoint     # Leader epoch info
├── orders-1/
└── orders-2/

Log segment format

Record format в segment файле:

  • Offset — уникальный номер сообщения в партиции
  • Message size — размер сообщения в байтах
  • CRC — контрольная сумма для проверки целостности
  • Magic byte — версия формата сообщения
  • Attributes — флаги (компрессия, timestamp type)
  • Timestamp — время создания/получения
  • Key — ключ сообщения (может быть null)
  • Value — содержимое сообщения

Index структура

Offset Index — sparse index для быстрого поиска:

  • Содержит mapping: relative_offset → physical_position
  • Не каждое сообщение индексируется (каждые 4KB по умолчанию)
  • Позволяет быстро найти позицию в log файле по offset

Time Index — для поиска по timestamp:

  • Mapping: timestamp → offset
  • Используется Consumer'ами для поиска с определенного времени

Пояснение: Sparse indexing экономит место, так как полный индекс был бы огромным при миллионах сообщений.


Producer internals

Батчинг и буферизация

Producer работает асинхронно:

  1. Accumulator — буферизует сообщения по топикам/партициям
  2. Batching — группирует сообщения в батчи для отправки
  3. Sender thread — отдельный поток отправляет батчи брокерам

Параметры батчинга

  • batch.size — максимальный размер батча в байтах
  • linger.ms — максимальное время ожидания заполнения батча
  • buffer.memory — общий размер буфера Producer'а

Partitioning алгоритм

Выбор партиции происходит по алгоритму:

  1. Если указан partition явно → используется он
  2. Если есть keyhash(key) % partitions_count
  3. Если нет key → round-robin или sticky partitioning

Sticky Partitioning (новый алгоритм):

  • Отправляет все сообщения без ключа в одну партицию
  • Переключается на следующую партицию только при заполнении батча
  • Улучшает производительность за счет более полных батчей

Idempotence и транзакции

Idempotent Producer (enable.idempotence=true):

  • Каждый Producer получает уникальный Producer ID
  • Каждое сообщение имеет sequence number
  • Broker отклоняет дублирующиеся sequence numbers

Транзакционный Producer:

  • Все сообщения в транзакции commit'ятся или rollback'аются атомарно
  • Использует Transaction Coordinator для управления состоянием
  • Consumer'ы могут читать только committed сообщения

Пояснение: Idempotence решает проблему дублирования при retry, а транзакции обеспечивают exactly-once семантику.


Consumer internals

Consumer Group координация

Group Coordinator — специальный процесс на одном из брокеров:

  • Управляет membership в consumer group
  • Координирует rebalancing партиций
  • Хранит consumer offsets в служебном топике __consumer_offsets

Rebalancing процесс

Типы rebalancing:

  1. Eager Rebalancing — все consumer'ы останавливаются, затем перераспределяются партиции
  2. Incremental Cooperative Rebalancing — только часть партиций перераспределяется

Этапы rebalancing:

  1. JoinGroup — consumer'ы присоединяются к группе
  2. SyncGroup — лидер группы распределяет партиции
  3. Heartbeat — поддержание активности в группе

Partition Assignment стратегии

Range Assignment:

  • Партиции назначаются последовательными диапазонами
  • Может привести к неравномерному распределению

RoundRobin Assignment:

  • Партиции распределяются циклически
  • Более равномерное распределение

Sticky Assignment:

  • Минимизирует изменения при rebalancing
  • Consumer'ы сохраняют максимум своих партиций

Cooperative Sticky:

  • Как Sticky, но с инкрементальным rebalancing
  • Меньше простоя при rebalancing

Offset management

Offset storage:

  • Современные версии хранят offset'ы в топике __consumer_offsets
  • Старые версии могли хранить в ZooKeeper
  • Offset commit может быть автоматическим или ручным

Commit стратегии:

  • Auto commit — offset'ы commit'ятся автоматически каждые 5 секунд
  • Manual commit — приложение явно commit'ит после обработки
  • Manual async commit — неблокирующий commit для производительности

Пояснение: Правильное управление offset'ами критично для exactly-once или at-least-once семантики обработки.


Network Layer и Protocols

Kafka Protocol

Kafka использует собственный бинарный протокол поверх TCP:

  • Request/Response модель
  • Каждый request имеет уникальный correlation ID
  • Multiplexing — несколько запросов могут быть in-flight одновременно

Основные API

Producer API:

  • ProduceRequest — отправка сообщений
  • MetadataRequest — получение информации о топиках/партициях

Consumer API:

  • FetchRequest — получение сообщений
  • OffsetCommitRequest — сохранение offset'ов
  • JoinGroupRequest — присоединение к consumer group

Admin API:

  • CreateTopicsRequest — создание топиков
  • DescribeConfigsRequest — получение конфигурации

Connection pooling

Kafka client'ы поддерживают connection pool:

  • Одно TCP соединение на broker
  • Pipelining — несколько запросов в одном соединении
  • Keep-alive для долгоживущих соединений

Пояснение: Эффективное использование сетевых соединений критично для производительности, особенно при высокой нагрузке.


ZooKeeper vs KRaft

ZooKeeper функции

В традиционной архитектуре ZooKeeper хранит:

  • Cluster membership — список активных брокеров
  • Topic metadata — конфигурация топиков и партиций
  • Partition leadership — кто является лидером каждой партиции
  • Consumer group metadata — информация о группах
  • Access Control Lists (ACL)

Проблемы ZooKeeper

Операционные сложности:

  • Дополнительная система для управления
  • Split-brain scenarios при сетевых разделениях
  • Ограничения масштабируемости (тысячи топиков)
  • Сложность deployment и monitoring

KRaft (Kafka Raft)

KRaft заменяет ZooKeeper начиная с версии 2.8:

  • Встроенный consensus algorithm на основе Raft
  • Метаданные хранятся в специальном Kafka топике
  • Упрощенная архитектура без внешних зависимостей

Преимущества KRaft:

  • Лучшая масштабируемость (миллионы партиций)
  • Более быстрое восстановление после сбоев
  • Упрощенная операционная модель
  • Более быстрые metadata операции

Metadata log в KRaft:

  • Все изменения метаданных записываются как события
  • Event sourcing модель для состояния кластера
  • Consistent snapshots для восстановления

Пояснение: Переход на KRaft — это эволюционный шаг, который делает Kafka самодостаточной системой без внешних зависимостей.


Performance и оптимизации

Disk I/O оптимизации

Sequential I/O:

  • Kafka пишет данные последовательно в лог файлы
  • Sequential writes в ~100 раз быстрее random writes на HDD
  • Использует page cache операционной системы

Zero-copy optimizations:

  • sendfile() system call для передачи данных с диска в сеть
  • Минимизация копирования данных в user space
  • DMA (Direct Memory Access) для передачи данных

Memory management

Page Cache usage:

  • Kafka полагается на OS page cache вместо собственного кеша
  • OS лучше знает паттерны доступа к данным
  • Автоматическое кеширование часто читаемых данных

Heap management:

  • Относительно небольшой JVM heap (6-8GB обычно достаточно)
  • Большая часть памяти отдается OS для page cache
  • G1GC или ZGC для минимизации пауз

Network optimizations

Batching:

  • Producer группирует сообщения в батчи
  • Consumer может получать несколько сообщений за один запрос
  • Уменьшение количества network round-trips

Compression:

  • GZIP, Snappy, LZ4, ZSTD поддерживаются
  • Компрессия применяется к целым батчам
  • Снижение network bandwidth и disk usage

Partitioning для производительности

Правильное количество партиций:

  • Больше партиций = больше параллелизма
  • Слишком много партиций = overhead на метаданные
  • Рекомендация: начинать с 2-3 партиций на broker

Hot partitions проблема:

  • Неравномерное распределение ключей
  • Одна партиция получает больше трафика
  • Решение: лучший выбор ключа партиционирования

Пояснение: Производительность Kafka зависит от понимания ее архитектуры. Правильная конфигурация может дать 10x улучшение производительности.


Monitoring и диагностика

Ключевые метрики Broker

Throughput метрики:

  • BytesInPerSec — входящий трафик
  • BytesOutPerSec — исходящий трафик
  • MessagesInPerSec — количество сообщений в секунду

Latency метрики:

  • ProduceRequestTime — время обработки Producer запросов
  • FetchRequestTime — время обработки Consumer запросов
  • RemoteTimeMs — время ожидания от followers

Resource utilization:

  • NetworkProcessorUtilization — загрузка network threads
  • RequestHandlerUtilization — загрузка request handler threads
  • LogFlushLatency — время fsync операций

Producer метрики

Performance метрики:

  • record-send-rate — скорость отправки сообщений
  • batch-size-avg — средний размер батча
  • compression-rate — коэффициент сжатия

Error метрики:

  • record-error-rate — частота ошибок отправки
  • record-retry-rate — частота повторных отправок

Consumer метрики

Lag метрики:

  • consumer-lag — отставание consumer'а от последнего сообщения
  • consumer-lag-sum — общее отставание по всем партициям

Processing метрики:

  • fetch-rate — скорость чтения сообщений
  • fetch-size-avg — средний размер fetch запроса

Partition метрики

Replication health:

  • UnderReplicatedPartitions — партиции с недостаточным количеством реплик
  • OfflinePartitionsCount — недоступные партиции
  • LeaderElectionRateAndTimeMs — частота и время выборов лидера

Пояснение: Мониторинг должен покрывать весь pipeline: Producer → Broker → Consumer. Особое внимание уделяйте lag метрикам и under-replicated партициям.


Failure scenarios и восстановление

Broker failures

Single broker failure:

  • Партиции на failed broker становятся недоступными для записи
  • Controller инициирует leader election для affected партиций
  • Новые лидеры выбираются из ISR
  • Recovery время зависит от количества партиций

Multiple broker failures:

  • Если потеряны все ISR для партиции, данные могут быть потеряны
  • unclean.leader.election.enable=true позволяет выбрать лидера из non-ISR
  • Риск потери данных vs доступность

Controller failure

Controller failover process:

  1. ZooKeeper/KRaft детектирует отказ controller'а
  2. Выбирается новый controller из живых брокеров
  3. Новый controller читает состояние кластера
  4. Отправляет обновления метаданных всем брокерам

Network partitions

Split-brain prevention:

  • ZooKeeper quorum предотвращает split-brain
  • Minority partition теряет доступ к ZooKeeper
  • Брокеры в minority перестают обслуживать клиентов

Data corruption

Checksum verification:

  • CRC32 для каждого сообщения
  • Broker'ы проверяют checksum при чтении
  • Corrupted сообщения отклоняются

Log recovery:

  • При старте broker проверяет integrity всех log segments
  • Truncation corrupted segments до последней valid записи
  • Replication восстанавливает потерянные данные

Пояснение: Kafka спроектирована для высокой доступности, но правильная конфигурация replication factor и ISR критична для предотвращения потери данных.


Tuning и оптимизация

OS level tuning

File system recommendations:

  • XFS или ext4 для лучшей производительности
  • Отдельные диски для log directories и ZooKeeper
  • SSD для ZooKeeper, HDD достаточно для Kafka logs

Kernel parameters:

# Увеличение file descriptor limits
fs.file-max = 2097152
# TCP buffer sizes
net.core.rmem_max = 134217728
net.core.wmem_max = 134217728
# Disable swapping
vm.swappiness = 1

JVM tuning

Heap sizing:

  • 6-8GB heap обычно достаточно для большинства workloads
  • Больший heap может привести к длинным GC паузам
  • Остальная память отдается OS page cache

GC tuning:

# G1GC рекомендуется для Kafka
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35

Kafka configuration tuning

Broker settings:

# Network threads
num.network.threads=8
# I/O threads  
num.io.threads=16
# Socket buffer sizes
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
# Log settings
log.segment.bytes=1073741824
log.retention.hours=168

Producer tuning:

# Batching for throughput
batch.size=65536
linger.ms=10
compression.type=snappy
# Memory allocation
buffer.memory=67108864

Consumer tuning:

# Fetch settings
fetch.min.bytes=50000
fetch.max.wait.ms=500
# Processing parallelism  
max.poll.records=2000

Security архитектура

Authentication mechanisms

SASL/PLAIN:

  • Простая username/password аутентификация
  • Credentials передаются в open text (требует SSL)

SASL/SCRAM:

  • Salted Challenge Response Authentication Mechanism
  • Пароли не передаются по сети
  • Поддержка SHA-256 и SHA-512

SSL/TLS:

  • Mutual TLS аутентификация
  • Client и server certificates
  • Encryption in transit

Authorization (ACLs)

ACL structure:

  • Principal (user/service account)
  • Resource (topic, consumer group, cluster)
  • Operation (read, write, create, delete)
  • Permission (allow/deny)

Resource types:

  • Topic ACLs — доступ к топикам
  • Consumer Group ACLs — membership в группах
  • Cluster ACLs — administrative operations

Encryption

Encryption at rest:

  • Kafka не предоставляет built-in encryption at rest
  • Используйте encrypted file systems или disk encryption
  • Application-level encryption сообщений

Encryption in transit:

  • SSL/TLS между clients и brokers
  • SSL/TLS между brokers (inter-broker)
  • SSL/TLS для ZooKeeper communication

Пояснение: Безопасность в Kafka многослойная. Комбинируйте authentication, authorization и encryption для полной защиты.


Заключение

Понимание внутреннего устройства Kafka критично для:

  1. Правильного дизайна топиков — выбор количества партиций и replication factor
  2. Оптимизации производительности — использование батчинга, compression, правильного partitioning
  3. Обеспечения надежности — понимание failure scenarios и recovery процессов
  4. Эффективного мониторинга — знание ключевых метрик и их значения
  5. Troubleshooting — диагностика проблем производительности и доступности

Kafka — это не просто message broker, а полноценная платформа для streaming данных. Ее архитектурные решения (log-based storage, leader-follower replication, consumer groups) обеспечивают уникальное сочетание высокой производительности, масштабируемости и надежности.

Главный принцип Kafka: "простые идеи, реализованные очень хорошо". Понимание этих идей поможет вам эффективно использовать все возможности платформы.

RabbitMQ

Что такое RabbitMQ

RabbitMQ — это надежный message broker, реализующий протокол AMQP (Advanced Message Queuing Protocol). Выступает посредником между приложениями, обеспечивая асинхронную доставку сообщений с гарантиями надежности.

Основные концепции:

  • Producer — отправляет сообщения
  • Consumer — получает и обрабатывает сообщения
  • Queue — буфер, хранящий сообщения
  • Exchange — маршрутизирует сообщения в очереди
  • Routing Key — ключ для маршрутизации
  • Binding — связь между exchange и queue

Преимущества:

  • Надежность — подтверждения доставки, персистентность
  • Гибкая маршрутизация — различные типы exchange
  • Кластеризация — высокая доступность
  • Множество протоколов — AMQP, STOMP, MQTT, WebSockets
  • Мониторинг — веб-интерфейс управления

Подключение к Spring Boot

Maven зависимости

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- Для JSON сериализации -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

Базовая конфигурация

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 60000
    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true
      retry:
        enabled: true
        initial-interval: 1000
        max-attempts: 3
        multiplier: 2
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 10
        retry:
          enabled: true
          max-attempts: 3

Пояснение:

  • publisher-confirms — подтверждения доставки от broker'а
  • publisher-returns — уведомления о недоставленных сообщениях
  • acknowledge-mode: manual — ручное подтверждение обработки
  • prefetch: 10 — количество неподтвержденных сообщений на consumer'а

Архитектура RabbitMQ

Virtual Hosts

Virtual Host (vhost) — логическое разделение RabbitMQ:

  • Изолирует пользователей, exchanges, queues
  • Похоже на database в РСУБД
  • По умолчанию используется vhost "/"
  • Каждый vhost имеет собственные права доступа

Exchanges типы

Direct Exchange:

  • Маршрутизация по точному совпадению routing key
  • Routing key сообщения = binding key очереди
  • Подходит для point-to-point сообщений

Topic Exchange:

  • Маршрутизация по шаблону routing key
  • Wildcards: * (одно слово), # (ноль или больше слов)
  • Гибкая маршрутизация для сложных сценариев

Fanout Exchange:

  • Отправляет сообщения во все привязанные очереди
  • Игнорирует routing key
  • Подходит для broadcast сообщений

Headers Exchange:

  • Маршрутизация по заголовкам сообщения
  • Более гибкая альтернатива topic exchange
  • Использует x-match: all/any для логики

Default Exchange:

  • Встроенный direct exchange с именем ""
  • Автоматически привязан ко всем очередям
  • Routing key = имя очереди

Пояснение: Выбор типа exchange определяет логику маршрутизации сообщений. Direct для простой маршрутизации, Topic для сложных паттернов, Fanout для broadcasting.


Конфигурация инфраструктуры

Объявление очередей и exchanges

@Configuration
public class RabbitConfig {

    // Direct exchange для заказов
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order.exchange", true, false);
    }

    // Topic exchange для уведомлений  
    @Bean
    public TopicExchange notificationExchange() {
        return new TopicExchange("notification.exchange", true, false);
    }

    // Fanout exchange для логирования
    @Bean
    public FanoutExchange logExchange() {
        return new FanoutExchange("log.exchange", true, false);
    }

    // Очереди
    @Bean
    public Queue orderProcessingQueue() {
        return QueueBuilder.durable("order.processing")
            .withArgument("x-dead-letter-exchange", "order.dlx")
            .withArgument("x-message-ttl", 300000) // 5 минут TTL
            .build();
    }

    @Bean
    public Queue orderDlqQueue() {
        return QueueBuilder.durable("order.dlq").build();
    }

    // Привязки (Bindings)
    @Bean
    public Binding orderProcessingBinding() {
        return BindingBuilder
            .bind(orderProcessingQueue())
            .to(orderExchange())
            .with("order.process");
    }

    @Bean
    public Binding notificationBinding() {
        return BindingBuilder
            .bind(notificationQueue())
            .to(notificationExchange())
            .with("notification.*.email");
    }
}

Пояснение:

  • durable=true — exchange/queue переживают перезапуск сервера
  • x-dead-letter-exchange — перенаправление failed сообщений
  • x-message-ttl — время жизни сообщений в миллисекундах

Dead Letter Exchange (DLX)

@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange("order.dlx", true, false);
}

@Bean
public Queue deadLetterQueue() {
    return QueueBuilder.durable("order.dlq").build();
}

@Bean
public Binding deadLetterBinding() {
    return BindingBuilder
        .bind(deadLetterQueue())
        .to(deadLetterExchange())
        .with("order.failed");
}

Пояснение: DLX обрабатывает сообщения, которые:

  • Отклонены consumer'ом с requeue=false
  • Истек TTL сообщения
  • Очередь переполнена (при установке x-max-length)

Producer (отправка сообщений)

Простая отправка

@Service
public class OrderService {

    private final RabbitTemplate rabbitTemplate;

    public void createOrder(Order order) {
        // Простая отправка
        rabbitTemplate.convertAndSend("order.exchange", "order.process", order);
        
        // С дополнительными свойствами
        rabbitTemplate.convertAndSend(
            "order.exchange", 
            "order.process", 
            order,
            message -> {
                message.getMessageProperties().setContentType("application/json");
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                message.getMessageProperties().setExpiration("60000"); // TTL 1 минута
                return message;
            }
        );
    }
}

Подтверждения и обработка ошибок

@Service
public class ReliableMessageService {

    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void setup() {
        // Callback для подтверждений
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("Message delivered successfully: {}", correlationData);
            } else {
                log.error("Message delivery failed: {}, cause: {}", correlationData, cause);
            }
        });

        // Callback для returned сообщений
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("Message returned: {}, reply: {}", 
                returned.getMessage(), returned.getReplyText());
        });
    }

    public void sendMessage(Object message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        
        rabbitTemplate.convertAndSend(
            "order.exchange",
            "order.process", 
            message,
            correlationData
        );
    }
}

Пояснение:

  • ConfirmCallback — уведомление о доставке в exchange
  • ReturnsCallback — уведомление о недоставленных в queue сообщениях
  • CorrelationData — связывает отправку с подтверждением

Транзакционная отправка

@Service
@Transactional
public class TransactionalOrderService {

    @RabbitTransactional
    public void processOrderWithNotification(Order order) {
        // Сохранение в БД
        orderRepository.save(order);
        
        // Отправка уведомления (в рамках транзакции)
        rabbitTemplate.convertAndSend("notification.exchange", 
            "notification.order.created", 
            new OrderCreatedEvent(order));
            
        // Если произойдет исключение, откатятся и БД, и сообщение
    }
}

Consumer (получение сообщений)

Простой consumer

@Component
public class OrderProcessingConsumer {

    @RabbitListener(queues = "order.processing")
    public void handleOrderProcessing(Order order, 
                                    @Header Map<String, Object> headers,
                                    Channel channel,
                                    @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        
        try {
            log.info("Processing order: {}", order.getId());
            orderService.processOrder(order);
            
            // Ручное подтверждение успешной обработки
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            log.error("Failed to process order: {}", order.getId(), e);
            
            try {
                // Отклонение с отправкой в DLQ
                channel.basicReject(deliveryTag, false);
            } catch (IOException ioException) {
                log.error("Failed to reject message", ioException);
            }
        }
    }
}

Пояснение:

  • @Header(AmqpHeaders.DELIVERY_TAG) — уникальный идентификатор сообщения
  • basicAck(deliveryTag, false) — подтверждение обработки одного сообщения
  • basicReject(deliveryTag, false) — отклонение без requeue

Retry механизм

@Component
public class RetryableConsumer {

    @RabbitListener(queues = "payment.processing")
    @Retryable(
        value = {RetryableException.class},
        maxAttempts = 3,
        backoff = @Backoff(delay = 1000, multiplier = 2)
    )
    public void processPayment(PaymentRequest request) throws Exception {
        try {
            paymentService.processPayment(request);
        } catch (TemporaryServiceException e) {
            throw new RetryableException("Payment service temporarily unavailable", e);
        } catch (InvalidDataException e) {
            // Не retry для permanent ошибок
            log.error("Invalid payment data: {}", request, e);
        }
    }

    @Recover
    public void recoverPayment(RetryableException ex, PaymentRequest request) {
        log.error("Payment processing failed after all retries: {}", request, ex);
        // Отправка в DLQ или альтернативная обработка
    }
}

Batch processing

@Component
public class BatchConsumer {

    @RabbitListener(queues = "analytics.events", 
                   containerFactory = "batchListenerContainerFactory")
    public void processBatch(List<AnalyticsEvent> events) {
        log.info("Processing batch of {} events", events.size());
        
        try {
            analyticsService.processBatch(events);
        } catch (Exception e) {
            log.error("Batch processing failed", e);
            // При ошибке можно обработать события по одному
            events.forEach(this::processSingleEvent);
        }
    }

    private void processSingleEvent(AnalyticsEvent event) {
        try {
            analyticsService.processEvent(event);
        } catch (Exception e) {
            log.error("Failed to process event: {}", event, e);
        }
    }
}

Priority Queues

@Bean
public Queue priorityQueue() {
    return QueueBuilder.durable("priority.processing")
        .withArgument("x-max-priority", 10)
        .build();
}

// Отправка с приоритетом
public void sendPriorityMessage(Object message, int priority) {
    rabbitTemplate.convertAndSend("priority.exchange", "priority.key", message, 
        msg -> {
            msg.getMessageProperties().setPriority(priority);
            return msg;
        });
}

@RabbitListener(queues = "priority.processing")
public void handlePriorityMessage(Object message) {
    // Сообщения обрабатываются в порядке приоритета
    processMessage(message);
}

Пояснение: Priority queues обрабатывают сообщения с высоким приоритетом раньше. Приоритет от 0 до 255 (или установленного максимума).


Конфигурация Listener'ов

Кастомные Container Factory

@Configuration
public class RabbitListenerConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        
        SimpleRabbitListenerContainerFactory factory = 
            new SimpleRabbitListenerContainerFactory();
        
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(10);
        factory.setConcurrentConsumers(2);
        factory.setMaxConcurrentConsumers(5);
        factory.setDefaultRequeueRejected(false);
        
        // Retry policy
        factory.setRetryTemplate(retryTemplate());
        factory.setRecoveryCallback(context -> {
            Message failedMessage = (Message) context.getAttribute("message");
            // Отправка в DLQ
            rabbitTemplate.send("failed.exchange", "", failedMessage);
            return null;
        });
        
        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory batchListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        
        SimpleRabbitListenerContainerFactory factory = 
            new SimpleRabbitListenerContainerFactory();
        
        factory.setConnectionFactory(connectionFactory);
        factory.setBatchListener(true);
        factory.setBatchSize(50);
        factory.setConsumerBatchEnabled(true);
        
        return factory;
    }

    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        
        return retryTemplate;
    }
}

Пояснение:

  • setConcurrentConsumers — минимальное количество consumer'ов
  • setMaxConcurrentConsumers — максимальное (автоматическое масштабирование)
  • setPrefetchCount — количество неподтвержденных сообщений на consumer'а

Routing Patterns

Topic Exchange patterns

// Отправка с различными routing keys
rabbitTemplate.convertAndSend("notification.exchange", "notification.email.signup", emailNotification);
rabbitTemplate.convertAndSend("notification.exchange", "notification.sms.signup", smsNotification);
rabbitTemplate.convertAndSend("notification.exchange", "notification.push.order", pushNotification);

// Binding patterns
@Bean
public Binding emailNotificationsBinding() {
    return BindingBuilder
        .bind(emailQueue())
        .to(notificationExchange())
        .with("notification.email.*"); // Все email уведомления
}

@Bean
public Binding allNotificationsBinding() {
    return BindingBuilder
        .bind(auditQueue())
        .to(notificationExchange()) 
        .with("notification.#"); // Все уведомления
}

@Bean
public Binding signupNotificationsBinding() {
    return BindingBuilder
        .bind(signupQueue())
        .to(notificationExchange())
        .with("notification.*.signup"); // Все уведомления о регистрации
}

Headers Exchange

@Bean
public HeadersExchange headersExchange() {
    return new HeadersExchange("headers.exchange");
}

@Bean 
public Binding headersBinding() {
    return BindingBuilder
        .bind(processQueue())
        .to(headersExchange())
        .whereAll(Map.of("type", "order", "priority", "high"))
        .match();
}

// Отправка с заголовками
public void sendWithHeaders(Object message) {
    rabbitTemplate.convertAndSend("headers.exchange", "", message,
        msg -> {
            msg.getMessageProperties().getHeaders().put("type", "order");
            msg.getMessageProperties().getHeaders().put("priority", "high");
            msg.getMessageProperties().getHeaders().put("region", "europe");
            return msg;
        });
}

Пояснение: Headers Exchange более гибкий чем Topic — может использовать множественные критерии маршрутизации с логикой AND/OR.


Мониторинг и управление

Management API

@Service
public class RabbitManagementService {

    private final RabbitAdmin rabbitAdmin;
    
    public QueueInfo getQueueInfo(String queueName) {
        Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
        if (queueProperties != null) {
            return QueueInfo.builder()
                .name(queueName)
                .messageCount((Integer) queueProperties.get("QUEUE_MESSAGE_COUNT"))
                .consumerCount((Integer) queueProperties.get("QUEUE_CONSUMER_COUNT"))
                .build();
        }
        return null;
    }

    public void purgeQueue(String queueName) {
        rabbitAdmin.purgeQueue(queueName, false);
    }

    public void createQueueDynamically(String queueName) {
        Queue queue = new Queue(queueName, true, false, false);
        rabbitAdmin.declareQueue(queue);
    }
}

Actuator endpoints

management:
  endpoints:
    web:
      exposure:
        include: health, rabbit, metrics
  endpoint:
    health:
      show-details: always
    rabbit:
      enabled: true

Health checks

@Component
public class RabbitHealthIndicator implements HealthIndicator {

    private final RabbitTemplate rabbitTemplate;

    @Override
    public Health health() {
        try {
            // Проверка соединения
            rabbitTemplate.execute(channel -> {
                channel.queueDeclarePassive("health.check");
                return null;
            });
            
            return Health.up()
                .withDetail("rabbit", "Available")
                .build();
        } catch (Exception e) {
            return Health.down()
                .withDetail("rabbit", "Unavailable")
                .withDetail("error", e.getMessage())
                .build();
        }
    }
}

Метрики

@Component
public class RabbitMetrics {

    private final Counter messagesProduced;
    private final Counter messagesConsumed;
    private final Timer processingTime;

    public RabbitMetrics(MeterRegistry meterRegistry) {
        this.messagesProduced = Counter.builder("rabbitmq.messages.produced")
            .description("Number of messages produced")
            .register(meterRegistry);
            
        this.messagesConsumed = Counter.builder("rabbitmq.messages.consumed")
            .description("Number of messages consumed")
            .register(meterRegistry);
            
        this.processingTime = Timer.builder("rabbitmq.processing.time")
            .description("Message processing time")
            .register(meterRegistry);
    }

    @EventListener
    public void handleMessageSent(MessageSentEvent event) {
        messagesProduced.increment(
            Tags.of("exchange", event.getExchange(), "routing-key", event.getRoutingKey())
        );
    }

    public void recordProcessingTime(String queue, long timeMs) {
        processingTime.record(timeMs, TimeUnit.MILLISECONDS, Tags.of("queue", queue));
    }
}

Пояснение: Мониторинг должен отслеживать основные метрики: количество сообщений, время обработки, размер очередей, количество consumer'ов.


High Availability и Clustering

Mirrored Queues (Classic)

# Настройка политики для HA
rabbitmqctl set_policy ha-orders "^order\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

Quorum Queues (Современный подход)

@Bean
public Queue quorumQueue() {
    return QueueBuilder.durable("order.processing.quorum")
        .withArgument("x-queue-type", "quorum")
        .withArgument("x-max-in-memory-length", 1000)
        .build();
}

Пояснение:

  • Classic mirrored queues — legacy подход с master-slave репликацией
  • Quorum queues — современный подход на основе Raft consensus, более надежный

Cluster configuration

# rabbitmq.conf
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@node1
cluster_formation.classic_config.nodes.2 = rabbit@node2
cluster_formation.classic_config.nodes.3 = rabbit@node3

# Политика для автоматического создания quorum queues
default_queue_type = quorum

Connection failure handling

@Configuration
public class RabbitConnectionConfig {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setAddresses("rabbit1:5672,rabbit2:5672,rabbit3:5672");
        factory.setUsername("user");
        factory.setPassword("password");
        
        // Настройки reconnection
        factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
        factory.getRabbitConnectionFactory().setNetworkRecoveryInterval(5000);
        factory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(true);
        
        return factory;
    }
}

Пояснение: Automatic recovery восстанавливает соединения, channels, exchanges, queues и bindings при сбоях сети.


Message Patterns

Request-Reply Pattern

@Service
public class RequestReplyService {

    private final RabbitTemplate rabbitTemplate;

    // Синхронный request-reply
    public String syncRequest(String request) {
        return (String) rabbitTemplate.convertSendAndReceive(
            "request.exchange", 
            "request.process", 
            request
        );
    }

    // Асинхронный request-reply
    public void asyncRequest(String request, CorrelationData correlationData) {
        rabbitTemplate.convertAndSend(
            "request.exchange",
            "request.process", 
            request,
            message -> {
                message.getMessageProperties().setReplyTo("response.queue");
                message.getMessageProperties().setCorrelationId(correlationData.getId());
                return message;
            }
        );
    }
}

@Component
public class RequestProcessor {

    @RabbitListener(queues = "request.queue")
    public void processRequest(String request, 
                              @Header(name = "replyTo", required = false) String replyTo,
                              @Header(name = "correlationId", required = false) String correlationId) {
        
        String response = businessService.processRequest(request);
        
        if (replyTo != null) {
            rabbitTemplate.convertAndSend("", replyTo, response, message -> {
                if (correlationId != null) {
                    message.getMessageProperties().setCorrelationId(correlationId);
                }
                return message;
            });
        }
    }
}

Publish-Subscribe Pattern

@Service 
public class EventPublisher {

    public void publishEvent(DomainEvent event) {
        // Fanout exchange отправляет во все подписанные очереди
        rabbitTemplate.convertAndSend("events.fanout", "", event);
    }
}

@Component
public class EmailSubscriber {

    @RabbitListener(queues = "events.email")
    public void handleEvent(DomainEvent event) {
        if (event instanceof UserRegisteredEvent) {
            emailService.sendWelcomeEmail((UserRegisteredEvent) event);
        }
    }
}

@Component
public class AnalyticsSubscriber {

    @RabbitListener(queues = "events.analytics")
    public void handleEvent(DomainEvent event) {
        analyticsService.trackEvent(event);
    }
}

Competing Consumers Pattern

// Несколько consumer'ов читают из одной очереди
@Component
public class OrderProcessor1 {

    @RabbitListener(queues = "order.processing", id = "processor1")
    public void processOrder(Order order) {
        log.info("Processor 1 handling order: {}", order.getId());
        orderService.processOrder(order);
    }
}

@Component  
public class OrderProcessor2 {

    @RabbitListener(queues = "order.processing", id = "processor2")
    public void processOrder(Order order) {
        log.info("Processor 2 handling order: {}", order.getId());
        orderService.processOrder(order);
    }
}

Пояснение: Competing Consumers автоматически распределяет нагрузку между multiple consumers одной очереди, увеличивая throughput.


Безопасность

SSL/TLS конфигурация

spring:
  rabbitmq:
    ssl:
      enabled: true
      key-store: classpath:client_key.p12
      key-store-password: password
      key-store-type: PKCS12
      trust-store: classpath:trust_store.jks
      trust-store-password: password
      trust-store-type: JKS
      algorithm: TLSv1.2

Аутентификация и авторизация

# Создание пользователя
rabbitmqctl add_user myuser mypassword

# Назначение прав
rabbitmqctl set_permissions -p "/" myuser "order\..*" "order\..*" "order\..*"

# Роли
rabbitmqctl set_user_tags myuser management

Права доступа

# Формат: configure write read
rabbitmqctl set_permissions -p "/" producer "^$" "order\..*" "^$"        # Только отправка
rabbitmqctl set_permissions -p "/" consumer "^$" "^$" "order\..*"        # Только чтение  
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"                  # Полный доступ

Пояснение:

  • configure — создание/удаление exchanges и queues
  • write — отправка сообщений
  • read — чтение сообщений и привязка queues

Performance tuning

Producer оптимизация

@Bean
public RabbitTemplate optimizedRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    
    // Batching для увеличения throughput
    template.setUsePublisherConnection(true);
    
    // JSON converter для лучшей производительности
    template.setMessageConverter(new Jackson2JsonMessageConverter());
    
    return template;
}

Consumer оптимизация

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 50              # Больше prefetch для высокой нагрузки
        concurrency: 5-10         # Диапазон consumer'ов
        batch-size: 10            # Batch processing
        idle-event-interval: 60s  # Частота idle events

Connection pooling

@Bean
public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
    
    // Connection caching
    factory.setConnectionCacheSize(10);
    factory.setChannelCacheSize(50);
    
    // Publisher confirms
    factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    factory.setPublisherReturns(true);
    
    return factory;
}

Memory и disk управление

# rabbitmq.conf
vm_memory_high_watermark.relative = 0.6  # 60% RAM для RabbitMQ
disk_free_limit.relative = 2.0           # 2GB свободного места
vm_memory_high_watermark_paging_ratio = 0.5  # Paging при 50% от watermark

# Настройки очередей для экономии памяти
default_queue_type = classic
queue_master_locator = min-masters

Пояснение:

  • При превышении memory watermark RabbitMQ начинает блокировать publishers
  • Paging перемещает сообщения из RAM на диск для экономии памяти
  • Lazy queues хранят сообщения на диске по умолчанию

Troubleshooting и диагностика

Основные проблемы и решения

Memory alarms:

# Проверка состояния
rabbitmqctl status
rabbitmqctl environment

# Очистка memory alarm (осторожно!)
rabbitmqctl eval 'rabbit_alarm:clear_alarm({resource_limit, memory, node()}).'

Disk space issues:

# Проверка использования диска
rabbitmqctl status | grep disk
du -sh /var/lib/rabbitmq/mnesia/

# Принудительная очистка (emergency)
rabbitmqctl stop_app
rabbitmqctl reset  # ВНИМАНИЕ: удаляет все данные!
rabbitmqctl start_app

Connection leaks:

# Мониторинг соединений
rabbitmqctl list_connections
rabbitmqctl list_channels

# Закрытие зависших соединений
rabbitmqctl close_connection "<connection_name>" "Connection cleanup"

Логирование и отладка

# application.yml
logging:
  level:
    org.springframework.amqp: DEBUG
    org.springframework.rabbit: DEBUG
    com.rabbitmq: DEBUG
@Component
public class RabbitDebugListener {

    @RabbitListener(queues = "debug.queue")
    public void debugMessage(Message message, Channel channel) {
        log.debug("Received message: {}", message);
        log.debug("Headers: {}", message.getMessageProperties().getHeaders());
        log.debug("Routing key: {}", message.getMessageProperties().getReceivedRoutingKey());
        log.debug("Exchange: {}", message.getMessageProperties().getReceivedExchange());
    }
}

Performance monitoring

@Component
public class RabbitPerformanceMonitor {

    @EventListener
    public void onListenerConsumerStarted(ListenerContainerConsumerStartedEvent event) {
        log.info("Consumer started: {}", event.getContainer());
    }

    @EventListener  
    public void onListenerConsumerStopped(ListenerContainerConsumerStoppedEvent event) {
        log.warn("Consumer stopped: {}", event.getContainer());
    }

    @EventListener
    public void onListenerConsumerFailed(ListenerContainerConsumerFailedEvent event) {
        log.error("Consumer failed: {}", event.getContainer(), event.getReason());
    }
}

Best Practices

1. Правильное именование

public class NamingConventions {
    // Exchanges
    public static final String ORDER_EXCHANGE = "order.exchange";
    public static final String NOTIFICATION_EXCHANGE = "notification.exchange";
    
    // Queues (включают назначение)
    public static final String ORDER_PROCESSING_QUEUE = "order.processing";
    public static final String ORDER_EMAIL_QUEUE = "order.email";
    public static final String ORDER_DLQ = "order.dlq";
    
    // Routing keys (иерархические)
    public static final String ORDER_CREATED = "order.created";
    public static final String ORDER_CANCELLED = "order.cancelled";
    public static final String NOTIFICATION_EMAIL_SIGNUP = "notification.email.signup";
}

2. Idempotency

@Component
public class IdempotentConsumer {

    private final RedisTemplate<String, String> redisTemplate;

    @RabbitListener(queues = "payment.processing")
    public void processPayment(PaymentRequest request, 
                              @Header(AmqpHeaders.MESSAGE_ID) String messageId) {
        
        String idempotencyKey = "payment:" + messageId;
        
        // Проверка на дублирование
        if (redisTemplate.hasKey(idempotencyKey)) {
            log.info("Payment already processed: {}", request.getPaymentId());
            return;
        }
        
        try {
            paymentService.processPayment(request);
            
            // Сохранение ключа идемпотентности
            redisTemplate.opsForValue().set(idempotencyKey, "processed", 
                Duration.ofHours(24));
                
        } catch (Exception e) {
            log.error("Payment processing failed: {}", request.getPaymentId(), e);
            throw e;
        }
    }
}

3. Circuit Breaker pattern

@Component
public class CircuitBreakerConsumer {

    private final CircuitBreaker circuitBreaker;

    @RabbitListener(queues = "external.api.calls")
    public void handleApiCall(ApiCallRequest request) {
        
        Supplier<ApiResponse> protectedCall = CircuitBreaker
            .decorateSupplier(circuitBreaker, () -> externalApiService.call(request));
            
        Try<ApiResponse> result = Try.ofSupplier(protectedCall);
        
        if (result.isSuccess()) {
            // Успешная обработка
            log.info("API call successful: {}", request.getId());
        } else {
            // Fallback или отправка в retry queue
            log.error("API call failed: {}", request.getId(), result.getCause());
            rabbitTemplate.convertAndSend("external.api.retry", request);
        }
    }
}

4. Graceful shutdown

@Component
public class GracefulShutdown {

    private final SimpleMessageListenerContainer listenerContainer;

    @PreDestroy
    public void shutdown() {
        log.info("Shutting down RabbitMQ listeners...");
        
        // Остановка приема новых сообщений
        listenerContainer.shutdown();
        
        // Ожидание завершения обработки текущих сообщений
        try {
            if (!listenerContainer.isRunning()) {
                log.info("All messages processed, shutdown complete");
            }
        } catch (Exception e) {
            log.warn("Error during graceful shutdown", e);
        }
    }
}

5. Message versioning

public abstract class BaseMessage {
    @JsonProperty("messageVersion")
    private String messageVersion = "1.0";
    
    @JsonProperty("messageType") 
    private String messageType;
    
    @JsonProperty("timestamp")
    private LocalDateTime timestamp = LocalDateTime.now();
    
    // Геттеры и сеттеры
}

@JsonTypeName("OrderCreated")
public class OrderCreatedMessage extends BaseMessage {
    
    public OrderCreatedMessage() {
        setMessageType("OrderCreated");
        setMessageVersion("1.1"); // Обновленная версия
    }
    
    // Поля сообщения
}

@Component
public class VersionedMessageHandler {

    @RabbitListener(queues = "order.events")
    public void handleOrderEvent(BaseMessage message) {
        
        switch (message.getMessageVersion()) {
            case "1.0":
                handleV1Message(message);
                break;
            case "1.1":
                handleV2Message(message);
                break;
            default:
                log.warn("Unknown message version: {}", message.getMessageVersion());
        }
    }
}

6. Dead Letter Queue обработка

@Component
public class DlqProcessor {

    @RabbitListener(queues = "order.dlq")
    public void processDlqMessage(Message message, Channel channel,
                                 @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        
        // Анализ причины попадания в DLQ
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        
        @SuppressWarnings("unchecked")
        List<Map<String, Object>> xDeathHeader = 
            (List<Map<String, Object>>) headers.get("x-death");
            
        if (xDeathHeader != null && !xDeathHeader.isEmpty()) {
            Map<String, Object> death = xDeathHeader.get(0);
            String reason = (String) death.get("reason");
            Long count = (Long) death.get("count");
            
            log.error("Message in DLQ. Reason: {}, Count: {}, Message: {}", 
                reason, count, message);
            
            // Логика обработки в зависимости от причины
            if ("rejected".equals(reason) && count < 3) {
                // Повторная отправка в основную очередь
                reprocessMessage(message);
            } else {
                // Отправка в архив или уведомление администратора
                archiveMessage(message);
            }
        }
        
        try {
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            log.error("Failed to ack DLQ message", e);
        }
    }
}

Production considerations

1. Monitoring setup

# Prometheus метрики
management:
  metrics:
    export:
      prometheus:
        enabled: true
  endpoints:
    web:
      exposure:
        include: prometheus, health, rabbit
@Component
public class RabbitMetricsCollector {

    @Scheduled(fixedRate = 30000) // Каждые 30 секунд
    public void collectMetrics() {
        try {
            // Получение статистики очередей через Management API
            Map<String, Object> queueStats = rabbitAdmin.getQueueProperties("order.processing");
            
            if (queueStats != null) {
                int messageCount = (Integer) queueStats.get("QUEUE_MESSAGE_COUNT");
                int consumerCount = (Integer) queueStats.get("QUEUE_CONSUMER_COUNT");
                
                Metrics.gauge("rabbitmq.queue.messages", messageCount, "queue", "order.processing");
                Metrics.gauge("rabbitmq.queue.consumers", consumerCount, "queue", "order.processing");
                
                // Алерт при большой очереди
                if (messageCount > 1000) {
                    log.warn("High message count in queue: {}", messageCount);
                    alertService.sendAlert("RabbitMQ queue backlog", 
                        "Queue order.processing has " + messageCount + " messages");
                }
            }
        } catch (Exception e) {
            log.error("Failed to collect RabbitMQ metrics", e);
        }
    }
}

2. Deployment patterns

# Docker Compose для разработки
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3.11-management
    container_name: rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin
      RABBITMQ_DEFAULT_VHOST: /
    ports:

      - "5672:5672"   # AMQP порт
      - "15672:15672" # Management UI
    volumes:

      - rabbitmq_data:/var/lib/rabbitmq
    networks:

      - app-network

volumes:
  rabbitmq_data:

networks:
  app-network:
# Kubernetes deployment
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq
spec:
  serviceName: rabbitmq
  replicas: 3
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:

      - name: rabbitmq
        image: rabbitmq:3.11-management
        ports:

        - containerPort: 5672
        - containerPort: 15672
        env:

        - name: RABBITMQ_ERLANG_COOKIE
          value: "mycookie"
        - name: RABBITMQ_DEFAULT_USER
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: username
        volumeMounts:

        - name: rabbitmq-data
          mountPath: /var/lib/rabbitmq
  volumeClaimTemplates:

  - metadata:
      name: rabbitmq-data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 10Gi

3. Backup и восстановление

#!/bin/bash
# Backup script

# Экспорт конфигурации
rabbitmqctl export_definitions /backup/definitions-$(date +%Y%m%d).json

# Экспорт сообщений (если нужно)
rabbitmqctl list_queues name messages | while read queue messages; do
    if [ "$messages" -gt 0 ]; then
        echo "Backing up queue: $queue with $messages messages"
        # Используйте shovel plugin для миграции сообщений
    fi
done

# Backup Mnesia database (при остановленном RabbitMQ)
# systemctl stop rabbitmq-server
# tar -czf /backup/mnesia-$(date +%Y%m%d).tar.gz /var/lib/rabbitmq/mnesia/
# systemctl start rabbitmq-server

4. Capacity planning

@Component
public class CapacityMonitor {

    @Scheduled(fixedRate = 300000) // Каждые 5 минут
    public void monitorCapacity() {
        // Проверка throughput
        long currentTime = System.currentTimeMillis();
        long messagesPerMinute = getMessagesPerMinute();
        
        // Прогнозирование нагрузки
        if (messagesPerMinute > THRESHOLD_HIGH) {
            log.warn("High message throughput detected: {} messages/min", messagesPerMinute);
            
            // Автоматическое масштабирование consumer'ов
            scaleConsumers("order.processing", messagesPerMinute);
        }
        
        // Проверка размера очередей
        checkQueueSizes();
        
        // Проверка memory usage
        checkMemoryUsage();
    }
    
    private void scaleConsumers(String queueName, long throughput) {
        int currentConsumers = getCurrentConsumerCount(queueName);
        int optimalConsumers = calculateOptimalConsumers(throughput);
        
        if (optimalConsumers > currentConsumers) {
            log.info("Scaling consumers for {}: {} -> {}", 
                queueName, currentConsumers, optimalConsumers);
            // Логика масштабирования
        }
    }
}

Заключение

RabbitMQ предоставляет мощные возможности для построения надежных distributed систем. Ключевые принципы:

  1. Выбирайте правильный тип exchange — Direct для простой маршрутизации, Topic для сложных паттернов, Fanout для broadcasting
  2. Используйте Dead Letter Queues — для обработки failed сообщений
  3. Настраивайте acknowledgments правильно — Manual для критических данных, Auto для высокой производительности
  4. Мониторьте ключевые метрики — размер очередей, throughput, latency, error rates
  5. Планируйте capacity — исходя из expected load и growth
  6. Обеспечивайте идемпотентность — для exactly-once обработки
  7. Используйте retry с exponential backoff — для временных сбоев
  8. Настраивайте HA правильно — Quorum queues для критических данных

RabbitMQ отлично подходит для traditional messaging patterns и scenarios где важны гарантии доставки, сложная маршрутизация и operational simplicity. Для high-throughput streaming используйте Apache Kafka, для простых use cases рассмотрите cloud-native solutions.

Главное правило: начинайте с простой конфигурации и усложняйте по мере роста требований. RabbitMQ очень гибкий, но эта гибкость может привести к overengineering если не контролировать сложность.

Сравнение Kafka vs RabbitMQ

Краткое описание

Apache Kafka

Kafka — это распределенная платформа для потоковой обработки данных, изначально созданная для обработки больших объемов логов. Работает как распределенный commit log, где сообщения хранятся в партиционированных топиках.

Ключевые особенности:

  • High throughput (миллионы сообщений/сек)
  • Долговременное хранение сообщений
  • Streaming processing capabilities
  • Горизонтальное масштабирование

RabbitMQ

RabbitMQ — это традиционный message broker, реализующий протокол AMQP. Работает как умный посредник, который принимает, маршрутизирует и доставляет сообщения.

Ключевые особенности:

  • Гибкая маршрутизация сообщений
  • Множество протоколов (AMQP, STOMP, MQTT)
  • Rich feature set для messaging
  • Простота использования

Архитектурные различия

Модель хранения данных

Kafka: Log-based storage

Topic: user-events
Partition 0: [msg1][msg2][msg3][msg4] → (append-only log)
Partition 1: [msg5][msg6][msg7][msg8] →
Partition 2: [msg9][msg10][msg11]    →

Пояснение: Kafka сохраняет все сообщения в append-only логах на диске. Сообщения не удаляются после чтения, а хранятся согласно retention policy (по времени или размеру).

RabbitMQ: Queue-based storage

Queue: order-processing
[msg1] → [msg2] → [msg3] → Consumer (msg удаляется после ack)

Пояснение: RabbitMQ работает как традиционная очередь — сообщение доставляется consumer'у и удаляется из очереди после подтверждения.

Consumer модели

Kafka: Pull-based consumers

  • Consumer'ы сами запрашивают данные у broker'ов
  • Каждый consumer отслеживает свой offset
  • Возможность replay сообщений с любой позиции
  • Consumer group автоматически распределяет партиции

RabbitMQ: Push-based delivery

  • Broker активно доставляет сообщения consumer'ам
  • Flexible routing через exchanges
  • Acknowledgments для подтверждения обработки
  • Competing consumers pattern

Масштабирование

Kafka: Горизонтальное партиционирование

  • Партиции распределяются по broker'ам
  • Один лидер на партицию для записи
  • Follower'ы реплицируют данные
  • Масштабирование через добавление партиций/broker'ов

RabbitMQ: Clustering и mirroring

  • Cluster из множества узлов
  • Quorum queues для HA
  • Sharding через federation/shovel
  • Масштабирование ограничено сложностью routing

Производительность

Throughput сравнение

Kafka превосходит в:

  • Пропускная способность: 100k-1M+ сообщений/сек на commodity hardware
  • Batch processing: Efficient batching на producer и consumer стороне
  • Sequential I/O: Оптимизированная запись на диск
  • Zero-copy: Минимальное копирование данных
# Типичные показатели Kafka
Producer: 800,000 msgs/sec (100 byte messages)
Consumer: 900,000 msgs/sec
Latency: 2-5ms (end-to-end)

RabbitMQ превосходит в:

  • Низкая latency: Sub-millisecond для small messages
  • Flexible routing: Сложная логика маршрутизации
  • Small message optimization: Эффективность для коротких сообщений
  • Memory-based queues: Быстрый доступ для транзиентных данных
# Типичные показатели RabbitMQ
Producer: 20,000-50,000 msgs/sec
Consumer: 30,000-70,000 msgs/sec  
Latency: 0.1-1ms (для simple routing)

Latency профили

Kafka:

  • Выше latency из-за batching
  • Predictable latency под нагрузкой
  • Лучше для high-throughput scenarios

RabbitMQ:

  • Ниже latency для individual messages
  • Latency может расти под высокой нагрузкой
  • Лучше для real-time communication

Пояснение: Kafka оптимизирована для throughput, RabbitMQ — для latency. Выбор зависит от приоритетов вашей системы.


Надежность и гарантии доставки

Delivery semantics

Kafka:

  • At-least-once: По умолчанию с правильной конфигурацией
  • Exactly-once: Поддерживается для Kafka Streams и транзакционных producers
  • At-most-once: При acks=0 или consumer errors
# Конфигурация для exactly-once
spring.kafka.producer:
  acks: all
  enable-idempotence: true
  transactional-id: my-transactional-id
spring.kafka.consumer:
  isolation-level: read_committed

RabbitMQ:

  • At-least-once: Стандартный режим с acknowledgments
  • At-most-once: Без acknowledgments
  • Exactly-once: Требует application-level idempotency
# Конфигурация для надежности
spring.rabbitmq:
  publisher-confirms: true
  publisher-returns: true
  listener.simple.acknowledge-mode: manual

Durability

Kafka:

  • Все сообщения записываются на диск
  • Configurable replication factor
  • Automatic leader election
  • Retention по времени/размеру

RabbitMQ:

  • Durable queues и persistent messages
  • Quorum queues для HA
  • Clustering с synchronous replication
  • TTL и DLQ для failed messages

Пояснение: Обе системы могут обеспечить высокую надежность, но Kafka делает это "по умолчанию", а RabbitMQ требует explicit конфигурации.


Особенности использования

Kafka: Stream processing

Идеальна для:

  • Event sourcing архитектур
  • Real-time analytics и ETL
  • Log aggregation
  • Activity tracking
// Kafka Streams example
@Component
public class OrderAnalytics {
    
    @Autowired
    private StreamsBuilder streamsBuilder;
    
    @Bean
    public KStream<String, Order> processOrders() {
        return streamsBuilder
            .stream("orders")
            .filter((key, order) -> order.getAmount() > 100)
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .aggregate(OrderStats::new, this::updateStats)
            .toStream()
            .to("order-analytics");
    }
}

Пояснение: Kafka Streams позволяет создавать сложные stream processing pipelines прямо внутри приложения без внешних frameworks.

RabbitMQ: Traditional messaging

Идеальна для:

  • Request-reply patterns
  • Task queues с приоритетами
  • Сложная маршрутизация сообщений
  • Микросервисная коммуникация
// RabbitMQ routing example
@RabbitListener(queues = "order.processing.high-priority")
public void processHighPriorityOrder(Order order) {
    // Обработка приоритетных заказов
}

@RabbitListener(queues = "order.processing.normal")  
public void processNormalOrder(Order order) {
    // Обработка обычных заказов
}

Пояснение: RabbitMQ excel в scenarios где нужна гибкая маршрутизация и traditional messaging patterns.


Operational complexity

Kafka: Сложность инфраструктуры

Плюсы:

  • Простая концептуальная модель (logs)
  • Automatic failover и recovery
  • Built-in monitoring metrics
  • KRaft устраняет ZooKeeper dependency

Минусы:

  • Требует понимания partitioning strategy
  • Сложность в настройке producer/consumer
  • Monitoring множества метрик
  • Retention policy management

RabbitMQ: Operational overhead

Плюсы:

  • Intuitive web management UI
  • Rich ecosystem и plugins
  • Flexible deployment options
  • Extensive documentation

Минусы:

  • Memory management complexity
  • Clustering configuration challenges
  • Queue mirroring overhead
  • More moving parts для HA setup

Пояснение: Kafka требует глубокого понимания distributed systems, RabbitMQ — больше operational expertise для production deployment.


Use cases и рекомендации

Выбирайте Kafka если:

1. High-throughput scenarios

✅ Обработка миллионов событий в секунду
✅ Analytics и ETL pipelines
✅ Activity tracking (clicks, page views)
✅ IoT data ingestion

2. Event sourcing архитектуры

✅ Нужно replay events с любого момента времени
✅ Audit trails и compliance требования
✅ Stream processing и real-time analytics
✅ CQRS implementations

3. Integration patterns

✅ Data pipeline между системами
✅ CDC (Change Data Capture)
✅ Log aggregation от множества services
✅ Backup и disaster recovery

Примеры реальных сценариев:

  • Netflix: Tracking user behavior, recommendations
  • LinkedIn: Activity feeds, messaging infrastructure
  • Uber: Location tracking, trip events
  • Spotify: Music streaming analytics

Выбирайте RabbitMQ если:

1. Traditional messaging patterns

✅ Request-reply communication
✅ Task queues с приоритетами
✅ RPC calls между микросервисами
✅ Workflow orchestration

2. Сложная маршрутизация

✅ Routing по multiple criteria
✅ Fan-out broadcasts
✅ Content-based routing
✅ Protocol translation (AMQP/STOMP/MQTT)

3. Low-latency requirements

✅ Real-time notifications
✅ Chat applications
✅ Gaming backends
✅ Financial trading systems

Примеры реальных сценариев:

  • WhatsApp: Message delivery infrastructure
  • Reddit: Comment processing, notifications
  • Mozilla: Build system coordination
  • Goldman Sachs: Trading system messaging

Сценарии выбора

Микросервисная архитектура

Kafka подходит для:

Service A → [Event Store] ← Service B
                ↓
    [Event-driven processing]
                ↓
         [Projections/Views]
  • Event-driven communication
  • Service decoupling через events
  • Audit logs для compliance
  • Analytics across services

RabbitMQ подходит для:

Service A → [Queue] → Service B
     ↑                   ↓
[Response Queue] ←── [ACK/NACK]
  • Direct service-to-service calls
  • Task distribution
  • Request-reply patterns
  • Circuit breaker implementations

Data pipeline архитектуры

Kafka: Streaming ETL

Sources → Kafka Connect → Kafka Streams → Sinks
  (DB)      (Ingestion)   (Processing)    (DWH)

Идеально для:

  • Real-time data warehousing
  • Stream joining и aggregations
  • CDC processing
  • Machine learning feature pipelines

RabbitMQ: Batch processing

Scheduler → [Job Queue] → Workers → [Result Queue] → Aggregator

Идеально для:

  • Batch job processing
  • Image/video processing
  • Report generation
  • Email sending campaigns

Hybrid подходы

Часто используются вместе:

External APIs → RabbitMQ → Service → Kafka → Analytics
    (Fast)      (Routing)  (Process)  (Store)   (Insights)

Пример архитектуры:

  1. RabbitMQ для immediate processing (orders, payments)
  2. Kafka для long-term storage и analytics
  3. Services читают из RabbitMQ и пишут в Kafka

Пояснение: Hybrid approach позволяет использовать strengths каждой системы — RabbitMQ для operational messaging, Kafka для data pipelines.


Технические ограничения

Kafka ограничения

Message size:

  • Default max: 1MB per message
  • Можно увеличить, но affects performance
  • Large messages лучше передавать по reference

Ordering guarantees:

  • Гарантируется только внутри партиции
  • Cross-partition ordering невозможен
  • Требует careful key selection

Consumer limitations:

  • Нельзя selective consumption (skip messages)
  • Rebalancing может быть медленным
  • Memory overhead для large consumer groups

RabbitMQ ограничения

Scalability ceiling:

  • Clustering complexity растет с nodes
  • Memory usage может быть проблемой
  • Network partitions сложно обрабатывать

Message ordering:

  • Нет ordering guarantees across queues
  • Single queue = single point of bottleneck
  • Priority queues нарушают FIFO order

Storage limitations:

  • Не предназначен для long-term storage
  • Large queues влияют на performance
  • Memory-based queues ограничены RAM

Cost considerations

Total Cost of Ownership (TCO)

Kafka:

Infrastructure: Средняя (нужны диски, память)
Operational:    Высокая (требует expertise)
Licensing:      Бесплатная (OSS) / Expensive (Confluent)
Monitoring:     Комплексная (много метрик)

RabbitMQ:

Infrastructure: Низкая (меньше ресурсов)
Operational:    Средняя (проще в управлении)  
Licensing:      Бесплатная (OSS) / Moderate (VMware)
Monitoring:     Простая (веб UI)

Cloud offerings

Managed Kafka:

  • AWS MSK, Confluent Cloud, Azure Event Hubs
  • Дороже self-hosted, но проще в управлении
  • Automatic scaling и patching

Managed RabbitMQ:

  • AWS AmazonMQ, CloudAMQP, Google Cloud Pub/Sub
  • Значительно дешевле Kafka equivalents
  • Faster time-to-market

Пояснение: Для startups и small teams managed services часто экономически выгоднее self-hosted solutions, особенно для RabbitMQ.


Migration considerations

От RabbitMQ к Kafka

Причины миграции:

  • Рост throughput requirements
  • Потребность в event sourcing
  • Analytics и real-time processing
  • Cost optimization для high volume

Стратегия миграции:

Phase 1: Dual write (RabbitMQ + Kafka)
Phase 2: Migrate consumers to Kafka  
Phase 3: Migrate producers to Kafka
Phase 4: Decommission RabbitMQ

От Kafka к RabbitMQ

Причины миграции:

  • Упрощение архитектуры
  • Потребность в complex routing
  • Low-latency requirements
  • Team expertise limitations

Стратегия миграции:

Phase 1: Implement RabbitMQ alongside
Phase 2: Route new flows через RabbitMQ
Phase 3: Migrate existing consumers
Phase 4: Retire Kafka components

Пояснение: Миграция — это major undertaking. Часто лучше использовать hybrid approach или выбирать правильную технологию с самого начала.


Матрица принятия решений

Выбор по критериям

Критерий Kafka RabbitMQ Комментарий
Throughput ⭐⭐⭐⭐⭐ ⭐⭐⭐ Kafka в 10-100x быстрее
Latency ⭐⭐⭐ ⭐⭐⭐⭐⭐ RabbitMQ лучше для real-time
Reliability ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ Обе надежны при правильной настройке
Scalability ⭐⭐⭐⭐⭐ ⭐⭐⭐ Kafka scales горизонтально
Operational complexity ⭐⭐ ⭐⭐⭐⭐ RabbitMQ проще в управлении
Feature richness ⭐⭐⭐ ⭐⭐⭐⭐⭐ RabbitMQ более feature-rich
Learning curve ⭐⭐ ⭐⭐⭐⭐ RabbitMQ проще в изучении

Decision tree

Нужен high throughput (>100k msg/sec)?
├─ YES → Kafka
└─ NO → Нужна сложная маршрутизация?
    ├─ YES → RabbitMQ  
    └─ NO → Нужно долговременное хранение?
        ├─ YES → Kafka
        └─ NO → Нужна низкая latency?
            ├─ YES → RabbitMQ
            └─ NO → Есть expertise для Kafka?
                ├─ YES → Kafka
                └─ NO → RabbitMQ

Заключение

Ключевые выводы

Kafka — это правильный выбор когда:

  • Приоритет на throughput и scalability
  • Нужно event sourcing или stream processing
  • Данные должны храниться долгосрочно
  • Team имеет expertise в distributed systems

RabbitMQ — это правильный выбор когда:

  • Приоритет на simplicity и operational ease
  • Нужна гибкая маршрутизация сообщений
  • Low latency критична
  • Traditional messaging patterns достаточно

Hybrid approach рассмотрите когда:

  • Разные parts системы имеют разные requirements
  • Migration от одной технологии к другой
  • Need best of both worlds

Главное правило

Не выбирайте технологию based on hype. Анализируйте your specific requirements:

  1. Объем данных и growth projections
  2. Latency requirements для вашего use case
  3. Team expertise и operational capabilities
  4. Cost constraints и budget limitations
  5. Integration requirements с существующими системами

Remember: правильная простая технология лучше неправильной сложной. Многие проблемы можно решить с RabbitMQ без необходимости в Kafka complexity.

Start simple, scale when needed — это мудрый подход к выбору message broker'а.

Kafka Streams

Основные концепции

Kafka Streams - это библиотека для создания приложений обработки потоковых данных в реальном времени. Работает поверх Apache Kafka и позволяет создавать микросервисы для обработки данных.

Ключевые особенности:

  • Stateless и Stateful операции - можно обрабатывать данные без сохранения состояния или с ним
  • Exactly-once семантика - гарантирует, что каждое сообщение обрабатывается ровно один раз
  • Fault tolerance - автоматическое восстановление после сбоев
  • Horizontal scaling - можно масштабировать добавлением новых инстансов

Архитектура и компоненты

Stream Processing Topology

Topology - это граф вычислений, где узлы представляют операции обработки, а рёбра - потоки данных между ними.

// Создание топологии
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");

KStream vs KTable vs GlobalKTable

KStream - поток записей (каждое сообщение - это событие):

KStream<String, String> stream = builder.stream("events-topic");
stream.filter((key, value) -> value.contains("error"))
      .to("error-topic");

KTable - таблица изменений (changelog stream). Представляет текущее состояние:

KTable<String, Long> table = builder.table("users-topic");
// Каждая запись перезаписывает предыдущую с тем же ключом

GlobalKTable - реплицируется на всех инстансах приложения. Используется для reference data:

GlobalKTable<String, String> globalTable = builder.globalTable("reference-data");

Операции обработки

Stateless операции (без состояния)

Filter - фильтрация записей:

stream.filter((key, value) -> value.length() > 5);

Map - преобразование записей:

stream.map((key, value) -> KeyValue.pair(key.toUpperCase(), value.toLowerCase()));

FlatMap - один элемент может породить несколько:

stream.flatMapValues(value -> Arrays.asList(value.split(" ")));

Branch - разделение потока по условиям:

KStream<String, String>[] branches = stream.branch(
    (key, value) -> value.startsWith("A"),
    (key, value) -> value.startsWith("B"),
    (key, value) -> true  // остальные
);

Stateful операции (с состоянием)

Aggregation - агрегирование данных:

KTable<String, Long> counts = stream
    .groupByKey()
    .aggregate(
        () -> 0L,  // initializer
        (key, value, aggregate) -> aggregate + 1,  // aggregator
        Materialized.as("counts-store")  // state store
    );

Join операции - соединение потоков:

// Stream-Stream Join (windowed)
KStream<String, String> joined = leftStream.join(
    rightStream,
    (leftValue, rightValue) -> leftValue + ":" + rightValue,
    JoinWindows.of(Duration.ofMinutes(5))
);

// Stream-Table Join
KStream<String, String> enriched = stream.join(
    table,
    (streamValue, tableValue) -> streamValue + ":" + tableValue
);

Windowing (Оконные операции)

Windows - механизм группировки записей по времени для агрегации.

Типы окон

Tumbling Windows - неперекрывающиеся окна фиксированного размера:

stream.groupByKey()
      .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
      .count();

Hopping Windows - перекрывающиеся окна:

// Окно 10 минут, сдвиг каждые 2 минуты
stream.groupByKey()
      .windowedBy(TimeWindows.of(Duration.ofMinutes(10))
                              .advanceBy(Duration.ofMinutes(2)))
      .count();

Session Windows - окна на основе периодов активности:

stream.groupByKey()
      .windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
      .count();

Время в Kafka Streams

Event Time - время, когда событие произошло (берётся из timestamp сообщения) Processing Time - время обработки сообщения приложением

// Извлечение времени из payload
stream.transformValues(() -> new ValueTransformer<String, String>() {
    @Override
    public String transform(String value) {
        // Логика извлечения времени
        return value;
    }
});

State Stores (Хранилища состояния)

State Store - локальное хранилище для сохранения состояния между обработкой сообщений.

Типы State Stores

Key-Value Store - простое хранилище ключ-значение:

StoreBuilder<KeyValueStore<String, Long>> storeBuilder = 
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"),
        Serdes.String(),
        Serdes.Long()
    );

Window Store - для хранения оконных данных:

StoreBuilder<WindowStore<String, Long>> windowStoreBuilder = 
    Stores.windowStoreBuilder(
        Stores.persistentWindowStore("window-store", 
                                   Duration.ofDays(1), 
                                   Duration.ofMinutes(5), 
                                   false),
        Serdes.String(),
        Serdes.Long()
    );

Session Store - для session windows:

StoreBuilder<SessionStore<String, Long>> sessionStoreBuilder = 
    Stores.sessionStoreBuilder(
        Stores.persistentSessionStore("session-store", Duration.ofDays(1)),
        Serdes.String(),
        Serdes.Long()
    );

Конфигурация и запуск

Основные настройки

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// Exactly-once processing
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
          StreamsConfig.EXACTLY_ONCE_V2);

// Commit interval
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);

// State directory
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

Запуск приложения

StreamsBuilder builder = new StreamsBuilder();
// Построение топологии...

KafkaStreams streams = new KafkaStreams(builder.build(), props);

// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

streams.start();

Обработка ошибок

Exception Handlers

Deserialization Handler - обработка ошибок десериализации:

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
          LogAndContinueExceptionHandler.class);

Production Exception Handler - обработка ошибок записи:

props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
          DefaultProductionExceptionHandler.class);

Uncaught Exception Handler - глобальная обработка:

streams.setUncaughtExceptionHandler((thread, exception) -> {
    logger.error("Uncaught exception in thread " + thread.getName(), exception);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});

Сериализация/Десериализация

Serdes (Serializer/Deserializer)

Встроенные Serdes:

Serdes.String()    // для String
Serdes.Long()      // для Long
Serdes.Integer()   // для Integer
Serdes.Bytes()     // для byte[]

Custom Serde для JSON:

public class JsonSerde<T> implements Serde<T> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Class<T> clazz;
    
    public JsonSerde(Class<T> clazz) {
        this.clazz = clazz;
    }
    
    @Override
    public Serializer<T> serializer() {
        return (topic, data) -> {
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (Exception e) {
                throw new SerializationException("Error serializing", e);
            }
        };
    }
    
    @Override
    public Deserializer<T> deserializer() {
        return (topic, data) -> {
            try {
                return objectMapper.readValue(data, clazz);
            } catch (Exception e) {
                throw new SerializationException("Error deserializing", e);
            }
        };
    }
}

Мониторинг и метрики

Ключевые метрики

Application-level:

  • commit-latency-avg - среднее время коммита
  • poll-latency-avg - среднее время poll операций
  • process-latency-avg - среднее время обработки

Thread-level:

  • process-ratio - отношение времени обработки к общему времени
  • commit-ratio - отношение времени коммита к общему времени

Task-level:

  • dropped-records-total - количество отброшенных записей
  • process-latency-max - максимальное время обработки

Получение метрик

streams.metrics().forEach((metricName, metric) -> {
    System.out.println(metricName.name() + ": " + metric.metricValue());
});

Практические паттерны

Event Sourcing

// Восстановление состояния из событий
KTable<String, Account> accounts = builder
    .stream("account-events")
    .groupByKey()
    .aggregate(
        Account::new,
        (key, event, account) -> account.apply(event),
        Materialized.as("accounts-store")
    );

CQRS (Command Query Responsibility Segregation)

// Разделение команд и запросов
KStream<String, Command> commands = builder.stream("commands");
KStream<String, Event> events = commands.mapValues(Command::execute);
events.to("events");

// Создание read-модели
KTable<String, ReadModel> readModel = builder
    .stream("events")
    .groupByKey()
    .aggregate(
        ReadModel::new,
        (key, event, model) -> model.update(event),
        Materialized.as("read-model-store")
    );

Важные концепции для собеседования

Rebalancing

Rebalancing - перераспределение партиций между инстансами приложения при добавлении/удалении инстансов.

Strategies:

  • RangeAssignor - назначает непрерывные диапазоны партиций
  • RoundRobinAssignor - круговое назначение партиций
  • StickyAssignor - минимизирует перемещения партиций

Fault Tolerance

Standby Replicas - резервные копии state stores:

props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

Changelog Topics - Kafka Streams автоматически создаёт changelog топики для восстановления состояния после сбоев.

Exactly-Once Semantics

Transactional Processing - использует Kafka транзакции для гарантии exactly-once:

props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
          StreamsConfig.EXACTLY_ONCE_V2);

Idempotent Producer - предотвращает дубликаты при retry:

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

Оптимизация производительности

Batch Processing

// Увеличение batch size
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 1024);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);

Parallel Processing

// Увеличение количества stream threads
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

Memory Management

// Настройка RocksDB для state stores
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, 
          CustomRocksDBConfigSetter.class);

Эта шпаргалка покрывает основные концепции Kafka Streams, необходимые для собеседования на позицию Senior Java Backend разработчика. Важно понимать не только API, но и архитектурные принципы, паттерны обработки данных и способы оптимизации производительности.

Kafka: Стратегии доставки сообщений

At-Least-Once (Минимум один раз)

Суть: Сообщение гарантированно доставляется, но может быть продублировано.

Принцип работы: Producer отправляет сообщение и ждет подтверждения (acknowledgment) от брокера. Если подтверждение не приходит в течение таймаута, сообщение отправляется повторно.

Конфигурация Producer:

props.put("acks", "all"); // Ждать подтверждения от всех реплик
props.put("retries", Integer.MAX_VALUE); // Максимальное количество повторов
props.put("enable.idempotence", false); // Отключаем идемпотентность
props.put("max.in.flight.requests.per.connection", 5);

Конфигурация Consumer:

props.put("enable.auto.commit", false); // Ручной коммит офсетов
// Коммитим офсет только после успешной обработки
consumer.commitSync();

Когда использовать: Когда потеря данных критична, а дубликаты можно обработать на уровне приложения.

Проблемы: Дубликаты сообщений при сетевых проблемах или таймаутах.


At-Most-Once (Максимум один раз)

Суть: Сообщение либо доставляется один раз, либо теряется. Дубликатов нет.

Принцип работы: Producer отправляет сообщение и не ждет подтверждения, либо не повторяет отправку при ошибках. Consumer коммитит офсет до обработки сообщения.

Конфигурация Producer:

props.put("acks", "0"); // Не ждать подтверждения
props.put("retries", 0); // Не повторять отправку
props.put("batch.size", 16384); // Отправка батчами для производительности

Конфигурация Consumer:

props.put("enable.auto.commit", true); // Автоматический коммит
props.put("auto.commit.interval.ms", 1000); // Интервал коммита
// Коммит происходит до обработки сообщения

Когда использовать: Когда важна производительность, а потеря отдельных сообщений допустима (метрики, логи).

Проблемы: Возможная потеря данных при сбоях.


Exactly-Once (Ровно один раз)

Суть: Каждое сообщение доставляется и обрабатывается ровно один раз.

Принцип работы: Комбинация идемпотентности Producer и транзакций. Используется уникальный Producer ID и sequence numbers для дедупликации.

Идемпотентный Producer

props.put("enable.idempotence", true); // Включаем идемпотентность
props.put("acks", "all"); // Обязательно для идемпотентности
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5); // ≤5 для идемпотентности

Транзакционный Producer

props.put("transactional.id", "my-transaction-id"); // Уникальный ID транзакции

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Инициализация транзакций

try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("topic", "key", "value"));
    producer.commitTransaction(); // Атомарный коммит
} catch (Exception e) {
    producer.abortTransaction(); // Откат транзакции
}

Consumer с изоляцией транзакций

props.put("isolation.level", "read_committed"); // Читать только закоммиченные сообщения
props.put("enable.auto.commit", false);

// Обработка в рамках транзакции
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    producer.beginTransaction();
    try {
        for (ConsumerRecord<String, String> record : records) {
            // Обработка сообщения
            processMessage(record);
            // Отправка результата в другой топик
            producer.send(new ProducerRecord<>("output-topic", record.value()));
        }
        
        // Коммит офсетов в рамках транзакции
        producer.sendOffsetsToTransaction(getOffsets(records), consumer.groupMetadata());
        producer.commitTransaction();
        
    } catch (Exception e) {
        producer.abortTransaction();
    }
}

Когда использовать: Критически важные системы, где недопустимы ни потери, ни дубликаты (финансовые операции, заказы).

Ограничения:

  • Работает только внутри одного кластера Kafka
  • Снижение производительности из-за overhead транзакций
  • Требует координации между Producer и Consumer

Ключевые параметры конфигурации

Producer

  • acks: "0" (at-most-once), "1" (лидер), "all" (все реплики)
  • retries: Количество попыток повторной отправки
  • enable.idempotence: Включение идемпотентности
  • transactional.id: ID для транзакций
  • max.in.flight.requests.per.connection: Количество неподтвержденных запросов

Consumer

  • enable.auto.commit: Автоматический/ручной коммит офсетов
  • isolation.level: "read_uncommitted" / "read_committed"
  • auto.commit.interval.ms: Интервал автокоммита

Брокер

  • min.insync.replicas: Минимальное количество синхронных реплик
  • unclean.leader.election.enable: Разрешить лидерство несинхронным репликам

Практические рекомендации

At-Least-Once: Реализуйте идемпотентность на уровне приложения (уникальные ключи, дедупликация по ID).

At-Most-Once: Используйте для некритичных данных с высокими требованиями к производительности.

Exactly-Once: Планируйте архитектуру с учетом ограничений и накладных расходов. Рассмотрите альтернативы на уровне приложения.

Мониторинг: Отслеживайте метрики producer-throttle-time, consumer-lag, failed-fetch-requests для выявления проблем с доставкой.

Kafka: Обеспечение идемпотентности

Что такое идемпотентность

Идемпотентность — свойство операции, при котором многократное выполнение дает тот же результат, что и однократное. В контексте Kafka это означает, что повторная отправка одного и того же сообщения не создаст дубликатов.

Проблема: При сетевых сбоях Producer может не получить acknowledgment от брокера и повторно отправить сообщение, создавая дубликаты.


Идемпотентный Producer (встроенный механизм)

Как работает: Kafka присваивает каждому Producer уникальный ID и каждому сообщению sequence number. Брокер отслеживает эти номера и отбрасывает дубликаты.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", true); // Включаем идемпотентность
props.put("acks", "all"); // Обязательно для идемпотентности
props.put("retries", Integer.MAX_VALUE); // Автоматические повторы
props.put("max.in.flight.requests.per.connection", 5); // ≤5 для порядка сообщений

Внутренний механизм:

  • Producer ID (PID): Уникальный идентификатор Producer, выдается брокером при инициализации
  • Sequence Number: Монотонно возрастающий номер для каждого сообщения в рамках топика-партиции
  • Дедупликация: Брокер ведет таблицу последних sequence numbers для каждого PID

Ограничения:

  • Работает только в рамках одной сессии Producer (при перезапуске PID меняется)
  • Гарантии только в пределах одного топика-партиции
  • Небольшое снижение производительности из-за overhead

Транзакционная идемпотентность

Расширение идемпотентного Producer: Использует стабильный Transaction ID вместо временного PID, что обеспечивает идемпотентность между перезапусками приложения.

props.put("enable.idempotence", true);
props.put("transactional.id", "unique-app-id-123"); // Стабильный ID

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Регистрация транзакционного ID

producer.beginTransaction();
try {
    producer.send(new ProducerRecord<>("topic", "key", "value"));
    producer.commitTransaction(); // Атомарное подтверждение
} catch (Exception e) {
    producer.abortTransaction(); // Откат при ошибке
}

Epoch механизм: При каждом перезапуске Producer с тем же transaction ID получает новый epoch. Брокер отклоняет сообщения от Producer с устаревшим epoch, предотвращая zombie producers.


Идемпотентность на уровне приложения

Уникальные ключи сообщений

Самый простой и надежный способ — использовать уникальные идентификаторы сообщений.

// Генерация уникального ключа
String messageId = UUID.randomUUID().toString();
ProducerRecord<String, String> record = new ProducerRecord<>("topic", messageId, payload);

// Consumer: проверка дубликатов
Set<String> processedIds = new HashSet<>(); // В реальности — база данных

if (!processedIds.contains(record.key())) {
    processMessage(record.value());
    processedIds.add(record.key());
}

Версионирование записей

Использование номеров версий для предотвращения конфликтов при обновлении данных.

// Структура сообщения с версией
public class VersionedMessage {
    private String id;
    private long version;
    private String data;
    private long timestamp;
}

// Обработка с проверкой версии
if (incomingMessage.getVersion() > currentVersion) {
    updateRecord(incomingMessage);
} else {
    // Игнорируем устаревшее сообщение
    log.info("Ignoring outdated message version");
}

Таблица дедупликации

Ведение отдельной таблицы для отслеживания обработанных сообщений.

@Entity
public class ProcessedMessage {
    @Id
    private String messageId;
    private LocalDateTime processedAt;
    private String topic;
    private int partition;
    private long offset;
}

// Проверка при обработке
@Transactional
public void processMessage(ConsumerRecord<String, String> record) {
    String messageId = extractMessageId(record.value());
    
    if (processedMessageRepository.existsById(messageId)) {
        log.info("Message {} already processed, skipping", messageId);
        return;
    }
    
    // Атомарная обработка + сохранение факта обработки
    businessLogic.process(record.value());
    processedMessageRepository.save(new ProcessedMessage(messageId, LocalDateTime.now()));
}

Идемпотентность Consumer

Ручное управление офсетами

Коммит офсета только после успешной обработки предотвращает повторную обработку при сбоях.

props.put("enable.auto.commit", false); // Отключаем автокоммит

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        try {
            processMessageIdempotent(record); // Идемпотентная обработка
            consumer.commitSync(); // Коммит после успешной обработки
        } catch (Exception e) {
            log.error("Processing failed for offset {}", record.offset());
            // Не коммитим офсет, сообщение будет обработано повторно
        }
    }
}

Exactly-Once Semantics (EOS)

Комбинация транзакционного Producer и Consumer для сквозной идемпотентности.

// Consumer с изоляцией транзакций
consumerProps.put("isolation.level", "read_committed");

// Producer-Consumer в одной транзакции
producer.beginTransaction();
try {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        String result = processMessage(record.value());
        producer.send(new ProducerRecord<>("output-topic", result));
    }
    
    // Коммит офсетов в рамках транзакции
    Map<TopicPartition, OffsetAndMetadata> offsets = buildOffsets(records);
    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
    
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

Практические стратегии

Комбинированный подход

Лучшая практика — сочетание встроенных механизмов Kafka с логикой приложения.

// 1. Включаем идемпотентность Producer
props.put("enable.idempotence", true);

// 2. Используем уникальные ключи
String businessKey = order.getId() + "-" + order.getVersion();
ProducerRecord<String, Order> record = new ProducerRecord<>("orders", businessKey, order);

// 3. Идемпотентная обработка в Consumer
@Transactional
public void handleOrder(Order order) {
    if (orderRepository.existsByIdAndVersion(order.getId(), order.getVersion())) {
        return; // Уже обработан
    }
    
    processOrder(order);
    markAsProcessed(order.getId(), order.getVersion());
}

Мониторинг идемпотентности

Метрики для отслеживания:

  • kafka.producer:type=producer-metrics,client-id=*,producer-duplicate-requests-total — количество дубликатов, отброшенных брокером
  • kafka.producer:type=producer-metrics,client-id=*,producer-outgoing-byte-rate — пропускная способность
  • Пользовательские метрики дедупликации на уровне приложения

Логирование:

// Логирование дубликатов для анализа
if (isDuplicate(messageId)) {
    duplicateCounter.increment();
    log.warn("Duplicate message detected: {} in topic: {}", messageId, topic);
}

Компромиссы и рекомендации

Производительность vs Надежность:

  • Идемпотентный Producer: ~5-10% снижение throughput
  • Транзакции: ~20-30% снижение производительности
  • Логика приложения: минимальное влияние при правильной реализации

Выбор стратегии:

  • Высокая нагрузка: Идемпотентность на уровне приложения + простой Producer
  • Критичные данные: Транзакционный Producer + EOS
  • Смешанная нагрузка: Идемпотентный Producer + дедупликация ключевых операций

На собеседовании важно понимать:

  • Разницу между механизмами Kafka и логикой приложения
  • Ограничения встроенной идемпотентности Kafka
  • Когда использовать транзакции, а когда достаточно простой дедупликации
  • Влияние на производительность и способы мониторинга

Kafka vs RabbitMQ: Реализация Dead Letter Queue

Что такое Dead Letter Queue (DLQ)

Dead Letter Queue — специальная очередь для сообщений, которые не удалось обработать после нескольких попыток. Это паттерн для обработки ошибок, позволяющий избежать бесконечных циклов повторной обработки и потери данных.

Основные сценарии попадания в DLQ:

  • Исключения при обработке сообщения
  • Превышение максимального количества попыток retry
  • Истечение TTL (Time To Live) сообщения
  • Валидационные ошибки данных

RabbitMQ: Встроенная поддержка DLQ

RabbitMQ имеет нативную поддержку Dead Letter Exchange (DLX) — более мощную концепцию, чем простая DLQ.

Dead Letter Exchange (DLX)

Dead Letter Exchange — специальный exchange, куда RabbitMQ автоматически перенаправляет "мертвые" сообщения.

// Настройка основной очереди с DLX
@Bean
public Queue mainQueue() {
    return QueueBuilder.durable("main.queue")
        .withArgument("x-dead-letter-exchange", "dlx.exchange") // DLX exchange
        .withArgument("x-dead-letter-routing-key", "failed") // routing key для DLQ
        .withArgument("x-message-ttl", 60000) // TTL сообщений (60 сек)
        .build();
}

@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange("dlx.exchange");
}

@Bean
public Queue deadLetterQueue() {
    return QueueBuilder.durable("dead.letter.queue").build();
}

@Bean
public Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue())
        .to(deadLetterExchange())
        .with("failed");
}

Обработка с reject/nack

Consumer должен явно отклонить сообщение для отправки в DLX.

@RabbitListener(queues = "main.queue")
public void processMessage(String message, Channel channel, 
                          @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
    try {
        // Бизнес-логика обработки
        businessLogic.process(message);
        channel.basicAck(deliveryTag, false); // Подтверждение успешной обработки
        
    } catch (BusinessException e) {
        // Отклоняем с requeue=false для отправки в DLX
        channel.basicNack(deliveryTag, false, false);
        log.error("Message rejected and sent to DLX: {}", message);
        
    } catch (RetryableException e) {
        // Отклоняем с requeue=true для повторной попытки
        channel.basicNack(deliveryTag, false, true);
    }
}

Retry с счетчиком попыток

Реализация ограниченного количества повторов через headers.

@RabbitListener(queues = "main.queue")
public void processWithRetry(String message, Channel channel,
                           @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
                           @Header(value = "x-retry-count", required = false) Integer retryCount) {
    
    int currentRetry = (retryCount != null) ? retryCount : 0;
    
    try {
        businessLogic.process(message);
        channel.basicAck(deliveryTag, false);
        
    } catch (Exception e) {
        if (currentRetry >= MAX_RETRIES) {
            // Превышено количество попыток — в DLX
            channel.basicNack(deliveryTag, false, false);
            log.error("Max retries exceeded, sending to DLX: {}", message);
        } else {
            // Повторная отправка с увеличенным счетчиком
            republishWithDelay(message, currentRetry + 1);
            channel.basicAck(deliveryTag, false);
        }
    }
}

private void republishWithDelay(String message, int retryCount) {
    rabbitTemplate.convertAndSend("retry.exchange", "retry.key", message, msg -> {
        msg.getMessageProperties().setHeader("x-retry-count", retryCount);
        msg.getMessageProperties().setExpiration(String.valueOf(retryCount * 1000)); // Экспоненциальная задержка
        return msg;
    });
}

Kafka: Реализация DLQ паттерна

Kafka не имеет встроенной поддержки DLQ, поэтому требуется реализация на уровне приложения.

Простая реализация с отдельным топиком

Создание отдельного топика для "мертвых" сообщений.

@Component
public class KafkaMessageProcessor {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    private static final String DLQ_TOPIC = "orders.dlq";
    private static final int MAX_RETRIES = 3;
    
    @KafkaListener(topics = "orders", groupId = "order-processor")
    public void processOrder(ConsumerRecord<String, String> record) {
        
        try {
            businessLogic.processOrder(record.value());
            
        } catch (Exception e) {
            handleFailedMessage(record, e);
        }
    }
    
    private void handleFailedMessage(ConsumerRecord<String, String> record, Exception error) {
        // Извлекаем счетчик попыток из headers
        Integer retryCount = extractRetryCount(record.headers());
        
        if (retryCount >= MAX_RETRIES) {
            // Отправляем в DLQ с метаданными об ошибке
            sendToDLQ(record, error, retryCount);
        } else {
            // Повторная отправка в тот же топик с увеличенным счетчиком
            retryMessage(record, retryCount + 1);
        }
    }
    
    private void sendToDLQ(ConsumerRecord<String, String> record, Exception error, int retryCount) {
        ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(DLQ_TOPIC, record.key(), record.value());
        
        // Добавляем метаданные об ошибке
        dlqRecord.headers().add("original.topic", record.topic().getBytes());
        dlqRecord.headers().add("original.partition", String.valueOf(record.partition()).getBytes());
        dlqRecord.headers().add("original.offset", String.valueOf(record.offset()).getBytes());
        dlqRecord.headers().add("error.message", error.getMessage().getBytes());
        dlqRecord.headers().add("retry.count", String.valueOf(retryCount).getBytes());
        dlqRecord.headers().add("failed.timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
        
        kafkaTemplate.send(dlqRecord);
        log.error("Message sent to DLQ after {} retries: {}", retryCount, record.value());
    }
}

Kafka Connect с Dead Letter Queue

Использование Kafka Connect для автоматической обработки ошибок.

{
  "name": "jdbc-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:postgresql://localhost:5432/orders",
    
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "orders.dlq",
    "errors.deadletterqueue.topic.replication.factor": "3",
    "errors.deadletterqueue.context.headers.enable": "true"
  }
}

Kafka Streams с обработкой ошибок

Реализация DLQ в Kafka Streams через branch операцию.

@Component
public class OrderProcessingStream {
    
    @Bean
    public KStream<String, String> processOrders(StreamsBuilder builder) {
        
        KStream<String, String> ordersStream = builder.stream("orders");
        
        // Разделяем поток на успешные и неуспешные сообщения
        KStream<String, String>[] branches = ordersStream.branch(
            (key, value) -> {
                try {
                    validateOrder(value);
                    return true; // Валидные сообщения
                } catch (Exception e) {
                    return false; // Невалидные — в DLQ
                }
            }
        );
        
        // Обработка валидных сообщений
        branches[0]
            .mapValues(this::processValidOrder)
            .to("processed.orders");
            
        // Отправка невалидных в DLQ с метаданными
        branches[1]
            .map((key, value) -> {
                // Создаем запись с метаданными об ошибке
                DLQRecord dlqRecord = new DLQRecord(value, System.currentTimeMillis(), "Validation failed");
                return KeyValue.pair(key, dlqRecord.toJson());
            })
            .to("orders.dlq");
            
        return ordersStream;
    }
}

Сравнение подходов

RabbitMQ преимущества

  • Встроенная поддержка: Нативный DLX механизм без дополнительной разработки
  • Автоматическое перенаправление: Сообщения автоматически попадают в DLX при reject/nack
  • Гибкая маршрутизация: DLX может использовать любой тип exchange (direct, topic, fanout)
  • TTL поддержка: Автоматическое перемещение в DLX по истечении времени жизни

Kafka преимущества

  • Персистентность: DLQ сообщения хранятся как обычные топики с настраиваемым retention
  • Порядок сообщений: Сохраняется порядок в рамках партиции
  • Масштабируемость: Лучше подходит для высоконагруженных систем
  • Replay возможности: Можно повторно обработать сообщения из DLQ

Таблица сравнения

Аспект RabbitMQ Kafka
Встроенная поддержка ✅ Dead Letter Exchange ❌ Требует реализации
Простота настройки ✅ Конфигурация через аргументы очереди ❌ Разработка логики приложения
Автоматическое TTL ✅ x-message-ttl ❌ Ручная реализация
Retry механизмы ✅ Встроенные + плагины ❌ Ручная реализация
Персистентность DLQ ⚠️ Зависит от настроек durability ✅ Стандартная персистентность топиков
Производительность ⚠️ Ограничена архитектурой ✅ Высокая пропускная способность
Replay из DLQ ❌ Сложно реализовать ✅ Стандартные возможности Consumer

Практические рекомендации

Мониторинг DLQ

Обязательный мониторинг для production систем.

// RabbitMQ метрики
@Component
public class DLQMonitoring {
    
    @Autowired
    private RabbitAdmin rabbitAdmin;
    
    @Scheduled(fixedRate = 60000) // Каждую минуту
    public void monitorDLQ() {
        Properties queueInfo = rabbitAdmin.getQueueProperties("dead.letter.queue");
        Integer messageCount = (Integer) queueInfo.get("QUEUE_MESSAGE_COUNT");
        
        if (messageCount > THRESHOLD) {
            alertingService.sendAlert("DLQ overflow: " + messageCount + " messages");
        }
    }
}

// Kafka метрики через JMX или custom metrics
@Component
public class KafkaDLQMonitoring {
    
    @EventListener
    public void handleDLQMessage(DLQMessageEvent event) {
        Metrics.counter("dlq.messages.total", 
            "topic", event.getOriginalTopic(),
            "error.type", event.getErrorType()
        ).increment();
    }
}

Стратегии обработки DLQ

Периодическая повторная обработка: Scheduled job для retry сообщений из DLQ после исправления проблем.

Manual intervention: Dashboard для ручного просмотра и повторной отправки критичных сообщений.

Dead letter analysis: Анализ типов ошибок для улучшения обработки и предотвращения будущих проблем.

Выбор решения

  • RabbitMQ: Когда нужна простота настройки и встроенные механизмы обработки ошибок
  • Kafka: Когда важна производительность, персистентность и возможность replay сообщений

На собеседовании важно знать:

  • Различия в реализации DLQ между брокерами
  • Стратегии retry и escalation
  • Мониторинг и алертинг по DLQ
  • Выбор подходящего решения в зависимости от требований

Apache ActiveMQ Artemis

Обзор и архитектура

Apache ActiveMQ Artemis — высокопроизводительный message broker нового поколения, созданный для замены классического ActiveMQ. Основан на проекте HornetQ и написан с нуля для обеспечения максимальной производительности и надежности.

Ключевые особенности

  • Высокая производительность: Обработка сотен тысяч сообщений в секунду
  • Persistent storage: Журналирование с использованием собственного формата файлов
  • Clustering: Встроенная поддержка кластеризации для высокой доступности
  • Protocol support: JMS 2.0, AMQP, STOMP, MQTT, OpenWire
  • Zero copy: Минимизация копирования данных в памяти

Архитектура компонентов

Broker — основной сервер, который принимает, маршрутизирует и доставляет сообщения.

Journal — файловая система для персистентного хранения сообщений. Использует append-only файлы для максимальной производительности записи.

Paging — механизм выгрузки сообщений на диск при превышении лимитов памяти.

Address Model — новая модель адресации, объединяющая queue и topic в единую концепцию.

Установка и конфигурация

Docker запуск

docker run -it --rm \
  -p 8161:8161 \
  -p 61616:61616 \
  apache/activemq-artemis:latest

Базовая конфигурация broker.xml

<configuration>
   <core>
      <name>artemis-broker</name>
      
      <!-- Настройки журнала -->
      <journal-type>ASYNCIO</journal-type>
      <journal-file-size>10M</journal-file-size>
      <journal-min-files>2</journal-min-files>
      
      <!-- Лимиты памяти -->
      <global-max-size>1GB</global-max-size>
      <max-disk-usage>90</max-disk-usage>
      
      <!-- Сетевые настройки -->
      <acceptors>
         <acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
      </acceptors>
      
      <!-- Адреса и очереди -->
      <addresses>
         <address name="orders">
            <anycast>
               <queue name="order-processing"/>
            </anycast>
         </address>
      </addresses>
   </core>
</configuration>

Модель адресации

Address vs Queue vs Topic

Address — логическое имя места назначения сообщений. Может содержать несколько очередей.

Anycast — point-to-point семантика (эквивалент JMS Queue). Сообщение доставляется только одному consumer'у.

Multicast — publish-subscribe семантика (эквивалент JMS Topic). Сообщение доставляется всем подписчикам.

Конфигурация адресов

<addresses>
   <!-- Point-to-point -->
   <address name="orders.processing">
      <anycast>
         <queue name="high-priority"/>
         <queue name="low-priority"/>
      </anycast>
   </address>
   
   <!-- Pub-sub -->
   <address name="notifications">
      <multicast/>
   </address>
   
   <!-- Смешанный режим -->
   <address name="mixed.address">
      <anycast>
         <queue name="queue1"/>
      </anycast>
      <multicast/>
   </address>
</addresses>

Программирование с JMS 2.0

Настройка Connection Factory

@Configuration
public class ArtemisConfig {
    
    @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(
            "tcp://localhost:61616"
        );
    }
    
    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory factory) {
        JmsTemplate template = new JmsTemplate(factory);
        template.setDeliveryPersistent(true);
        template.setTimeToLive(300000); // 5 минут
        return template;
    }
}

Producer для отправки сообщений

@Service
public class OrderMessageProducer {
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    // Отправка в очередь (anycast)
    public void sendOrderToQueue(Order order) {
        jmsTemplate.convertAndSend("order-processing", order, message -> {
            message.setJMSPriority(9); // Высокий приоритет
            message.setStringProperty("orderType", order.getType());
            return message;
        });
    }
    
    // Отправка в топик (multicast) 
    public void publishOrderEvent(OrderEvent event) {
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.convertAndSend("order.events", event);
    }
}

Consumer для обработки сообщений

@Component
public class OrderMessageConsumer {
    
    // Listener для очереди
    @JmsListener(destination = "order-processing")
    public void processOrder(Order order, Message message) {
        try {
            String orderType = message.getStringProperty("orderType");
            processOrderByType(order, orderType);
            
        } catch (Exception e) {
            // Сообщение автоматически пойдет в DLQ
            throw new RuntimeException("Processing failed", e);
        }
    }
    
    // Listener с селектором
    @JmsListener(destination = "order-processing", 
                 selector = "orderType = 'URGENT'")
    public void processUrgentOrder(Order order) {
        // Обработка только срочных заказов
    }
}

Управление транзакциями

Локальные транзакции JMS

@Service
@Transactional
public class TransactionalOrderService {
    
    @JmsListener(destination = "orders")
    @Transactional(rollbackFor = Exception.class)
    public void processOrderTransactionally(Order order) {
        // 1. Сохранение в БД
        orderRepository.save(order);
        
        // 2. Отправка уведомления
        jmsTemplate.convertAndSend("notifications", 
            new OrderProcessedEvent(order.getId()));
        
        // При исключении - откат всей транзакции
        if (order.getAmount() < 0) {
            throw new IllegalArgumentException("Invalid amount");
        }
    }
}

XA транзакции (двухфазный коммит)

@Configuration
public class XAConfig {
    
    @Bean
    public XAConnectionFactory xaConnectionFactory() {
        ActiveMQXAConnectionFactory factory = 
            new ActiveMQXAConnectionFactory("tcp://localhost:61616");
        return factory;
    }
    
    @Bean
    public JtaTransactionManager transactionManager() {
        return new JtaTransactionManager();
    }
}

Надежность и обработка ошибок

Dead Letter Queue (DLQ)

<address-settings>
   <address-setting match="orders.#">
      <!-- Максимум попыток доставки -->
      <max-delivery-attempts>3</max-delivery-attempts>
      
      <!-- Задержка между попытками -->
      <redelivery-delay>5000</redelivery-delay>
      <redelivery-delay-multiplier>2</redelivery-delay-multiplier>
      <max-redelivery-delay>30000</max-redelivery-delay>
      
      <!-- DLQ настройки -->
      <dead-letter-address>DLQ</dead-letter-address>
      <auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
   </address-setting>
</address-settings>

Expiry Queue для истекших сообщений

<address-setting match="orders.#">
   <expiry-address>ExpiryQueue</expiry-address>
   <auto-create-expiry-resources>true</auto-create-expiry-resources>
</address-setting>

Обработчик DLQ сообщений

@Component
public class DLQHandler {
    
    @JmsListener(destination = "DLQ")
    public void handleFailedMessages(Message message) {
        try {
            String originalDest = message.getStringProperty("_AMQ_ORIG_ADDRESS");
            int deliveryCount = message.getIntProperty("_AMQ_DELIVERY_COUNT");
            
            logger.error("Message failed {} times from {}", 
                deliveryCount, originalDest);
                
            // Логирование, алертинг, ручная обработка
            alertService.sendAlert("DLQ message received", message);
            
        } catch (JMSException e) {
            logger.error("Error processing DLQ message", e);
        }
    }
}

Кластеризация и высокая доступность

Cluster Configuration

<cluster-connections>
   <cluster-connection name="artemis-cluster">
      <connector-ref>artemis</connector-ref>
      <retry-interval>500</retry-interval>
      <use-duplicate-detection>true</use-duplicate-detection>
      <message-load-balancing>STRICT</message-load-balancing>
      <max-hops>1</max-hops>
      <discovery-group-ref>dg-group1</discovery-group-ref>
   </cluster-connection>
</cluster-connections>

<discovery-groups>
   <discovery-group name="dg-group1">
      <jgroups-file>jgroups-ping.xml</jgroups-file>
      <jgroups-channel>artemis_broadcast</jgroups-channel>
      <refresh-timeout>10000</refresh-timeout>
   </discovery-group>
</discovery-groups>

High Availability (HA) пара

<!-- Master broker -->
<ha-policy>
   <replication>
      <master>
         <check-for-live-server>true</check-for-live-server>
      </master>
   </replication>
</ha-policy>

<!-- Slave broker -->
<ha-policy>
   <replication>
      <slave>
         <allow-failback>true</allow-failback>
      </slave>
   </replication>
</ha-policy>

Мониторинг и метрики

JMX мониторинг

@Component
public class ArtemisMonitor {
    
    @Autowired
    private MBeanServer mBeanServer;
    
    public QueueMetrics getQueueMetrics(String queueName) {
        try {
            ObjectName objectName = new ObjectName(
                "org.apache.activemq.artemis:broker=\"artemis\",component=addresses,address=\"" 
                + queueName + "\",subcomponent=queues,routing-type=\"anycast\",queue=\"" 
                + queueName + "\""
            );
            
            Long messageCount = (Long) mBeanServer.getAttribute(objectName, "MessageCount");
            Long consumerCount = (Long) mBeanServer.getAttribute(objectName, "ConsumerCount");
            
            return new QueueMetrics(messageCount, consumerCount);
            
        } catch (Exception e) {
            throw new RuntimeException("Failed to get metrics", e);
        }
    }
}

Настройка логирования

<!-- logging.properties -->
logger.org.apache.activemq.artemis.level=INFO
logger.org.apache.activemq.artemis.core.server.level=DEBUG

# Аудит сообщений
logger.org.apache.activemq.audit.level=INFO
logger.org.apache.activemq.audit.handlers=AUDIT_FILE

Производительность и оптимизация

Настройки производительности

<configuration>
   <core>
      <!-- Batch настройки -->
      <journal-buffer-size>490KiB</journal-buffer-size>
      <journal-max-io>4096</journal-max-io>
      <journal-sync-transactional>false</journal-sync-transactional>
      
      <!-- Memory optimization -->
      <page-size-bytes>10M</page-size-bytes>
      <page-max-cache-size>5</page-max-cache-size>
      
      <!-- Network tuning -->
      <netty-threads>8</netty-threads>
      <scheduled-thread-pool-max-size>5</scheduled-thread-pool-max-size>
   </core>
</configuration>

Address настройки для производительности

<address-settings>
   <address-setting match="high-throughput.#">
      <!-- Увеличение размера страниц -->
      <page-size-bytes>10M</page-size-bytes>
      <max-size-bytes>100M</max-size-bytes>
      
      <!-- Отключение персистентности для временных данных -->
      <default-non-destructive>false</default-non-destructive>
      
      <!-- Batch обработка -->
      <redistribution-delay>0</redistribution-delay>
   </address-setting>
</address-settings>

Безопасность

Базовая аутентификация

<security-settings>
   <security-setting match="#">
      <permission type="createNonDurableQueue" roles="user"/>
      <permission type="deleteNonDurableQueue" roles="user"/>
      <permission type="createDurableQueue" roles="admin"/>
      <permission type="deleteDurableQueue" roles="admin"/>
      <permission type="createAddress" roles="admin"/>
      <permission type="deleteAddress" roles="admin"/>
      <permission type="consume" roles="user"/>
      <permission type="browse" roles="user"/>
      <permission type="send" roles="user"/>
      <permission type="manage" roles="admin"/>
   </security-setting>
</security-settings>

LDAP интеграция

<security-manager>
   <ldap-security-setting>
      <initial-context-factory>com.sun.jndi.ldap.LdapCtxFactory</initial-context-factory>
      <connection-url>ldap://localhost:389</connection-url>
      <connection-username>uid=admin,ou=system</connection-username>
      <connection-password>secret</connection-password>
      <user-base>ou=people,dc=example,dc=com</user-base>
      <user-search-filter>uid={0}</user-search-filter>
      <role-base>ou=groups,dc=example,dc=com</role-base>
      <role-search-filter>member={0}</role-search-filter>
   </ldap-security-setting>
</security-manager>

Полезные команды CLI

Управление через Artemis CLI

# Создание брокера
artemis create mybroker --user admin --password admin

# Запуск/остановка
artemis run --home mybroker
artemis stop --home mybroker

# Управление очередями
artemis queue create --name orders --anycast --home mybroker
artemis queue stat --name orders --home mybroker

# Отправка тестового сообщения
artemis producer --message-count 100 --url tcp://localhost:61616 --destination orders

# Мониторинг
artemis browser --destination orders --home mybroker

Интеграция с Spring Boot

Spring Boot Starter

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-artemis</artifactId>
</dependency>

Application.yml настройки

spring:
  artemis:
    mode: native
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
    pool:
      enabled: true
      max-connections: 10
    embedded:
      enabled: false
      
  jms:
    template:
      default-destination: orders
      delivery-mode: persistent
      time-to-live: 300000

Health Check и метрики

@Component
public class ArtemisHealthIndicator implements HealthIndicator {
    
    @Autowired
    private ConnectionFactory connectionFactory;
    
    @Override
    public Health health() {
        try (Connection connection = connectionFactory.createConnection()) {
            connection.start();
            return Health.up()
                .withDetail("broker", "artemis")
                .withDetail("status", "connected")
                .build();
        } catch (Exception e) {
            return Health.down()
                .withDetail("error", e.getMessage())
                .build();
        }
    }
}

Ключевые термины для собеседования

Journal — система персистентного хранения, использующая append-only файлы для высокой производительности записи.

Paging — механизм выгрузки сообщений на диск при превышении memory limits.

Address Model — новая модель адресации, где Address может содержать как anycast (queue), так и multicast (topic) семантику.

Message Groups — группировка сообщений для обеспечения порядка обработки в рамках группы.

Flow Control — механизм контроля потока для предотвращения переполнения producer'ов.

Clustering — распределение нагрузки между несколькими broker'ами с автоматическим failover.

Diverts — перенаправление копий сообщений в другие адреса для аудита или репликации.

Эта шпаргалка покрывает основные аспекты работы с Apache ActiveMQ Artemis, необходимые для Senior Java Backend разработчика на собеседовании и в повседневной работе.