Реактивное программирование

Основные концепции

Синхронный vs Асинхронный

Синхронный - вызывающий поток ждет завершения операции перед продолжением Асинхронный - операция выполняется в фоне, результат приходит через callback/Future/Promise

// Синхронный подход
String result = httpClient.get("/api/data"); // Блокирует поток
processResult(result);

// Асинхронный подход
CompletableFuture<String> future = httpClient.getAsync("/api/data");
future.thenAccept(this::processResult); // Поток не блокируется

Блокирующий vs Неблокирующий

Блокирующий - операция останавливает выполнение потока до получения результата Неблокирующий - операция сразу возвращает управление, результат обрабатывается позже

// Блокирующий I/O
InputStream stream = socket.getInputStream();
int data = stream.read(); // Поток заморожен до получения данных

// Неблокирующий I/O (NIO)
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_READ);
selector.select(); // Возвращает готовые каналы

Push vs Pull модели

Pull модель - потребитель запрашивает данные когда готов их обработать Push модель - производитель отправляет данные потребителю когда они готовы

// Pull модель (Iterator)
Iterator<String> iterator = collection.iterator();
while (iterator.hasNext()) {
    String item = iterator.next(); // Явно запрашиваем следующий элемент
}

// Push модель (Observer/Reactive Streams)
publisher.subscribe(new Subscriber<String>() {
    public void onNext(String item) {
        // Данные приходят когда готовы
    }
});

Backpressure (Противодавление)

Что это: Механизм контроля потока данных когда производитель генерирует данные быстрее, чем потребитель может их обработать.

Зачем нужен: Предотвращает переполнение памяти, OutOfMemoryError, деградацию производительности.

Стратегии обработки:

  • Buffer - накапливаем данные в буфере
  • Drop - отбрасываем новые элементы
  • Latest - оставляем только последний элемент
  • Error - генерируем исключение
// Reactive Streams с backpressure
Flowable.range(1, 1000000)
    .onBackpressureBuffer(100) // Буфер на 100 элементов
    .observeOn(Schedulers.computation())
    .subscribe(item -> {
        Thread.sleep(100); // Медленная обработка
        System.out.println(item);
    });

Reactive Streams API

Основные интерфейсы

// Производитель данных
public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

// Потребитель данных
public interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}

// Управление подпиской
public interface Subscription {
    void request(long n); // Запрос n элементов
    void cancel();        // Отмена подписки
}

Протокол взаимодействия

  1. Подписка: publisher.subscribe(subscriber)
  2. Подтверждение: subscriber.onSubscribe(subscription)
  3. Запрос данных: subscription.request(n)
  4. Получение данных: subscriber.onNext(item)
  5. Завершение: subscriber.onComplete() или subscriber.onError()

RxJava

Основные типы

Observable - Push-based, без backpressure, для UI событий Flowable - Push-based, с backpressure, для потоков данных Single - Один элемент или ошибка Maybe - Ноль или один элемент Completable - Только завершение или ошибка

// Observable - горячий поток
Observable<Long> timer = Observable.interval(1, TimeUnit.SECONDS);

// Flowable - холодный поток с backpressure
Flowable<String> data = Flowable.fromCallable(() -> 
    httpClient.get("/api/data")
);

// Single - одно значение
Single<User> user = Single.fromCallable(() -> 
    userService.findById(123)
);

Операторы трансформации

// map - преобразование каждого элемента
source.map(String::toUpperCase)

// flatMap - преобразование в поток + слияние
source.flatMap(id -> userService.findById(id))

// filter - фильтрация элементов
source.filter(item -> item.length() > 5)

// take - взять первые N элементов
source.take(10)

// skip - пропустить первые N элементов
source.skip(5)

Операторы комбинирования

// zip - комбинирование элементов по индексу
Observable.zip(source1, source2, (a, b) -> a + b)

// merge - объединение потоков в один
Observable.merge(source1, source2)

// concat - последовательное объединение
Observable.concat(source1, source2)

Project Reactor

Основные типы

Mono - 0 или 1 элемент Flux - 0 до N элементов

// Mono - асинхронный результат
Mono<String> result = webClient
    .get()
    .uri("/api/data")
    .retrieve()
    .bodyToMono(String.class);

// Flux - поток данных
Flux<User> users = userRepository
    .findAll()
    .filter(user -> user.isActive());

Обработка ошибок

// onErrorReturn - возврат значения по умолчанию
mono.onErrorReturn("default value")

// onErrorResume - переключение на другой поток
mono.onErrorResume(error -> Mono.just("fallback"))

// retry - повторная попытка
mono.retry(3)

// timeout - таймаут операции
mono.timeout(Duration.ofSeconds(5))

Schedulers (Планировщики)

Зачем нужны: Управление потоками выполнения в реактивных приложениях

// Schedulers.io() - для блокирующих I/O операций
Mono.fromCallable(() -> databaseCall())
    .subscribeOn(Schedulers.boundedElastic())

// Schedulers.computation() - для CPU-intensive задач
Flux.range(1, 1000)
    .map(this::heavyComputation)
    .subscribeOn(Schedulers.parallel())

// Schedulers.single() - один поток для всех задач
observable.observeOn(Schedulers.single())

Практические применения

Микросервисы

// Неблокирующие HTTP вызовы
@RestController
public class UserController {
    
    @GetMapping("/users/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        return userService.findById(id)
            .timeout(Duration.ofSeconds(5))
            .onErrorReturn(User.empty());
    }
}

Обработка данных

// Потоковая обработка больших объемов данных
Flux.fromIterable(largeDataSet)
    .buffer(100)                    // Группировка по 100 элементов
    .flatMap(batch -> 
        processBatch(batch)
            .subscribeOn(Schedulers.boundedElastic())
    )
    .subscribe();

Event-driven архитектура

// Обработка событий из очереди
@EventListener
public void handleEvent(OrderCreatedEvent event) {
    return eventProcessor
        .process(event)
        .flatMap(this::sendNotification)
        .flatMap(this::updateInventory)
        .subscribe();
}

Ключевые принципы

"React to events over time"

Система реагирует на события по мере их поступления, а не ждет завершения всех операций

"Don't block – subscribe"

Вместо блокировки потоков используется подписка на события с callback-обработчиками

Композиция операций

Сложная логика строится из простых операторов через цепочки вызовов

Декларативность

Описывается ЧТО нужно сделать, а не КАК это сделать пошагово

Антипаттерны

// ❌ Блокировка в реактивном коде
mono.block(); // Убивает асинхронность

// ❌ Подписка внутри оператора
source.map(item -> {
    otherSource.subscribe(result -> {
        // Создает memory leak
    });
    return item;
});

// ✅ Правильное комбинирование
source.flatMap(item -> 
    otherSource.map(result -> combine(item, result))
);

Отладка и мониторинг

// Логирование в реактивных цепочках
flux.doOnNext(item -> log.info("Processing: {}", item))
    .doOnError(error -> log.error("Error occurred", error))
    .doOnComplete(() -> log.info("Stream completed"))
    .subscribe();

// Метрики производительности
flux.name("user-processing")
    .tag("service", "user-service")
    .metrics()
    .subscribe();

Reactive Streams спецификация

Что такое Reactive Streams

Reactive Streams - это стандарт для обработки асинхронных потоков данных с неблокирующим backpressure. Цель - обеспечить интероперабельность между различными реактивными библиотеками (RxJava, Project Reactor, Akka Streams).

Ключевая проблема: Как обрабатывать потоки данных, когда производитель генерирует данные быстрее, чем потребитель может их обработать, без блокировки потоков и переполнения памяти.

Основные интерфейсы

Publisher - Производитель данных

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

Назначение: Источник данных, который может производить потенциально неограниченное количество элементов.

Обязанности:

  • Принимает подписчиков через subscribe()
  • Может иметь множество подписчиков
  • Управляет жизненным циклом подписок

Subscriber - Потребитель данных

public interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}

Назначение: Получатель данных, который обрабатывает элементы по мере их поступления.

Методы:

  • onSubscribe() - получение объекта подписки для управления потоком
  • onNext() - обработка каждого элемента данных
  • onError() - обработка ошибок (терминальное событие)
  • onComplete() - сигнал о завершении потока (терминальное событие)

Subscription - Управление подпиской

public interface Subscription {
    void request(long n);
    void cancel();
}

Назначение: Связь между Publisher и Subscriber, позволяющая управлять потоком данных.

Методы:

  • request(n) - запрос следующих n элементов (реализация backpressure)
  • cancel() - отмена подписки и освобождение ресурсов

Processor<T,R> - Трансформатор данных

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    // Наследует методы от обоих интерфейсов
}

Назначение: Компонент, который одновременно является подписчиком и издателем. Получает данные типа T, трансформирует их и публикует как тип R.

Применение: Создание цепочек обработки данных, где один процессор может быть подключен к другому.

Контракт взаимодействия

Последовательность вызовов

// 1. Подписка
publisher.subscribe(subscriber);

// 2. Подтверждение подписки
subscriber.onSubscribe(subscription);

// 3. Запрос данных (backpressure)
subscription.request(10);

// 4. Получение данных
subscriber.onNext(item1);
subscriber.onNext(item2);
// ... до 10 элементов

// 5. Завершение (одно из двух)
subscriber.onComplete();  // ИЛИ
subscriber.onError(throwable);

Правила протокола

Правило 1: После onSubscribe() вызывается только один из терминальных методов (onComplete() или onError())

Правило 2: Количество вызовов onNext() не должно превышать запрошенное через request(n)

Правило 3: onNext(), onError(), onComplete() должны быть вызваны последовательно (не параллельно)

Правило 4: Если cancel() вызван, Publisher должен прекратить отправку данных

Управление Backpressure

Кто управляет

Subscriber управляет backpressure через вызов subscription.request(n). Это принцип pull-based backpressure.

class MySubscriber implements Subscriber<String> {
    private Subscription subscription;
    
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // Запрашиваем первый элемент
    }
    
    @Override
    public void onNext(String item) {
        processItem(item);
        subscription.request(1); // Запрашиваем следующий
    }
}

Зачем это важно

Предотвращение переполнения: Subscriber может контролировать скорость получения данных в соответствии со своей производительностью.

Управление ресурсами: Избегание OutOfMemoryError при обработке больших потоков данных.

Адаптивность: Возможность динамически изменять скорость обработки в зависимости от нагрузки.

Стратегии Backpressure

Unbounded Request

// Запрос всех доступных элементов
subscription.request(Long.MAX_VALUE);

Когда использовать: Когда уверены, что можем обработать все данные без переполнения памяти.

Bounded Request

// Запрос фиксированного количества
subscription.request(100);

Когда использовать: Для контроля размера буфера и предсказуемого потребления памяти.

Dynamic Request

class AdaptiveSubscriber implements Subscriber<String> {
    private int bufferSize = 10;
    private int processed = 0;
    
    @Override
    public void onNext(String item) {
        processItem(item);
        processed++;
        
        // Адаптивный запрос в зависимости от нагрузки
        if (processed >= bufferSize) {
            subscription.request(bufferSize);
            processed = 0;
        }
    }
}

Практические примеры

Простой Publisher

class NumberPublisher implements Publisher<Integer> {
    private final int count;
    
    public NumberPublisher(int count) {
        this.count = count;
    }
    
    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new NumberSubscription(subscriber, count));
    }
}

class NumberSubscription implements Subscription {
    private final Subscriber<? super Integer> subscriber;
    private int current = 0;
    private final int max;
    private boolean cancelled = false;
    
    @Override
    public void request(long n) {
        if (cancelled) return;
        
        for (int i = 0; i < n && current < max; i++) {
            subscriber.onNext(current++);
        }
        
        if (current >= max) {
            subscriber.onComplete();
        }
    }
    
    @Override
    public void cancel() {
        cancelled = true;
    }
}

Обработка ошибок

class ErrorHandlingSubscriber implements Subscriber<String> {
    @Override
    public void onError(Throwable throwable) {
        // Логирование ошибки
        logger.error("Stream error occurred", throwable);
        
        // Очистка ресурсов
        cleanup();
        
        // Уведомление других компонентов
        notifyErrorHandlers(throwable);
    }
    
    @Override
    public void onComplete() {
        // Финализация обработки
        logger.info("Stream completed successfully");
        cleanup();
    }
}

Интеграция с существующими библиотеками

Project Reactor

// Создание Flux из Publisher
Flux<String> flux = Flux.from(customPublisher);

// Создание Publisher из Flux
Publisher<String> publisher = flux.share();

RxJava

// Создание Flowable из Publisher
Flowable<String> flowable = Flowable.fromPublisher(customPublisher);

// Создание Publisher из Flowable
Publisher<String> publisher = flowable.toFlowable(BackpressureStrategy.BUFFER);

Важные особенности реализации

Thread Safety

Требование: Все методы должны быть thread-safe, но вызовы onNext(), onError(), onComplete() должны быть последовательными.

Обработка Cancel

class CancellableSubscription implements Subscription {
    private volatile boolean cancelled = false;
    
    @Override
    public void request(long n) {
        if (cancelled) {
            return; // Не отправляем данные после отмены
        }
        // ... логика отправки
    }
    
    @Override
    public void cancel() {
        cancelled = true;
        // Освобождение ресурсов
        cleanup();
    }
}

Обработка некорректных запросов

@Override
public void request(long n) {
    if (n <= 0) {
        subscriber.onError(new IllegalArgumentException(
            "Request must be positive, but was: " + n
        ));
        return;
    }
    // ... нормальная обработка
}

Преимущества спецификации

Стандартизация: Единый интерфейс для всех реактивных библиотек в JVM экосистеме.

Интероперабельность: Компоненты разных библиотек могут работать вместе.

Backpressure из коробки: Встроенная поддержка управления потоком данных.

Асинхронность: Неблокирующая обработка потоков данных.

Частые ошибки

Нарушение контракта

// ❌ Вызов onNext после onComplete
subscriber.onComplete();
subscriber.onNext(item); // Ошибка!

// ❌ Превышение запрошенного количества
subscription.request(5);
// Отправляем 6 элементов - нарушение контракта

Неправильная обработка ошибок

// ❌ Продолжение после ошибки
subscriber.onError(new RuntimeException());
subscriber.onNext(item); // Ошибка!

// ✅ Правильная обработка
try {
    processItem(item);
    subscriber.onNext(item);
} catch (Exception e) {
    subscriber.onError(e);
    return; // Прекращаем обработку
}

Блокировка в обработчиках

// ❌ Блокирующие операции в onNext
@Override
public void onNext(String item) {
    Thread.sleep(1000); // Блокирует поток!
    processItem(item);
}

// ✅ Асинхронная обработка
@Override
public void onNext(String item) {
    CompletableFuture.supplyAsync(() -> processItem(item));
}

Project Reactor (Spring)

Что такое Project Reactor

Project Reactor - это реактивная библиотека для JVM, созданная компанией Pivotal (теперь VMware). Основа для Spring WebFlux и других реактивных компонентов Spring Framework. Полностью совместима со спецификацией Reactive Streams.

Ключевые особенности:

  • Неблокирующая обработка данных
  • Встроенная поддержка backpressure
  • Богатый набор операторов
  • Интеграция с Spring экосистемой

Основные типы данных

Mono - 0 или 1 элемент

Назначение: Представляет асинхронный результат, который может содержать максимум один элемент или ошибку.

Аналогия: Как Optional<T> но для асинхронных операций, похож на CompletableFuture<T>.

// Создание Mono
Mono<String> mono = Mono.just("Hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("Error"));

// Из Callable (ленивое выполнение)
Mono<String> fromCallable = Mono.fromCallable(() -> {
    return expensiveOperation(); // Выполнится только при подписке
});

// Из CompletableFuture
Mono<String> fromFuture = Mono.fromFuture(
    CompletableFuture.supplyAsync(() -> "Result")
);

Когда использовать:

  • HTTP запросы (один ответ)
  • Поиск записи в базе данных
  • Валидация данных
  • Любые операции с единичным результатом

Flux - 0 до N элементов

Назначение: Представляет поток данных, который может содержать множество элементов, приходящих асинхронно.

Аналогия: Как Stream<T> но для асинхронных операций, похож на Observable из RxJava.

// Создание Flux
Flux<String> flux = Flux.just("A", "B", "C");
Flux<Integer> range = Flux.range(1, 100);
Flux<String> fromIterable = Flux.fromIterable(Arrays.asList("X", "Y", "Z"));

// Бесконечный поток
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));

// Из массива
Flux<String> fromArray = Flux.fromArray(new String[]{"1", "2", "3"});

// Генерация элементов
Flux<String> generated = Flux.generate(
    () -> 0,  // начальное состояние
    (state, sink) -> {
        sink.next("Item " + state);
        if (state == 10) sink.complete();
        return state + 1;
    }
);

Когда использовать:

  • Потоки данных из базы данных
  • Обработка файлов
  • Server-Sent Events
  • Обработка сообщений из очереди

Операторы трансформации

map - Преобразование каждого элемента

Назначение: Применяет функцию к каждому элементу потока, создавая новый поток с преобразованными значениями.

Flux<String> names = Flux.just("john", "jane", "bob");
Flux<String> upperCase = names.map(String::toUpperCase);
// Результат: "JOHN", "JANE", "BOB"

Mono<Integer> length = Mono.just("Hello").map(String::length);
// Результат: 5

flatMap - Асинхронное преобразование с объединением

Назначение: Преобразует каждый элемент в Publisher, затем объединяет все результирующие потоки в один. Не сохраняет порядок, выполняется параллельно.

Flux<String> ids = Flux.just("1", "2", "3");
Flux<User> users = ids.flatMap(id -> 
    userService.findById(id)  // Возвращает Mono<User>
);
// Результат: поток пользователей, порядок не гарантирован

// Ограничение concurrency
Flux<User> limited = ids.flatMap(
    id -> userService.findById(id),
    3  // Максимум 3 одновременных запроса
);

concatMap - Последовательное преобразование

Назначение: Как flatMap, но сохраняет порядок элементов, выполняя операции последовательно.

Flux<String> ids = Flux.just("1", "2", "3");
Flux<User> users = ids.concatMap(id -> 
    userService.findById(id)  // Выполняется по порядку
);
// Результат: пользователи в том же порядке, что и ID

switchMap - Переключение на новый поток

Назначение: При появлении нового элемента отменяет предыдущую операцию и переключается на новую.

Flux<String> searchQueries = Flux.just("a", "ab", "abc");
Flux<SearchResult> results = searchQueries.switchMap(query ->
    searchService.search(query)  // Отменяет предыдущий поиск
);
// Результат: только результаты последнего поиска

Операторы фильтрации и выборки

filter - Фильтрация элементов

Flux<Integer> numbers = Flux.range(1, 10);
Flux<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);
// Результат: 2, 4, 6, 8, 10

take - Взять первые N элементов

Flux<String> first3 = Flux.just("A", "B", "C", "D", "E").take(3);
// Результат: "A", "B", "C"

// take с условием
Flux<Integer> untilCondition = Flux.range(1, 100)
    .takeWhile(n -> n < 5);
// Результат: 1, 2, 3, 4

skip - Пропустить первые N элементов

Flux<String> afterSkip = Flux.just("A", "B", "C", "D", "E").skip(2);
// Результат: "C", "D", "E"

Операторы комбинирования

zip - Комбинирование по парам

Назначение: Объединяет элементы из нескольких потоков в кортежи, ждет элементы от всех потоков.

Flux<String> names = Flux.just("John", "Jane");
Flux<Integer> ages = Flux.just(25, 30);
Flux<String> combined = Flux.zip(names, ages)
    .map(tuple -> tuple.getT1() + " is " + tuple.getT2());
// Результат: "John is 25", "Jane is 30"

// Zip с функцией комбинирования
Flux<String> result = Flux.zip(names, ages, (name, age) -> 
    name + " is " + age
);

combineLatest - Комбинирование последних значений

Назначение: Комбинирует последние значения из каждого потока при изменении любого из них.

Flux<String> source1 = Flux.just("A", "B").delayElements(Duration.ofSeconds(1));
Flux<String> source2 = Flux.just("1", "2").delayElements(Duration.ofSeconds(2));

Flux<String> combined = Flux.combineLatest(source1, source2, 
    (s1, s2) -> s1 + s2
);
// Результат: "A1", "B1", "B2"

Операторы жизненного цикла

doOnNext - Выполнение побочных действий

Назначение: Выполняет действие для каждого элемента, не изменяя поток (side-effect).

Flux<String> flux = Flux.just("A", "B", "C")
    .doOnNext(item -> logger.info("Processing: {}", item))
    .map(String::toLowerCase)
    .doOnNext(item -> logger.info("Transformed: {}", item));

doOnError - Обработка ошибок

Mono<String> mono = Mono.fromCallable(() -> riskyOperation())
    .doOnError(error -> logger.error("Operation failed", error))
    .doOnError(IllegalArgumentException.class, 
        error -> logger.warn("Invalid argument: {}", error.getMessage())
    );

Обработка ошибок

onErrorResume - Переключение на альтернативный поток

Назначение: При ошибке переключается на другой Publisher, позволяя продолжить обработку.

Mono<String> primary = Mono.fromCallable(() -> primaryService.getData())
    .onErrorResume(error -> {
        logger.warn("Primary service failed, using fallback");
        return fallbackService.getData();
    });

// Обработка конкретного типа ошибки
Mono<String> withSpecificError = primary
    .onErrorResume(TimeoutException.class, 
        error -> Mono.just("Timeout occurred")
    );

retry - Повторная попытка

Назначение: Повторяет операцию при ошибке заданное количество раз.

Mono<String> withRetry = Mono.fromCallable(() -> unreliableService.call())
    .retry(3);  // Повторить до 3 раз

// Retry с backoff
Mono<String> withBackoff = Mono.fromCallable(() -> unreliableService.call())
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));

timeout - Ограничение времени выполнения

Mono<String> withTimeout = webClient.get()
    .uri("/slow-endpoint")
    .retrieve()
    .bodyToMono(String.class)
    .timeout(Duration.ofSeconds(5))
    .onErrorResume(TimeoutException.class, 
        error -> Mono.just("Service unavailable")
    );

Сравнение flatMap vs concatMap

flatMap - Параллельное выполнение

Flux<String> urls = Flux.just("/api/1", "/api/2", "/api/3");

// flatMap - не гарантирует порядок, быстрее
Flux<String> responses = urls.flatMap(url -> 
    webClient.get().uri(url).retrieve().bodyToMono(String.class)
);

Характеристики:

  • Выполняется параллельно (по умолчанию до 256 одновременных операций)
  • Не сохраняет порядок элементов
  • Быстрее для независимых операций
  • Может использовать больше ресурсов

concatMap - Последовательное выполнение

Flux<String> urls = Flux.just("/api/1", "/api/2", "/api/3");

// concatMap - гарантирует порядок, медленнее
Flux<String> responses = urls.concatMap(url -> 
    webClient.get().uri(url).retrieve().bodyToMono(String.class)
);

Характеристики:

  • Выполняется последовательно
  • Сохраняет порядок элементов
  • Медленнее, но предсказуемее
  • Использует меньше ресурсов одновременно

Когда использовать каждый

flatMap:

  • Независимые операции
  • Скорость важнее порядка
  • Параллельные HTTP запросы
  • Обработка больших объемов данных

concatMap:

  • Порядок обработки важен
  • Операции зависят друг от друга
  • Ограниченные ресурсы
  • Последовательные операции с базой данных

Стратегии обработки ошибок

Fallback стратегии

// Значение по умолчанию
Mono<String> withDefault = risky()
    .onErrorReturn("default value");

// Альтернативный источник данных
Mono<String> withFallback = primary()
    .onErrorResume(error -> secondary())
    .onErrorResume(error -> tertiary());

// Conditional fallback
Mono<String> conditional = risky()
    .onErrorResume(TimeoutException.class, 
        error -> Mono.just("Timeout fallback")
    )
    .onErrorResume(IllegalArgumentException.class,
        error -> Mono.just("Invalid argument fallback")
    );

Error transformation

Mono<String> transformed = risky()
    .onErrorMap(SQLException.class, 
        sql -> new ServiceException("Database error", sql)
    )
    .onErrorMap(error -> new GenericException("Operation failed", error));

Составление цепочек операций

Простая цепочка

Mono<UserProfile> userProfile = Mono.just(userId)
    .flatMap(id -> userService.findById(id))          // Получаем пользователя
    .flatMap(user -> profileService.getProfile(user)) // Получаем профиль
    .map(profile -> profile.withTimestamp(Instant.now())) // Добавляем timestamp
    .doOnNext(profile -> logger.info("Profile loaded: {}", profile.getId()))
    .timeout(Duration.ofSeconds(10))
    .onErrorResume(error -> profileService.getDefaultProfile());

Сложная цепочка с комбинированием

Mono<OrderSummary> orderSummary = Mono.just(orderId)
    .flatMap(id -> orderService.findById(id))
    .flatMap(order -> {
        // Параллельно получаем данные
        Mono<Customer> customer = customerService.findById(order.getCustomerId());
        Mono<List<Item>> items = itemService.findByOrderId(order.getId());
        Mono<ShippingInfo> shipping = shippingService.getInfo(order.getId());
        
        // Комбинируем результаты
        return Mono.zip(customer, items, shipping)
            .map(tuple -> new OrderSummary(
                order, 
                tuple.getT1(), 
                tuple.getT2(), 
                tuple.getT3()
            ));
    })
    .doOnNext(summary -> logger.info("Order summary created"))
    .onErrorResume(error -> {
        logger.error("Failed to create order summary", error);
        return Mono.empty();
    });

Цепочки обработки ошибок

Многоуровневая обработка

Mono<Result> complexOperation = Mono.just(input)
    .flatMap(data -> step1(data))
    .onErrorResume(ValidationException.class, 
        error -> {
            logger.warn("Validation failed: {}", error.getMessage());
            return Mono.just(getDefaultData());
        }
    )
    .flatMap(data -> step2(data))
    .onErrorResume(NetworkException.class,
        error -> {
            logger.error("Network error, retrying...", error);
            return step2Retry(input);
        }
    )
    .flatMap(data -> step3(data))
    .onErrorMap(DatabaseException.class,
        db -> new ServiceException("Database operation failed", db)
    )
    .doOnError(error -> auditService.logError(error))
    .onErrorResume(error -> {
        logger.error("All recovery attempts failed", error);
        return Mono.just(getEmptyResult());
    });

Circuit Breaker паттерн

Mono<String> withCircuitBreaker = externalService.call()
    .transform(CircuitBreakerOperator.of(circuitBreaker))
    .onErrorResume(CallNotPermittedException.class,
        error -> Mono.just("Circuit breaker is open")
    )
    .onErrorResume(error -> {
        logger.warn("External service failed: {}", error.getMessage());
        return fallbackService.call();
    });

Практические рекомендации

Композиция операций

// ✅ Хорошо - четкая цепочка операций
Mono<ProcessedData> good = source
    .filter(this::isValid)
    .flatMap(this::enrich)
    .map(this::transform)
    .timeout(Duration.ofSeconds(30))
    .onErrorResume(this::handleError);

// ❌ Плохо - вложенные подписки
source.subscribe(data -> {
    enrichmentService.enrich(data).subscribe(enriched -> {
        // Создает memory leak и сложность
        processData(enriched);
    });
});

Обработка ресурсов

// Правильное управление ресурсами
Mono<String> withResources = Mono.using(
    () -> openConnection(),           // Создание ресурса
    connection -> processData(connection), // Использование
    connection -> connection.close()  // Очистка
);

Тестирование цепочек

@Test
public void testReactiveChain() {
    StepVerifier.create(
        userService.processUser(userId)
            .flatMap(user -> enrichmentService.enrich(user))
            .map(user -> user.withStatus(ACTIVE))
    )
    .expectNextMatches(user -> user.getStatus() == ACTIVE)
    .expectComplete()
    .verify();
}

Spring WebFlux

Что такое Spring WebFlux

Spring WebFlux - это реактивный веб-фреймворк, появившийся в Spring 5.0. Построен на Project Reactor и предназначен для создания неблокирующих веб-приложений с высокой пропускной способностью.

Ключевые особенности:

  • Неблокирующий I/O на базе Netty (по умолчанию)
  • Поддержка backpressure
  • Функциональный стиль программирования
  • Совместимость с Reactive Streams

Архитектура: Основан на event loop модели, где небольшое количество потоков обрабатывает множество запросов асинхронно.

Аннотационные контроллеры (@RestController)

Основные возвращаемые типы

Mono - для единичных результатов (один объект или ошибка) Flux - для потоков данных (множество объектов)

@RestController
@RequestMapping("/api/users")
public class UserController {
    
    // Mono для единичного результата
    @GetMapping("/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        return userService.findById(id)
            .switchIfEmpty(Mono.error(new UserNotFoundException(id)));
    }
    
    // Flux для списка
    @GetMapping
    public Flux<User> getAllUsers() {
        return userService.findAll()
            .take(100); // Ограничение для безопасности
    }
    
    // Создание пользователя
    @PostMapping
    public Mono<User> createUser(@RequestBody Mono<CreateUserRequest> request) {
        return request
            .flatMap(userService::create)
            .map(user -> user.withTimestamp(Instant.now()));
    }
    
    // Обновление с обработкой ошибок
    @PutMapping("/{id}")
    public Mono<User> updateUser(@PathVariable String id, 
                                @RequestBody Mono<UpdateUserRequest> request) {
        return Mono.zip(Mono.just(id), request)
            .flatMap(tuple -> userService.update(tuple.getT1(), tuple.getT2()))
            .onErrorResume(UserNotFoundException.class, 
                error -> Mono.error(new ResponseStatusException(
                    HttpStatus.NOT_FOUND, "User not found"
                ))
            );
    }
}

Обработка параметров запроса

@RestController
public class SearchController {
    
    // Query параметры
    @GetMapping("/search")
    public Flux<Product> searchProducts(
            @RequestParam String query,
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "10") int size) {
        
        return searchService.search(query, page, size)
            .timeout(Duration.ofSeconds(30))
            .onErrorResume(error -> Flux.empty());
    }
    
    // Path variables и headers
    @GetMapping("/categories/{category}/products")
    public Flux<Product> getProductsByCategory(
            @PathVariable String category,
            @RequestHeader(value = "X-User-Id", required = false) String userId) {
        
        return productService.findByCategory(category)
            .filter(product -> isAccessible(product, userId));
    }
}

Функциональный стиль (RouterFunction + HandlerFunction)

Основные концепции

RouterFunction - определяет маршрутизацию запросов, аналог @RequestMapping HandlerFunction - обрабатывает запрос, аналог методов контроллера ServerRequest - представляет HTTP запрос ServerResponse - представляет HTTP ответ

@Configuration
public class RouterConfig {
    
    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions
            .route(GET("/api/users/{id}"), handler::getUser)
            .andRoute(GET("/api/users"), handler::getAllUsers)
            .andRoute(POST("/api/users"), handler::createUser)
            .andRoute(PUT("/api/users/{id}"), handler::updateUser)
            .andRoute(DELETE("/api/users/{id}"), handler::deleteUser);
    }
    
    // Вложенные маршруты с общим префиксом
    @Bean
    public RouterFunction<ServerResponse> apiRoutes(
            UserHandler userHandler, 
            ProductHandler productHandler) {
        
        return RouterFunctions
            .nest(path("/api"),
                RouterFunctions
                    .nest(path("/users"), 
                        route(GET("/{id}"), userHandler::getUser)
                        .andRoute(GET("/"), userHandler::getAllUsers)
                    )
                    .andNest(path("/products"),
                        route(GET("/"), productHandler::getAllProducts)
                        .andRoute(GET("/{id}"), productHandler::getProduct)
                    )
            );
    }
}

Handler функции

@Component
public class UserHandler {
    
    private final UserService userService;
    
    // Получение пользователя
    public Mono<ServerResponse> getUser(ServerRequest request) {
        String id = request.pathVariable("id");
        
        return userService.findById(id)
            .flatMap(user -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }
    
    // Список пользователей с фильтрацией
    public Mono<ServerResponse> getAllUsers(ServerRequest request) {
        String status = request.queryParam("status").orElse("all");
        
        Flux<User> users = userService.findAll();
        if (!"all".equals(status)) {
            users = users.filter(user -> user.getStatus().equals(status));
        }
        
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(users, User.class);
    }
    
    // Создание пользователя
    public Mono<ServerResponse> createUser(ServerRequest request) {
        return request.bodyToMono(CreateUserRequest.class)
            .flatMap(userService::create)
            .flatMap(user -> ServerResponse.status(HttpStatus.CREATED)
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(user))
            .onErrorResume(ValidationException.class, 
                error -> ServerResponse.badRequest().bodyValue(error.getMessage())
            );
    }
    
    // Обработка multipart данных
    public Mono<ServerResponse> uploadAvatar(ServerRequest request) {
        String userId = request.pathVariable("userId");
        
        return request.multipartData()
            .map(parts -> parts.getFirst("avatar"))
            .cast(FilePart.class)
            .flatMap(filePart -> userService.updateAvatar(userId, filePart))
            .flatMap(user -> ServerResponse.ok().bodyValue(user))
            .onErrorResume(error -> 
                ServerResponse.badRequest().bodyValue("Upload failed")
            );
    }
}

Фильтры и middleware

@Configuration
public class WebConfig {
    
    @Bean
    public RouterFunction<ServerResponse> routesWithFilter() {
        return RouterFunctions
            .route(GET("/api/secure/**"), this::secureHandler)
            .filter(this::authenticationFilter)
            .filter(this::loggingFilter);
    }
    
    // Фильтр аутентификации
    private Mono<ServerResponse> authenticationFilter(
            ServerRequest request, 
            HandlerFunction<ServerResponse> next) {
        
        return request.headers()
            .firstHeader("Authorization")
            .map(this::validateToken)
            .map(isValid -> isValid ? next.handle(request) : 
                ServerResponse.status(HttpStatus.UNAUTHORIZED).build())
            .orElse(ServerResponse.status(HttpStatus.UNAUTHORIZED).build())
            .flatMap(response -> response);
    }
    
    // Фильтр логирования
    private Mono<ServerResponse> loggingFilter(
            ServerRequest request,
            HandlerFunction<ServerResponse> next) {
        
        long startTime = System.currentTimeMillis();
        
        return next.handle(request)
            .doOnNext(response -> {
                long duration = System.currentTimeMillis() - startTime;
                logger.info("Request {} {} completed in {}ms with status {}", 
                    request.method(), request.path(), duration, 
                    response.statusCode());
            });
    }
}

Server-Sent Events (SSE)

SSE - технология для отправки событий от сервера к клиенту в режиме реального времени через HTTP соединение.

@RestController
public class EventController {
    
    // Простой SSE поток
    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamEvents() {
        return Flux.interval(Duration.ofSeconds(1))
            .map(sequence -> "Event " + sequence)
            .take(Duration.ofMinutes(10)); // Ограничение по времени
    }
    
    // SSE с кастомными событиями
    @GetMapping(value = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<NotificationData>> streamNotifications(
            @RequestParam String userId) {
        
        return notificationService.getUserNotifications(userId)
            .map(notification -> ServerSentEvent.<NotificationData>builder()
                .id(notification.getId())
                .event("notification")
                .data(notification)
                .retry(Duration.ofSeconds(3))
                .build())
            .doOnCancel(() -> logger.info("User {} disconnected", userId));
    }
    
    // Функциональный стиль SSE
    public Mono<ServerResponse> streamData(ServerRequest request) {
        String category = request.pathVariable("category");
        
        Flux<DataEvent> events = dataService.getEventStream(category)
            .map(data -> ServerSentEvent.builder(data)
                .event("data-update")
                .build());
        
        return ServerResponse.ok()
            .contentType(MediaType.TEXT_EVENT_STREAM)
            .body(BodyInserters.fromServerSentEvents(events));
    }
}

WebSocket поддержка

WebSocket - протокол для двустороннего общения между клиентом и сервером в режиме реального времени.

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new ChatWebSocketHandler(), "/chat")
                .setAllowedOrigins("*");
    }
}

@Component
public class ChatWebSocketHandler implements WebSocketHandler {
    
    private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
    
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String userId = getUserId(session);
        sessions.put(userId, session);
        
        // Обработка входящих сообщений
        Mono<Void> input = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .flatMap(message -> processMessage(userId, message))
            .then();
        
        // Отправка исходящих сообщений
        Flux<String> output = chatService.getMessagesForUser(userId);
        Mono<Void> outputMono = session.send(
            output.map(session::textMessage)
        );
        
        return Mono.zip(input, outputMono).then()
            .doFinally(signalType -> sessions.remove(userId));
    }
    
    private Mono<Void> processMessage(String userId, String message) {
        return chatService.processMessage(userId, message)
            .flatMap(this::broadcastMessage)
            .then();
    }
    
    private Mono<Void> broadcastMessage(ChatMessage message) {
        return Flux.fromIterable(sessions.values())
            .flatMap(session -> session.send(
                Mono.just(session.textMessage(message.toJson()))
            ))
            .then();
    }
}

Отличия от Spring MVC

Thread Model (Модель потоков)

Spring MVC:

  • Модель: One-thread-per-request (один поток на запрос)
  • Потоки: Servlet container управляет пулом потоков (обычно 200-400 потоков)
  • Блокировка: Поток блокируется на I/O операциях
  • Масштабирование: Вертикальное (больше потоков = больше памяти)
// Spring MVC - блокирующий подход
@GetMapping("/users/{id}")
public User getUser(@PathVariable String id) {
    return userService.findById(id); // Поток блокируется здесь
}

Spring WebFlux:

  • Модель: Event loop (несколько потоков обрабатывают множество запросов)
  • Потоки: Небольшое количество потоков (обычно количество CPU ядер)
  • Блокировка: Неблокирующий I/O, операции асинхронные
  • Масштабирование: Горизонтальное (больше запросов на тех же ресурсах)
// Spring WebFlux - неблокирующий подход
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {
    return userService.findById(id); // Поток не блокируется
}

Производительность

Пропускная способность:

  • MVC: Хорошо для CPU-intensive задач, ограничен количеством потоков
  • WebFlux: Отлично для I/O-intensive задач, может обрабатывать десятки тысяч одновременных запросов

Потребление памяти:

  • MVC: ~1MB на поток, ограничение по памяти при большом количестве потоков
  • WebFlux: Минимальное потребление памяти, не зависит от количества запросов

Latency (задержка):

  • MVC: Стабильная задержка, предсказуемая производительность
  • WebFlux: Может быть выше задержка для простых операций из-за overhead реактивности

Когда использовать каждый

Spring MVC подходит для:

  • Традиционных CRUD приложений
  • Блокирующих операций с базой данных
  • CPU-intensive задач
  • Команд с опытом работы только с императивным программированием

Spring WebFlux подходит для:

  • Микросервисов с множеством внешних вызовов
  • Потоковой обработки данных
  • Real-time приложений (чаты, уведомления)
  • Высоконагруженных приложений с большим количеством I/O

WebClient - альтернатива RestTemplate

WebClient - неблокирующий, реактивный HTTP клиент, заменяющий RestTemplate в реактивных приложениях.

Создание и настройка

@Configuration
public class WebClientConfig {
    
    @Bean
    public WebClient webClient() {
        return WebClient.builder()
            .baseUrl("https://api.example.com")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .defaultHeader(HttpHeaders.USER_AGENT, "MyApp/1.0")
            .codecs(configurer -> configurer
                .defaultCodecs()
                .maxInMemorySize(1024 * 1024)) // 1MB buffer
            .clientConnector(new ReactorClientHttpConnector(
                HttpClient.create()
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                    .responseTimeout(Duration.ofSeconds(30))
                    .keepAlive(true)
            ))
            .build();
    }
    
    // WebClient с аутентификацией
    @Bean
    public WebClient authenticatedWebClient() {
        return WebClient.builder()
            .baseUrl("https://secure-api.example.com")
            .filter(ExchangeFilterFunction.ofRequestProcessor(request -> {
                return Mono.just(ClientRequest.from(request)
                    .header("Authorization", "Bearer " + getAccessToken())
                    .build());
            }))
            .build();
    }
}

Основные операции

@Service
public class ApiService {
    
    private final WebClient webClient;
    
    // GET запрос
    public Mono<User> getUser(String id) {
        return webClient.get()
            .uri("/users/{id}", id)
            .retrieve()
            .bodyToMono(User.class)
            .timeout(Duration.ofSeconds(10))
            .onErrorResume(WebClientResponseException.class, error -> {
                if (error.getStatusCode() == HttpStatus.NOT_FOUND) {
                    return Mono.empty();
                }
                return Mono.error(new ServiceException("API call failed", error));
            });
    }
    
    // GET с query параметрами
    public Flux<Product> searchProducts(String query, int page, int size) {
        return webClient.get()
            .uri(uriBuilder -> uriBuilder
                .path("/products/search")
                .queryParam("q", query)
                .queryParam("page", page)
                .queryParam("size", size)
                .build())
            .retrieve()
            .bodyToFlux(Product.class);
    }
    
    // POST запрос
    public Mono<User> createUser(CreateUserRequest request) {
        return webClient.post()
            .uri("/users")
            .bodyValue(request)
            .retrieve()
            .bodyToMono(User.class)
            .onErrorMap(WebClientResponseException.BadRequest.class,
                error -> new ValidationException("Invalid user data", error));
    }
    
    // PUT с обработкой статусов
    public Mono<User> updateUser(String id, UpdateUserRequest request) {
        return webClient.put()
            .uri("/users/{id}", id)
            .bodyValue(request)
            .exchangeToMono(response -> {
                if (response.statusCode().is2xxSuccessful()) {
                    return response.bodyToMono(User.class);
                } else if (response.statusCode() == HttpStatus.NOT_FOUND) {
                    return Mono.error(new UserNotFoundException(id));
                } else {
                    return response.createException()
                        .flatMap(Mono::error);
                }
            });
    }
}

Обработка ошибок и retry

@Service
public class ResilientApiService {
    
    public Mono<Data> fetchDataWithRetry(String id) {
        return webClient.get()
            .uri("/data/{id}", id)
            .retrieve()
            .bodyToMono(Data.class)
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
                .filter(throwable -> throwable instanceof WebClientRequestException))
            .timeout(Duration.ofSeconds(30))
            .onErrorResume(TimeoutException.class, 
                error -> Mono.just(getDefaultData(id)))
            .onErrorResume(WebClientResponseException.class, error -> {
                logger.error("API call failed for id {}: {}", id, error.getMessage());
                return Mono.empty();
            });
    }
    
    // Параллельные запросы
    public Mono<CombinedData> fetchCombinedData(String userId) {
        Mono<User> user = webClient.get()
            .uri("/users/{id}", userId)
            .retrieve()
            .bodyToMono(User.class);
            
        Mono<Profile> profile = webClient.get()
            .uri("/profiles/{id}", userId)
            .retrieve()
            .bodyToMono(Profile.class);
            
        Mono<List<Order>> orders = webClient.get()
            .uri("/users/{id}/orders", userId)
            .retrieve()
            .bodyToFlux(Order.class)
            .collectList();
        
        return Mono.zip(user, profile, orders)
            .map(tuple -> new CombinedData(
                tuple.getT1(), 
                tuple.getT2(), 
                tuple.getT3()
            ));
    }
}

Streaming ответов

// Обработка больших ответов потоком
public Flux<DataChunk> streamLargeData() {
    return webClient.get()
        .uri("/large-dataset")
        .accept(MediaType.APPLICATION_NDJSON) // Newline-delimited JSON
        .retrieve()
        .bodyToFlux(DataChunk.class)
        .buffer(100) // Группировка по 100 элементов
        .flatMap(this::processBatch);
}

// Загрузка файлов
public Mono<UploadResult> uploadFile(String filename, Resource file) {
    MultiValueMap<String, Object> parts = new LinkedMultiValueMap<>();
    parts.add("file", file);
    parts.add("filename", filename);
    
    return webClient.post()
        .uri("/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .bodyValue(parts)
        .retrieve()
        .bodyToMono(UploadResult.class);
}

Практические рекомендации

Обработка ошибок

// Глобальный обработчик ошибок
@ControllerAdvice
public class GlobalErrorHandler {
    
    @ExceptionHandler(ValidationException.class)
    public Mono<ResponseEntity<ErrorResponse>> handleValidation(ValidationException ex) {
        return Mono.just(ResponseEntity.badRequest()
            .body(new ErrorResponse("VALIDATION_ERROR", ex.getMessage())));
    }
    
    @ExceptionHandler(UserNotFoundException.class)
    public Mono<ResponseEntity<ErrorResponse>> handleNotFound(UserNotFoundException ex) {
        return Mono.just(ResponseEntity.notFound().build());
    }
}

Мониторинг и метрики

@RestController
public class MonitoredController {
    
    @GetMapping("/api/users/{id}")
    @Timed(name = "user.fetch", description = "Time taken to fetch user")
    public Mono<User> getUser(@PathVariable String id) {
        return userService.findById(id)
            .name("user-fetch")
            .tag("operation", "find-by-id")
            .metrics(); // Добавляет метрики Reactor
    }
}

Лучшие практики

// ✅ Правильно - неблокирующая цепочка
@GetMapping("/users/{id}/enriched")
public Mono<EnrichedUser> getEnrichedUser(@PathVariable String id) {
    return userService.findById(id)
        .flatMap(user -> 
            profileService.getProfile(user.getId())
                .map(profile -> user.withProfile(profile))
        )
        .timeout(Duration.ofSeconds(30));
}

// ❌ Неправильно - блокировка в реактивной цепочке
@GetMapping("/users/{id}/bad")
public Mono<User> getBadUser(@PathVariable String id) {
    return userService.findById(id)
        .map(user -> {
            // Блокирующий вызов убивает реактивность!
            Profile profile = profileService.getProfile(user.getId()).block();
            return user.withProfile(profile);
        });
}

R2DBC (Reactive DB Access)

Что такое R2DBC

R2DBC (Reactive Relational Database Connectivity) - это спецификация для реактивного доступа к реляционным базам данных в Java. Создана как альтернатива JDBC для неблокирующих приложений.

Ключевые принципы:

  • Полностью асинхронный API
  • Неблокирующий I/O
  • Backpressure support
  • Совместимость с Reactive Streams

Цель: Обеспечить реактивный доступ к базам данных без блокировки потоков, что критично для high-throughput приложений.

Отличия R2DBC от JDBC

JDBC - блокирующий подход

Модель работы: Thread-per-connection, каждое соединение блокирует поток до получения результата.

// JDBC - блокирующий код
@Repository
public class UserRepositoryJdbc {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    public User findById(Long id) {
        // Поток блокируется здесь до получения результата
        return jdbcTemplate.queryForObject(
            "SELECT * FROM users WHERE id = ?",
            new BeanPropertyRowMapper<>(User.class),
            id
        );
    }
    
    public List<User> findAll() {
        // Поток блокируется, загружает ВСЕ записи в память
        return jdbcTemplate.query(
            "SELECT * FROM users",
            new BeanPropertyRowMapper<>(User.class)
        );
    }
}

Проблемы JDBC:

  • Блокировка потоков на I/O операциях
  • Один поток = одно соединение
  • Загрузка всех результатов в память
  • Плохое масштабирование при высокой нагрузке

R2DBC - неблокирующий подход

Модель работы: Event-driven, асинхронная обработка с минимальным количеством потоков.

// R2DBC - неблокирующий код
@Repository
public class UserRepositoryR2dbc {
    
    @Autowired
    private R2dbcEntityTemplate template;
    
    public Mono<User> findById(Long id) {
        // Поток НЕ блокируется, возвращает Mono
        return template.select(User.class)
            .matching(Query.query(Criteria.where("id").is(id)))
            .one();
    }
    
    public Flux<User> findAll() {
        // Потоковая обработка, элементы приходят по мере готовности
        return template.select(User.class)
            .all()
            .take(1000); // Ограничение для безопасности
    }
}

Преимущества R2DBC:

  • Неблокирующий I/O
  • Потоковая обработка результатов
  • Эффективное использование ресурсов
  • Встроенная поддержка backpressure

Сравнение производительности

Характеристика JDBC R2DBC
Модель потоков Thread-per-connection Event loop
Блокировка Блокирующий I/O Неблокирующий I/O
Память Загружает все в память Потоковая обработка
Масштабируемость Ограничена потоками Высокая
Latency Предсказуемая Может быть выше для простых запросов
Throughput Средний Очень высокий

Настройка R2DBC

Зависимости и конфигурация

// application.yml
spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/mydb
    username: user
    password: password
    pool:
      enabled: true
      initial-size: 10
      max-size: 20
      max-idle-time: 30m
      max-acquire-time: 60s
      max-create-connection-time: 60s
@Configuration
@EnableR2dbcRepositories
public class R2dbcConfig extends AbstractR2dbcConfiguration {
    
    @Override
    public ConnectionFactory connectionFactory() {
        ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
            .option(DRIVER, "postgresql")
            .option(HOST, "localhost")
            .option(PORT, 5432)
            .option(USER, "user")
            .option(PASSWORD, "password")
            .option(DATABASE, "mydb")
            .build();
            
        return ConnectionFactories.get(options);
    }
    
    // Пул соединений для производительности
    @Bean
    public ConnectionFactory connectionFactory() {
        return ConnectionPoolConfiguration.builder(connectionFactory())
            .maxIdleTime(Duration.ofMinutes(30))
            .initialSize(10)
            .maxSize(20)
            .maxCreateConnectionTime(Duration.ofSeconds(60))
            .build();
    }
}

Работа с PostgreSQL

Базовые операции с R2dbcEntityTemplate

@Service
public class UserService {
    
    private final R2dbcEntityTemplate template;
    
    // Поиск по ID
    public Mono<User> findById(Long id) {
        return template.select(User.class)
            .matching(Query.query(Criteria.where("id").is(id)))
            .one()
            .switchIfEmpty(Mono.error(new UserNotFoundException(id)));
    }
    
    // Поиск с условиями
    public Flux<User> findActiveUsers() {
        return template.select(User.class)
            .matching(Query.query(
                Criteria.where("active").is(true)
                .and("created_at").greaterThan(LocalDateTime.now().minusYears(1))
            ))
            .all();
    }
    
    // Создание пользователя
    public Mono<User> create(User user) {
        user.setCreatedAt(LocalDateTime.now());
        return template.insert(User.class)
            .using(user)
            .map(savedUser -> savedUser.withGeneratedId());
    }
    
    // Обновление
    public Mono<User> update(User user) {
        return template.update(User.class)
            .matching(Query.query(Criteria.where("id").is(user.getId())))
            .apply(Update.update("name", user.getName())
                .set("email", user.getEmail())
                .set("updated_at", LocalDateTime.now()))
            .then(findById(user.getId()));
    }
    
    // Удаление
    public Mono<Void> deleteById(Long id) {
        return template.delete(User.class)
            .matching(Query.query(Criteria.where("id").is(id)))
            .all()
            .then();
    }
}

Сложные запросы с DatabaseClient

@Repository
public class CustomUserRepository {
    
    private final DatabaseClient databaseClient;
    
    // Сырой SQL запрос
    public Flux<User> findUsersByComplexCriteria(String namePattern, int minAge) {
        return databaseClient.sql("""
            SELECT u.*, p.age 
            FROM users u 
            JOIN profiles p ON u.id = p.user_id 
            WHERE u.name ILIKE :pattern 
            AND p.age >= :minAge 
            ORDER BY u.created_at DESC
            """)
            .bind("pattern", "%" + namePattern + "%")
            .bind("minAge", minAge)
            .map((row, metadata) -> User.builder()
                .id(row.get("id", Long.class))
                .name(row.get("name", String.class))
                .email(row.get("email", String.class))
                .age(row.get("age", Integer.class))
                .build())
            .all();
    }
    
    // Агрегатные запросы
    public Mono<UserStats> getUserStats() {
        return databaseClient.sql("""
            SELECT 
                COUNT(*) as total_users,
                COUNT(CASE WHEN active = true THEN 1 END) as active_users,
                AVG(age) as average_age,
                MAX(created_at) as last_registration
            FROM users u 
            LEFT JOIN profiles p ON u.id = p.user_id
            """)
            .map((row, metadata) -> UserStats.builder()
                .totalUsers(row.get("total_users", Long.class))
                .activeUsers(row.get("active_users", Long.class))
                .averageAge(row.get("average_age", Double.class))
                .lastRegistration(row.get("last_registration", LocalDateTime.class))
                .build())
            .one();
    }
    
    // Batch операции
    public Flux<User> createUsers(List<User> users) {
        return databaseClient.inConnectionMany(connection -> {
            Statement statement = connection.createStatement(
                "INSERT INTO users (name, email, created_at) VALUES ($1, $2, $3)"
            );
            
            return Flux.fromIterable(users)
                .flatMap(user -> {
                    statement.bind(0, user.getName())
                           .bind(1, user.getEmail())
                           .bind(2, LocalDateTime.now())
                           .add();
                    return statement.execute();
                })
                .flatMap(result -> result.map((row, metadata) -> 
                    User.builder()
                        .id(row.get("id", Long.class))
                        .name(row.get("name", String.class))
                        .email(row.get("email", String.class))
                        .build()));
        });
    }
}

Транзакции в R2DBC

@Service
@Transactional
public class TransactionalUserService {
    
    private final R2dbcEntityTemplate template;
    private final TransactionalOperator transactionalOperator;
    
    // Декларативные транзакции
    @Transactional
    public Mono<User> createUserWithProfile(CreateUserRequest request) {
        return template.insert(User.class)
            .using(User.from(request))
            .flatMap(user -> 
                template.insert(Profile.class)
                    .using(Profile.forUser(user))
                    .map(profile -> user.withProfile(profile))
            );
    }
    
    // Программные транзакции
    public Mono<TransferResult> transferFunds(Long fromUserId, Long toUserId, BigDecimal amount) {
        return transactionalOperator.transactional(
            findById(fromUserId)
                .flatMap(fromUser -> {
                    if (fromUser.getBalance().compareTo(amount) < 0) {
                        return Mono.error(new InsufficientFundsException());
                    }
                    return updateBalance(fromUserId, fromUser.getBalance().subtract(amount));
                })
                .flatMap(fromUser -> updateBalance(toUserId, amount))
                .map(toUser -> new TransferResult(fromUserId, toUserId, amount))
        );
    }
    
    // Rollback при ошибке
    @Transactional(rollbackFor = Exception.class)
    public Mono<OrderResult> processOrder(CreateOrderRequest request) {
        return createOrder(request)
            .flatMap(order -> updateInventory(order.getItems()))
            .flatMap(inventory -> processPayment(request.getPayment()))
            .flatMap(payment -> sendConfirmation(request.getEmail()))
            .onErrorResume(PaymentException.class, error -> {
                logger.error("Payment failed, order will be rolled back", error);
                return Mono.error(error);
            });
    }
}

Работа с MongoDB (Reactive)

Настройка Reactive MongoDB

@Configuration
@EnableReactiveMongoRepositories
public class MongoConfig extends AbstractReactiveMongoConfiguration {
    
    @Override
    protected String getDatabaseName() {
        return "reactive-app";
    }
    
    @Override
    public MongoClient reactiveMongoClient() {
        return MongoClients.create(
            MongoClientSettings.builder()
                .applyConnectionString(new ConnectionString(
                    "mongodb://localhost:27017/reactive-app"
                ))
                .applyToConnectionPoolSettings(builder -> 
                    builder.maxSize(20)
                           .minSize(5)
                           .maxConnectionIdleTime(30, TimeUnit.SECONDS))
                .build()
        );
    }
}

Reactive MongoDB операции

@Service
public class ProductService {
    
    private final ReactiveMongoTemplate mongoTemplate;
    
    // Поиск документов
    public Flux<Product> findByCategory(String category) {
        Query query = Query.query(Criteria.where("category").is(category));
        return mongoTemplate.find(query, Product.class)
            .sort(Sort.by(Sort.Direction.DESC, "createdAt"));
    }
    
    // Сложные запросы с агрегацией
    public Flux<CategoryStats> getCategoryStats() {
        Aggregation aggregation = Aggregation.newAggregation(
            Aggregation.group("category")
                .count().as("productCount")
                .avg("price").as("averagePrice")
                .max("price").as("maxPrice"),
            Aggregation.sort(Sort.Direction.DESC, "productCount")
        );
        
        return mongoTemplate.aggregate(
            aggregation, "products", CategoryStats.class
        );
    }
    
    // Текстовый поиск
    public Flux<Product> searchProducts(String searchTerm) {
        TextCriteria criteria = TextCriteria.forDefaultLanguage()
            .matchingAny(searchTerm);
        Query query = TextQuery.queryText(criteria)
            .sortByScore();
            
        return mongoTemplate.find(query, Product.class);
    }
    
    // Геопространственные запросы
    public Flux<Store> findNearbyStores(double longitude, double latitude, double maxDistance) {
        Point location = new Point(longitude, latitude);
        Distance distance = new Distance(maxDistance, Metrics.KILOMETERS);
        
        Query query = Query.query(
            Criteria.where("location").nearSphere(location).maxDistance(distance.getValue())
        );
        
        return mongoTemplate.find(query, Store.class);
    }
}

Потоковая обработка результатов

Интеграция с Flux для больших объемов данных

@Service
public class DataProcessingService {
    
    private final R2dbcEntityTemplate template;
    
    // Потоковая обработка миллионов записей
    public Flux<ProcessedData> processLargeDataset() {
        return template.select(RawData.class)
            .all()
            .buffer(1000)  // Группируем по 1000 записей
            .flatMap(batch -> processBatch(batch))
            .onBackpressureBuffer(10000)  // Буфер для backpressure
            .doOnNext(data -> logger.debug("Processed: {}", data.getId()));
    }
    
    // Параллельная обработка с ограничением concurrency
    public Flux<EnrichedUser> enrichUsers() {
        return template.select(User.class)
            .all()
            .flatMap(user -> enrichUser(user), 10)  // Максимум 10 параллельных операций
            .onErrorContinue((error, user) -> {
                logger.error("Failed to enrich user {}: {}", 
                    ((User) user).getId(), error.getMessage());
            });
    }
    
    // Обработка в реальном времени с window операциями
    public Flux<UserActivity> processUserActivities() {
        return template.select(ActivityEvent.class)
            .all()
            .window(Duration.ofMinutes(5))  // Окна по 5 минут
            .flatMap(window -> 
                window.groupBy(ActivityEvent::getUserId)
                      .flatMap(userEvents -> 
                          userEvents.collectList()
                                   .map(events -> aggregateUserActivity(events))
                      )
            );
    }
    
    // Экспорт данных в потоковом режиме
    public Flux<DataBuffer> exportToCsv() {
        return template.select(ExportData.class)
            .all()
            .map(this::convertToCsvLine)
            .startWith("id,name,email,created_at\n")  // CSV заголовок
            .map(line -> bufferFactory.wrap(line.getBytes()))
            .doOnComplete(() -> logger.info("Export completed"));
    }
}

Backpressure стратегии

@Service
public class BackpressureService {
    
    // Buffer стратегия - накапливаем элементы
    public Flux<ProcessedItem> bufferStrategy() {
        return dataSource.getAllItems()
            .onBackpressureBuffer(
                10000,  // Размер буфера
                item -> logger.warn("Dropping item: {}", item),  // При переполнении
                BufferOverflowStrategy.DROP_OLDEST
            )
            .flatMap(this::processItem);
    }
    
    // Drop стратегия - отбрасываем элементы
    public Flux<ProcessedItem> dropStrategy() {
        return dataSource.getAllItems()
            .onBackpressureDrop(item -> 
                logger.warn("Dropped item due to backpressure: {}", item)
            )
            .flatMap(this::processItem);
    }
    
    // Latest стратегия - только последний элемент
    public Flux<ProcessedItem> latestStrategy() {
        return dataSource.getAllItems()
            .onBackpressureLatest()
            .flatMap(this::processItem);
    }
}

Ошибки при смешении блокирующего и неблокирующего I/O

Типичные ошибки

// ❌ ОШИБКА 1: Блокировка в реактивной цепочке
@Service
public class BadService {
    
    public Mono<User> getBadUser(Long id) {
        return userRepository.findById(id)
            .map(user -> {
                // КАТАСТРОФА! Блокируем event loop поток
                Profile profile = profileService.getProfile(user.getId()).block();
                return user.withProfile(profile);
            });
    }
    
    // ❌ ОШИБКА 2: Использование JDBC в WebFlux приложении
    public Flux<User> getAllUsersBad() {
        // Блокирующий JDBC убивает всю реактивность
        List<User> users = jdbcTemplate.query(
            "SELECT * FROM users", 
            new BeanPropertyRowMapper<>(User.class)
        );
        return Flux.fromIterable(users);
    }
    
    // ❌ ОШИБКА 3: Синхронные операции в subscribe
    public void processBad() {
        userRepository.findAll()
            .subscribe(user -> {
                // Блокирующая операция в callback
                Thread.sleep(1000);
                processUser(user);
            });
    }
}

Правильные подходы

// ✅ ПРАВИЛЬНО: Полностью реактивная цепочка
@Service
public class GoodService {
    
    public Mono<User> getGoodUser(Long id) {
        return userRepository.findById(id)
            .flatMap(user -> 
                profileService.getProfile(user.getId())
                    .map(profile -> user.withProfile(profile))
            );
    }
    
    // ✅ ПРАВИЛЬНО: Использование R2DBC
    public Flux<User> getAllUsersGood() {
        return userRepository.findAll()
            .take(1000)  // Ограничение для безопасности
            .timeout(Duration.ofSeconds(30));
    }
    
    // ✅ ПРАВИЛЬНО: Асинхронная обработка
    public Mono<Void> processGood() {
        return userRepository.findAll()
            .flatMap(user -> 
                processUserAsync(user)
                    .subscribeOn(Schedulers.boundedElastic())
            )
            .then();
    }
}

Обнаружение блокирующих операций

// Включение детектора блокировок в application.yml
reactor:
  blockhound:
    enabled: true

// Программная настройка
@Configuration
public class ReactorConfig {
    
    @PostConstruct
    public void configureReactor() {
        // Включаем обнаружение блокировок
        BlockHound.install();
        
        // Исключения для известных блокирующих операций
        BlockHound.builder()
            .allowBlockingCallsInside("java.util.logging", "Logger")
            .allowBlockingCallsInside("org.slf4j", "Logger")
            .install();
    }
}

Миграция с JDBC на R2DBC

// Шаг 1: Dual stack подход
@Service
public class MigrationService {
    
    private final UserRepositoryJdbc jdbcRepository;
    private final UserRepositoryR2dbc r2dbcRepository;
    
    @Value("${app.use-r2dbc:false}")
    private boolean useR2dbc;
    
    public Mono<User> findById(Long id) {
        if (useR2dbc) {
            return r2dbcRepository.findById(id);
        } else {
            // Оборачиваем блокирующий вызов
            return Mono.fromCallable(() -> jdbcRepository.findById(id))
                .subscribeOn(Schedulers.boundedElastic());
        }
    }
}

// Шаг 2: Постепенная замена
@Service
public class HybridService {
    
    // Критичные операции - уже на R2DBC
    public Flux<User> getActiveUsers() {
        return r2dbcRepository.findByActive(true);
    }
    
    // Редкие операции - пока на JDBC
    public Mono<Report> generateReport() {
        return Mono.fromCallable(() -> jdbcRepository.generateComplexReport())
            .subscribeOn(Schedulers.boundedElastic())
            .timeout(Duration.ofMinutes(5));
    }
}

Мониторинг и отладка

Логирование и метрики

@Service
public class MonitoredService {
    
    public Flux<User> getUsers() {
        return userRepository.findAll()
            .name("user-fetch-all")
            .tag("operation", "find-all")
            .metrics()  // Добавляет метрики Micrometer
            .doOnSubscribe(sub -> logger.info("Starting user fetch"))
            .doOnNext(user -> logger.debug("Fetched user: {}", user.getId()))
            .doOnComplete(() -> logger.info("User fetch completed"))
            .doOnError(error -> logger.error("User fetch failed", error));
    }
    
    // Debugging сложных цепочек
    public Mono<EnrichedUser> debugChain(Long userId) {
        return userRepository.findById(userId)
            .checkpoint("after-user-fetch")
            .flatMap(user -> 
                profileService.getProfile(user.getId())
                    .checkpoint("after-profile-fetch")
                    .map(profile -> user.withProfile(profile))
            )
            .checkpoint("after-enrichment");
    }
}

Производительность

// Connection pooling мониторинг
@Component
public class ConnectionPoolMonitor {
    
    @EventListener
    public void handleConnectionEvent(ConnectionCreatedEvent event) {
        logger.info("New connection created: {}", event.getConnectionId());
    }
    
    @EventListener
    public void handleConnectionEvent(ConnectionReleasedEvent event) {
        logger.debug("Connection released: {}", event.getConnectionId());
    }
}

// Health checks
@Component
public class R2dbcHealthIndicator implements ReactiveHealthIndicator {
    
    private final DatabaseClient databaseClient;
    
    @Override
    public Mono<Health> health() {
        return databaseClient.sql("SELECT 1")
            .fetch()
            .one()
            .map(result -> Health.up().withDetail("database", "available").build())
            .onErrorResume(error -> 
                Mono.just(Health.down(error).withDetail("database", "unavailable").build())
            )
            .timeout(Duration.ofSeconds(10));
    }
}

Проектирование реактивных сервисов

Thread Model: Event Loop и Non-blocking I/O

Традиционная модель (Thread-per-request)

Принцип: Каждый запрос обрабатывается отдельным потоком из пула.

// Традиционный подход - блокирующий
@RestController
public class TraditionalController {
    
    @GetMapping("/users/{id}")
    public User getUser(@PathVariable String id) {
        // Поток блокируется здесь
        User user = userService.findById(id);        // ~50ms DB
        Profile profile = profileService.get(id);    // ~30ms HTTP
        Settings settings = settingsService.get(id); // ~20ms Cache
        
        return user.withProfile(profile).withSettings(settings);
        // Общее время: ~100ms, поток занят все это время
    }
}

Проблемы:

  • Один поток = один запрос
  • Поток простаивает во время I/O операций
  • Ограничение по количеству одновременных запросов (обычно 200-400)
  • Высокое потребление памяти (1MB на поток)

Event Loop модель (реактивная)

Принцип: Небольшое количество потоков обрабатывает множество запросов асинхронно.

// Реактивный подход - неблокирующий
@RestController
public class ReactiveController {
    
    @GetMapping("/users/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        // Параллельное выполнение без блокировки потоков
        Mono<User> userMono = userService.findById(id);
        Mono<Profile> profileMono = profileService.get(id);
        Mono<Settings> settingsMono = settingsService.get(id);
        
        return Mono.zip(userMono, profileMono, settingsMono)
            .map(tuple -> tuple.getT1()
                .withProfile(tuple.getT2())
                .withSettings(tuple.getT3())
            );
        // Общее время: ~50ms (самая медленная операция)
        // Потоки не блокируются
    }
}

Преимущества:

  • Множество запросов на небольшом количестве потоков
  • Потоки не блокируются на I/O
  • Высокая пропускная способность
  • Эффективное использование ресурсов

Архитектура Event Loop

// Упрощенная схема работы
public class EventLoopExample {
    
    // Event Loop обрабатывает события
    public void eventLoopCycle() {
        while (running) {
            // 1. Проверяем готовые I/O операции
            List<ReadyOperation> readyOps = selector.select();
            
            // 2. Обрабатываем каждую готовую операцию
            for (ReadyOperation op : readyOps) {
                if (op.isReadReady()) {
                    processRead(op);
                } else if (op.isWriteReady()) {
                    processWrite(op);
                }
            }
            
            // 3. Выполняем задачи из очереди
            processTaskQueue();
        }
    }
    
    // Неблокирующая обработка запроса
    public void handleRequest(HttpRequest request) {
        // Не блокируем поток, а регистрируем callback
        databaseClient.findUser(request.getUserId())
            .whenComplete((user, error) -> {
                if (error != null) {
                    sendErrorResponse(request, error);
                } else {
                    sendSuccessResponse(request, user);
                }
            });
        // Поток сразу возвращается к обработке других событий
    }
}

Управление потоками: subscribeOn и publishOn

subscribeOn - Поток для подписки

Назначение: Определяет, на каком планировщике будет выполняться подписка и источник данных.

@Service
public class SubscribeOnService {
    
    // subscribeOn влияет на источник данных
    public Mono<Data> fetchData() {
        return Mono.fromCallable(() -> {
            // Этот код выполнится на boundedElastic
            logger.info("Fetching on thread: {}", Thread.currentThread().getName());
            return expensiveDatabaseCall();
        })
        .subscribeOn(Schedulers.boundedElastic()) // Источник на elastic потоке
        .map(data -> {
            // Это тоже выполнится на boundedElastic
            logger.info("Mapping on thread: {}", Thread.currentThread().getName());
            return transformData(data);
        });
    }
    
    // Только первый subscribeOn имеет эффект
    public Mono<String> multipleSubscribeOn() {
        return Mono.just("data")
            .subscribeOn(Schedulers.parallel())      // Этот применится
            .subscribeOn(Schedulers.boundedElastic()) // Этот игнорируется
            .map(String::toUpperCase);
    }
}

publishOn - Поток для downstream операций

Назначение: Переключает выполнение downstream операций на указанный планировщик.

@Service
public class PublishOnService {
    
    // publishOn влияет на последующие операции
    public Flux<ProcessedData> processData() {
        return dataSource.getData()
            .map(data -> {
                // Выполняется на потоке источника данных
                logger.info("Initial processing: {}", Thread.currentThread().getName());
                return data;
            })
            .publishOn(Schedulers.parallel()) // Переключаемся на parallel
            .map(data -> {
                // Выполняется на parallel потоке
                logger.info("CPU processing: {}", Thread.currentThread().getName());
                return cpuIntensiveProcessing(data);
            })
            .publishOn(Schedulers.boundedElastic()) // Переключаемся на elastic
            .flatMap(data -> {
                // Выполняется на boundedElastic потоке
                logger.info("I/O operation: {}", Thread.currentThread().getName());
                return saveToDatabase(data);
            });
    }
    
    // Множественные publishOn работают последовательно
    public Mono<String> multiplePublishOn() {
        return Mono.just("data")
            .publishOn(Schedulers.parallel())
            .map(s -> s + "-parallel")           // На parallel потоке
            .publishOn(Schedulers.boundedElastic())
            .map(s -> s + "-elastic");           // На elastic потоке
    }
}

Комбинирование subscribeOn и publishOn

@Service
public class CombinedSchedulingService {
    
    public Flux<Result> complexProcessing() {
        return Mono.fromCallable(() -> loadFromFile()) // I/O операция
            .subscribeOn(Schedulers.boundedElastic())   // Источник на elastic
            .flatMapMany(data -> Flux.fromIterable(data.getItems()))
            .publishOn(Schedulers.parallel())           // CPU обработка на parallel
            .map(this::cpuIntensiveTransform)
            .buffer(100)
            .publishOn(Schedulers.boundedElastic())     // Обратно на elastic для DB
            .flatMap(batch -> saveBatch(batch));
    }
    
    // Оптимизация для разных типов операций
    public Mono<EnrichedUser> enrichUser(String userId) {
        return userRepository.findById(userId)
            .subscribeOn(Schedulers.boundedElastic())   // DB операция
            .publishOn(Schedulers.parallel())           // CPU обработка
            .map(user -> calculateScore(user))
            .publishOn(Schedulers.boundedElastic())     // HTTP вызов
            .flatMap(user -> externalService.enrich(user));
    }
}

Типы планировщиков (Schedulers)

Schedulers.parallel() - CPU-intensive задачи

Назначение: Оптимизирован для вычислительных задач, использует все доступные CPU ядра.

@Service
public class ParallelSchedulerService {
    
    // Подходит для математических вычислений
    public Flux<CalculationResult> performCalculations() {
        return Flux.range(1, 1000000)
            .publishOn(Schedulers.parallel())
            .map(this::complexMathOperation)  // CPU-intensive
            .filter(result -> result.isValid())
            .groupBy(result -> result.getCategory())
            .flatMap(group -> group.collectList());
    }
    
    // Параллельная обработка данных
    public Flux<ProcessedImage> processImages(List<Image> images) {
        return Flux.fromIterable(images)
            .publishOn(Schedulers.parallel())
            .map(this::resizeImage)       // CPU-intensive
            .map(this::applyFilters)      // CPU-intensive
            .map(this::compress);         // CPU-intensive
    }
}

Характеристики:

  • Количество потоков = количество CPU ядер
  • Неблокирующие операции
  • Оптимален для вычислений

Schedulers.boundedElastic() - I/O операции

Назначение: Для блокирующих I/O операций, создает потоки по требованию с ограничением.

@Service
public class BoundedElasticService {
    
    // Подходит для файловых операций
    public Mono<String> readFile(String filename) {
        return Mono.fromCallable(() -> {
            // Блокирующая операция
            return Files.readString(Paths.get(filename));
        })
        .subscribeOn(Schedulers.boundedElastic());
    }
    
    // HTTP вызовы к внешним сервисам
    public Flux<ApiResponse> callExternalApis(List<String> urls) {
        return Flux.fromIterable(urls)
            .flatMap(url -> 
                Mono.fromCallable(() -> restTemplate.getForObject(url, ApiResponse.class))
                    .subscribeOn(Schedulers.boundedElastic())
            , 10); // Максимум 10 одновременных вызовов
    }
    
    // Работа с legacy блокирующими библиотеками
    public Mono<LegacyResult> callLegacyService(Request request) {
        return Mono.fromCallable(() -> {
            // Блокирующий legacy код
            return legacyService.process(request);
        })
        .subscribeOn(Schedulers.boundedElastic())
        .timeout(Duration.ofSeconds(30));
    }
}

Характеристики:

  • Создает потоки по требованию (до 10x CPU cores)
  • TTL потоков = 60 секунд
  • Подходит для блокирующих операций

Schedulers.single() - Последовательная обработка

Назначение: Один поток для всех операций, гарантирует последовательность.

@Service
public class SingleSchedulerService {
    
    // Обработка событий в строгом порядке
    public Flux<EventResult> processEvents() {
        return eventSource.getEvents()
            .publishOn(Schedulers.single())  // Один поток = строгий порядок
            .map(this::processEvent)
            .doOnNext(this::logEvent);
    }
    
    // Обновление shared state
    private int counter = 0;
    
    public Mono<Integer> incrementCounter() {
        return Mono.fromCallable(() -> ++counter)
            .subscribeOn(Schedulers.single()); // Thread-safe без синхронизации
    }
}

Schedulers.immediate() - Текущий поток

Назначение: Выполняет операции на текущем потоке.

@Service
public class ImmediateSchedulerService {
    
    // Легкие операции без переключения потоков
    public Mono<String> lightProcessing(String input) {
        return Mono.just(input)
            .publishOn(Schedulers.immediate())  // Остается на текущем потоке
            .map(String::toUpperCase)
            .map(s -> s + "-processed");
    }
}

Проблемы Thread Leaks

Что такое Thread Leak

Thread Leak - ситуация, когда потоки создаются, но не освобождаются, что приводит к исчерпанию ресурсов.

// ❌ ОПАСНО: Thread leak через неправильное использование
@Service
public class LeakyService {
    
    // Создание кастомного планировщика без cleanup
    public Flux<Data> badSchedulerUsage() {
        Scheduler customScheduler = Schedulers.newParallel("custom", 10);
        
        return dataSource.getData()
            .publishOn(customScheduler)  // Scheduler никогда не освобождается!
            .map(this::processData);
    }
    
    // Infinite streams без proper disposal
    public void badInfiniteStream() {
        Flux.interval(Duration.ofSeconds(1))
            .publishOn(Schedulers.boundedElastic())
            .subscribe(tick -> {
                // Поток будет работать вечно
                processPeriodicTask(tick);
            });
        // Подписка никогда не отменяется!
    }
    
    // Блокирующие операции на неподходящих планировщиках
    public Mono<String> badBlockingCall() {
        return Mono.fromCallable(() -> {
            Thread.sleep(10000); // Блокируем parallel поток!
            return "result";
        })
        .subscribeOn(Schedulers.parallel()); // Неправильный планировщик
    }
}

Правильное управление ресурсами

@Service
public class ProperResourceManagement {
    
    private final Scheduler customScheduler;
    
    public ProperResourceManagement() {
        // Создаем кастомный планировщик
        this.customScheduler = Schedulers.newBoundedElastic(
            10, 100, "custom-scheduler");
    }
    
    @PreDestroy
    public void cleanup() {
        // ОБЯЗАТЕЛЬНО освобождаем планировщик
        customScheduler.dispose();
    }
    
    // Правильное использование с try-with-resources аналогом
    public Flux<Data> properSchedulerUsage() {
        return dataSource.getData()
            .publishOn(customScheduler)
            .map(this::processData)
            .doFinally(signalType -> {
                // Cleanup действия при завершении
                logger.info("Stream completed with signal: {}", signalType);
            });
    }
    
    // Правильная работа с infinite streams
    public Disposable properInfiniteStream() {
        return Flux.interval(Duration.ofSeconds(1))
            .publishOn(Schedulers.boundedElastic())
            .subscribe(
                tick -> processPeriodicTask(tick),
                error -> logger.error("Stream error", error)
            );
        // Возвращаем Disposable для возможности отмены
    }
    
    // Управление жизненным циклом подписок
    private final CompositeDisposable disposables = new CompositeDisposable();
    
    public void startPeriodicTasks() {
        Disposable task1 = Flux.interval(Duration.ofMinutes(1))
            .subscribe(this::hourlyTask);
        
        Disposable task2 = Flux.interval(Duration.ofMinutes(5))
            .subscribe(this::fiveMinuteTask);
        
        disposables.addAll(task1, task2);
    }
    
    @PreDestroy
    public void stopAllTasks() {
        disposables.dispose(); // Отменяем все подписки
    }
}

Мониторинг потоков

@Component
public class ThreadMonitor {
    
    @Scheduled(fixedRate = 60000) // Каждую минуту
    public void monitorThreads() {
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
        int threadCount = threadBean.getThreadCount();
        
        logger.info("Active threads: {}", threadCount);
        
        // Проверяем на утечки
        if (threadCount > 1000) {
            logger.warn("Potential thread leak detected! Thread count: {}", threadCount);
            
            // Дополнительная диагностика
            ThreadInfo[] threads = threadBean.getThreadInfo(threadBean.getAllThreadIds());
            Map<String, Long> threadGroups = Arrays.stream(threads)
                .filter(Objects::nonNull)
                .collect(Collectors.groupingBy(
                    thread -> thread.getThreadName().split("-")[0],
                    Collectors.counting()
                ));
            
            threadGroups.forEach((name, count) -> 
                logger.info("Thread group '{}': {} threads", name, count)
            );
        }
    }
    
    // Health check для планировщиков
    @EventListener
    public void checkSchedulerHealth() {
        // Проверяем состояние планировщиков
        if (Schedulers.parallel().isDisposed()) {
            logger.error("Parallel scheduler is disposed!");
        }
        
        if (Schedulers.boundedElastic().isDisposed()) {
            logger.error("BoundedElastic scheduler is disposed!");
        }
    }
}

Механизмы управления нагрузкой

Rate Limiting - Ограничение частоты

Назначение: Контроль количества операций в единицу времени.

@Service
public class RateLimitingService {
    
    // Простое rate limiting
    public Flux<ApiResponse> rateLimitedCalls() {
        return requestSource.getRequests()
            .delayElements(Duration.ofMillis(100)) // Максимум 10 запросов/сек
            .flatMap(this::makeApiCall);
    }
    
    // Sliding window rate limiting
    public Flux<ProcessedData> slidingWindowRateLimit() {
        return dataSource.getData()
            .window(Duration.ofSeconds(1))        // Окно в 1 секунду
            .flatMap(window -> 
                window.take(100)                  // Максимум 100 элементов в окне
                      .flatMap(this::processData)
            );
    }
    
    // Adaptive rate limiting на основе нагрузки
    private volatile int currentRate = 10;
    
    public Flux<Result> adaptiveRateLimit() {
        return taskSource.getTasks()
            .delayElements(Duration.ofMillis(1000 / currentRate))
            .flatMap(task -> 
                processTask(task)
                    .doOnSuccess(result -> {
                        if (result.getProcessingTime() < 100) {
                            currentRate = Math.min(currentRate + 1, 100); // Увеличиваем rate
                        }
                    })
                    .doOnError(error -> {
                        currentRate = Math.max(currentRate - 5, 1);     // Уменьшаем rate
                    })
            );
    }
}

Window операторы - Группировка по времени

Назначение: Группировка элементов потока в окна для batch обработки.

@Service
public class WindowOperatorsService {
    
    // Временные окна для batch обработки
    public Flux<BatchResult> timeBasedWindows() {
        return eventStream.getEvents()
            .window(Duration.ofSeconds(5))       // Окна по 5 секунд
            .flatMap(window -> 
                window.collectList()             // Собираем элементы окна
                      .flatMap(this::processBatch)
            );
    }
    
    // Окна по количеству элементов
    public Flux<ProcessedBatch> countBasedWindows() {
        return dataStream.getData()
            .window(100)                         // Окна по 100 элементов
            .flatMap(window -> 
                window.buffer()                  // Преобразуем в List
                      .flatMap(this::optimizedBatchProcess)
            );
    }
    
    // Скользящие окна с перекрытием
    public Flux<Statistics> slidingWindows() {
        return metricStream.getMetrics()
            .window(Duration.ofMinutes(5), Duration.ofMinutes(1)) // 5-мин окна каждую минуту
            .flatMap(window -> 
                window.collectList()
                      .map(this::calculateStatistics)
            );
    }
    
    // Сессионные окна (по периодам неактивности)
    public Flux<SessionData> sessionWindows() {
        return userActivityStream.getActivities()
            .windowTimeout(100, Duration.ofMinutes(30)) // До 100 событий или 30 мин неактивности
            .flatMap(window -> 
                window.collectList()
                      .map(this::aggregateSession)
            );
    }
}

Sample операторы - Прореживание потока

Назначение: Уменьшение частоты элементов в потоке для снижения нагрузки.

@Service
public class SamplingService {
    
    // Временная выборка
    public Flux<Measurement> timeSampling() {
        return sensorStream.getMeasurements()
            .sample(Duration.ofSeconds(1))       // Берем одно измерение в секунду
            .filter(measurement -> measurement.isValid());
    }
    
    // Выборка по другому потоку (trigger-based)
    public Flux<Data> triggerBasedSampling() {
        Flux<Data> dataStream = dataSource.getData();
        Flux<Object> triggerStream = Flux.interval(Duration.ofSeconds(5));
        
        return dataStream.sample(triggerStream);  // Выборка каждые 5 секунд
    }
    
    // Throttling - ограничение частоты с задержкой
    public Flux<Event> throttleEvents() {
        return eventStream.getEvents()
            .throttleFirst(Duration.ofSeconds(2)) // Первый элемент, затем игнор на 2 сек
            .doOnNext(event -> logger.info("Processed event: {}", event.getId()));
    }
    
    // Debounce - убираем "дрожание"
    public Flux<SearchResult> debounceSearch() {
        return userInputService.getSearchQueries()
            .debounce(Duration.ofMillis(300))     // Ждем 300мс паузы
            .distinctUntilChanged()               // Игнорируем дубликаты
            .flatMap(query -> searchService.search(query));
    }
}

Buffer операторы - Накопление элементов

Назначение: Группировка элементов для более эффективной обработки.

@Service
public class BufferingService {
    
    // Буферизация по размеру
    public Flux<BatchWriteResult> sizeBasedBuffering() {
        return writeRequestStream.getRequests()
            .buffer(1000)                        // Группы по 1000 записей
            .flatMap(batch -> 
                databaseService.batchWrite(batch)
                    .retry(3)
                    .timeout(Duration.ofSeconds(30))
            );
    }
    
    // Буферизация по времени
    public Flux<Notification> timeBasedBuffering() {
        return notificationStream.getNotifications()
            .buffer(Duration.ofMinutes(5))       // Группы за 5 минут
            .filter(notifications -> !notifications.isEmpty())
            .flatMap(batch -> 
                notificationService.sendBulkNotifications(batch)
            );
    }
    
    // Буферизация с overflow стратегией
    public Flux<ProcessedData> bufferWithOverflow() {
        return highVolumeStream.getData()
            .onBackpressureBuffer(
                10000,                           // Размер буфера
                data -> logger.warn("Dropping data: {}", data.getId()), // При переполнении
                BufferOverflowStrategy.DROP_OLDEST
            )
            .buffer(100, Duration.ofSeconds(1))  // Группы по 100 или каждую секунду
            .flatMap(this::processBatch);
    }
    
    // Адаптивная буферизация
    public Flux<AdaptiveBatchResult> adaptiveBuffering() {
        return dataStream.getData()
            .bufferTimeout(
                getBatchSize(),                  // Динамический размер
                Duration.ofSeconds(getTimeout()) // Динамический таймаут
            )
            .flatMap(batch -> {
                long start = System.currentTimeMillis();
                return processBatch(batch)
                    .doOnSuccess(result -> {
                        long duration = System.currentTimeMillis() - start;
                        adjustBatchParameters(duration, batch.size());
                    });
            });
    }
    
    private int getBatchSize() {
        // Логика адаптации размера batch
        return Math.max(10, Math.min(currentSystemLoad() * 100, 1000));
    }
}

Комплексная стратегия управления нагрузкой

@Service
public class LoadManagementService {
    
    private final AtomicInteger activeConnections = new AtomicInteger(0);
    private final CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("external-service");
    
    public Flux<ProcessedRequest> comprehensiveLoadManagement() {
        return requestStream.getRequests()
            // 1. Rate limiting
            .delayElements(Duration.ofMillis(10)) // Максимум 100 req/sec
            
            // 2. Backpressure handling
            .onBackpressureBuffer(
                1000,
                request -> logger.warn("Dropping request: {}", request.getId()),
                BufferOverflowStrategy.DROP_LATEST
            )
            
            // 3. Batching для эффективности
            .buffer(50, Duration.ofSeconds(1))
            
            // 4. Circuit breaker protection
            .flatMap(batch -> 
                Mono.fromCallable(() -> circuitBreaker.executeSupplier(() -> 
                    processBatch(batch)
                ))
                .subscribeOn(Schedulers.boundedElastic())
            )
            
            // 5. Retry с exponential backoff
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
            
            // 6. Timeout protection
            .timeout(Duration.ofSeconds(30))
            
            // 7. Concurrency limiting
            .flatMap(result -> 
                processResult(result), 
                10  // Максимум 10 одновременных обработок
            )
            
            // 8. Monitoring
            .doOnSubscribe(sub -> activeConnections.incrementAndGet())
            .doFinally(signal -> activeConnections.decrementAndGet())
            .doOnNext(result -> updateMetrics(result));
    }
    
    // Health check и adaptive parameters
    @Scheduled(fixedRate = 30000)
    public void adjustLoadParameters() {
        int connections = activeConnections.get();
        double systemLoad = getSystemLoad();
        
        if (systemLoad > 0.8 || connections > 1000) {
            // Снижаем нагрузку
            adjustRateLimit(0.5);
            adjustBatchSize(0.8);
        } else if (systemLoad < 0.3 && connections < 100) {
            // Увеличиваем пропускную способность
            adjustRateLimit(1.2);
            adjustBatchSize(1.1);
        }
    }
}

Интеграция реактивных технологий

Kafka + Reactive (reactor-kafka)

Что такое reactor-kafka

Reactor-kafka - это реактивная библиотека для работы с Apache Kafka, построенная на Project Reactor. Предоставляет неблокирующий API для производства и потребления сообщений с поддержкой backpressure.

Преимущества перед обычным Kafka Client:

  • Неблокирующие операции
  • Встроенная поддержка backpressure
  • Интеграция с реактивными цепочками
  • Эффективное управление ресурсами

Настройка и конфигурация

@Configuration
public class ReactiveKafkaConfig {
    
    @Bean
    public ReactiveKafkaProducerTemplate<String, Object> reactiveKafkaProducerTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        return new ReactiveKafkaProducerTemplate<>(
            SenderOptions.create(props)
        );
    }
    
    @Bean
    public ReceiverOptions<String, Object> kafkaReceiverOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        return ReceiverOptions.create(props);
    }
}

Reactive Producer - отправка сообщений

@Service
public class ReactiveKafkaProducerService {
    
    private final ReactiveKafkaProducerTemplate<String, Object> template;
    private final KafkaSender<String, Object> sender;
    
    // Простая отправка сообщения
    public Mono<SenderResult<Void>> sendMessage(String topic, String key, Object value) {
        return template.send(topic, key, value)
            .doOnSuccess(result -> logger.info("Message sent to topic {}: {}", topic, key))
            .doOnError(error -> logger.error("Failed to send message", error));
    }
    
    // Batch отправка для производительности
    public Flux<SenderResult<String>> sendBatch(Flux<UserEvent> events) {
        return events
            .map(event -> SenderRecord.create(
                new ProducerRecord<>("user-events", event.getUserId(), event),
                event.getId() // correlation metadata
            ))
            .buffer(100) // Группируем по 100 сообщений
            .flatMap(batch -> sender.send(Flux.fromIterable(batch)))
            .doOnNext(result -> {
                if (result.exception() != null) {
                    logger.error("Failed to send batch message: {}", 
                        result.correlationMetadata(), result.exception());
                }
            });
    }
    
    // Transactional отправка
    public Mono<Void> sendTransactional(List<OrderEvent> events) {
        return sender.sendTransactionally(
            Flux.fromIterable(events)
                .map(event -> SenderRecord.create(
                    new ProducerRecord<>("orders", event.getOrderId(), event),
                    null
                ))
        )
        .then()
        .doOnSuccess(v -> logger.info("Transactional batch sent successfully"))
        .doOnError(error -> logger.error("Transactional send failed", error));
    }
}

Reactive Consumer - потребление сообщений

@Service
public class ReactiveKafkaConsumerService {
    
    private final ReceiverOptions<String, Object> receiverOptions;
    
    // Простое потребление сообщений
    public Flux<ProcessedMessage> consumeMessages(String topic) {
        return KafkaReceiver.create(
            receiverOptions.subscription(Collections.singleton(topic))
        )
        .receive()
        .flatMap(record -> 
            processMessage(record.value())
                .doOnSuccess(result -> record.receiverOffset().acknowledge())
                .doOnError(error -> {
                    logger.error("Failed to process message: {}", record.key(), error);
                    // Можно реализовать DLQ или retry логику
                })
                .onErrorResume(error -> Mono.empty()) // Пропускаем неудачные сообщения
        );
    }
    
    // Потребление с backpressure и manual commit
    public Flux<BatchResult> consumeWithBackpressure(String topic) {
        return KafkaReceiver.create(
            receiverOptions
                .subscription(Collections.singleton(topic))
                .commitInterval(Duration.ofSeconds(5)) // Commit каждые 5 секунд
                .commitBatchSize(100) // Или каждые 100 сообщений
        )
        .receive()
        .buffer(50) // Группируем по 50 сообщений для batch обработки
        .flatMap(records -> {
            List<Object> messages = records.stream()
                .map(ConsumerRecord::value)
                .collect(Collectors.toList());
            
            return processBatch(messages)
                .doOnSuccess(result -> {
                    // Acknowledge всех сообщений в batch
                    records.forEach(record -> 
                        record.receiverOffset().acknowledge()
                    );
                })
                .onErrorResume(error -> {
                    logger.error("Batch processing failed", error);
                    return Mono.just(BatchResult.failed());
                });
        })
        .onBackpressureBuffer(1000); // Буфер для управления нагрузкой
    }
    
    // Потребление с retry и dead letter queue
    public Flux<ProcessedMessage> consumeWithRetry(String topic) {
        return KafkaReceiver.create(
            receiverOptions.subscription(Collections.singleton(topic))
        )
        .receive()
        .flatMap(record -> 
            processMessage(record.value())
                .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
                    .filter(error -> !(error instanceof ValidationException))
                )
                .doOnSuccess(result -> record.receiverOffset().acknowledge())
                .onErrorResume(error -> {
                    // Отправляем в DLQ после всех retry
                    return sendToDLQ(record, error)
                        .doOnSuccess(v -> record.receiverOffset().acknowledge())
                        .then(Mono.empty());
                })
        );
    }
}

Паттерны интеграции с Kafka

@Service
public class KafkaIntegrationPatterns {
    
    // Event Sourcing паттерн
    public Flux<AggregateState> eventSourcingConsumer() {
        return KafkaReceiver.create(receiverOptions.subscription(Set.of("domain-events")))
            .receive()
            .scan(AggregateState.empty(), (state, record) -> {
                DomainEvent event = (DomainEvent) record.value();
                return state.apply(event); // Применяем событие к состоянию
            })
            .doOnNext(state -> record.receiverOffset().acknowledge());
    }
    
    // CQRS - разделение команд и запросов
    public Mono<Void> cqrsCommandHandler(Command command) {
        return validateCommand(command)
            .flatMap(this::processCommand)
            .flatMap(events -> 
                // Публикуем события для read models
                Flux.fromIterable(events)
                    .flatMap(event -> kafkaProducer.send("read-model-events", event))
                    .then()
            );
    }
    
    // Saga паттерн для распределенных транзакций
    public Flux<SagaStep> sagaOrchestrator() {
        return KafkaReceiver.create(receiverOptions.subscription(Set.of("saga-events")))
            .receive()
            .flatMap(record -> {
                SagaEvent event = (SagaEvent) record.value();
                return sagaManager.processEvent(event)
                    .flatMapMany(nextSteps -> 
                        Flux.fromIterable(nextSteps)
                            .flatMap(step -> kafkaProducer.send(step.getTargetTopic(), step))
                    );
            });
    }
}

Redis PubSub

Реактивный Redis клиент

Spring Data Redis Reactive предоставляет реактивный API для работы с Redis, включая pub/sub операции.

@Configuration
public class ReactiveRedisConfig {
    
    @Bean
    public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate() {
        LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(
            new RedisStandaloneConfiguration("localhost", 6379)
        );
        
        RedisSerializationContext<String, Object> context = RedisSerializationContext
            .<String, Object>newSerializationContext()
            .key(StringRedisSerializer.UTF_8)
            .value(new GenericJackson2JsonRedisSerializer())
            .build();
            
        return new ReactiveRedisTemplate<>(connectionFactory, context);
    }
    
    @Bean
    public ReactiveRedisMessageListenerContainer redisMessageListenerContainer(
            ReactiveRedisConnectionFactory connectionFactory) {
        return new ReactiveRedisMessageListenerContainer(connectionFactory);
    }
}

Publisher - отправка сообщений

@Service
public class ReactiveRedisPublisher {
    
    private final ReactiveRedisTemplate<String, Object> redisTemplate;
    
    // Простая публикация сообщения
    public Mono<Long> publishMessage(String channel, Object message) {
        return redisTemplate.convertAndSend(channel, message)
            .doOnSuccess(subscribers -> 
                logger.info("Message published to {} subscribers on channel {}", 
                    subscribers, channel)
            )
            .doOnError(error -> 
                logger.error("Failed to publish message to channel {}", channel, error)
            );
    }
    
    // Batch публикация для производительности
    public Flux<Long> publishBatch(String channel, Flux<Notification> notifications) {
        return notifications
            .buffer(100, Duration.ofSeconds(5)) // Группируем сообщения
            .flatMap(batch -> 
                Flux.fromIterable(batch)
                    .flatMap(notification -> 
                        redisTemplate.convertAndSend(channel, notification)
                    )
                    .collectList()
                    .map(results -> results.stream().mapToLong(Long::longValue).sum())
            );
    }
    
    // Условная публикация с проверками
    public Mono<Void> publishUserEvent(UserEvent event) {
        return checkUserSubscription(event.getUserId())
            .filter(isSubscribed -> isSubscribed)
            .flatMap(isSubscribed -> 
                redisTemplate.convertAndSend("user-events:" + event.getUserId(), event)
            )
            .then()
            .doOnSuccess(v -> updateEventMetrics(event))
            .onErrorResume(error -> {
                logger.warn("Failed to publish user event: {}", event.getId(), error);
                return Mono.empty();
            });
    }
}

Subscriber - получение сообщений

@Service
public class ReactiveRedisSubscriber {
    
    private final ReactiveRedisMessageListenerContainer listenerContainer;
    private final ReactiveRedisTemplate<String, Object> redisTemplate;
    
    // Подписка на канал
    public Flux<Message<String, Object>> subscribeToChannel(String channelPattern) {
        return listenerContainer
            .receive(ChannelTopic.of(channelPattern))
            .doOnSubscribe(sub -> logger.info("Subscribed to channel: {}", channelPattern))
            .doOnNext(message -> 
                logger.debug("Received message on channel {}: {}", 
                    message.getChannel(), message.getMessage())
            )
            .doOnError(error -> 
                logger.error("Error in subscription to {}", channelPattern, error)
            );
    }
    
    // Подписка с обработкой и backpressure
    public Flux<ProcessedMessage> subscribeWithProcessing(String channel) {
        return listenerContainer
            .receive(ChannelTopic.of(channel))
            .cast(Message.class)
            .flatMap(message -> 
                processMessage(message.getMessage())
                    .onErrorResume(error -> {
                        logger.error("Failed to process message: {}", 
                            message.getMessage(), error);
                        return Mono.empty();
                    })
            )
            .onBackpressureBuffer(1000)
            .doOnNext(result -> updateProcessingMetrics());
    }
    
    // Pattern-based подписка (wildcard)
    public Flux<RoutedMessage> subscribeToPattern(String pattern) {
        return listenerContainer
            .receive(PatternTopic.of(pattern))
            .map(message -> new RoutedMessage(
                message.getChannel(),
                message.getMessage(),
                extractRoutingKey(message.getChannel())
            ))
            .groupBy(RoutedMessage::getRoutingKey)
            .flatMap(group -> 
                group.flatMap(routedMessage -> 
                    routeMessage(routedMessage)
                )
            );
    }
}

Реактивные паттерны с Redis

@Service
public class RedisReactivePatterns {
    
    // Cache-aside паттерн
    public Mono<User> getCachedUser(String userId) {
        return redisTemplate.opsForValue().get("user:" + userId)
            .cast(User.class)
            .switchIfEmpty(
                userRepository.findById(userId)
                    .flatMap(user -> 
                        redisTemplate.opsForValue()
                            .set("user:" + userId, user, Duration.ofHours(1))
                            .thenReturn(user)
                    )
            );
    }
    
    // Distributed locking
    public <T> Mono<T> withDistributedLock(String lockKey, Mono<T> operation) {
        String lockValue = UUID.randomUUID().toString();
        
        return redisTemplate.opsForValue()
            .setIfAbsent(lockKey, lockValue, Duration.ofMinutes(5))
            .flatMap(acquired -> {
                if (acquired) {
                    return operation
                        .doFinally(signal -> 
                            // Освобождаем lock
                            redisTemplate.delete(lockKey).subscribe()
                        );
                } else {
                    return Mono.error(new LockAcquisitionException(lockKey));
                }
            });
    }
    
    // Rate limiting с Redis
    public Mono<Boolean> checkRateLimit(String userId, int maxRequests, Duration window) {
        String key = "rate_limit:" + userId;
        long windowSeconds = window.getSeconds();
        
        return redisTemplate.execute(connection -> {
            return connection.eval(
                // Lua script для атомарной операции
                "local current = redis.call('GET', KEYS[1]) " +
                "if current == false then " +
                "  redis.call('SETEX', KEYS[1], ARGV[2], 1) " +
                "  return 1 " +
                "else " +
                "  local count = tonumber(current) " +
                "  if count < tonumber(ARGV[1]) then " +
                "    redis.call('INCR', KEYS[1]) " +
                "    return count + 1 " +
                "  else " +
                "    return -1 " +
                "  end " +
                "end",
                ReturnType.INTEGER,
                Collections.singletonList(key),
                maxRequests, windowSeconds
            );
        })
        .map(result -> (Long) result > 0);
    }
}

WebSocket + Backpressure

Реактивные WebSocket с Spring WebFlux

WebSocket в Spring WebFlux предоставляет реактивный API для real-time коммуникации с встроенной поддержкой backpressure.

@Configuration
@EnableWebSocket
public class ReactiveWebSocketConfig implements WebSocketConfigurer {
    
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new ReactiveWebSocketHandler(), "/ws")
                .setAllowedOrigins("*");
    }
}

@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
    
    private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
    private final Sinks.Many<ChatMessage> messageSink = Sinks.many().multicast().onBackpressureBuffer();
    
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String sessionId = session.getId();
        sessions.put(sessionId, session);
        
        // Входящие сообщения от клиента
        Mono<Void> input = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .flatMap(payload -> processIncomingMessage(sessionId, payload))
            .doOnError(error -> logger.error("Error processing message", error))
            .onErrorResume(error -> Mono.empty())
            .then();
        
        // Исходящие сообщения к клиенту с backpressure
        Mono<Void> output = messageSink.asFlux()
            .filter(message -> shouldSendToSession(sessionId, message))
            .onBackpressureBuffer(1000) // Буфер для каждой сессии
            .map(message -> session.textMessage(message.toJson()))
            .as(session::send);
        
        return Mono.zip(input, output)
            .doFinally(signal -> {
                sessions.remove(sessionId);
                logger.info("Session {} disconnected", sessionId);
            })
            .then();
    }
    
    // Обработка входящих сообщений
    private Mono<Void> processIncomingMessage(String sessionId, String payload) {
        return Mono.fromCallable(() -> parseMessage(payload))
            .flatMap(message -> {
                message.setSenderId(sessionId);
                message.setTimestamp(Instant.now());
                
                // Валидация и сохранение
                return validateMessage(message)
                    .flatMap(this::saveMessage)
                    .doOnSuccess(savedMessage -> {
                        // Распространяем сообщение всем подписчикам
                        messageSink.tryEmitNext(savedMessage);
                    });
            })
            .then();
    }
}

Управление backpressure в WebSocket

@Service
public class WebSocketBackpressureService {
    
    // Стратегии обработки backpressure
    public Flux<ServerSentEvent<GameState>> gameStateStream(String gameId) {
        return gameStateService.getStateUpdates(gameId)
            // Стратегия 1: Sample - берем только последнее состояние
            .sample(Duration.ofMillis(100)) // Максимум 10 обновлений в секунду
            
            // Стратегия 2: ConflateMap - объединяем быстрые обновления
            .onBackpressureLatest() // Оставляем только последнее
            
            .map(gameState -> ServerSentEvent.<GameState>builder()
                .id(gameState.getId())
                .event("game-update")
                .data(gameState)
                .build())
            
            .doOnNext(event -> updateClientMetrics())
            .doOnError(error -> logger.error("Game state stream error", error));
    }
    
    // Adaptive backpressure на основе client capability
    public Flux<MarketData> adaptiveMarketDataStream(String clientId) {
        ClientCapability capability = getClientCapability(clientId);
        
        return marketDataService.getPriceUpdates()
            .onBackpressureBuffer(
                capability.getBufferSize(),
                data -> logger.warn("Dropping market data for slow client {}", clientId),
                getOverflowStrategy(capability)
            )
            .sample(Duration.ofMillis(capability.getSampleRate()))
            .doOnNext(data -> trackClientPerformance(clientId, data));
    }
    
    // WebSocket с different QoS levels
    public Mono<Void> handleWebSocketWithQoS(WebSocketSession session) {
        String qosLevel = extractQoSLevel(session);
        
        Flux<Message> messageStream = getMessageStream()
            .transform(flux -> applyQoSStrategy(flux, qosLevel));
        
        return session.send(
            messageStream
                .map(message -> session.textMessage(message.toJson()))
                .onErrorResume(error -> {
                    logger.error("Send error for session {}", session.getId(), error);
                    return Flux.empty();
                })
        );
    }
    
    private Flux<Message> applyQoSStrategy(Flux<Message> flux, String qosLevel) {
        return switch (qosLevel) {
            case "HIGH" -> flux.onBackpressureBuffer(10000);
            case "MEDIUM" -> flux.onBackpressureBuffer(1000)
                .sample(Duration.ofMillis(100));
            case "LOW" -> flux.onBackpressureLatest()
                .sample(Duration.ofMillis(500));
            default -> flux.onBackpressureDrop();
        };
    }
}

WebSocket connection pooling и scaling

@Service
public class WebSocketConnectionManager {
    
    private final Map<String, Set<WebSocketSession>> topicSubscriptions = new ConcurrentHashMap<>();
    private final LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
    
    // Управление подписками по топикам
    public void subscribeToTopic(WebSocketSession session, String topic) {
        topicSubscriptions.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet())
            .add(session);
        
        logger.info("Session {} subscribed to topic {}", session.getId(), topic);
    }
    
    // Broadcast сообщений с backpressure management
    public Mono<Void> broadcastToTopic(String topic, Object message) {
        Set<WebSocketSession> sessions = topicSubscriptions.get(topic);
        if (sessions == null || sessions.isEmpty()) {
            return Mono.empty();
        }
        
        return Flux.fromIterable(sessions)
            .filter(session -> session.isOpen())
            .flatMap(session -> 
                sendToSession(session, message)
                    .onErrorResume(error -> {
                        logger.warn("Failed to send to session {}", session.getId(), error);
                        removeInactiveSession(session, topic);
                        return Mono.empty();
                    })
            , 100) // Максимум 100 одновременных отправок
            .then();
    }
    
    private Mono<Void> sendToSession(WebSocketSession session, Object message) {
        return Mono.fromCallable(() -> session.textMessage(objectMapper.writeValueAsString(message)))
            .flatMap(webSocketMessage -> 
                Mono.fromCallable(() -> {
                    session.send(Mono.just(webSocketMessage)).subscribe();
                    return null;
                })
            )
            .timeout(Duration.ofSeconds(5))
            .subscribeOn(Schedulers.boundedElastic());
    }
    
    // Health monitoring для WebSocket connections
    @Scheduled(fixedRate = 30000)
    public void monitorConnections() {
        topicSubscriptions.forEach((topic, sessions) -> {
            int activeSessions = (int) sessions.stream()
                .filter(WebSocketSession::isOpen)
                .count();
            
            logger.info("Topic {}: {} active connections", topic, activeSessions);
            
            // Удаляем неактивные сессии
            sessions.removeIf(session -> !session.isOpen());
        });
    }
}

MongoDB Reactive Streams

Настройка Reactive MongoDB

@Configuration
@EnableReactiveMongoRepositories
public class ReactiveMongoConfig extends AbstractReactiveMongoConfiguration {
    
    @Override
    protected String getDatabaseName() {
        return "reactive-app";
    }
    
    @Override
    public MongoClient reactiveMongoClient() {
        ConnectionString connectionString = new ConnectionString(
            "mongodb://localhost:27017/reactive-app?maxPoolSize=20&minPoolSize=5"
        );
        
        return MongoClients.create(
            MongoClientSettings.builder()
                .applyConnectionString(connectionString)
                .applyToConnectionPoolSettings(builder ->
                    builder.maxSize(50)
                           .minSize(10)
                           .maxConnectionIdleTime(30, TimeUnit.SECONDS)
                           .maxWaitTime(10, TimeUnit.SECONDS)
                )
                .codecRegistry(fromProviders(
                    PojoCodecProvider.builder().automatic(true).build(),
                    new JacksonCodecProvider(objectMapper)
                ))
                .build()
        );
    }
}

Reactive CRUD операции

@Service
public class ReactiveMongoService {
    
    private final ReactiveMongoTemplate mongoTemplate;
    
    // Потоковое чтение больших коллекций
    public Flux<Document> streamLargeCollection(String collectionName) {
        Query query = new Query();
        query.limit(0); // Убираем лимит для потокового чтения
        
        return mongoTemplate.find(query, Document.class, collectionName)
            .buffer(1000) // Группируем для эффективности
            .flatMap(batch -> 
                processBatch(batch)
                    .onErrorResume(error -> {
                        logger.error("Batch processing failed", error);
                        return Flux.empty();
                    })
            )
            .onBackpressureBuffer(10000);
    }
    
    // Реактивные агрегации
    public Flux<AggregationResult> complexAggregation() {
        Aggregation aggregation = Aggregation.newAggregation(
            Aggregation.match(Criteria.where("status").is("active")),
            Aggregation.group("category")
                .count().as("count")
                .avg("price").as("avgPrice")
                .max("createdAt").as("lastUpdate"),
            Aggregation.sort(Sort.Direction.DESC, "count"),
            Aggregation.limit(100)
        );
        
        return mongoTemplate.aggregate(aggregation, "products", AggregationResult.class)
            .doOnNext(result -> logger.debug("Aggregation result: {}", result))
            .timeout(Duration.ofMinutes(5));
    }
    
    // Batch операции с backpressure
    public Flux<BulkWriteResult> batchInsert(Flux<Product> products) {
        return products
            .buffer(1000) // Группируем по 1000 документов
            .flatMap(batch -> {
                List<WriteModel<Product>> writeModels = batch.stream()
                    .map(product -> new InsertOneModel<>(product))
                    .collect(Collectors.toList());
                
                return mongoTemplate.getCollection("products")
                    .flatMap(collection -> 
                        Mono.fromPublisher(collection.bulkWrite(writeModels))
                    );
            })
            .onErrorResume(error -> {
                logger.error("Bulk insert failed", error);
                return Mono.empty();
            });
    }
}

Change Streams - реактивные обновления

@Service
public class MongoChangeStreamService {
    
    private final ReactiveMongoTemplate mongoTemplate;
    
    // Отслеживание изменений в коллекции
    public Flux<ChangeStreamEvent<Document>> watchCollection(String collectionName) {
        ChangeStreamOptions options = ChangeStreamOptions.builder()
            .filter(Aggregation.newAggregation(
                Aggregation.match(Criteria.where("operationType").in("insert", "update", "delete"))
            ))
            .fullDocument(FullDocument.UPDATE_LOOKUP)
            .build();
        
        return mongoTemplate.changeStream(collectionName, options, Document.class)
            .doOnNext(event -> 
                logger.info("Change detected: {} on {}", 
                    event.getOperationType(), event.getDocumentKey())
            )
            .doOnError(error -> 
                logger.error("Change stream error for collection {}", collectionName, error)
            )
            .retry(3)
            .onErrorResume(error -> {
                logger.error("Change stream failed permanently", error);
                return Flux.empty();
            });
    }
    
    // Real-time синхронизация данных
    public Flux<SyncEvent> synchronizeData() {
        return mongoTemplate.changeStream("users", ChangeStreamOptions.empty(), User.class)
            .flatMap(changeEvent -> 
                switch (changeEvent.getOperationType()) {
                    case INSERT -> handleUserInsert(changeEvent.getBody());
                    case UPDATE -> handleUserUpdate(changeEvent.getBody());
                    case DELETE -> handleUserDelete(changeEvent.getDocumentKey());
                    default -> Mono.empty();
                }
            )
            .onBackpressureBuffer(1000)
            .share(); // Делаем hot stream для multiple subscribers
    }
    
    // Distributed cache invalidation через change streams
    public Flux<CacheInvalidation> cacheInvalidationStream() {
        return mongoTemplate.changeStream("cache_invalidation", 
            ChangeStreamOptions.empty(), CacheInvalidation.class)
            .map(ChangeStreamEvent::getBody)
            .groupBy(CacheInvalidation::getCacheRegion)
            .flatMap(group -> 
                group.buffer(Duration.ofSeconds(1)) // Batch invalidations
                     .filter(batch -> !batch.isEmpty())
                     .flatMap(this::processCacheInvalidationBatch)
            );
    }
}

Reactive Transactions

@Service
public class ReactiveMongoTransactionService {
    
    private final ReactiveMongoTransactionManager transactionManager;
    private final ReactiveMongoTemplate mongoTemplate;
    
    // Декларативные транзакции
    @Transactional
    public Mono<OrderResult> processOrderTransactional(CreateOrderRequest request) {
        return createOrder(request)
            .flatMap(order -> updateInventory(order.getItems()))
            .flatMap(inventory -> processPayment(request.getPayment()))
            .flatMap(payment -> updateOrderStatus(request.getOrderId(), "COMPLETED"))
            .onErrorResume(error -> {
                logger.error("Order processing failed, rolling back", error);
                return Mono.error(error);
            });
    }
    
    // Программные транзакции
    public Mono<TransferResult> transferFunds(String fromAccount, String toAccount, BigDecimal amount) {
        TransactionalOperator transactionalOperator = 
            TransactionalOperator.create(transactionManager);
        
        return mongoTemplate.findById(fromAccount, Account.class)
            .flatMap(from -> {
                if (from.getBalance().compareTo(amount) < 0) {
                    return Mono.error(new InsufficientFundsException());
                }
                return mongoTemplate.findById(toAccount, Account.class)
                    .flatMap(to -> {
                        from.setBalance(from.getBalance().subtract(amount));
                        to.setBalance(to.getBalance().add(amount));
                        
                        return mongoTemplate.save(from)
                            .then(mongoTemplate.save(to))
                            .map(savedTo -> new TransferResult(fromAccount, toAccount, amount));
                    });
            })
            .as(transactionalOperator::transactional);
    }
}

MongoDB Reactive Patterns

@Service
public class MongoReactivePatterns {
    
    // Event Sourcing с MongoDB
    public Flux<AggregateSnapshot> eventSourcingReplay(String aggregateId) {
        return mongoTemplate.find(
            Query.query(Criteria.where("aggregateId").is(aggregateId))
                .with(Sort.by("timestamp")),
            DomainEvent.class
        )
        .scan(AggregateSnapshot.empty(), (snapshot, event) -> 
            snapshot.apply(event)
        )
        .skip(1); // Пропускаем начальное пустое состояние
    }
    
    // CQRS Read Model projections
    public Flux<ReadModelUpdate> projectToReadModel() {
        return mongoTemplate.changeStream("domain_events", 
            ChangeStreamOptions.empty(), DomainEvent.class)
            .map(ChangeStreamEvent::getBody)
            .flatMap(event -> 
                switch (event.getEventType()) {
                    case "UserCreated" -> updateUserReadModel(event);
                    case "OrderPlaced" -> updateOrderReadModel(event);
                    case "PaymentProcessed" -> updatePaymentReadModel(event);
                    default -> Mono.empty();
                }
            )
            .onErrorContinue((error, event) -> 
                logger.error("Failed to project event: {}", event, error)
            );
    }
    
    // Optimistic locking с версионированием
    public Mono<Document> optimisticUpdate(String id, Function<Document, Document> updateFunction) {
        return mongoTemplate.findById(id, Document.class)
            .flatMap(document -> {
                long currentVersion = document.getLong("version");
                Document updated = updateFunction.apply(document);
                updated.put("version", currentVersion + 1);
                
                Query query = Query.query(
                    Criteria.where("_id").is(id)
                           .and("version").is(currentVersion)
                );
                
                return mongoTemplate.findAndReplace(query, updated)
                    .switchIfEmpty(Mono.error(new OptimisticLockingException(id)));
            })
            .retry(3);
    }
}

Интеграционные паттерны

Orchestration Service - координация между технологиями

@Service
public class IntegrationOrchestrationService {
    
    private final ReactiveKafkaProducerService kafkaProducer;
    private final ReactiveRedisPublisher redisPublisher;
    private final WebSocketConnectionManager wsManager;
    private final ReactiveMongoTemplate mongoTemplate;
    
    // Комплексная обработка события
    public Mono<EventProcessingResult> processComplexEvent(BusinessEvent event) {
        return Mono.just(event)
            // 1. Сохраняем событие в MongoDB
            .flatMap(e -> mongoTemplate.save(e, "business_events"))
            
            // 2. Публикуем в Kafka для downstream processing
            .flatMap(savedEvent -> 
                kafkaProducer.sendMessage("business-events", event.getId(), event)
                    .map(result -> savedEvent)
            )
            
            // 3. Уведомляем через Redis PubSub
            .flatMap(savedEvent -> 
                redisPublisher.publishMessage("event-notifications", 
                    new EventNotification(event.getType(), event.getId()))
                    .map(subscribers -> savedEvent)
            )
            
            // 4. Отправляем real-time уведомления через WebSocket
            .flatMap(savedEvent -> 
                wsManager.broadcastToTopic("events:" + event.getType(), 
                    new RealtimeEvent(event))
                    .thenReturn(savedEvent)
            )
            
            // 5. Формируем результат
            .map(savedEvent -> new EventProcessingResult(
                savedEvent.getId(), 
                ProcessingStatus.COMPLETED,
                Instant.now()
            ))
            
            .doOnSuccess(result -> 
                logger.info("Complex event processing completed: {}", result)
            )
            .onErrorResume(error -> {
                logger.error("Complex event processing failed", error);
                return handleEventProcessingError(event, error);
            });
    }
    
    // Saga pattern для распределенных операций
    public Mono<SagaResult> executeSaga(SagaDefinition saga) {
        return Flux.fromIterable(saga.getSteps())
            .concatMap(step -> executeStep(step)
                .onErrorResume(error -> 
                    compensateStep(step)
                        .then(Mono.error(new SagaExecutionException(step, error)))
                )
            )
            .then(Mono.just(SagaResult.success(saga.getId())))
            .onErrorResume(SagaExecutionException.class, error -> 
                compensateExecutedSteps(saga, error.getFailedStep())
                    .then(Mono.just(SagaResult.failed(saga.getId(), error)))
            );
    }
    
    private Mono<StepResult> executeStep(SagaStep step) {
        return switch (step.getType()) {
            case KAFKA_PUBLISH -> executeKafkaStep(step);
            case MONGO_UPDATE -> executeMongoStep(step);
            case REDIS_CACHE -> executeRedisStep(step);
            case WEBSOCKET_NOTIFY -> executeWebSocketStep(step);
            default -> Mono.error(new UnsupportedStepException(step.getType()));
        };
    }
}

Error Handling и Circuit Breaker

@Service
public class IntegrationResilienceService {
    
    private final CircuitBreaker kafkaCircuitBreaker = CircuitBreaker.ofDefaults("kafka");
    private final CircuitBreaker redisCircuitBreaker = CircuitBreaker.ofDefaults("redis");
    private final CircuitBreaker mongoCircuitBreaker = CircuitBreaker.ofDefaults("mongo");
    
    // Resilient integration с fallback стратегиями
    public Mono<NotificationResult> sendResilientNotification(Notification notification) {
        return sendPrimaryNotification(notification)
            .onErrorResume(this::sendFallbackNotification)
            .timeout(Duration.ofSeconds(30))
            .doOnSuccess(result -> 
                logger.info("Notification sent successfully: {}", result.getMethod())
            );
    }
    
    private Mono<NotificationResult> sendPrimaryNotification(Notification notification) {
        // Пробуем отправить через WebSocket (primary)
        return Mono.fromCallable(() -> 
            kafkaCircuitBreaker.executeSupplier(() -> 
                wsManager.broadcastToTopic("notifications", notification)
            )
        )
        .flatMap(broadcast -> Mono.just(NotificationResult.websocket()))
        .onErrorResume(error -> 
            // Fallback к Redis PubSub
            sendViaRedis(notification)
        );
    }
    
    private Mono<NotificationResult> sendViaRedis(Notification notification) {
        return Mono.fromCallable(() ->
            redisCircuitBreaker.executeSupplier(() ->
                redisPublisher.publishMessage("notifications", notification)
            )
        )
        .flatMap(result -> Mono.just(NotificationResult.redis()))
        .onErrorResume(error ->
            // Последний fallback - сохранение в MongoDB для retry
            saveForLaterDelivery(notification)
        );
    }
    
    private Mono<NotificationResult> saveForLaterDelivery(Notification notification) {
        return mongoTemplate.save(
            new PendingNotification(notification, Instant.now()),
            "pending_notifications"
        )
        .map(saved -> NotificationResult.deferred());
    }
    
    // Retry логика с exponential backoff
    public <T> Mono<T> withRetry(Mono<T> operation, String operationName) {
        return operation
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
                .maxBackoff(Duration.ofSeconds(10))
                .filter(error -> isRetryableError(error))
                .doBeforeRetry(retrySignal -> 
                    logger.warn("Retrying {} (attempt {})", 
                        operationName, retrySignal.totalRetries() + 1)
                )
            )
            .doOnError(error -> 
                logger.error("Operation {} failed after retries", operationName, error)
            );
    }
    
    // Health checks для всех интеграций
    @Scheduled(fixedRate = 60000)
    public void performHealthChecks() {
        Mono.zip(
            checkKafkaHealth(),
            checkRedisHealth(), 
            checkMongoHealth(),
            checkWebSocketHealth()
        )
        .subscribe(
            tuple -> {
                HealthStatus overallHealth = calculateOverallHealth(
                    tuple.getT1(), tuple.getT2(), tuple.getT3(), tuple.getT4()
                );
                updateHealthMetrics(overallHealth);
            },
            error -> logger.error("Health check failed", error)
        );
    }
}

Мониторинг и метрики

@Component
public class IntegrationMonitoringService {
    
    private final MeterRegistry meterRegistry;
    private final Counter kafkaMessages;
    private final Counter redisOperations;
    private final Timer mongoOperations;
    private final Gauge activeWebSocketConnections;
    
    public IntegrationMonitoringService(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.kafkaMessages = Counter.builder("kafka.messages.total")
            .description("Total Kafka messages processed")
            .register(meterRegistry);
        this.redisOperations = Counter.builder("redis.operations.total")
            .description("Total Redis operations")
            .register(meterRegistry);
        this.mongoOperations = Timer.builder("mongo.operations.duration")
            .description("MongoDB operation duration")
            .register(meterRegistry);
        this.activeWebSocketConnections = Gauge.builder("websocket.connections.active")
            .description("Active WebSocket connections")
            .register(meterRegistry, this, IntegrationMonitoringService::getActiveConnections);
    }
    
    // Метрики для Kafka
    public <T> Mono<T> monitorKafkaOperation(Mono<T> operation, String operationType) {
        return operation
            .doOnSubscribe(sub -> kafkaMessages.increment(Tags.of("operation", operationType, "status", "started")))
            .doOnSuccess(result -> kafkaMessages.increment(Tags.of("operation", operationType, "status", "success")))
            .doOnError(error -> kafkaMessages.increment(Tags.of("operation", operationType, "status", "error")))
            .name("kafka." + operationType)
            .metrics();
    }
    
    // Метрики для MongoDB с timing
    public <T> Mono<T> monitorMongoOperation(Mono<T> operation, String collection, String operationType) {
        Timer.Sample sample = Timer.start(meterRegistry);
        
        return operation
            .doOnTerminate(() -> 
                sample.stop(Timer.builder("mongo.operation.duration")
                    .tag("collection", collection)
                    .tag("operation", operationType)
                    .register(meterRegistry))
            )
            .name("mongo." + collection + "." + operationType)
            .metrics();
    }
    
    // Distributed tracing для комплексных операций
    public <T> Mono<T> withTracing(Mono<T> operation, String operationName) {
        return operation
            .doOnSubscribe(sub -> 
                logger.info("Starting operation: {} [trace-id: {}]", 
                    operationName, getCurrentTraceId())
            )
            .doOnSuccess(result -> 
                logger.info("Completed operation: {} [trace-id: {}]", 
                    operationName, getCurrentTraceId())
            )
            .doOnError(error -> 
                logger.error("Failed operation: {} [trace-id: {}]", 
                    operationName, getCurrentTraceId(), error)
            );
    }
    
    private double getActiveConnections() {
        // Реализация подсчета активных WebSocket соединений
        return wsManager.getActiveConnectionsCount();
    }
    
    // Пользовательские метрики бизнес-логики
    public void recordBusinessMetric(String eventType, String status, double value) {
        Timer.builder("business.event.processing")
            .tag("event.type", eventType)
            .tag("status", status)
            .register(meterRegistry)
            .record(value, TimeUnit.MILLISECONDS);
    }
}

Лучшие практики интеграции

Конфигурация и настройка

// Оптимальные настройки для production
@ConfigurationProperties(prefix = "app.reactive.integration")
@Data
public class ReactiveIntegrationProperties {
    
    private KafkaProperties kafka = new KafkaProperties();
    private RedisProperties redis = new RedisProperties();
    private MongoProperties mongo = new MongoProperties();
    private WebSocketProperties websocket = new WebSocketProperties();
    
    @Data
    public static class KafkaProperties {
        private int bufferSize = 1000;
        private Duration commitInterval = Duration.ofSeconds(5);
        private int maxConcurrency = 10;
        private Duration timeout = Duration.ofSeconds(30);
    }
    
    @Data
    public static class RedisProperties {
        private int maxConnections = 20;
        private Duration commandTimeout = Duration.ofSeconds(5);
        private int bufferSize = 1000;
    }
    
    @Data
    public static class MongoProperties {
        private int maxPoolSize = 50;
        private int minPoolSize = 10;
        private Duration maxConnectionIdleTime = Duration.ofMinutes(30);
        private int bufferSize = 1000;
    }
    
    @Data
    public static class WebSocketProperties {
        private int maxSessions = 10000;
        private int bufferSize = 1000;
        private Duration heartbeatInterval = Duration.ofSeconds(30);
    }
}

Graceful Shutdown

@Component
public class ReactiveIntegrationShutdownManager {
    
    private final List<Disposable> disposables = new CopyOnWriteArrayList<>();
    private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
    
    @PreDestroy
    public void gracefulShutdown() {
        logger.info("Starting graceful shutdown of reactive integrations");
        shuttingDown.set(true);
        
        // 1. Останавливаем прием новых сообщений
        stopAcceptingNewMessages();
        
        // 2. Завершаем обработку текущих сообщений
        waitForCurrentProcessingToComplete();
        
        // 3. Освобождаем все ресурсы
        disposables.forEach(Disposable::dispose);
        
        logger.info("Graceful shutdown completed");
    }
    
    public void registerDisposable(Disposable disposable) {
        if (!shuttingDown.get()) {
            disposables.add(disposable);
        }
    }
    
    private void stopAcceptingNewMessages() {
        // Логика остановки consumers
    }
    
    private void waitForCurrentProcessingToComplete() {
        // Ожидание завершения текущих операций
    }
}

Расширенные темы Reactor

Hot vs Cold Publishers

Cold Publishers (Холодные издатели)

Определение: Каждый подписчик получает собственный поток данных с начала. Данные генерируются по требованию при подписке.

Характеристики:

  • Ленивые (lazy) - не производят данные без подписчиков
  • Каждый subscriber получает полный набор данных
  • Данные воспроизводятся для каждого подписчика заново
// Cold Publisher - каждый подписчик получает свою копию
Flux<Integer> coldFlux = Flux.range(1, 5)
    .doOnSubscribe(s -> System.out.println("Cold: Подписка"))
    .map(i -> {
        System.out.println("Cold: Обработка " + i);
        return i * 2;
    });

// Первый подписчик получает: 2, 4, 6, 8, 10
coldFlux.subscribe(i -> System.out.println("Sub1: " + i));

// Второй подписчик СНОВА получает: 2, 4, 6, 8, 10
coldFlux.subscribe(i -> System.out.println("Sub2: " + i));

Hot Publishers (Горячие издатели)

Определение: Излучают данные независимо от подписчиков. Подписчики получают только данные, излучаемые после момента подписки.

Характеристики:

  • Активные (eager) - производят данные даже без подписчиков
  • Подписчики получают только "текущие" данные
  • Данные могут быть потеряны если нет подписчиков
// Hot Publisher - данные излучаются независимо от подписчиков
Flux<Long> hotFlux = Flux.interval(Duration.ofSeconds(1))
    .doOnNext(i -> System.out.println("Hot: Излучение " + i))
    .share(); // Превращаем в hot

// Подписчик 1 получает: 0, 1, 2, 3...
hotFlux.subscribe(i -> System.out.println("Sub1: " + i));

Thread.sleep(2500); // Ждем 2.5 секунды

// Подписчик 2 получает только: 3, 4, 5... (пропустил 0, 1, 2)
hotFlux.subscribe(i -> System.out.println("Sub2: " + i));

ConnectableFlux, share(), cache()

ConnectableFlux

Назначение: Позволяет контролировать момент начала излучения данных в hot publisher.

Принцип работы:

  • Несколько подписчиков могут подписаться
  • Данные начинают излучаться только после вызова connect()
  • Все подписчики получают одинаковые данные
// ConnectableFlux - контролируемый hot publisher
ConnectableFlux<Long> connectableFlux = Flux.interval(Duration.ofSeconds(1))
    .doOnNext(i -> System.out.println("Излучение: " + i))
    .publish(); // Создаем ConnectableFlux

// Подписываемся, но данные еще не излучаются
connectableFlux.subscribe(i -> System.out.println("Sub1: " + i));
connectableFlux.subscribe(i -> System.out.println("Sub2: " + i));

// Теперь запускаем излучение для всех подписчиков
Disposable connection = connectableFlux.connect();

// Остановка излучения
connection.dispose();

share()

Назначение: Превращает cold publisher в hot, разделяя один поток между несколькими подписчиками.

Поведение:

  • Первый подписчик запускает upstream
  • Последующие подписчики получают те же данные
  • Когда все отписываются, upstream останавливается
Flux<Integer> shared = Flux.range(1, 5)
    .doOnSubscribe(s -> System.out.println("Upstream запущен"))
    .doOnCancel(() -> System.out.println("Upstream остановлен"))
    .share();

// Первый подписчик запускает upstream
Disposable sub1 = shared.subscribe(i -> System.out.println("Sub1: " + i));

// Второй получает те же данные
Disposable sub2 = shared.subscribe(i -> System.out.println("Sub2: " + i));

// Когда все отписываются, upstream останавливается
sub1.dispose();
sub2.dispose();

cache()

Назначение: Кэширует излученные значения и воспроизводит их для новых подписчиков.

Варианты использования:

  • cache() - кэширует все значения
  • cache(int) - кэширует последние N значений
  • cache(Duration) - кэширует на определенное время
// Кэширование всех значений
Flux<Integer> cached = Flux.range(1, 3)
    .doOnSubscribe(s -> System.out.println("Выполнение upstream"))
    .map(i -> i * 2)
    .cache();

// Первый подписчик - выполняется upstream
cached.subscribe(i -> System.out.println("Sub1: " + i));

// Второй подписчик - получает кэшированные значения
cached.subscribe(i -> System.out.println("Sub2: " + i));

// Кэширование с ограничениями
Flux<Integer> timedCache = source.cache(Duration.ofSeconds(5)); // 5 секунд
Flux<Integer> limitedCache = source.cache(10); // Последние 10 элементов

Schedulers.fromExecutor() для кастомных пулов

Зачем нужны кастомные пулы

Проблемы стандартных Schedulers:

  • Ограниченный контроль над параметрами пула
  • Невозможность настройки специфичных для приложения характеристик
  • Сложность мониторинга и отладки

Создание кастомного Scheduler

// Создание ExecutorService с кастомными параметрами
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
    5,                          // corePoolSize
    20,                         // maximumPoolSize
    60L, TimeUnit.SECONDS,      // keepAliveTime
    new LinkedBlockingQueue<>(100), // workQueue
    new ThreadFactory() {       // threadFactory
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "custom-reactor-" + threadNumber.getAndIncrement());
            t.setDaemon(true);
            return t;
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy() // rejectedExecutionHandler
);

// Создание Scheduler из ExecutorService
Scheduler customScheduler = Schedulers.fromExecutor(customExecutor);

// Использование кастомного scheduler
Flux.range(1, 10)
    .subscribeOn(customScheduler)
    .publishOn(customScheduler)
    .subscribe(i -> System.out.println("Thread: " + 
        Thread.currentThread().getName() + ", Value: " + i));

Специализированные пулы для разных задач

public class SchedulerConfig {
    
    // Для CPU-интенсивных задач
    public static final Scheduler CPU_INTENSIVE = Schedulers.fromExecutor(
        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
    );
    
    // Для I/O операций
    public static final Scheduler IO_OPERATIONS = Schedulers.fromExecutor(
        new ThreadPoolExecutor(10, 100, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>())
    );
    
    // Для быстрых задач
    public static final Scheduler FAST_TASKS = Schedulers.fromExecutor(
        new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2)
    );
}

// Использование специализированных пулов
Flux.fromIterable(data)
    .subscribeOn(SchedulerConfig.IO_OPERATIONS)      // Чтение данных
    .map(this::processData)                          // CPU обработка
    .publishOn(SchedulerConfig.CPU_INTENSIVE)        
    .flatMap(this::saveToDatabase)                   // I/O операция
    .publishOn(SchedulerConfig.IO_OPERATIONS)
    .subscribe();

Контекст: Context API, передача данных между операторами

Context API - основы

Назначение: Неизменяемая структура данных для передачи метаданных через reactive chain без изменения сигнатуры методов.

Принципы работы:

  • Иммутабельный (неизменяемый)
  • Распространяется upstream (снизу вверх)
  • Доступен во всех операторах
// Создание и использование Context
Mono<String> mono = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx -> {
        String userId = ctx.get("userId");
        return Mono.just(s + " " + userId);
    }))
    .contextWrite(Context.of("userId", "user123"));

mono.subscribe(System.out::println); // Вывод: Hello user123

Передача данных между операторами

// Комплексный пример с Context
Flux<String> processWithContext = Flux.just("data1", "data2")
    .doOnNext(data -> {
        // Доступ к контексту в операторе
        String traceId = ContextView.current().get("traceId");
        System.out.println("Processing " + data + " with trace: " + traceId);
    })
    .flatMap(data -> 
        Mono.deferContextual(ctx -> {
            String userId = ctx.get("userId");
            String traceId = ctx.get("traceId");
            return processData(data, userId, traceId);
        })
    )
    .contextWrite(ctx -> ctx.put("timestamp", Instant.now())) // Добавляем timestamp
    .contextWrite(Context.of("userId", "user123", "traceId", "trace-456"));

Практические применения Context

1. Трассировка запросов:

// Добавление trace ID в контекст
public Mono<ResponseEntity<String>> handleRequest(String traceId) {
    return processRequest()
        .contextWrite(Context.of("traceId", traceId))
        .doOnNext(result -> logResult(result)) // traceId доступен в логгере
        .map(ResponseEntity::ok);
}

// Логирование с контекстом
private void logResult(String result) {
    Mono.deferContextual(ctx -> {
        String traceId = ctx.get("traceId");
        log.info("Request {} completed with result: {}", traceId, result);
        return Mono.empty();
    }).subscribe();
}

2. Аутентификация и авторизация:

// Передача информации о пользователе
public Mono<String> secureOperation() {
    return Mono.deferContextual(ctx -> {
        User user = ctx.get("currentUser");
        if (user.hasPermission("READ")) {
            return performOperation();
        }
        return Mono.error(new SecurityException("Access denied"));
    });
}

// Использование
secureOperation()
    .contextWrite(Context.of("currentUser", getCurrentUser()))
    .subscribe();

3. Управление транзакциями:

// Передача транзакционного контекста
public Mono<Void> transactionalOperation() {
    return Mono.deferContextual(ctx -> {
        DatabaseConnection connection = ctx.get("dbConnection");
        return performDatabaseOperation(connection);
    });
}

// Использование в транзакции
transactionManager.withTransaction(connection -> 
    transactionalOperation()
        .contextWrite(Context.of("dbConnection", connection))
);

Паттерны работы с Context

Композиция контекстов:

// Объединение нескольких источников контекста
Context baseContext = Context.of("app", "myApp");
Context userContext = Context.of("userId", "123");
Context requestContext = Context.of("requestId", "req-456");

// Объединение контекстов
Context fullContext = baseContext
    .putAll(userContext)
    .putAll(requestContext);

Условное добавление в контекст:

Mono<String> conditionalContext = Mono.just("data")
    .contextWrite(ctx -> {
        if (needsTracing()) {
            return ctx.put("traceId", generateTraceId());
        }
        return ctx;
    });

Важные моменты для собеседования

  1. Context распространяется upstream - от подписчика к источнику
  2. Context иммутабелен - каждое изменение создает новый экземпляр
  3. Context не заменяет параметры методов - используется для метаданных
  4. Доступ к Context через deferContextual() - для reactive доступа
  5. Context доступен во всех операторах - через contextWrite()