Реактивное программирование
Основные концепции
Синхронный vs Асинхронный
Синхронный - вызывающий поток ждет завершения операции перед продолжением Асинхронный - операция выполняется в фоне, результат приходит через callback/Future/Promise
// Синхронный подход
String result = httpClient.get("/api/data"); // Блокирует поток
processResult(result);
// Асинхронный подход
CompletableFuture<String> future = httpClient.getAsync("/api/data");
future.thenAccept(this::processResult); // Поток не блокируется
Блокирующий vs Неблокирующий
Блокирующий - операция останавливает выполнение потока до получения результата Неблокирующий - операция сразу возвращает управление, результат обрабатывается позже
// Блокирующий I/O
InputStream stream = socket.getInputStream();
int data = stream.read(); // Поток заморожен до получения данных
// Неблокирующий I/O (NIO)
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_READ);
selector.select(); // Возвращает готовые каналы
Push vs Pull модели
Pull модель - потребитель запрашивает данные когда готов их обработать Push модель - производитель отправляет данные потребителю когда они готовы
// Pull модель (Iterator)
Iterator<String> iterator = collection.iterator();
while (iterator.hasNext()) {
String item = iterator.next(); // Явно запрашиваем следующий элемент
}
// Push модель (Observer/Reactive Streams)
publisher.subscribe(new Subscriber<String>() {
public void onNext(String item) {
// Данные приходят когда готовы
}
});
Backpressure (Противодавление)
Что это: Механизм контроля потока данных когда производитель генерирует данные быстрее, чем потребитель может их обработать.
Зачем нужен: Предотвращает переполнение памяти, OutOfMemoryError, деградацию производительности.
Стратегии обработки:
- Buffer - накапливаем данные в буфере
- Drop - отбрасываем новые элементы
- Latest - оставляем только последний элемент
- Error - генерируем исключение
// Reactive Streams с backpressure
Flowable.range(1, 1000000)
.onBackpressureBuffer(100) // Буфер на 100 элементов
.observeOn(Schedulers.computation())
.subscribe(item -> {
Thread.sleep(100); // Медленная обработка
System.out.println(item);
});
Reactive Streams API
Основные интерфейсы
// Производитель данных
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
// Потребитель данных
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
// Управление подпиской
public interface Subscription {
void request(long n); // Запрос n элементов
void cancel(); // Отмена подписки
}
Протокол взаимодействия
- Подписка:
publisher.subscribe(subscriber)
- Подтверждение:
subscriber.onSubscribe(subscription)
- Запрос данных:
subscription.request(n)
- Получение данных:
subscriber.onNext(item)
- Завершение:
subscriber.onComplete()
илиsubscriber.onError()
RxJava
Основные типы
Observable - Push-based, без backpressure, для UI событий Flowable - Push-based, с backpressure, для потоков данных Single - Один элемент или ошибка Maybe - Ноль или один элемент Completable - Только завершение или ошибка
// Observable - горячий поток
Observable<Long> timer = Observable.interval(1, TimeUnit.SECONDS);
// Flowable - холодный поток с backpressure
Flowable<String> data = Flowable.fromCallable(() ->
httpClient.get("/api/data")
);
// Single - одно значение
Single<User> user = Single.fromCallable(() ->
userService.findById(123)
);
Операторы трансформации
// map - преобразование каждого элемента
source.map(String::toUpperCase)
// flatMap - преобразование в поток + слияние
source.flatMap(id -> userService.findById(id))
// filter - фильтрация элементов
source.filter(item -> item.length() > 5)
// take - взять первые N элементов
source.take(10)
// skip - пропустить первые N элементов
source.skip(5)
Операторы комбинирования
// zip - комбинирование элементов по индексу
Observable.zip(source1, source2, (a, b) -> a + b)
// merge - объединение потоков в один
Observable.merge(source1, source2)
// concat - последовательное объединение
Observable.concat(source1, source2)
Project Reactor
Основные типы
Mono - 0 или 1 элемент Flux - 0 до N элементов
// Mono - асинхронный результат
Mono<String> result = webClient
.get()
.uri("/api/data")
.retrieve()
.bodyToMono(String.class);
// Flux - поток данных
Flux<User> users = userRepository
.findAll()
.filter(user -> user.isActive());
Обработка ошибок
// onErrorReturn - возврат значения по умолчанию
mono.onErrorReturn("default value")
// onErrorResume - переключение на другой поток
mono.onErrorResume(error -> Mono.just("fallback"))
// retry - повторная попытка
mono.retry(3)
// timeout - таймаут операции
mono.timeout(Duration.ofSeconds(5))
Schedulers (Планировщики)
Зачем нужны: Управление потоками выполнения в реактивных приложениях
// Schedulers.io() - для блокирующих I/O операций
Mono.fromCallable(() -> databaseCall())
.subscribeOn(Schedulers.boundedElastic())
// Schedulers.computation() - для CPU-intensive задач
Flux.range(1, 1000)
.map(this::heavyComputation)
.subscribeOn(Schedulers.parallel())
// Schedulers.single() - один поток для всех задач
observable.observeOn(Schedulers.single())
Практические применения
Микросервисы
// Неблокирующие HTTP вызовы
@RestController
public class UserController {
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {
return userService.findById(id)
.timeout(Duration.ofSeconds(5))
.onErrorReturn(User.empty());
}
}
Обработка данных
// Потоковая обработка больших объемов данных
Flux.fromIterable(largeDataSet)
.buffer(100) // Группировка по 100 элементов
.flatMap(batch ->
processBatch(batch)
.subscribeOn(Schedulers.boundedElastic())
)
.subscribe();
Event-driven архитектура
// Обработка событий из очереди
@EventListener
public void handleEvent(OrderCreatedEvent event) {
return eventProcessor
.process(event)
.flatMap(this::sendNotification)
.flatMap(this::updateInventory)
.subscribe();
}
Ключевые принципы
"React to events over time"
Система реагирует на события по мере их поступления, а не ждет завершения всех операций
"Don't block – subscribe"
Вместо блокировки потоков используется подписка на события с callback-обработчиками
Композиция операций
Сложная логика строится из простых операторов через цепочки вызовов
Декларативность
Описывается ЧТО нужно сделать, а не КАК это сделать пошагово
Антипаттерны
// ❌ Блокировка в реактивном коде
mono.block(); // Убивает асинхронность
// ❌ Подписка внутри оператора
source.map(item -> {
otherSource.subscribe(result -> {
// Создает memory leak
});
return item;
});
// ✅ Правильное комбинирование
source.flatMap(item ->
otherSource.map(result -> combine(item, result))
);
Отладка и мониторинг
// Логирование в реактивных цепочках
flux.doOnNext(item -> log.info("Processing: {}", item))
.doOnError(error -> log.error("Error occurred", error))
.doOnComplete(() -> log.info("Stream completed"))
.subscribe();
// Метрики производительности
flux.name("user-processing")
.tag("service", "user-service")
.metrics()
.subscribe();
Reactive Streams спецификация
Что такое Reactive Streams
Reactive Streams - это стандарт для обработки асинхронных потоков данных с неблокирующим backpressure. Цель - обеспечить интероперабельность между различными реактивными библиотеками (RxJava, Project Reactor, Akka Streams).
Ключевая проблема: Как обрабатывать потоки данных, когда производитель генерирует данные быстрее, чем потребитель может их обработать, без блокировки потоков и переполнения памяти.
Основные интерфейсы
Publisher - Производитель данных
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
Назначение: Источник данных, который может производить потенциально неограниченное количество элементов.
Обязанности:
- Принимает подписчиков через
subscribe()
- Может иметь множество подписчиков
- Управляет жизненным циклом подписок
Subscriber - Потребитель данных
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
Назначение: Получатель данных, который обрабатывает элементы по мере их поступления.
Методы:
onSubscribe()
- получение объекта подписки для управления потокомonNext()
- обработка каждого элемента данныхonError()
- обработка ошибок (терминальное событие)onComplete()
- сигнал о завершении потока (терминальное событие)
Subscription - Управление подпиской
public interface Subscription {
void request(long n);
void cancel();
}
Назначение: Связь между Publisher и Subscriber, позволяющая управлять потоком данных.
Методы:
request(n)
- запрос следующих n элементов (реализация backpressure)cancel()
- отмена подписки и освобождение ресурсов
Processor<T,R> - Трансформатор данных
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
// Наследует методы от обоих интерфейсов
}
Назначение: Компонент, который одновременно является подписчиком и издателем. Получает данные типа T, трансформирует их и публикует как тип R.
Применение: Создание цепочек обработки данных, где один процессор может быть подключен к другому.
Контракт взаимодействия
Последовательность вызовов
// 1. Подписка
publisher.subscribe(subscriber);
// 2. Подтверждение подписки
subscriber.onSubscribe(subscription);
// 3. Запрос данных (backpressure)
subscription.request(10);
// 4. Получение данных
subscriber.onNext(item1);
subscriber.onNext(item2);
// ... до 10 элементов
// 5. Завершение (одно из двух)
subscriber.onComplete(); // ИЛИ
subscriber.onError(throwable);
Правила протокола
Правило 1: После onSubscribe()
вызывается только один из терминальных методов (onComplete()
или onError()
)
Правило 2: Количество вызовов onNext()
не должно превышать запрошенное через request(n)
Правило 3: onNext()
, onError()
, onComplete()
должны быть вызваны последовательно (не параллельно)
Правило 4: Если cancel()
вызван, Publisher должен прекратить отправку данных
Управление Backpressure
Кто управляет
Subscriber управляет backpressure через вызов subscription.request(n)
. Это принцип pull-based backpressure.
class MySubscriber implements Subscriber<String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // Запрашиваем первый элемент
}
@Override
public void onNext(String item) {
processItem(item);
subscription.request(1); // Запрашиваем следующий
}
}
Зачем это важно
Предотвращение переполнения: Subscriber может контролировать скорость получения данных в соответствии со своей производительностью.
Управление ресурсами: Избегание OutOfMemoryError при обработке больших потоков данных.
Адаптивность: Возможность динамически изменять скорость обработки в зависимости от нагрузки.
Стратегии Backpressure
Unbounded Request
// Запрос всех доступных элементов
subscription.request(Long.MAX_VALUE);
Когда использовать: Когда уверены, что можем обработать все данные без переполнения памяти.
Bounded Request
// Запрос фиксированного количества
subscription.request(100);
Когда использовать: Для контроля размера буфера и предсказуемого потребления памяти.
Dynamic Request
class AdaptiveSubscriber implements Subscriber<String> {
private int bufferSize = 10;
private int processed = 0;
@Override
public void onNext(String item) {
processItem(item);
processed++;
// Адаптивный запрос в зависимости от нагрузки
if (processed >= bufferSize) {
subscription.request(bufferSize);
processed = 0;
}
}
}
Практические примеры
Простой Publisher
class NumberPublisher implements Publisher<Integer> {
private final int count;
public NumberPublisher(int count) {
this.count = count;
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new NumberSubscription(subscriber, count));
}
}
class NumberSubscription implements Subscription {
private final Subscriber<? super Integer> subscriber;
private int current = 0;
private final int max;
private boolean cancelled = false;
@Override
public void request(long n) {
if (cancelled) return;
for (int i = 0; i < n && current < max; i++) {
subscriber.onNext(current++);
}
if (current >= max) {
subscriber.onComplete();
}
}
@Override
public void cancel() {
cancelled = true;
}
}
Обработка ошибок
class ErrorHandlingSubscriber implements Subscriber<String> {
@Override
public void onError(Throwable throwable) {
// Логирование ошибки
logger.error("Stream error occurred", throwable);
// Очистка ресурсов
cleanup();
// Уведомление других компонентов
notifyErrorHandlers(throwable);
}
@Override
public void onComplete() {
// Финализация обработки
logger.info("Stream completed successfully");
cleanup();
}
}
Интеграция с существующими библиотеками
Project Reactor
// Создание Flux из Publisher
Flux<String> flux = Flux.from(customPublisher);
// Создание Publisher из Flux
Publisher<String> publisher = flux.share();
RxJava
// Создание Flowable из Publisher
Flowable<String> flowable = Flowable.fromPublisher(customPublisher);
// Создание Publisher из Flowable
Publisher<String> publisher = flowable.toFlowable(BackpressureStrategy.BUFFER);
Важные особенности реализации
Thread Safety
Требование: Все методы должны быть thread-safe, но вызовы onNext()
, onError()
, onComplete()
должны быть последовательными.
Обработка Cancel
class CancellableSubscription implements Subscription {
private volatile boolean cancelled = false;
@Override
public void request(long n) {
if (cancelled) {
return; // Не отправляем данные после отмены
}
// ... логика отправки
}
@Override
public void cancel() {
cancelled = true;
// Освобождение ресурсов
cleanup();
}
}
Обработка некорректных запросов
@Override
public void request(long n) {
if (n <= 0) {
subscriber.onError(new IllegalArgumentException(
"Request must be positive, but was: " + n
));
return;
}
// ... нормальная обработка
}
Преимущества спецификации
Стандартизация: Единый интерфейс для всех реактивных библиотек в JVM экосистеме.
Интероперабельность: Компоненты разных библиотек могут работать вместе.
Backpressure из коробки: Встроенная поддержка управления потоком данных.
Асинхронность: Неблокирующая обработка потоков данных.
Частые ошибки
Нарушение контракта
// ❌ Вызов onNext после onComplete
subscriber.onComplete();
subscriber.onNext(item); // Ошибка!
// ❌ Превышение запрошенного количества
subscription.request(5);
// Отправляем 6 элементов - нарушение контракта
Неправильная обработка ошибок
// ❌ Продолжение после ошибки
subscriber.onError(new RuntimeException());
subscriber.onNext(item); // Ошибка!
// ✅ Правильная обработка
try {
processItem(item);
subscriber.onNext(item);
} catch (Exception e) {
subscriber.onError(e);
return; // Прекращаем обработку
}
Блокировка в обработчиках
// ❌ Блокирующие операции в onNext
@Override
public void onNext(String item) {
Thread.sleep(1000); // Блокирует поток!
processItem(item);
}
// ✅ Асинхронная обработка
@Override
public void onNext(String item) {
CompletableFuture.supplyAsync(() -> processItem(item));
}
Project Reactor (Spring)
Что такое Project Reactor
Project Reactor - это реактивная библиотека для JVM, созданная компанией Pivotal (теперь VMware). Основа для Spring WebFlux и других реактивных компонентов Spring Framework. Полностью совместима со спецификацией Reactive Streams.
Ключевые особенности:
- Неблокирующая обработка данных
- Встроенная поддержка backpressure
- Богатый набор операторов
- Интеграция с Spring экосистемой
Основные типы данных
Mono - 0 или 1 элемент
Назначение: Представляет асинхронный результат, который может содержать максимум один элемент или ошибку.
Аналогия: Как Optional<T>
но для асинхронных операций, похож на CompletableFuture<T>
.
// Создание Mono
Mono<String> mono = Mono.just("Hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("Error"));
// Из Callable (ленивое выполнение)
Mono<String> fromCallable = Mono.fromCallable(() -> {
return expensiveOperation(); // Выполнится только при подписке
});
// Из CompletableFuture
Mono<String> fromFuture = Mono.fromFuture(
CompletableFuture.supplyAsync(() -> "Result")
);
Когда использовать:
- HTTP запросы (один ответ)
- Поиск записи в базе данных
- Валидация данных
- Любые операции с единичным результатом
Flux - 0 до N элементов
Назначение: Представляет поток данных, который может содержать множество элементов, приходящих асинхронно.
Аналогия: Как Stream<T>
но для асинхронных операций, похож на Observable
из RxJava.
// Создание Flux
Flux<String> flux = Flux.just("A", "B", "C");
Flux<Integer> range = Flux.range(1, 100);
Flux<String> fromIterable = Flux.fromIterable(Arrays.asList("X", "Y", "Z"));
// Бесконечный поток
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
// Из массива
Flux<String> fromArray = Flux.fromArray(new String[]{"1", "2", "3"});
// Генерация элементов
Flux<String> generated = Flux.generate(
() -> 0, // начальное состояние
(state, sink) -> {
sink.next("Item " + state);
if (state == 10) sink.complete();
return state + 1;
}
);
Когда использовать:
- Потоки данных из базы данных
- Обработка файлов
- Server-Sent Events
- Обработка сообщений из очереди
Операторы трансформации
map - Преобразование каждого элемента
Назначение: Применяет функцию к каждому элементу потока, создавая новый поток с преобразованными значениями.
Flux<String> names = Flux.just("john", "jane", "bob");
Flux<String> upperCase = names.map(String::toUpperCase);
// Результат: "JOHN", "JANE", "BOB"
Mono<Integer> length = Mono.just("Hello").map(String::length);
// Результат: 5
flatMap - Асинхронное преобразование с объединением
Назначение: Преобразует каждый элемент в Publisher, затем объединяет все результирующие потоки в один. Не сохраняет порядок, выполняется параллельно.
Flux<String> ids = Flux.just("1", "2", "3");
Flux<User> users = ids.flatMap(id ->
userService.findById(id) // Возвращает Mono<User>
);
// Результат: поток пользователей, порядок не гарантирован
// Ограничение concurrency
Flux<User> limited = ids.flatMap(
id -> userService.findById(id),
3 // Максимум 3 одновременных запроса
);
concatMap - Последовательное преобразование
Назначение: Как flatMap, но сохраняет порядок элементов, выполняя операции последовательно.
Flux<String> ids = Flux.just("1", "2", "3");
Flux<User> users = ids.concatMap(id ->
userService.findById(id) // Выполняется по порядку
);
// Результат: пользователи в том же порядке, что и ID
switchMap - Переключение на новый поток
Назначение: При появлении нового элемента отменяет предыдущую операцию и переключается на новую.
Flux<String> searchQueries = Flux.just("a", "ab", "abc");
Flux<SearchResult> results = searchQueries.switchMap(query ->
searchService.search(query) // Отменяет предыдущий поиск
);
// Результат: только результаты последнего поиска
Операторы фильтрации и выборки
filter - Фильтрация элементов
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);
// Результат: 2, 4, 6, 8, 10
take - Взять первые N элементов
Flux<String> first3 = Flux.just("A", "B", "C", "D", "E").take(3);
// Результат: "A", "B", "C"
// take с условием
Flux<Integer> untilCondition = Flux.range(1, 100)
.takeWhile(n -> n < 5);
// Результат: 1, 2, 3, 4
skip - Пропустить первые N элементов
Flux<String> afterSkip = Flux.just("A", "B", "C", "D", "E").skip(2);
// Результат: "C", "D", "E"
Операторы комбинирования
zip - Комбинирование по парам
Назначение: Объединяет элементы из нескольких потоков в кортежи, ждет элементы от всех потоков.
Flux<String> names = Flux.just("John", "Jane");
Flux<Integer> ages = Flux.just(25, 30);
Flux<String> combined = Flux.zip(names, ages)
.map(tuple -> tuple.getT1() + " is " + tuple.getT2());
// Результат: "John is 25", "Jane is 30"
// Zip с функцией комбинирования
Flux<String> result = Flux.zip(names, ages, (name, age) ->
name + " is " + age
);
combineLatest - Комбинирование последних значений
Назначение: Комбинирует последние значения из каждого потока при изменении любого из них.
Flux<String> source1 = Flux.just("A", "B").delayElements(Duration.ofSeconds(1));
Flux<String> source2 = Flux.just("1", "2").delayElements(Duration.ofSeconds(2));
Flux<String> combined = Flux.combineLatest(source1, source2,
(s1, s2) -> s1 + s2
);
// Результат: "A1", "B1", "B2"
Операторы жизненного цикла
doOnNext - Выполнение побочных действий
Назначение: Выполняет действие для каждого элемента, не изменяя поток (side-effect).
Flux<String> flux = Flux.just("A", "B", "C")
.doOnNext(item -> logger.info("Processing: {}", item))
.map(String::toLowerCase)
.doOnNext(item -> logger.info("Transformed: {}", item));
doOnError - Обработка ошибок
Mono<String> mono = Mono.fromCallable(() -> riskyOperation())
.doOnError(error -> logger.error("Operation failed", error))
.doOnError(IllegalArgumentException.class,
error -> logger.warn("Invalid argument: {}", error.getMessage())
);
Обработка ошибок
onErrorResume - Переключение на альтернативный поток
Назначение: При ошибке переключается на другой Publisher, позволяя продолжить обработку.
Mono<String> primary = Mono.fromCallable(() -> primaryService.getData())
.onErrorResume(error -> {
logger.warn("Primary service failed, using fallback");
return fallbackService.getData();
});
// Обработка конкретного типа ошибки
Mono<String> withSpecificError = primary
.onErrorResume(TimeoutException.class,
error -> Mono.just("Timeout occurred")
);
retry - Повторная попытка
Назначение: Повторяет операцию при ошибке заданное количество раз.
Mono<String> withRetry = Mono.fromCallable(() -> unreliableService.call())
.retry(3); // Повторить до 3 раз
// Retry с backoff
Mono<String> withBackoff = Mono.fromCallable(() -> unreliableService.call())
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
timeout - Ограничение времени выполнения
Mono<String> withTimeout = webClient.get()
.uri("/slow-endpoint")
.retrieve()
.bodyToMono(String.class)
.timeout(Duration.ofSeconds(5))
.onErrorResume(TimeoutException.class,
error -> Mono.just("Service unavailable")
);
Сравнение flatMap vs concatMap
flatMap - Параллельное выполнение
Flux<String> urls = Flux.just("/api/1", "/api/2", "/api/3");
// flatMap - не гарантирует порядок, быстрее
Flux<String> responses = urls.flatMap(url ->
webClient.get().uri(url).retrieve().bodyToMono(String.class)
);
Характеристики:
- Выполняется параллельно (по умолчанию до 256 одновременных операций)
- Не сохраняет порядок элементов
- Быстрее для независимых операций
- Может использовать больше ресурсов
concatMap - Последовательное выполнение
Flux<String> urls = Flux.just("/api/1", "/api/2", "/api/3");
// concatMap - гарантирует порядок, медленнее
Flux<String> responses = urls.concatMap(url ->
webClient.get().uri(url).retrieve().bodyToMono(String.class)
);
Характеристики:
- Выполняется последовательно
- Сохраняет порядок элементов
- Медленнее, но предсказуемее
- Использует меньше ресурсов одновременно
Когда использовать каждый
flatMap:
- Независимые операции
- Скорость важнее порядка
- Параллельные HTTP запросы
- Обработка больших объемов данных
concatMap:
- Порядок обработки важен
- Операции зависят друг от друга
- Ограниченные ресурсы
- Последовательные операции с базой данных
Стратегии обработки ошибок
Fallback стратегии
// Значение по умолчанию
Mono<String> withDefault = risky()
.onErrorReturn("default value");
// Альтернативный источник данных
Mono<String> withFallback = primary()
.onErrorResume(error -> secondary())
.onErrorResume(error -> tertiary());
// Conditional fallback
Mono<String> conditional = risky()
.onErrorResume(TimeoutException.class,
error -> Mono.just("Timeout fallback")
)
.onErrorResume(IllegalArgumentException.class,
error -> Mono.just("Invalid argument fallback")
);
Error transformation
Mono<String> transformed = risky()
.onErrorMap(SQLException.class,
sql -> new ServiceException("Database error", sql)
)
.onErrorMap(error -> new GenericException("Operation failed", error));
Составление цепочек операций
Простая цепочка
Mono<UserProfile> userProfile = Mono.just(userId)
.flatMap(id -> userService.findById(id)) // Получаем пользователя
.flatMap(user -> profileService.getProfile(user)) // Получаем профиль
.map(profile -> profile.withTimestamp(Instant.now())) // Добавляем timestamp
.doOnNext(profile -> logger.info("Profile loaded: {}", profile.getId()))
.timeout(Duration.ofSeconds(10))
.onErrorResume(error -> profileService.getDefaultProfile());
Сложная цепочка с комбинированием
Mono<OrderSummary> orderSummary = Mono.just(orderId)
.flatMap(id -> orderService.findById(id))
.flatMap(order -> {
// Параллельно получаем данные
Mono<Customer> customer = customerService.findById(order.getCustomerId());
Mono<List<Item>> items = itemService.findByOrderId(order.getId());
Mono<ShippingInfo> shipping = shippingService.getInfo(order.getId());
// Комбинируем результаты
return Mono.zip(customer, items, shipping)
.map(tuple -> new OrderSummary(
order,
tuple.getT1(),
tuple.getT2(),
tuple.getT3()
));
})
.doOnNext(summary -> logger.info("Order summary created"))
.onErrorResume(error -> {
logger.error("Failed to create order summary", error);
return Mono.empty();
});
Цепочки обработки ошибок
Многоуровневая обработка
Mono<Result> complexOperation = Mono.just(input)
.flatMap(data -> step1(data))
.onErrorResume(ValidationException.class,
error -> {
logger.warn("Validation failed: {}", error.getMessage());
return Mono.just(getDefaultData());
}
)
.flatMap(data -> step2(data))
.onErrorResume(NetworkException.class,
error -> {
logger.error("Network error, retrying...", error);
return step2Retry(input);
}
)
.flatMap(data -> step3(data))
.onErrorMap(DatabaseException.class,
db -> new ServiceException("Database operation failed", db)
)
.doOnError(error -> auditService.logError(error))
.onErrorResume(error -> {
logger.error("All recovery attempts failed", error);
return Mono.just(getEmptyResult());
});
Circuit Breaker паттерн
Mono<String> withCircuitBreaker = externalService.call()
.transform(CircuitBreakerOperator.of(circuitBreaker))
.onErrorResume(CallNotPermittedException.class,
error -> Mono.just("Circuit breaker is open")
)
.onErrorResume(error -> {
logger.warn("External service failed: {}", error.getMessage());
return fallbackService.call();
});
Практические рекомендации
Композиция операций
// ✅ Хорошо - четкая цепочка операций
Mono<ProcessedData> good = source
.filter(this::isValid)
.flatMap(this::enrich)
.map(this::transform)
.timeout(Duration.ofSeconds(30))
.onErrorResume(this::handleError);
// ❌ Плохо - вложенные подписки
source.subscribe(data -> {
enrichmentService.enrich(data).subscribe(enriched -> {
// Создает memory leak и сложность
processData(enriched);
});
});
Обработка ресурсов
// Правильное управление ресурсами
Mono<String> withResources = Mono.using(
() -> openConnection(), // Создание ресурса
connection -> processData(connection), // Использование
connection -> connection.close() // Очистка
);
Тестирование цепочек
@Test
public void testReactiveChain() {
StepVerifier.create(
userService.processUser(userId)
.flatMap(user -> enrichmentService.enrich(user))
.map(user -> user.withStatus(ACTIVE))
)
.expectNextMatches(user -> user.getStatus() == ACTIVE)
.expectComplete()
.verify();
}
Spring WebFlux
Что такое Spring WebFlux
Spring WebFlux - это реактивный веб-фреймворк, появившийся в Spring 5.0. Построен на Project Reactor и предназначен для создания неблокирующих веб-приложений с высокой пропускной способностью.
Ключевые особенности:
- Неблокирующий I/O на базе Netty (по умолчанию)
- Поддержка backpressure
- Функциональный стиль программирования
- Совместимость с Reactive Streams
Архитектура: Основан на event loop модели, где небольшое количество потоков обрабатывает множество запросов асинхронно.
Аннотационные контроллеры (@RestController)
Основные возвращаемые типы
Mono
@RestController
@RequestMapping("/api/users")
public class UserController {
// Mono для единичного результата
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable String id) {
return userService.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException(id)));
}
// Flux для списка
@GetMapping
public Flux<User> getAllUsers() {
return userService.findAll()
.take(100); // Ограничение для безопасности
}
// Создание пользователя
@PostMapping
public Mono<User> createUser(@RequestBody Mono<CreateUserRequest> request) {
return request
.flatMap(userService::create)
.map(user -> user.withTimestamp(Instant.now()));
}
// Обновление с обработкой ошибок
@PutMapping("/{id}")
public Mono<User> updateUser(@PathVariable String id,
@RequestBody Mono<UpdateUserRequest> request) {
return Mono.zip(Mono.just(id), request)
.flatMap(tuple -> userService.update(tuple.getT1(), tuple.getT2()))
.onErrorResume(UserNotFoundException.class,
error -> Mono.error(new ResponseStatusException(
HttpStatus.NOT_FOUND, "User not found"
))
);
}
}
Обработка параметров запроса
@RestController
public class SearchController {
// Query параметры
@GetMapping("/search")
public Flux<Product> searchProducts(
@RequestParam String query,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "10") int size) {
return searchService.search(query, page, size)
.timeout(Duration.ofSeconds(30))
.onErrorResume(error -> Flux.empty());
}
// Path variables и headers
@GetMapping("/categories/{category}/products")
public Flux<Product> getProductsByCategory(
@PathVariable String category,
@RequestHeader(value = "X-User-Id", required = false) String userId) {
return productService.findByCategory(category)
.filter(product -> isAccessible(product, userId));
}
}
Функциональный стиль (RouterFunction + HandlerFunction)
Основные концепции
RouterFunction - определяет маршрутизацию запросов, аналог @RequestMapping
HandlerFunction - обрабатывает запрос, аналог методов контроллера
ServerRequest - представляет HTTP запрос
ServerResponse - представляет HTTP ответ
@Configuration
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions
.route(GET("/api/users/{id}"), handler::getUser)
.andRoute(GET("/api/users"), handler::getAllUsers)
.andRoute(POST("/api/users"), handler::createUser)
.andRoute(PUT("/api/users/{id}"), handler::updateUser)
.andRoute(DELETE("/api/users/{id}"), handler::deleteUser);
}
// Вложенные маршруты с общим префиксом
@Bean
public RouterFunction<ServerResponse> apiRoutes(
UserHandler userHandler,
ProductHandler productHandler) {
return RouterFunctions
.nest(path("/api"),
RouterFunctions
.nest(path("/users"),
route(GET("/{id}"), userHandler::getUser)
.andRoute(GET("/"), userHandler::getAllUsers)
)
.andNest(path("/products"),
route(GET("/"), productHandler::getAllProducts)
.andRoute(GET("/{id}"), productHandler::getProduct)
)
);
}
}
Handler функции
@Component
public class UserHandler {
private final UserService userService;
// Получение пользователя
public Mono<ServerResponse> getUser(ServerRequest request) {
String id = request.pathVariable("id");
return userService.findById(id)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
// Список пользователей с фильтрацией
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
String status = request.queryParam("status").orElse("all");
Flux<User> users = userService.findAll();
if (!"all".equals(status)) {
users = users.filter(user -> user.getStatus().equals(status));
}
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(users, User.class);
}
// Создание пользователя
public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(CreateUserRequest.class)
.flatMap(userService::create)
.flatMap(user -> ServerResponse.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.onErrorResume(ValidationException.class,
error -> ServerResponse.badRequest().bodyValue(error.getMessage())
);
}
// Обработка multipart данных
public Mono<ServerResponse> uploadAvatar(ServerRequest request) {
String userId = request.pathVariable("userId");
return request.multipartData()
.map(parts -> parts.getFirst("avatar"))
.cast(FilePart.class)
.flatMap(filePart -> userService.updateAvatar(userId, filePart))
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.onErrorResume(error ->
ServerResponse.badRequest().bodyValue("Upload failed")
);
}
}
Фильтры и middleware
@Configuration
public class WebConfig {
@Bean
public RouterFunction<ServerResponse> routesWithFilter() {
return RouterFunctions
.route(GET("/api/secure/**"), this::secureHandler)
.filter(this::authenticationFilter)
.filter(this::loggingFilter);
}
// Фильтр аутентификации
private Mono<ServerResponse> authenticationFilter(
ServerRequest request,
HandlerFunction<ServerResponse> next) {
return request.headers()
.firstHeader("Authorization")
.map(this::validateToken)
.map(isValid -> isValid ? next.handle(request) :
ServerResponse.status(HttpStatus.UNAUTHORIZED).build())
.orElse(ServerResponse.status(HttpStatus.UNAUTHORIZED).build())
.flatMap(response -> response);
}
// Фильтр логирования
private Mono<ServerResponse> loggingFilter(
ServerRequest request,
HandlerFunction<ServerResponse> next) {
long startTime = System.currentTimeMillis();
return next.handle(request)
.doOnNext(response -> {
long duration = System.currentTimeMillis() - startTime;
logger.info("Request {} {} completed in {}ms with status {}",
request.method(), request.path(), duration,
response.statusCode());
});
}
}
Server-Sent Events (SSE)
SSE - технология для отправки событий от сервера к клиенту в режиме реального времени через HTTP соединение.
@RestController
public class EventController {
// Простой SSE поток
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Event " + sequence)
.take(Duration.ofMinutes(10)); // Ограничение по времени
}
// SSE с кастомными событиями
@GetMapping(value = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<NotificationData>> streamNotifications(
@RequestParam String userId) {
return notificationService.getUserNotifications(userId)
.map(notification -> ServerSentEvent.<NotificationData>builder()
.id(notification.getId())
.event("notification")
.data(notification)
.retry(Duration.ofSeconds(3))
.build())
.doOnCancel(() -> logger.info("User {} disconnected", userId));
}
// Функциональный стиль SSE
public Mono<ServerResponse> streamData(ServerRequest request) {
String category = request.pathVariable("category");
Flux<DataEvent> events = dataService.getEventStream(category)
.map(data -> ServerSentEvent.builder(data)
.event("data-update")
.build());
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(BodyInserters.fromServerSentEvents(events));
}
}
WebSocket поддержка
WebSocket - протокол для двустороннего общения между клиентом и сервером в режиме реального времени.
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new ChatWebSocketHandler(), "/chat")
.setAllowedOrigins("*");
}
}
@Component
public class ChatWebSocketHandler implements WebSocketHandler {
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
@Override
public Mono<Void> handle(WebSocketSession session) {
String userId = getUserId(session);
sessions.put(userId, session);
// Обработка входящих сообщений
Mono<Void> input = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(message -> processMessage(userId, message))
.then();
// Отправка исходящих сообщений
Flux<String> output = chatService.getMessagesForUser(userId);
Mono<Void> outputMono = session.send(
output.map(session::textMessage)
);
return Mono.zip(input, outputMono).then()
.doFinally(signalType -> sessions.remove(userId));
}
private Mono<Void> processMessage(String userId, String message) {
return chatService.processMessage(userId, message)
.flatMap(this::broadcastMessage)
.then();
}
private Mono<Void> broadcastMessage(ChatMessage message) {
return Flux.fromIterable(sessions.values())
.flatMap(session -> session.send(
Mono.just(session.textMessage(message.toJson()))
))
.then();
}
}
Отличия от Spring MVC
Thread Model (Модель потоков)
Spring MVC:
- Модель: One-thread-per-request (один поток на запрос)
- Потоки: Servlet container управляет пулом потоков (обычно 200-400 потоков)
- Блокировка: Поток блокируется на I/O операциях
- Масштабирование: Вертикальное (больше потоков = больше памяти)
// Spring MVC - блокирующий подход
@GetMapping("/users/{id}")
public User getUser(@PathVariable String id) {
return userService.findById(id); // Поток блокируется здесь
}
Spring WebFlux:
- Модель: Event loop (несколько потоков обрабатывают множество запросов)
- Потоки: Небольшое количество потоков (обычно количество CPU ядер)
- Блокировка: Неблокирующий I/O, операции асинхронные
- Масштабирование: Горизонтальное (больше запросов на тех же ресурсах)
// Spring WebFlux - неблокирующий подход
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {
return userService.findById(id); // Поток не блокируется
}
Производительность
Пропускная способность:
- MVC: Хорошо для CPU-intensive задач, ограничен количеством потоков
- WebFlux: Отлично для I/O-intensive задач, может обрабатывать десятки тысяч одновременных запросов
Потребление памяти:
- MVC: ~1MB на поток, ограничение по памяти при большом количестве потоков
- WebFlux: Минимальное потребление памяти, не зависит от количества запросов
Latency (задержка):
- MVC: Стабильная задержка, предсказуемая производительность
- WebFlux: Может быть выше задержка для простых операций из-за overhead реактивности
Когда использовать каждый
Spring MVC подходит для:
- Традиционных CRUD приложений
- Блокирующих операций с базой данных
- CPU-intensive задач
- Команд с опытом работы только с императивным программированием
Spring WebFlux подходит для:
- Микросервисов с множеством внешних вызовов
- Потоковой обработки данных
- Real-time приложений (чаты, уведомления)
- Высоконагруженных приложений с большим количеством I/O
WebClient - альтернатива RestTemplate
WebClient - неблокирующий, реактивный HTTP клиент, заменяющий RestTemplate в реактивных приложениях.
Создание и настройка
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.defaultHeader(HttpHeaders.USER_AGENT, "MyApp/1.0")
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(1024 * 1024)) // 1MB buffer
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.responseTimeout(Duration.ofSeconds(30))
.keepAlive(true)
))
.build();
}
// WebClient с аутентификацией
@Bean
public WebClient authenticatedWebClient() {
return WebClient.builder()
.baseUrl("https://secure-api.example.com")
.filter(ExchangeFilterFunction.ofRequestProcessor(request -> {
return Mono.just(ClientRequest.from(request)
.header("Authorization", "Bearer " + getAccessToken())
.build());
}))
.build();
}
}
Основные операции
@Service
public class ApiService {
private final WebClient webClient;
// GET запрос
public Mono<User> getUser(String id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
.timeout(Duration.ofSeconds(10))
.onErrorResume(WebClientResponseException.class, error -> {
if (error.getStatusCode() == HttpStatus.NOT_FOUND) {
return Mono.empty();
}
return Mono.error(new ServiceException("API call failed", error));
});
}
// GET с query параметрами
public Flux<Product> searchProducts(String query, int page, int size) {
return webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/products/search")
.queryParam("q", query)
.queryParam("page", page)
.queryParam("size", size)
.build())
.retrieve()
.bodyToFlux(Product.class);
}
// POST запрос
public Mono<User> createUser(CreateUserRequest request) {
return webClient.post()
.uri("/users")
.bodyValue(request)
.retrieve()
.bodyToMono(User.class)
.onErrorMap(WebClientResponseException.BadRequest.class,
error -> new ValidationException("Invalid user data", error));
}
// PUT с обработкой статусов
public Mono<User> updateUser(String id, UpdateUserRequest request) {
return webClient.put()
.uri("/users/{id}", id)
.bodyValue(request)
.exchangeToMono(response -> {
if (response.statusCode().is2xxSuccessful()) {
return response.bodyToMono(User.class);
} else if (response.statusCode() == HttpStatus.NOT_FOUND) {
return Mono.error(new UserNotFoundException(id));
} else {
return response.createException()
.flatMap(Mono::error);
}
});
}
}
Обработка ошибок и retry
@Service
public class ResilientApiService {
public Mono<Data> fetchDataWithRetry(String id) {
return webClient.get()
.uri("/data/{id}", id)
.retrieve()
.bodyToMono(Data.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(throwable -> throwable instanceof WebClientRequestException))
.timeout(Duration.ofSeconds(30))
.onErrorResume(TimeoutException.class,
error -> Mono.just(getDefaultData(id)))
.onErrorResume(WebClientResponseException.class, error -> {
logger.error("API call failed for id {}: {}", id, error.getMessage());
return Mono.empty();
});
}
// Параллельные запросы
public Mono<CombinedData> fetchCombinedData(String userId) {
Mono<User> user = webClient.get()
.uri("/users/{id}", userId)
.retrieve()
.bodyToMono(User.class);
Mono<Profile> profile = webClient.get()
.uri("/profiles/{id}", userId)
.retrieve()
.bodyToMono(Profile.class);
Mono<List<Order>> orders = webClient.get()
.uri("/users/{id}/orders", userId)
.retrieve()
.bodyToFlux(Order.class)
.collectList();
return Mono.zip(user, profile, orders)
.map(tuple -> new CombinedData(
tuple.getT1(),
tuple.getT2(),
tuple.getT3()
));
}
}
Streaming ответов
// Обработка больших ответов потоком
public Flux<DataChunk> streamLargeData() {
return webClient.get()
.uri("/large-dataset")
.accept(MediaType.APPLICATION_NDJSON) // Newline-delimited JSON
.retrieve()
.bodyToFlux(DataChunk.class)
.buffer(100) // Группировка по 100 элементов
.flatMap(this::processBatch);
}
// Загрузка файлов
public Mono<UploadResult> uploadFile(String filename, Resource file) {
MultiValueMap<String, Object> parts = new LinkedMultiValueMap<>();
parts.add("file", file);
parts.add("filename", filename);
return webClient.post()
.uri("/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.bodyValue(parts)
.retrieve()
.bodyToMono(UploadResult.class);
}
Практические рекомендации
Обработка ошибок
// Глобальный обработчик ошибок
@ControllerAdvice
public class GlobalErrorHandler {
@ExceptionHandler(ValidationException.class)
public Mono<ResponseEntity<ErrorResponse>> handleValidation(ValidationException ex) {
return Mono.just(ResponseEntity.badRequest()
.body(new ErrorResponse("VALIDATION_ERROR", ex.getMessage())));
}
@ExceptionHandler(UserNotFoundException.class)
public Mono<ResponseEntity<ErrorResponse>> handleNotFound(UserNotFoundException ex) {
return Mono.just(ResponseEntity.notFound().build());
}
}
Мониторинг и метрики
@RestController
public class MonitoredController {
@GetMapping("/api/users/{id}")
@Timed(name = "user.fetch", description = "Time taken to fetch user")
public Mono<User> getUser(@PathVariable String id) {
return userService.findById(id)
.name("user-fetch")
.tag("operation", "find-by-id")
.metrics(); // Добавляет метрики Reactor
}
}
Лучшие практики
// ✅ Правильно - неблокирующая цепочка
@GetMapping("/users/{id}/enriched")
public Mono<EnrichedUser> getEnrichedUser(@PathVariable String id) {
return userService.findById(id)
.flatMap(user ->
profileService.getProfile(user.getId())
.map(profile -> user.withProfile(profile))
)
.timeout(Duration.ofSeconds(30));
}
// ❌ Неправильно - блокировка в реактивной цепочке
@GetMapping("/users/{id}/bad")
public Mono<User> getBadUser(@PathVariable String id) {
return userService.findById(id)
.map(user -> {
// Блокирующий вызов убивает реактивность!
Profile profile = profileService.getProfile(user.getId()).block();
return user.withProfile(profile);
});
}
R2DBC (Reactive DB Access)
Что такое R2DBC
R2DBC (Reactive Relational Database Connectivity) - это спецификация для реактивного доступа к реляционным базам данных в Java. Создана как альтернатива JDBC для неблокирующих приложений.
Ключевые принципы:
- Полностью асинхронный API
- Неблокирующий I/O
- Backpressure support
- Совместимость с Reactive Streams
Цель: Обеспечить реактивный доступ к базам данных без блокировки потоков, что критично для high-throughput приложений.
Отличия R2DBC от JDBC
JDBC - блокирующий подход
Модель работы: Thread-per-connection, каждое соединение блокирует поток до получения результата.
// JDBC - блокирующий код
@Repository
public class UserRepositoryJdbc {
@Autowired
private JdbcTemplate jdbcTemplate;
public User findById(Long id) {
// Поток блокируется здесь до получения результата
return jdbcTemplate.queryForObject(
"SELECT * FROM users WHERE id = ?",
new BeanPropertyRowMapper<>(User.class),
id
);
}
public List<User> findAll() {
// Поток блокируется, загружает ВСЕ записи в память
return jdbcTemplate.query(
"SELECT * FROM users",
new BeanPropertyRowMapper<>(User.class)
);
}
}
Проблемы JDBC:
- Блокировка потоков на I/O операциях
- Один поток = одно соединение
- Загрузка всех результатов в память
- Плохое масштабирование при высокой нагрузке
R2DBC - неблокирующий подход
Модель работы: Event-driven, асинхронная обработка с минимальным количеством потоков.
// R2DBC - неблокирующий код
@Repository
public class UserRepositoryR2dbc {
@Autowired
private R2dbcEntityTemplate template;
public Mono<User> findById(Long id) {
// Поток НЕ блокируется, возвращает Mono
return template.select(User.class)
.matching(Query.query(Criteria.where("id").is(id)))
.one();
}
public Flux<User> findAll() {
// Потоковая обработка, элементы приходят по мере готовности
return template.select(User.class)
.all()
.take(1000); // Ограничение для безопасности
}
}
Преимущества R2DBC:
- Неблокирующий I/O
- Потоковая обработка результатов
- Эффективное использование ресурсов
- Встроенная поддержка backpressure
Сравнение производительности
Характеристика | JDBC | R2DBC |
---|---|---|
Модель потоков | Thread-per-connection | Event loop |
Блокировка | Блокирующий I/O | Неблокирующий I/O |
Память | Загружает все в память | Потоковая обработка |
Масштабируемость | Ограничена потоками | Высокая |
Latency | Предсказуемая | Может быть выше для простых запросов |
Throughput | Средний | Очень высокий |
Настройка R2DBC
Зависимости и конфигурация
// application.yml
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/mydb
username: user
password: password
pool:
enabled: true
initial-size: 10
max-size: 20
max-idle-time: 30m
max-acquire-time: 60s
max-create-connection-time: 60s
@Configuration
@EnableR2dbcRepositories
public class R2dbcConfig extends AbstractR2dbcConfiguration {
@Override
public ConnectionFactory connectionFactory() {
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
.option(DRIVER, "postgresql")
.option(HOST, "localhost")
.option(PORT, 5432)
.option(USER, "user")
.option(PASSWORD, "password")
.option(DATABASE, "mydb")
.build();
return ConnectionFactories.get(options);
}
// Пул соединений для производительности
@Bean
public ConnectionFactory connectionFactory() {
return ConnectionPoolConfiguration.builder(connectionFactory())
.maxIdleTime(Duration.ofMinutes(30))
.initialSize(10)
.maxSize(20)
.maxCreateConnectionTime(Duration.ofSeconds(60))
.build();
}
}
Работа с PostgreSQL
Базовые операции с R2dbcEntityTemplate
@Service
public class UserService {
private final R2dbcEntityTemplate template;
// Поиск по ID
public Mono<User> findById(Long id) {
return template.select(User.class)
.matching(Query.query(Criteria.where("id").is(id)))
.one()
.switchIfEmpty(Mono.error(new UserNotFoundException(id)));
}
// Поиск с условиями
public Flux<User> findActiveUsers() {
return template.select(User.class)
.matching(Query.query(
Criteria.where("active").is(true)
.and("created_at").greaterThan(LocalDateTime.now().minusYears(1))
))
.all();
}
// Создание пользователя
public Mono<User> create(User user) {
user.setCreatedAt(LocalDateTime.now());
return template.insert(User.class)
.using(user)
.map(savedUser -> savedUser.withGeneratedId());
}
// Обновление
public Mono<User> update(User user) {
return template.update(User.class)
.matching(Query.query(Criteria.where("id").is(user.getId())))
.apply(Update.update("name", user.getName())
.set("email", user.getEmail())
.set("updated_at", LocalDateTime.now()))
.then(findById(user.getId()));
}
// Удаление
public Mono<Void> deleteById(Long id) {
return template.delete(User.class)
.matching(Query.query(Criteria.where("id").is(id)))
.all()
.then();
}
}
Сложные запросы с DatabaseClient
@Repository
public class CustomUserRepository {
private final DatabaseClient databaseClient;
// Сырой SQL запрос
public Flux<User> findUsersByComplexCriteria(String namePattern, int minAge) {
return databaseClient.sql("""
SELECT u.*, p.age
FROM users u
JOIN profiles p ON u.id = p.user_id
WHERE u.name ILIKE :pattern
AND p.age >= :minAge
ORDER BY u.created_at DESC
""")
.bind("pattern", "%" + namePattern + "%")
.bind("minAge", minAge)
.map((row, metadata) -> User.builder()
.id(row.get("id", Long.class))
.name(row.get("name", String.class))
.email(row.get("email", String.class))
.age(row.get("age", Integer.class))
.build())
.all();
}
// Агрегатные запросы
public Mono<UserStats> getUserStats() {
return databaseClient.sql("""
SELECT
COUNT(*) as total_users,
COUNT(CASE WHEN active = true THEN 1 END) as active_users,
AVG(age) as average_age,
MAX(created_at) as last_registration
FROM users u
LEFT JOIN profiles p ON u.id = p.user_id
""")
.map((row, metadata) -> UserStats.builder()
.totalUsers(row.get("total_users", Long.class))
.activeUsers(row.get("active_users", Long.class))
.averageAge(row.get("average_age", Double.class))
.lastRegistration(row.get("last_registration", LocalDateTime.class))
.build())
.one();
}
// Batch операции
public Flux<User> createUsers(List<User> users) {
return databaseClient.inConnectionMany(connection -> {
Statement statement = connection.createStatement(
"INSERT INTO users (name, email, created_at) VALUES ($1, $2, $3)"
);
return Flux.fromIterable(users)
.flatMap(user -> {
statement.bind(0, user.getName())
.bind(1, user.getEmail())
.bind(2, LocalDateTime.now())
.add();
return statement.execute();
})
.flatMap(result -> result.map((row, metadata) ->
User.builder()
.id(row.get("id", Long.class))
.name(row.get("name", String.class))
.email(row.get("email", String.class))
.build()));
});
}
}
Транзакции в R2DBC
@Service
@Transactional
public class TransactionalUserService {
private final R2dbcEntityTemplate template;
private final TransactionalOperator transactionalOperator;
// Декларативные транзакции
@Transactional
public Mono<User> createUserWithProfile(CreateUserRequest request) {
return template.insert(User.class)
.using(User.from(request))
.flatMap(user ->
template.insert(Profile.class)
.using(Profile.forUser(user))
.map(profile -> user.withProfile(profile))
);
}
// Программные транзакции
public Mono<TransferResult> transferFunds(Long fromUserId, Long toUserId, BigDecimal amount) {
return transactionalOperator.transactional(
findById(fromUserId)
.flatMap(fromUser -> {
if (fromUser.getBalance().compareTo(amount) < 0) {
return Mono.error(new InsufficientFundsException());
}
return updateBalance(fromUserId, fromUser.getBalance().subtract(amount));
})
.flatMap(fromUser -> updateBalance(toUserId, amount))
.map(toUser -> new TransferResult(fromUserId, toUserId, amount))
);
}
// Rollback при ошибке
@Transactional(rollbackFor = Exception.class)
public Mono<OrderResult> processOrder(CreateOrderRequest request) {
return createOrder(request)
.flatMap(order -> updateInventory(order.getItems()))
.flatMap(inventory -> processPayment(request.getPayment()))
.flatMap(payment -> sendConfirmation(request.getEmail()))
.onErrorResume(PaymentException.class, error -> {
logger.error("Payment failed, order will be rolled back", error);
return Mono.error(error);
});
}
}
Работа с MongoDB (Reactive)
Настройка Reactive MongoDB
@Configuration
@EnableReactiveMongoRepositories
public class MongoConfig extends AbstractReactiveMongoConfiguration {
@Override
protected String getDatabaseName() {
return "reactive-app";
}
@Override
public MongoClient reactiveMongoClient() {
return MongoClients.create(
MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(
"mongodb://localhost:27017/reactive-app"
))
.applyToConnectionPoolSettings(builder ->
builder.maxSize(20)
.minSize(5)
.maxConnectionIdleTime(30, TimeUnit.SECONDS))
.build()
);
}
}
Reactive MongoDB операции
@Service
public class ProductService {
private final ReactiveMongoTemplate mongoTemplate;
// Поиск документов
public Flux<Product> findByCategory(String category) {
Query query = Query.query(Criteria.where("category").is(category));
return mongoTemplate.find(query, Product.class)
.sort(Sort.by(Sort.Direction.DESC, "createdAt"));
}
// Сложные запросы с агрегацией
public Flux<CategoryStats> getCategoryStats() {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.group("category")
.count().as("productCount")
.avg("price").as("averagePrice")
.max("price").as("maxPrice"),
Aggregation.sort(Sort.Direction.DESC, "productCount")
);
return mongoTemplate.aggregate(
aggregation, "products", CategoryStats.class
);
}
// Текстовый поиск
public Flux<Product> searchProducts(String searchTerm) {
TextCriteria criteria = TextCriteria.forDefaultLanguage()
.matchingAny(searchTerm);
Query query = TextQuery.queryText(criteria)
.sortByScore();
return mongoTemplate.find(query, Product.class);
}
// Геопространственные запросы
public Flux<Store> findNearbyStores(double longitude, double latitude, double maxDistance) {
Point location = new Point(longitude, latitude);
Distance distance = new Distance(maxDistance, Metrics.KILOMETERS);
Query query = Query.query(
Criteria.where("location").nearSphere(location).maxDistance(distance.getValue())
);
return mongoTemplate.find(query, Store.class);
}
}
Потоковая обработка результатов
Интеграция с Flux для больших объемов данных
@Service
public class DataProcessingService {
private final R2dbcEntityTemplate template;
// Потоковая обработка миллионов записей
public Flux<ProcessedData> processLargeDataset() {
return template.select(RawData.class)
.all()
.buffer(1000) // Группируем по 1000 записей
.flatMap(batch -> processBatch(batch))
.onBackpressureBuffer(10000) // Буфер для backpressure
.doOnNext(data -> logger.debug("Processed: {}", data.getId()));
}
// Параллельная обработка с ограничением concurrency
public Flux<EnrichedUser> enrichUsers() {
return template.select(User.class)
.all()
.flatMap(user -> enrichUser(user), 10) // Максимум 10 параллельных операций
.onErrorContinue((error, user) -> {
logger.error("Failed to enrich user {}: {}",
((User) user).getId(), error.getMessage());
});
}
// Обработка в реальном времени с window операциями
public Flux<UserActivity> processUserActivities() {
return template.select(ActivityEvent.class)
.all()
.window(Duration.ofMinutes(5)) // Окна по 5 минут
.flatMap(window ->
window.groupBy(ActivityEvent::getUserId)
.flatMap(userEvents ->
userEvents.collectList()
.map(events -> aggregateUserActivity(events))
)
);
}
// Экспорт данных в потоковом режиме
public Flux<DataBuffer> exportToCsv() {
return template.select(ExportData.class)
.all()
.map(this::convertToCsvLine)
.startWith("id,name,email,created_at\n") // CSV заголовок
.map(line -> bufferFactory.wrap(line.getBytes()))
.doOnComplete(() -> logger.info("Export completed"));
}
}
Backpressure стратегии
@Service
public class BackpressureService {
// Buffer стратегия - накапливаем элементы
public Flux<ProcessedItem> bufferStrategy() {
return dataSource.getAllItems()
.onBackpressureBuffer(
10000, // Размер буфера
item -> logger.warn("Dropping item: {}", item), // При переполнении
BufferOverflowStrategy.DROP_OLDEST
)
.flatMap(this::processItem);
}
// Drop стратегия - отбрасываем элементы
public Flux<ProcessedItem> dropStrategy() {
return dataSource.getAllItems()
.onBackpressureDrop(item ->
logger.warn("Dropped item due to backpressure: {}", item)
)
.flatMap(this::processItem);
}
// Latest стратегия - только последний элемент
public Flux<ProcessedItem> latestStrategy() {
return dataSource.getAllItems()
.onBackpressureLatest()
.flatMap(this::processItem);
}
}
Ошибки при смешении блокирующего и неблокирующего I/O
Типичные ошибки
// ❌ ОШИБКА 1: Блокировка в реактивной цепочке
@Service
public class BadService {
public Mono<User> getBadUser(Long id) {
return userRepository.findById(id)
.map(user -> {
// КАТАСТРОФА! Блокируем event loop поток
Profile profile = profileService.getProfile(user.getId()).block();
return user.withProfile(profile);
});
}
// ❌ ОШИБКА 2: Использование JDBC в WebFlux приложении
public Flux<User> getAllUsersBad() {
// Блокирующий JDBC убивает всю реактивность
List<User> users = jdbcTemplate.query(
"SELECT * FROM users",
new BeanPropertyRowMapper<>(User.class)
);
return Flux.fromIterable(users);
}
// ❌ ОШИБКА 3: Синхронные операции в subscribe
public void processBad() {
userRepository.findAll()
.subscribe(user -> {
// Блокирующая операция в callback
Thread.sleep(1000);
processUser(user);
});
}
}
Правильные подходы
// ✅ ПРАВИЛЬНО: Полностью реактивная цепочка
@Service
public class GoodService {
public Mono<User> getGoodUser(Long id) {
return userRepository.findById(id)
.flatMap(user ->
profileService.getProfile(user.getId())
.map(profile -> user.withProfile(profile))
);
}
// ✅ ПРАВИЛЬНО: Использование R2DBC
public Flux<User> getAllUsersGood() {
return userRepository.findAll()
.take(1000) // Ограничение для безопасности
.timeout(Duration.ofSeconds(30));
}
// ✅ ПРАВИЛЬНО: Асинхронная обработка
public Mono<Void> processGood() {
return userRepository.findAll()
.flatMap(user ->
processUserAsync(user)
.subscribeOn(Schedulers.boundedElastic())
)
.then();
}
}
Обнаружение блокирующих операций
// Включение детектора блокировок в application.yml
reactor:
blockhound:
enabled: true
// Программная настройка
@Configuration
public class ReactorConfig {
@PostConstruct
public void configureReactor() {
// Включаем обнаружение блокировок
BlockHound.install();
// Исключения для известных блокирующих операций
BlockHound.builder()
.allowBlockingCallsInside("java.util.logging", "Logger")
.allowBlockingCallsInside("org.slf4j", "Logger")
.install();
}
}
Миграция с JDBC на R2DBC
// Шаг 1: Dual stack подход
@Service
public class MigrationService {
private final UserRepositoryJdbc jdbcRepository;
private final UserRepositoryR2dbc r2dbcRepository;
@Value("${app.use-r2dbc:false}")
private boolean useR2dbc;
public Mono<User> findById(Long id) {
if (useR2dbc) {
return r2dbcRepository.findById(id);
} else {
// Оборачиваем блокирующий вызов
return Mono.fromCallable(() -> jdbcRepository.findById(id))
.subscribeOn(Schedulers.boundedElastic());
}
}
}
// Шаг 2: Постепенная замена
@Service
public class HybridService {
// Критичные операции - уже на R2DBC
public Flux<User> getActiveUsers() {
return r2dbcRepository.findByActive(true);
}
// Редкие операции - пока на JDBC
public Mono<Report> generateReport() {
return Mono.fromCallable(() -> jdbcRepository.generateComplexReport())
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofMinutes(5));
}
}
Мониторинг и отладка
Логирование и метрики
@Service
public class MonitoredService {
public Flux<User> getUsers() {
return userRepository.findAll()
.name("user-fetch-all")
.tag("operation", "find-all")
.metrics() // Добавляет метрики Micrometer
.doOnSubscribe(sub -> logger.info("Starting user fetch"))
.doOnNext(user -> logger.debug("Fetched user: {}", user.getId()))
.doOnComplete(() -> logger.info("User fetch completed"))
.doOnError(error -> logger.error("User fetch failed", error));
}
// Debugging сложных цепочек
public Mono<EnrichedUser> debugChain(Long userId) {
return userRepository.findById(userId)
.checkpoint("after-user-fetch")
.flatMap(user ->
profileService.getProfile(user.getId())
.checkpoint("after-profile-fetch")
.map(profile -> user.withProfile(profile))
)
.checkpoint("after-enrichment");
}
}
Производительность
// Connection pooling мониторинг
@Component
public class ConnectionPoolMonitor {
@EventListener
public void handleConnectionEvent(ConnectionCreatedEvent event) {
logger.info("New connection created: {}", event.getConnectionId());
}
@EventListener
public void handleConnectionEvent(ConnectionReleasedEvent event) {
logger.debug("Connection released: {}", event.getConnectionId());
}
}
// Health checks
@Component
public class R2dbcHealthIndicator implements ReactiveHealthIndicator {
private final DatabaseClient databaseClient;
@Override
public Mono<Health> health() {
return databaseClient.sql("SELECT 1")
.fetch()
.one()
.map(result -> Health.up().withDetail("database", "available").build())
.onErrorResume(error ->
Mono.just(Health.down(error).withDetail("database", "unavailable").build())
)
.timeout(Duration.ofSeconds(10));
}
}
Проектирование реактивных сервисов
Thread Model: Event Loop и Non-blocking I/O
Традиционная модель (Thread-per-request)
Принцип: Каждый запрос обрабатывается отдельным потоком из пула.
// Традиционный подход - блокирующий
@RestController
public class TraditionalController {
@GetMapping("/users/{id}")
public User getUser(@PathVariable String id) {
// Поток блокируется здесь
User user = userService.findById(id); // ~50ms DB
Profile profile = profileService.get(id); // ~30ms HTTP
Settings settings = settingsService.get(id); // ~20ms Cache
return user.withProfile(profile).withSettings(settings);
// Общее время: ~100ms, поток занят все это время
}
}
Проблемы:
- Один поток = один запрос
- Поток простаивает во время I/O операций
- Ограничение по количеству одновременных запросов (обычно 200-400)
- Высокое потребление памяти (1MB на поток)
Event Loop модель (реактивная)
Принцип: Небольшое количество потоков обрабатывает множество запросов асинхронно.
// Реактивный подход - неблокирующий
@RestController
public class ReactiveController {
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {
// Параллельное выполнение без блокировки потоков
Mono<User> userMono = userService.findById(id);
Mono<Profile> profileMono = profileService.get(id);
Mono<Settings> settingsMono = settingsService.get(id);
return Mono.zip(userMono, profileMono, settingsMono)
.map(tuple -> tuple.getT1()
.withProfile(tuple.getT2())
.withSettings(tuple.getT3())
);
// Общее время: ~50ms (самая медленная операция)
// Потоки не блокируются
}
}
Преимущества:
- Множество запросов на небольшом количестве потоков
- Потоки не блокируются на I/O
- Высокая пропускная способность
- Эффективное использование ресурсов
Архитектура Event Loop
// Упрощенная схема работы
public class EventLoopExample {
// Event Loop обрабатывает события
public void eventLoopCycle() {
while (running) {
// 1. Проверяем готовые I/O операции
List<ReadyOperation> readyOps = selector.select();
// 2. Обрабатываем каждую готовую операцию
for (ReadyOperation op : readyOps) {
if (op.isReadReady()) {
processRead(op);
} else if (op.isWriteReady()) {
processWrite(op);
}
}
// 3. Выполняем задачи из очереди
processTaskQueue();
}
}
// Неблокирующая обработка запроса
public void handleRequest(HttpRequest request) {
// Не блокируем поток, а регистрируем callback
databaseClient.findUser(request.getUserId())
.whenComplete((user, error) -> {
if (error != null) {
sendErrorResponse(request, error);
} else {
sendSuccessResponse(request, user);
}
});
// Поток сразу возвращается к обработке других событий
}
}
Управление потоками: subscribeOn и publishOn
subscribeOn - Поток для подписки
Назначение: Определяет, на каком планировщике будет выполняться подписка и источник данных.
@Service
public class SubscribeOnService {
// subscribeOn влияет на источник данных
public Mono<Data> fetchData() {
return Mono.fromCallable(() -> {
// Этот код выполнится на boundedElastic
logger.info("Fetching on thread: {}", Thread.currentThread().getName());
return expensiveDatabaseCall();
})
.subscribeOn(Schedulers.boundedElastic()) // Источник на elastic потоке
.map(data -> {
// Это тоже выполнится на boundedElastic
logger.info("Mapping on thread: {}", Thread.currentThread().getName());
return transformData(data);
});
}
// Только первый subscribeOn имеет эффект
public Mono<String> multipleSubscribeOn() {
return Mono.just("data")
.subscribeOn(Schedulers.parallel()) // Этот применится
.subscribeOn(Schedulers.boundedElastic()) // Этот игнорируется
.map(String::toUpperCase);
}
}
publishOn - Поток для downstream операций
Назначение: Переключает выполнение downstream операций на указанный планировщик.
@Service
public class PublishOnService {
// publishOn влияет на последующие операции
public Flux<ProcessedData> processData() {
return dataSource.getData()
.map(data -> {
// Выполняется на потоке источника данных
logger.info("Initial processing: {}", Thread.currentThread().getName());
return data;
})
.publishOn(Schedulers.parallel()) // Переключаемся на parallel
.map(data -> {
// Выполняется на parallel потоке
logger.info("CPU processing: {}", Thread.currentThread().getName());
return cpuIntensiveProcessing(data);
})
.publishOn(Schedulers.boundedElastic()) // Переключаемся на elastic
.flatMap(data -> {
// Выполняется на boundedElastic потоке
logger.info("I/O operation: {}", Thread.currentThread().getName());
return saveToDatabase(data);
});
}
// Множественные publishOn работают последовательно
public Mono<String> multiplePublishOn() {
return Mono.just("data")
.publishOn(Schedulers.parallel())
.map(s -> s + "-parallel") // На parallel потоке
.publishOn(Schedulers.boundedElastic())
.map(s -> s + "-elastic"); // На elastic потоке
}
}
Комбинирование subscribeOn и publishOn
@Service
public class CombinedSchedulingService {
public Flux<Result> complexProcessing() {
return Mono.fromCallable(() -> loadFromFile()) // I/O операция
.subscribeOn(Schedulers.boundedElastic()) // Источник на elastic
.flatMapMany(data -> Flux.fromIterable(data.getItems()))
.publishOn(Schedulers.parallel()) // CPU обработка на parallel
.map(this::cpuIntensiveTransform)
.buffer(100)
.publishOn(Schedulers.boundedElastic()) // Обратно на elastic для DB
.flatMap(batch -> saveBatch(batch));
}
// Оптимизация для разных типов операций
public Mono<EnrichedUser> enrichUser(String userId) {
return userRepository.findById(userId)
.subscribeOn(Schedulers.boundedElastic()) // DB операция
.publishOn(Schedulers.parallel()) // CPU обработка
.map(user -> calculateScore(user))
.publishOn(Schedulers.boundedElastic()) // HTTP вызов
.flatMap(user -> externalService.enrich(user));
}
}
Типы планировщиков (Schedulers)
Schedulers.parallel() - CPU-intensive задачи
Назначение: Оптимизирован для вычислительных задач, использует все доступные CPU ядра.
@Service
public class ParallelSchedulerService {
// Подходит для математических вычислений
public Flux<CalculationResult> performCalculations() {
return Flux.range(1, 1000000)
.publishOn(Schedulers.parallel())
.map(this::complexMathOperation) // CPU-intensive
.filter(result -> result.isValid())
.groupBy(result -> result.getCategory())
.flatMap(group -> group.collectList());
}
// Параллельная обработка данных
public Flux<ProcessedImage> processImages(List<Image> images) {
return Flux.fromIterable(images)
.publishOn(Schedulers.parallel())
.map(this::resizeImage) // CPU-intensive
.map(this::applyFilters) // CPU-intensive
.map(this::compress); // CPU-intensive
}
}
Характеристики:
- Количество потоков = количество CPU ядер
- Неблокирующие операции
- Оптимален для вычислений
Schedulers.boundedElastic() - I/O операции
Назначение: Для блокирующих I/O операций, создает потоки по требованию с ограничением.
@Service
public class BoundedElasticService {
// Подходит для файловых операций
public Mono<String> readFile(String filename) {
return Mono.fromCallable(() -> {
// Блокирующая операция
return Files.readString(Paths.get(filename));
})
.subscribeOn(Schedulers.boundedElastic());
}
// HTTP вызовы к внешним сервисам
public Flux<ApiResponse> callExternalApis(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url ->
Mono.fromCallable(() -> restTemplate.getForObject(url, ApiResponse.class))
.subscribeOn(Schedulers.boundedElastic())
, 10); // Максимум 10 одновременных вызовов
}
// Работа с legacy блокирующими библиотеками
public Mono<LegacyResult> callLegacyService(Request request) {
return Mono.fromCallable(() -> {
// Блокирующий legacy код
return legacyService.process(request);
})
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(30));
}
}
Характеристики:
- Создает потоки по требованию (до 10x CPU cores)
- TTL потоков = 60 секунд
- Подходит для блокирующих операций
Schedulers.single() - Последовательная обработка
Назначение: Один поток для всех операций, гарантирует последовательность.
@Service
public class SingleSchedulerService {
// Обработка событий в строгом порядке
public Flux<EventResult> processEvents() {
return eventSource.getEvents()
.publishOn(Schedulers.single()) // Один поток = строгий порядок
.map(this::processEvent)
.doOnNext(this::logEvent);
}
// Обновление shared state
private int counter = 0;
public Mono<Integer> incrementCounter() {
return Mono.fromCallable(() -> ++counter)
.subscribeOn(Schedulers.single()); // Thread-safe без синхронизации
}
}
Schedulers.immediate() - Текущий поток
Назначение: Выполняет операции на текущем потоке.
@Service
public class ImmediateSchedulerService {
// Легкие операции без переключения потоков
public Mono<String> lightProcessing(String input) {
return Mono.just(input)
.publishOn(Schedulers.immediate()) // Остается на текущем потоке
.map(String::toUpperCase)
.map(s -> s + "-processed");
}
}
Проблемы Thread Leaks
Что такое Thread Leak
Thread Leak - ситуация, когда потоки создаются, но не освобождаются, что приводит к исчерпанию ресурсов.
// ❌ ОПАСНО: Thread leak через неправильное использование
@Service
public class LeakyService {
// Создание кастомного планировщика без cleanup
public Flux<Data> badSchedulerUsage() {
Scheduler customScheduler = Schedulers.newParallel("custom", 10);
return dataSource.getData()
.publishOn(customScheduler) // Scheduler никогда не освобождается!
.map(this::processData);
}
// Infinite streams без proper disposal
public void badInfiniteStream() {
Flux.interval(Duration.ofSeconds(1))
.publishOn(Schedulers.boundedElastic())
.subscribe(tick -> {
// Поток будет работать вечно
processPeriodicTask(tick);
});
// Подписка никогда не отменяется!
}
// Блокирующие операции на неподходящих планировщиках
public Mono<String> badBlockingCall() {
return Mono.fromCallable(() -> {
Thread.sleep(10000); // Блокируем parallel поток!
return "result";
})
.subscribeOn(Schedulers.parallel()); // Неправильный планировщик
}
}
Правильное управление ресурсами
@Service
public class ProperResourceManagement {
private final Scheduler customScheduler;
public ProperResourceManagement() {
// Создаем кастомный планировщик
this.customScheduler = Schedulers.newBoundedElastic(
10, 100, "custom-scheduler");
}
@PreDestroy
public void cleanup() {
// ОБЯЗАТЕЛЬНО освобождаем планировщик
customScheduler.dispose();
}
// Правильное использование с try-with-resources аналогом
public Flux<Data> properSchedulerUsage() {
return dataSource.getData()
.publishOn(customScheduler)
.map(this::processData)
.doFinally(signalType -> {
// Cleanup действия при завершении
logger.info("Stream completed with signal: {}", signalType);
});
}
// Правильная работа с infinite streams
public Disposable properInfiniteStream() {
return Flux.interval(Duration.ofSeconds(1))
.publishOn(Schedulers.boundedElastic())
.subscribe(
tick -> processPeriodicTask(tick),
error -> logger.error("Stream error", error)
);
// Возвращаем Disposable для возможности отмены
}
// Управление жизненным циклом подписок
private final CompositeDisposable disposables = new CompositeDisposable();
public void startPeriodicTasks() {
Disposable task1 = Flux.interval(Duration.ofMinutes(1))
.subscribe(this::hourlyTask);
Disposable task2 = Flux.interval(Duration.ofMinutes(5))
.subscribe(this::fiveMinuteTask);
disposables.addAll(task1, task2);
}
@PreDestroy
public void stopAllTasks() {
disposables.dispose(); // Отменяем все подписки
}
}
Мониторинг потоков
@Component
public class ThreadMonitor {
@Scheduled(fixedRate = 60000) // Каждую минуту
public void monitorThreads() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
int threadCount = threadBean.getThreadCount();
logger.info("Active threads: {}", threadCount);
// Проверяем на утечки
if (threadCount > 1000) {
logger.warn("Potential thread leak detected! Thread count: {}", threadCount);
// Дополнительная диагностика
ThreadInfo[] threads = threadBean.getThreadInfo(threadBean.getAllThreadIds());
Map<String, Long> threadGroups = Arrays.stream(threads)
.filter(Objects::nonNull)
.collect(Collectors.groupingBy(
thread -> thread.getThreadName().split("-")[0],
Collectors.counting()
));
threadGroups.forEach((name, count) ->
logger.info("Thread group '{}': {} threads", name, count)
);
}
}
// Health check для планировщиков
@EventListener
public void checkSchedulerHealth() {
// Проверяем состояние планировщиков
if (Schedulers.parallel().isDisposed()) {
logger.error("Parallel scheduler is disposed!");
}
if (Schedulers.boundedElastic().isDisposed()) {
logger.error("BoundedElastic scheduler is disposed!");
}
}
}
Механизмы управления нагрузкой
Rate Limiting - Ограничение частоты
Назначение: Контроль количества операций в единицу времени.
@Service
public class RateLimitingService {
// Простое rate limiting
public Flux<ApiResponse> rateLimitedCalls() {
return requestSource.getRequests()
.delayElements(Duration.ofMillis(100)) // Максимум 10 запросов/сек
.flatMap(this::makeApiCall);
}
// Sliding window rate limiting
public Flux<ProcessedData> slidingWindowRateLimit() {
return dataSource.getData()
.window(Duration.ofSeconds(1)) // Окно в 1 секунду
.flatMap(window ->
window.take(100) // Максимум 100 элементов в окне
.flatMap(this::processData)
);
}
// Adaptive rate limiting на основе нагрузки
private volatile int currentRate = 10;
public Flux<Result> adaptiveRateLimit() {
return taskSource.getTasks()
.delayElements(Duration.ofMillis(1000 / currentRate))
.flatMap(task ->
processTask(task)
.doOnSuccess(result -> {
if (result.getProcessingTime() < 100) {
currentRate = Math.min(currentRate + 1, 100); // Увеличиваем rate
}
})
.doOnError(error -> {
currentRate = Math.max(currentRate - 5, 1); // Уменьшаем rate
})
);
}
}
Window операторы - Группировка по времени
Назначение: Группировка элементов потока в окна для batch обработки.
@Service
public class WindowOperatorsService {
// Временные окна для batch обработки
public Flux<BatchResult> timeBasedWindows() {
return eventStream.getEvents()
.window(Duration.ofSeconds(5)) // Окна по 5 секунд
.flatMap(window ->
window.collectList() // Собираем элементы окна
.flatMap(this::processBatch)
);
}
// Окна по количеству элементов
public Flux<ProcessedBatch> countBasedWindows() {
return dataStream.getData()
.window(100) // Окна по 100 элементов
.flatMap(window ->
window.buffer() // Преобразуем в List
.flatMap(this::optimizedBatchProcess)
);
}
// Скользящие окна с перекрытием
public Flux<Statistics> slidingWindows() {
return metricStream.getMetrics()
.window(Duration.ofMinutes(5), Duration.ofMinutes(1)) // 5-мин окна каждую минуту
.flatMap(window ->
window.collectList()
.map(this::calculateStatistics)
);
}
// Сессионные окна (по периодам неактивности)
public Flux<SessionData> sessionWindows() {
return userActivityStream.getActivities()
.windowTimeout(100, Duration.ofMinutes(30)) // До 100 событий или 30 мин неактивности
.flatMap(window ->
window.collectList()
.map(this::aggregateSession)
);
}
}
Sample операторы - Прореживание потока
Назначение: Уменьшение частоты элементов в потоке для снижения нагрузки.
@Service
public class SamplingService {
// Временная выборка
public Flux<Measurement> timeSampling() {
return sensorStream.getMeasurements()
.sample(Duration.ofSeconds(1)) // Берем одно измерение в секунду
.filter(measurement -> measurement.isValid());
}
// Выборка по другому потоку (trigger-based)
public Flux<Data> triggerBasedSampling() {
Flux<Data> dataStream = dataSource.getData();
Flux<Object> triggerStream = Flux.interval(Duration.ofSeconds(5));
return dataStream.sample(triggerStream); // Выборка каждые 5 секунд
}
// Throttling - ограничение частоты с задержкой
public Flux<Event> throttleEvents() {
return eventStream.getEvents()
.throttleFirst(Duration.ofSeconds(2)) // Первый элемент, затем игнор на 2 сек
.doOnNext(event -> logger.info("Processed event: {}", event.getId()));
}
// Debounce - убираем "дрожание"
public Flux<SearchResult> debounceSearch() {
return userInputService.getSearchQueries()
.debounce(Duration.ofMillis(300)) // Ждем 300мс паузы
.distinctUntilChanged() // Игнорируем дубликаты
.flatMap(query -> searchService.search(query));
}
}
Buffer операторы - Накопление элементов
Назначение: Группировка элементов для более эффективной обработки.
@Service
public class BufferingService {
// Буферизация по размеру
public Flux<BatchWriteResult> sizeBasedBuffering() {
return writeRequestStream.getRequests()
.buffer(1000) // Группы по 1000 записей
.flatMap(batch ->
databaseService.batchWrite(batch)
.retry(3)
.timeout(Duration.ofSeconds(30))
);
}
// Буферизация по времени
public Flux<Notification> timeBasedBuffering() {
return notificationStream.getNotifications()
.buffer(Duration.ofMinutes(5)) // Группы за 5 минут
.filter(notifications -> !notifications.isEmpty())
.flatMap(batch ->
notificationService.sendBulkNotifications(batch)
);
}
// Буферизация с overflow стратегией
public Flux<ProcessedData> bufferWithOverflow() {
return highVolumeStream.getData()
.onBackpressureBuffer(
10000, // Размер буфера
data -> logger.warn("Dropping data: {}", data.getId()), // При переполнении
BufferOverflowStrategy.DROP_OLDEST
)
.buffer(100, Duration.ofSeconds(1)) // Группы по 100 или каждую секунду
.flatMap(this::processBatch);
}
// Адаптивная буферизация
public Flux<AdaptiveBatchResult> adaptiveBuffering() {
return dataStream.getData()
.bufferTimeout(
getBatchSize(), // Динамический размер
Duration.ofSeconds(getTimeout()) // Динамический таймаут
)
.flatMap(batch -> {
long start = System.currentTimeMillis();
return processBatch(batch)
.doOnSuccess(result -> {
long duration = System.currentTimeMillis() - start;
adjustBatchParameters(duration, batch.size());
});
});
}
private int getBatchSize() {
// Логика адаптации размера batch
return Math.max(10, Math.min(currentSystemLoad() * 100, 1000));
}
}
Комплексная стратегия управления нагрузкой
@Service
public class LoadManagementService {
private final AtomicInteger activeConnections = new AtomicInteger(0);
private final CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("external-service");
public Flux<ProcessedRequest> comprehensiveLoadManagement() {
return requestStream.getRequests()
// 1. Rate limiting
.delayElements(Duration.ofMillis(10)) // Максимум 100 req/sec
// 2. Backpressure handling
.onBackpressureBuffer(
1000,
request -> logger.warn("Dropping request: {}", request.getId()),
BufferOverflowStrategy.DROP_LATEST
)
// 3. Batching для эффективности
.buffer(50, Duration.ofSeconds(1))
// 4. Circuit breaker protection
.flatMap(batch ->
Mono.fromCallable(() -> circuitBreaker.executeSupplier(() ->
processBatch(batch)
))
.subscribeOn(Schedulers.boundedElastic())
)
// 5. Retry с exponential backoff
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
// 6. Timeout protection
.timeout(Duration.ofSeconds(30))
// 7. Concurrency limiting
.flatMap(result ->
processResult(result),
10 // Максимум 10 одновременных обработок
)
// 8. Monitoring
.doOnSubscribe(sub -> activeConnections.incrementAndGet())
.doFinally(signal -> activeConnections.decrementAndGet())
.doOnNext(result -> updateMetrics(result));
}
// Health check и adaptive parameters
@Scheduled(fixedRate = 30000)
public void adjustLoadParameters() {
int connections = activeConnections.get();
double systemLoad = getSystemLoad();
if (systemLoad > 0.8 || connections > 1000) {
// Снижаем нагрузку
adjustRateLimit(0.5);
adjustBatchSize(0.8);
} else if (systemLoad < 0.3 && connections < 100) {
// Увеличиваем пропускную способность
adjustRateLimit(1.2);
adjustBatchSize(1.1);
}
}
}
Интеграция реактивных технологий
Kafka + Reactive (reactor-kafka)
Что такое reactor-kafka
Reactor-kafka - это реактивная библиотека для работы с Apache Kafka, построенная на Project Reactor. Предоставляет неблокирующий API для производства и потребления сообщений с поддержкой backpressure.
Преимущества перед обычным Kafka Client:
- Неблокирующие операции
- Встроенная поддержка backpressure
- Интеграция с реактивными цепочками
- Эффективное управление ресурсами
Настройка и конфигурация
@Configuration
public class ReactiveKafkaConfig {
@Bean
public ReactiveKafkaProducerTemplate<String, Object> reactiveKafkaProducerTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new ReactiveKafkaProducerTemplate<>(
SenderOptions.create(props)
);
}
@Bean
public ReceiverOptions<String, Object> kafkaReceiverOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return ReceiverOptions.create(props);
}
}
Reactive Producer - отправка сообщений
@Service
public class ReactiveKafkaProducerService {
private final ReactiveKafkaProducerTemplate<String, Object> template;
private final KafkaSender<String, Object> sender;
// Простая отправка сообщения
public Mono<SenderResult<Void>> sendMessage(String topic, String key, Object value) {
return template.send(topic, key, value)
.doOnSuccess(result -> logger.info("Message sent to topic {}: {}", topic, key))
.doOnError(error -> logger.error("Failed to send message", error));
}
// Batch отправка для производительности
public Flux<SenderResult<String>> sendBatch(Flux<UserEvent> events) {
return events
.map(event -> SenderRecord.create(
new ProducerRecord<>("user-events", event.getUserId(), event),
event.getId() // correlation metadata
))
.buffer(100) // Группируем по 100 сообщений
.flatMap(batch -> sender.send(Flux.fromIterable(batch)))
.doOnNext(result -> {
if (result.exception() != null) {
logger.error("Failed to send batch message: {}",
result.correlationMetadata(), result.exception());
}
});
}
// Transactional отправка
public Mono<Void> sendTransactional(List<OrderEvent> events) {
return sender.sendTransactionally(
Flux.fromIterable(events)
.map(event -> SenderRecord.create(
new ProducerRecord<>("orders", event.getOrderId(), event),
null
))
)
.then()
.doOnSuccess(v -> logger.info("Transactional batch sent successfully"))
.doOnError(error -> logger.error("Transactional send failed", error));
}
}
Reactive Consumer - потребление сообщений
@Service
public class ReactiveKafkaConsumerService {
private final ReceiverOptions<String, Object> receiverOptions;
// Простое потребление сообщений
public Flux<ProcessedMessage> consumeMessages(String topic) {
return KafkaReceiver.create(
receiverOptions.subscription(Collections.singleton(topic))
)
.receive()
.flatMap(record ->
processMessage(record.value())
.doOnSuccess(result -> record.receiverOffset().acknowledge())
.doOnError(error -> {
logger.error("Failed to process message: {}", record.key(), error);
// Можно реализовать DLQ или retry логику
})
.onErrorResume(error -> Mono.empty()) // Пропускаем неудачные сообщения
);
}
// Потребление с backpressure и manual commit
public Flux<BatchResult> consumeWithBackpressure(String topic) {
return KafkaReceiver.create(
receiverOptions
.subscription(Collections.singleton(topic))
.commitInterval(Duration.ofSeconds(5)) // Commit каждые 5 секунд
.commitBatchSize(100) // Или каждые 100 сообщений
)
.receive()
.buffer(50) // Группируем по 50 сообщений для batch обработки
.flatMap(records -> {
List<Object> messages = records.stream()
.map(ConsumerRecord::value)
.collect(Collectors.toList());
return processBatch(messages)
.doOnSuccess(result -> {
// Acknowledge всех сообщений в batch
records.forEach(record ->
record.receiverOffset().acknowledge()
);
})
.onErrorResume(error -> {
logger.error("Batch processing failed", error);
return Mono.just(BatchResult.failed());
});
})
.onBackpressureBuffer(1000); // Буфер для управления нагрузкой
}
// Потребление с retry и dead letter queue
public Flux<ProcessedMessage> consumeWithRetry(String topic) {
return KafkaReceiver.create(
receiverOptions.subscription(Collections.singleton(topic))
)
.receive()
.flatMap(record ->
processMessage(record.value())
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(error -> !(error instanceof ValidationException))
)
.doOnSuccess(result -> record.receiverOffset().acknowledge())
.onErrorResume(error -> {
// Отправляем в DLQ после всех retry
return sendToDLQ(record, error)
.doOnSuccess(v -> record.receiverOffset().acknowledge())
.then(Mono.empty());
})
);
}
}
Паттерны интеграции с Kafka
@Service
public class KafkaIntegrationPatterns {
// Event Sourcing паттерн
public Flux<AggregateState> eventSourcingConsumer() {
return KafkaReceiver.create(receiverOptions.subscription(Set.of("domain-events")))
.receive()
.scan(AggregateState.empty(), (state, record) -> {
DomainEvent event = (DomainEvent) record.value();
return state.apply(event); // Применяем событие к состоянию
})
.doOnNext(state -> record.receiverOffset().acknowledge());
}
// CQRS - разделение команд и запросов
public Mono<Void> cqrsCommandHandler(Command command) {
return validateCommand(command)
.flatMap(this::processCommand)
.flatMap(events ->
// Публикуем события для read models
Flux.fromIterable(events)
.flatMap(event -> kafkaProducer.send("read-model-events", event))
.then()
);
}
// Saga паттерн для распределенных транзакций
public Flux<SagaStep> sagaOrchestrator() {
return KafkaReceiver.create(receiverOptions.subscription(Set.of("saga-events")))
.receive()
.flatMap(record -> {
SagaEvent event = (SagaEvent) record.value();
return sagaManager.processEvent(event)
.flatMapMany(nextSteps ->
Flux.fromIterable(nextSteps)
.flatMap(step -> kafkaProducer.send(step.getTargetTopic(), step))
);
});
}
}
Redis PubSub
Реактивный Redis клиент
Spring Data Redis Reactive предоставляет реактивный API для работы с Redis, включая pub/sub операции.
@Configuration
public class ReactiveRedisConfig {
@Bean
public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate() {
LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(
new RedisStandaloneConfiguration("localhost", 6379)
);
RedisSerializationContext<String, Object> context = RedisSerializationContext
.<String, Object>newSerializationContext()
.key(StringRedisSerializer.UTF_8)
.value(new GenericJackson2JsonRedisSerializer())
.build();
return new ReactiveRedisTemplate<>(connectionFactory, context);
}
@Bean
public ReactiveRedisMessageListenerContainer redisMessageListenerContainer(
ReactiveRedisConnectionFactory connectionFactory) {
return new ReactiveRedisMessageListenerContainer(connectionFactory);
}
}
Publisher - отправка сообщений
@Service
public class ReactiveRedisPublisher {
private final ReactiveRedisTemplate<String, Object> redisTemplate;
// Простая публикация сообщения
public Mono<Long> publishMessage(String channel, Object message) {
return redisTemplate.convertAndSend(channel, message)
.doOnSuccess(subscribers ->
logger.info("Message published to {} subscribers on channel {}",
subscribers, channel)
)
.doOnError(error ->
logger.error("Failed to publish message to channel {}", channel, error)
);
}
// Batch публикация для производительности
public Flux<Long> publishBatch(String channel, Flux<Notification> notifications) {
return notifications
.buffer(100, Duration.ofSeconds(5)) // Группируем сообщения
.flatMap(batch ->
Flux.fromIterable(batch)
.flatMap(notification ->
redisTemplate.convertAndSend(channel, notification)
)
.collectList()
.map(results -> results.stream().mapToLong(Long::longValue).sum())
);
}
// Условная публикация с проверками
public Mono<Void> publishUserEvent(UserEvent event) {
return checkUserSubscription(event.getUserId())
.filter(isSubscribed -> isSubscribed)
.flatMap(isSubscribed ->
redisTemplate.convertAndSend("user-events:" + event.getUserId(), event)
)
.then()
.doOnSuccess(v -> updateEventMetrics(event))
.onErrorResume(error -> {
logger.warn("Failed to publish user event: {}", event.getId(), error);
return Mono.empty();
});
}
}
Subscriber - получение сообщений
@Service
public class ReactiveRedisSubscriber {
private final ReactiveRedisMessageListenerContainer listenerContainer;
private final ReactiveRedisTemplate<String, Object> redisTemplate;
// Подписка на канал
public Flux<Message<String, Object>> subscribeToChannel(String channelPattern) {
return listenerContainer
.receive(ChannelTopic.of(channelPattern))
.doOnSubscribe(sub -> logger.info("Subscribed to channel: {}", channelPattern))
.doOnNext(message ->
logger.debug("Received message on channel {}: {}",
message.getChannel(), message.getMessage())
)
.doOnError(error ->
logger.error("Error in subscription to {}", channelPattern, error)
);
}
// Подписка с обработкой и backpressure
public Flux<ProcessedMessage> subscribeWithProcessing(String channel) {
return listenerContainer
.receive(ChannelTopic.of(channel))
.cast(Message.class)
.flatMap(message ->
processMessage(message.getMessage())
.onErrorResume(error -> {
logger.error("Failed to process message: {}",
message.getMessage(), error);
return Mono.empty();
})
)
.onBackpressureBuffer(1000)
.doOnNext(result -> updateProcessingMetrics());
}
// Pattern-based подписка (wildcard)
public Flux<RoutedMessage> subscribeToPattern(String pattern) {
return listenerContainer
.receive(PatternTopic.of(pattern))
.map(message -> new RoutedMessage(
message.getChannel(),
message.getMessage(),
extractRoutingKey(message.getChannel())
))
.groupBy(RoutedMessage::getRoutingKey)
.flatMap(group ->
group.flatMap(routedMessage ->
routeMessage(routedMessage)
)
);
}
}
Реактивные паттерны с Redis
@Service
public class RedisReactivePatterns {
// Cache-aside паттерн
public Mono<User> getCachedUser(String userId) {
return redisTemplate.opsForValue().get("user:" + userId)
.cast(User.class)
.switchIfEmpty(
userRepository.findById(userId)
.flatMap(user ->
redisTemplate.opsForValue()
.set("user:" + userId, user, Duration.ofHours(1))
.thenReturn(user)
)
);
}
// Distributed locking
public <T> Mono<T> withDistributedLock(String lockKey, Mono<T> operation) {
String lockValue = UUID.randomUUID().toString();
return redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, Duration.ofMinutes(5))
.flatMap(acquired -> {
if (acquired) {
return operation
.doFinally(signal ->
// Освобождаем lock
redisTemplate.delete(lockKey).subscribe()
);
} else {
return Mono.error(new LockAcquisitionException(lockKey));
}
});
}
// Rate limiting с Redis
public Mono<Boolean> checkRateLimit(String userId, int maxRequests, Duration window) {
String key = "rate_limit:" + userId;
long windowSeconds = window.getSeconds();
return redisTemplate.execute(connection -> {
return connection.eval(
// Lua script для атомарной операции
"local current = redis.call('GET', KEYS[1]) " +
"if current == false then " +
" redis.call('SETEX', KEYS[1], ARGV[2], 1) " +
" return 1 " +
"else " +
" local count = tonumber(current) " +
" if count < tonumber(ARGV[1]) then " +
" redis.call('INCR', KEYS[1]) " +
" return count + 1 " +
" else " +
" return -1 " +
" end " +
"end",
ReturnType.INTEGER,
Collections.singletonList(key),
maxRequests, windowSeconds
);
})
.map(result -> (Long) result > 0);
}
}
WebSocket + Backpressure
Реактивные WebSocket с Spring WebFlux
WebSocket в Spring WebFlux предоставляет реактивный API для real-time коммуникации с встроенной поддержкой backpressure.
@Configuration
@EnableWebSocket
public class ReactiveWebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new ReactiveWebSocketHandler(), "/ws")
.setAllowedOrigins("*");
}
}
@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final Sinks.Many<ChatMessage> messageSink = Sinks.many().multicast().onBackpressureBuffer();
@Override
public Mono<Void> handle(WebSocketSession session) {
String sessionId = session.getId();
sessions.put(sessionId, session);
// Входящие сообщения от клиента
Mono<Void> input = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(payload -> processIncomingMessage(sessionId, payload))
.doOnError(error -> logger.error("Error processing message", error))
.onErrorResume(error -> Mono.empty())
.then();
// Исходящие сообщения к клиенту с backpressure
Mono<Void> output = messageSink.asFlux()
.filter(message -> shouldSendToSession(sessionId, message))
.onBackpressureBuffer(1000) // Буфер для каждой сессии
.map(message -> session.textMessage(message.toJson()))
.as(session::send);
return Mono.zip(input, output)
.doFinally(signal -> {
sessions.remove(sessionId);
logger.info("Session {} disconnected", sessionId);
})
.then();
}
// Обработка входящих сообщений
private Mono<Void> processIncomingMessage(String sessionId, String payload) {
return Mono.fromCallable(() -> parseMessage(payload))
.flatMap(message -> {
message.setSenderId(sessionId);
message.setTimestamp(Instant.now());
// Валидация и сохранение
return validateMessage(message)
.flatMap(this::saveMessage)
.doOnSuccess(savedMessage -> {
// Распространяем сообщение всем подписчикам
messageSink.tryEmitNext(savedMessage);
});
})
.then();
}
}
Управление backpressure в WebSocket
@Service
public class WebSocketBackpressureService {
// Стратегии обработки backpressure
public Flux<ServerSentEvent<GameState>> gameStateStream(String gameId) {
return gameStateService.getStateUpdates(gameId)
// Стратегия 1: Sample - берем только последнее состояние
.sample(Duration.ofMillis(100)) // Максимум 10 обновлений в секунду
// Стратегия 2: ConflateMap - объединяем быстрые обновления
.onBackpressureLatest() // Оставляем только последнее
.map(gameState -> ServerSentEvent.<GameState>builder()
.id(gameState.getId())
.event("game-update")
.data(gameState)
.build())
.doOnNext(event -> updateClientMetrics())
.doOnError(error -> logger.error("Game state stream error", error));
}
// Adaptive backpressure на основе client capability
public Flux<MarketData> adaptiveMarketDataStream(String clientId) {
ClientCapability capability = getClientCapability(clientId);
return marketDataService.getPriceUpdates()
.onBackpressureBuffer(
capability.getBufferSize(),
data -> logger.warn("Dropping market data for slow client {}", clientId),
getOverflowStrategy(capability)
)
.sample(Duration.ofMillis(capability.getSampleRate()))
.doOnNext(data -> trackClientPerformance(clientId, data));
}
// WebSocket с different QoS levels
public Mono<Void> handleWebSocketWithQoS(WebSocketSession session) {
String qosLevel = extractQoSLevel(session);
Flux<Message> messageStream = getMessageStream()
.transform(flux -> applyQoSStrategy(flux, qosLevel));
return session.send(
messageStream
.map(message -> session.textMessage(message.toJson()))
.onErrorResume(error -> {
logger.error("Send error for session {}", session.getId(), error);
return Flux.empty();
})
);
}
private Flux<Message> applyQoSStrategy(Flux<Message> flux, String qosLevel) {
return switch (qosLevel) {
case "HIGH" -> flux.onBackpressureBuffer(10000);
case "MEDIUM" -> flux.onBackpressureBuffer(1000)
.sample(Duration.ofMillis(100));
case "LOW" -> flux.onBackpressureLatest()
.sample(Duration.ofMillis(500));
default -> flux.onBackpressureDrop();
};
}
}
WebSocket connection pooling и scaling
@Service
public class WebSocketConnectionManager {
private final Map<String, Set<WebSocketSession>> topicSubscriptions = new ConcurrentHashMap<>();
private final LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
// Управление подписками по топикам
public void subscribeToTopic(WebSocketSession session, String topic) {
topicSubscriptions.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet())
.add(session);
logger.info("Session {} subscribed to topic {}", session.getId(), topic);
}
// Broadcast сообщений с backpressure management
public Mono<Void> broadcastToTopic(String topic, Object message) {
Set<WebSocketSession> sessions = topicSubscriptions.get(topic);
if (sessions == null || sessions.isEmpty()) {
return Mono.empty();
}
return Flux.fromIterable(sessions)
.filter(session -> session.isOpen())
.flatMap(session ->
sendToSession(session, message)
.onErrorResume(error -> {
logger.warn("Failed to send to session {}", session.getId(), error);
removeInactiveSession(session, topic);
return Mono.empty();
})
, 100) // Максимум 100 одновременных отправок
.then();
}
private Mono<Void> sendToSession(WebSocketSession session, Object message) {
return Mono.fromCallable(() -> session.textMessage(objectMapper.writeValueAsString(message)))
.flatMap(webSocketMessage ->
Mono.fromCallable(() -> {
session.send(Mono.just(webSocketMessage)).subscribe();
return null;
})
)
.timeout(Duration.ofSeconds(5))
.subscribeOn(Schedulers.boundedElastic());
}
// Health monitoring для WebSocket connections
@Scheduled(fixedRate = 30000)
public void monitorConnections() {
topicSubscriptions.forEach((topic, sessions) -> {
int activeSessions = (int) sessions.stream()
.filter(WebSocketSession::isOpen)
.count();
logger.info("Topic {}: {} active connections", topic, activeSessions);
// Удаляем неактивные сессии
sessions.removeIf(session -> !session.isOpen());
});
}
}
MongoDB Reactive Streams
Настройка Reactive MongoDB
@Configuration
@EnableReactiveMongoRepositories
public class ReactiveMongoConfig extends AbstractReactiveMongoConfiguration {
@Override
protected String getDatabaseName() {
return "reactive-app";
}
@Override
public MongoClient reactiveMongoClient() {
ConnectionString connectionString = new ConnectionString(
"mongodb://localhost:27017/reactive-app?maxPoolSize=20&minPoolSize=5"
);
return MongoClients.create(
MongoClientSettings.builder()
.applyConnectionString(connectionString)
.applyToConnectionPoolSettings(builder ->
builder.maxSize(50)
.minSize(10)
.maxConnectionIdleTime(30, TimeUnit.SECONDS)
.maxWaitTime(10, TimeUnit.SECONDS)
)
.codecRegistry(fromProviders(
PojoCodecProvider.builder().automatic(true).build(),
new JacksonCodecProvider(objectMapper)
))
.build()
);
}
}
Reactive CRUD операции
@Service
public class ReactiveMongoService {
private final ReactiveMongoTemplate mongoTemplate;
// Потоковое чтение больших коллекций
public Flux<Document> streamLargeCollection(String collectionName) {
Query query = new Query();
query.limit(0); // Убираем лимит для потокового чтения
return mongoTemplate.find(query, Document.class, collectionName)
.buffer(1000) // Группируем для эффективности
.flatMap(batch ->
processBatch(batch)
.onErrorResume(error -> {
logger.error("Batch processing failed", error);
return Flux.empty();
})
)
.onBackpressureBuffer(10000);
}
// Реактивные агрегации
public Flux<AggregationResult> complexAggregation() {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("status").is("active")),
Aggregation.group("category")
.count().as("count")
.avg("price").as("avgPrice")
.max("createdAt").as("lastUpdate"),
Aggregation.sort(Sort.Direction.DESC, "count"),
Aggregation.limit(100)
);
return mongoTemplate.aggregate(aggregation, "products", AggregationResult.class)
.doOnNext(result -> logger.debug("Aggregation result: {}", result))
.timeout(Duration.ofMinutes(5));
}
// Batch операции с backpressure
public Flux<BulkWriteResult> batchInsert(Flux<Product> products) {
return products
.buffer(1000) // Группируем по 1000 документов
.flatMap(batch -> {
List<WriteModel<Product>> writeModels = batch.stream()
.map(product -> new InsertOneModel<>(product))
.collect(Collectors.toList());
return mongoTemplate.getCollection("products")
.flatMap(collection ->
Mono.fromPublisher(collection.bulkWrite(writeModels))
);
})
.onErrorResume(error -> {
logger.error("Bulk insert failed", error);
return Mono.empty();
});
}
}
Change Streams - реактивные обновления
@Service
public class MongoChangeStreamService {
private final ReactiveMongoTemplate mongoTemplate;
// Отслеживание изменений в коллекции
public Flux<ChangeStreamEvent<Document>> watchCollection(String collectionName) {
ChangeStreamOptions options = ChangeStreamOptions.builder()
.filter(Aggregation.newAggregation(
Aggregation.match(Criteria.where("operationType").in("insert", "update", "delete"))
))
.fullDocument(FullDocument.UPDATE_LOOKUP)
.build();
return mongoTemplate.changeStream(collectionName, options, Document.class)
.doOnNext(event ->
logger.info("Change detected: {} on {}",
event.getOperationType(), event.getDocumentKey())
)
.doOnError(error ->
logger.error("Change stream error for collection {}", collectionName, error)
)
.retry(3)
.onErrorResume(error -> {
logger.error("Change stream failed permanently", error);
return Flux.empty();
});
}
// Real-time синхронизация данных
public Flux<SyncEvent> synchronizeData() {
return mongoTemplate.changeStream("users", ChangeStreamOptions.empty(), User.class)
.flatMap(changeEvent ->
switch (changeEvent.getOperationType()) {
case INSERT -> handleUserInsert(changeEvent.getBody());
case UPDATE -> handleUserUpdate(changeEvent.getBody());
case DELETE -> handleUserDelete(changeEvent.getDocumentKey());
default -> Mono.empty();
}
)
.onBackpressureBuffer(1000)
.share(); // Делаем hot stream для multiple subscribers
}
// Distributed cache invalidation через change streams
public Flux<CacheInvalidation> cacheInvalidationStream() {
return mongoTemplate.changeStream("cache_invalidation",
ChangeStreamOptions.empty(), CacheInvalidation.class)
.map(ChangeStreamEvent::getBody)
.groupBy(CacheInvalidation::getCacheRegion)
.flatMap(group ->
group.buffer(Duration.ofSeconds(1)) // Batch invalidations
.filter(batch -> !batch.isEmpty())
.flatMap(this::processCacheInvalidationBatch)
);
}
}
Reactive Transactions
@Service
public class ReactiveMongoTransactionService {
private final ReactiveMongoTransactionManager transactionManager;
private final ReactiveMongoTemplate mongoTemplate;
// Декларативные транзакции
@Transactional
public Mono<OrderResult> processOrderTransactional(CreateOrderRequest request) {
return createOrder(request)
.flatMap(order -> updateInventory(order.getItems()))
.flatMap(inventory -> processPayment(request.getPayment()))
.flatMap(payment -> updateOrderStatus(request.getOrderId(), "COMPLETED"))
.onErrorResume(error -> {
logger.error("Order processing failed, rolling back", error);
return Mono.error(error);
});
}
// Программные транзакции
public Mono<TransferResult> transferFunds(String fromAccount, String toAccount, BigDecimal amount) {
TransactionalOperator transactionalOperator =
TransactionalOperator.create(transactionManager);
return mongoTemplate.findById(fromAccount, Account.class)
.flatMap(from -> {
if (from.getBalance().compareTo(amount) < 0) {
return Mono.error(new InsufficientFundsException());
}
return mongoTemplate.findById(toAccount, Account.class)
.flatMap(to -> {
from.setBalance(from.getBalance().subtract(amount));
to.setBalance(to.getBalance().add(amount));
return mongoTemplate.save(from)
.then(mongoTemplate.save(to))
.map(savedTo -> new TransferResult(fromAccount, toAccount, amount));
});
})
.as(transactionalOperator::transactional);
}
}
MongoDB Reactive Patterns
@Service
public class MongoReactivePatterns {
// Event Sourcing с MongoDB
public Flux<AggregateSnapshot> eventSourcingReplay(String aggregateId) {
return mongoTemplate.find(
Query.query(Criteria.where("aggregateId").is(aggregateId))
.with(Sort.by("timestamp")),
DomainEvent.class
)
.scan(AggregateSnapshot.empty(), (snapshot, event) ->
snapshot.apply(event)
)
.skip(1); // Пропускаем начальное пустое состояние
}
// CQRS Read Model projections
public Flux<ReadModelUpdate> projectToReadModel() {
return mongoTemplate.changeStream("domain_events",
ChangeStreamOptions.empty(), DomainEvent.class)
.map(ChangeStreamEvent::getBody)
.flatMap(event ->
switch (event.getEventType()) {
case "UserCreated" -> updateUserReadModel(event);
case "OrderPlaced" -> updateOrderReadModel(event);
case "PaymentProcessed" -> updatePaymentReadModel(event);
default -> Mono.empty();
}
)
.onErrorContinue((error, event) ->
logger.error("Failed to project event: {}", event, error)
);
}
// Optimistic locking с версионированием
public Mono<Document> optimisticUpdate(String id, Function<Document, Document> updateFunction) {
return mongoTemplate.findById(id, Document.class)
.flatMap(document -> {
long currentVersion = document.getLong("version");
Document updated = updateFunction.apply(document);
updated.put("version", currentVersion + 1);
Query query = Query.query(
Criteria.where("_id").is(id)
.and("version").is(currentVersion)
);
return mongoTemplate.findAndReplace(query, updated)
.switchIfEmpty(Mono.error(new OptimisticLockingException(id)));
})
.retry(3);
}
}
Интеграционные паттерны
Orchestration Service - координация между технологиями
@Service
public class IntegrationOrchestrationService {
private final ReactiveKafkaProducerService kafkaProducer;
private final ReactiveRedisPublisher redisPublisher;
private final WebSocketConnectionManager wsManager;
private final ReactiveMongoTemplate mongoTemplate;
// Комплексная обработка события
public Mono<EventProcessingResult> processComplexEvent(BusinessEvent event) {
return Mono.just(event)
// 1. Сохраняем событие в MongoDB
.flatMap(e -> mongoTemplate.save(e, "business_events"))
// 2. Публикуем в Kafka для downstream processing
.flatMap(savedEvent ->
kafkaProducer.sendMessage("business-events", event.getId(), event)
.map(result -> savedEvent)
)
// 3. Уведомляем через Redis PubSub
.flatMap(savedEvent ->
redisPublisher.publishMessage("event-notifications",
new EventNotification(event.getType(), event.getId()))
.map(subscribers -> savedEvent)
)
// 4. Отправляем real-time уведомления через WebSocket
.flatMap(savedEvent ->
wsManager.broadcastToTopic("events:" + event.getType(),
new RealtimeEvent(event))
.thenReturn(savedEvent)
)
// 5. Формируем результат
.map(savedEvent -> new EventProcessingResult(
savedEvent.getId(),
ProcessingStatus.COMPLETED,
Instant.now()
))
.doOnSuccess(result ->
logger.info("Complex event processing completed: {}", result)
)
.onErrorResume(error -> {
logger.error("Complex event processing failed", error);
return handleEventProcessingError(event, error);
});
}
// Saga pattern для распределенных операций
public Mono<SagaResult> executeSaga(SagaDefinition saga) {
return Flux.fromIterable(saga.getSteps())
.concatMap(step -> executeStep(step)
.onErrorResume(error ->
compensateStep(step)
.then(Mono.error(new SagaExecutionException(step, error)))
)
)
.then(Mono.just(SagaResult.success(saga.getId())))
.onErrorResume(SagaExecutionException.class, error ->
compensateExecutedSteps(saga, error.getFailedStep())
.then(Mono.just(SagaResult.failed(saga.getId(), error)))
);
}
private Mono<StepResult> executeStep(SagaStep step) {
return switch (step.getType()) {
case KAFKA_PUBLISH -> executeKafkaStep(step);
case MONGO_UPDATE -> executeMongoStep(step);
case REDIS_CACHE -> executeRedisStep(step);
case WEBSOCKET_NOTIFY -> executeWebSocketStep(step);
default -> Mono.error(new UnsupportedStepException(step.getType()));
};
}
}
Error Handling и Circuit Breaker
@Service
public class IntegrationResilienceService {
private final CircuitBreaker kafkaCircuitBreaker = CircuitBreaker.ofDefaults("kafka");
private final CircuitBreaker redisCircuitBreaker = CircuitBreaker.ofDefaults("redis");
private final CircuitBreaker mongoCircuitBreaker = CircuitBreaker.ofDefaults("mongo");
// Resilient integration с fallback стратегиями
public Mono<NotificationResult> sendResilientNotification(Notification notification) {
return sendPrimaryNotification(notification)
.onErrorResume(this::sendFallbackNotification)
.timeout(Duration.ofSeconds(30))
.doOnSuccess(result ->
logger.info("Notification sent successfully: {}", result.getMethod())
);
}
private Mono<NotificationResult> sendPrimaryNotification(Notification notification) {
// Пробуем отправить через WebSocket (primary)
return Mono.fromCallable(() ->
kafkaCircuitBreaker.executeSupplier(() ->
wsManager.broadcastToTopic("notifications", notification)
)
)
.flatMap(broadcast -> Mono.just(NotificationResult.websocket()))
.onErrorResume(error ->
// Fallback к Redis PubSub
sendViaRedis(notification)
);
}
private Mono<NotificationResult> sendViaRedis(Notification notification) {
return Mono.fromCallable(() ->
redisCircuitBreaker.executeSupplier(() ->
redisPublisher.publishMessage("notifications", notification)
)
)
.flatMap(result -> Mono.just(NotificationResult.redis()))
.onErrorResume(error ->
// Последний fallback - сохранение в MongoDB для retry
saveForLaterDelivery(notification)
);
}
private Mono<NotificationResult> saveForLaterDelivery(Notification notification) {
return mongoTemplate.save(
new PendingNotification(notification, Instant.now()),
"pending_notifications"
)
.map(saved -> NotificationResult.deferred());
}
// Retry логика с exponential backoff
public <T> Mono<T> withRetry(Mono<T> operation, String operationName) {
return operation
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.filter(error -> isRetryableError(error))
.doBeforeRetry(retrySignal ->
logger.warn("Retrying {} (attempt {})",
operationName, retrySignal.totalRetries() + 1)
)
)
.doOnError(error ->
logger.error("Operation {} failed after retries", operationName, error)
);
}
// Health checks для всех интеграций
@Scheduled(fixedRate = 60000)
public void performHealthChecks() {
Mono.zip(
checkKafkaHealth(),
checkRedisHealth(),
checkMongoHealth(),
checkWebSocketHealth()
)
.subscribe(
tuple -> {
HealthStatus overallHealth = calculateOverallHealth(
tuple.getT1(), tuple.getT2(), tuple.getT3(), tuple.getT4()
);
updateHealthMetrics(overallHealth);
},
error -> logger.error("Health check failed", error)
);
}
}
Мониторинг и метрики
@Component
public class IntegrationMonitoringService {
private final MeterRegistry meterRegistry;
private final Counter kafkaMessages;
private final Counter redisOperations;
private final Timer mongoOperations;
private final Gauge activeWebSocketConnections;
public IntegrationMonitoringService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.kafkaMessages = Counter.builder("kafka.messages.total")
.description("Total Kafka messages processed")
.register(meterRegistry);
this.redisOperations = Counter.builder("redis.operations.total")
.description("Total Redis operations")
.register(meterRegistry);
this.mongoOperations = Timer.builder("mongo.operations.duration")
.description("MongoDB operation duration")
.register(meterRegistry);
this.activeWebSocketConnections = Gauge.builder("websocket.connections.active")
.description("Active WebSocket connections")
.register(meterRegistry, this, IntegrationMonitoringService::getActiveConnections);
}
// Метрики для Kafka
public <T> Mono<T> monitorKafkaOperation(Mono<T> operation, String operationType) {
return operation
.doOnSubscribe(sub -> kafkaMessages.increment(Tags.of("operation", operationType, "status", "started")))
.doOnSuccess(result -> kafkaMessages.increment(Tags.of("operation", operationType, "status", "success")))
.doOnError(error -> kafkaMessages.increment(Tags.of("operation", operationType, "status", "error")))
.name("kafka." + operationType)
.metrics();
}
// Метрики для MongoDB с timing
public <T> Mono<T> monitorMongoOperation(Mono<T> operation, String collection, String operationType) {
Timer.Sample sample = Timer.start(meterRegistry);
return operation
.doOnTerminate(() ->
sample.stop(Timer.builder("mongo.operation.duration")
.tag("collection", collection)
.tag("operation", operationType)
.register(meterRegistry))
)
.name("mongo." + collection + "." + operationType)
.metrics();
}
// Distributed tracing для комплексных операций
public <T> Mono<T> withTracing(Mono<T> operation, String operationName) {
return operation
.doOnSubscribe(sub ->
logger.info("Starting operation: {} [trace-id: {}]",
operationName, getCurrentTraceId())
)
.doOnSuccess(result ->
logger.info("Completed operation: {} [trace-id: {}]",
operationName, getCurrentTraceId())
)
.doOnError(error ->
logger.error("Failed operation: {} [trace-id: {}]",
operationName, getCurrentTraceId(), error)
);
}
private double getActiveConnections() {
// Реализация подсчета активных WebSocket соединений
return wsManager.getActiveConnectionsCount();
}
// Пользовательские метрики бизнес-логики
public void recordBusinessMetric(String eventType, String status, double value) {
Timer.builder("business.event.processing")
.tag("event.type", eventType)
.tag("status", status)
.register(meterRegistry)
.record(value, TimeUnit.MILLISECONDS);
}
}
Лучшие практики интеграции
Конфигурация и настройка
// Оптимальные настройки для production
@ConfigurationProperties(prefix = "app.reactive.integration")
@Data
public class ReactiveIntegrationProperties {
private KafkaProperties kafka = new KafkaProperties();
private RedisProperties redis = new RedisProperties();
private MongoProperties mongo = new MongoProperties();
private WebSocketProperties websocket = new WebSocketProperties();
@Data
public static class KafkaProperties {
private int bufferSize = 1000;
private Duration commitInterval = Duration.ofSeconds(5);
private int maxConcurrency = 10;
private Duration timeout = Duration.ofSeconds(30);
}
@Data
public static class RedisProperties {
private int maxConnections = 20;
private Duration commandTimeout = Duration.ofSeconds(5);
private int bufferSize = 1000;
}
@Data
public static class MongoProperties {
private int maxPoolSize = 50;
private int minPoolSize = 10;
private Duration maxConnectionIdleTime = Duration.ofMinutes(30);
private int bufferSize = 1000;
}
@Data
public static class WebSocketProperties {
private int maxSessions = 10000;
private int bufferSize = 1000;
private Duration heartbeatInterval = Duration.ofSeconds(30);
}
}
Graceful Shutdown
@Component
public class ReactiveIntegrationShutdownManager {
private final List<Disposable> disposables = new CopyOnWriteArrayList<>();
private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
@PreDestroy
public void gracefulShutdown() {
logger.info("Starting graceful shutdown of reactive integrations");
shuttingDown.set(true);
// 1. Останавливаем прием новых сообщений
stopAcceptingNewMessages();
// 2. Завершаем обработку текущих сообщений
waitForCurrentProcessingToComplete();
// 3. Освобождаем все ресурсы
disposables.forEach(Disposable::dispose);
logger.info("Graceful shutdown completed");
}
public void registerDisposable(Disposable disposable) {
if (!shuttingDown.get()) {
disposables.add(disposable);
}
}
private void stopAcceptingNewMessages() {
// Логика остановки consumers
}
private void waitForCurrentProcessingToComplete() {
// Ожидание завершения текущих операций
}
}
Расширенные темы Reactor
Hot vs Cold Publishers
Cold Publishers (Холодные издатели)
Определение: Каждый подписчик получает собственный поток данных с начала. Данные генерируются по требованию при подписке.
Характеристики:
- Ленивые (lazy) - не производят данные без подписчиков
- Каждый subscriber получает полный набор данных
- Данные воспроизводятся для каждого подписчика заново
// Cold Publisher - каждый подписчик получает свою копию
Flux<Integer> coldFlux = Flux.range(1, 5)
.doOnSubscribe(s -> System.out.println("Cold: Подписка"))
.map(i -> {
System.out.println("Cold: Обработка " + i);
return i * 2;
});
// Первый подписчик получает: 2, 4, 6, 8, 10
coldFlux.subscribe(i -> System.out.println("Sub1: " + i));
// Второй подписчик СНОВА получает: 2, 4, 6, 8, 10
coldFlux.subscribe(i -> System.out.println("Sub2: " + i));
Hot Publishers (Горячие издатели)
Определение: Излучают данные независимо от подписчиков. Подписчики получают только данные, излучаемые после момента подписки.
Характеристики:
- Активные (eager) - производят данные даже без подписчиков
- Подписчики получают только "текущие" данные
- Данные могут быть потеряны если нет подписчиков
// Hot Publisher - данные излучаются независимо от подписчиков
Flux<Long> hotFlux = Flux.interval(Duration.ofSeconds(1))
.doOnNext(i -> System.out.println("Hot: Излучение " + i))
.share(); // Превращаем в hot
// Подписчик 1 получает: 0, 1, 2, 3...
hotFlux.subscribe(i -> System.out.println("Sub1: " + i));
Thread.sleep(2500); // Ждем 2.5 секунды
// Подписчик 2 получает только: 3, 4, 5... (пропустил 0, 1, 2)
hotFlux.subscribe(i -> System.out.println("Sub2: " + i));
ConnectableFlux, share(), cache()
ConnectableFlux
Назначение: Позволяет контролировать момент начала излучения данных в hot publisher.
Принцип работы:
- Несколько подписчиков могут подписаться
- Данные начинают излучаться только после вызова
connect()
- Все подписчики получают одинаковые данные
// ConnectableFlux - контролируемый hot publisher
ConnectableFlux<Long> connectableFlux = Flux.interval(Duration.ofSeconds(1))
.doOnNext(i -> System.out.println("Излучение: " + i))
.publish(); // Создаем ConnectableFlux
// Подписываемся, но данные еще не излучаются
connectableFlux.subscribe(i -> System.out.println("Sub1: " + i));
connectableFlux.subscribe(i -> System.out.println("Sub2: " + i));
// Теперь запускаем излучение для всех подписчиков
Disposable connection = connectableFlux.connect();
// Остановка излучения
connection.dispose();
share()
Назначение: Превращает cold publisher в hot, разделяя один поток между несколькими подписчиками.
Поведение:
- Первый подписчик запускает upstream
- Последующие подписчики получают те же данные
- Когда все отписываются, upstream останавливается
Flux<Integer> shared = Flux.range(1, 5)
.doOnSubscribe(s -> System.out.println("Upstream запущен"))
.doOnCancel(() -> System.out.println("Upstream остановлен"))
.share();
// Первый подписчик запускает upstream
Disposable sub1 = shared.subscribe(i -> System.out.println("Sub1: " + i));
// Второй получает те же данные
Disposable sub2 = shared.subscribe(i -> System.out.println("Sub2: " + i));
// Когда все отписываются, upstream останавливается
sub1.dispose();
sub2.dispose();
cache()
Назначение: Кэширует излученные значения и воспроизводит их для новых подписчиков.
Варианты использования:
cache()
- кэширует все значенияcache(int)
- кэширует последние N значенийcache(Duration)
- кэширует на определенное время
// Кэширование всех значений
Flux<Integer> cached = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("Выполнение upstream"))
.map(i -> i * 2)
.cache();
// Первый подписчик - выполняется upstream
cached.subscribe(i -> System.out.println("Sub1: " + i));
// Второй подписчик - получает кэшированные значения
cached.subscribe(i -> System.out.println("Sub2: " + i));
// Кэширование с ограничениями
Flux<Integer> timedCache = source.cache(Duration.ofSeconds(5)); // 5 секунд
Flux<Integer> limitedCache = source.cache(10); // Последние 10 элементов
Schedulers.fromExecutor() для кастомных пулов
Зачем нужны кастомные пулы
Проблемы стандартных Schedulers:
- Ограниченный контроль над параметрами пула
- Невозможность настройки специфичных для приложения характеристик
- Сложность мониторинга и отладки
Создание кастомного Scheduler
// Создание ExecutorService с кастомными параметрами
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
5, // corePoolSize
20, // maximumPoolSize
60L, TimeUnit.SECONDS, // keepAliveTime
new LinkedBlockingQueue<>(100), // workQueue
new ThreadFactory() { // threadFactory
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "custom-reactor-" + threadNumber.getAndIncrement());
t.setDaemon(true);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // rejectedExecutionHandler
);
// Создание Scheduler из ExecutorService
Scheduler customScheduler = Schedulers.fromExecutor(customExecutor);
// Использование кастомного scheduler
Flux.range(1, 10)
.subscribeOn(customScheduler)
.publishOn(customScheduler)
.subscribe(i -> System.out.println("Thread: " +
Thread.currentThread().getName() + ", Value: " + i));
Специализированные пулы для разных задач
public class SchedulerConfig {
// Для CPU-интенсивных задач
public static final Scheduler CPU_INTENSIVE = Schedulers.fromExecutor(
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
);
// Для I/O операций
public static final Scheduler IO_OPERATIONS = Schedulers.fromExecutor(
new ThreadPoolExecutor(10, 100, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>())
);
// Для быстрых задач
public static final Scheduler FAST_TASKS = Schedulers.fromExecutor(
new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2)
);
}
// Использование специализированных пулов
Flux.fromIterable(data)
.subscribeOn(SchedulerConfig.IO_OPERATIONS) // Чтение данных
.map(this::processData) // CPU обработка
.publishOn(SchedulerConfig.CPU_INTENSIVE)
.flatMap(this::saveToDatabase) // I/O операция
.publishOn(SchedulerConfig.IO_OPERATIONS)
.subscribe();
Контекст: Context API, передача данных между операторами
Context API - основы
Назначение: Неизменяемая структура данных для передачи метаданных через reactive chain без изменения сигнатуры методов.
Принципы работы:
- Иммутабельный (неизменяемый)
- Распространяется upstream (снизу вверх)
- Доступен во всех операторах
// Создание и использование Context
Mono<String> mono = Mono.just("Hello")
.flatMap(s -> Mono.deferContextual(ctx -> {
String userId = ctx.get("userId");
return Mono.just(s + " " + userId);
}))
.contextWrite(Context.of("userId", "user123"));
mono.subscribe(System.out::println); // Вывод: Hello user123
Передача данных между операторами
// Комплексный пример с Context
Flux<String> processWithContext = Flux.just("data1", "data2")
.doOnNext(data -> {
// Доступ к контексту в операторе
String traceId = ContextView.current().get("traceId");
System.out.println("Processing " + data + " with trace: " + traceId);
})
.flatMap(data ->
Mono.deferContextual(ctx -> {
String userId = ctx.get("userId");
String traceId = ctx.get("traceId");
return processData(data, userId, traceId);
})
)
.contextWrite(ctx -> ctx.put("timestamp", Instant.now())) // Добавляем timestamp
.contextWrite(Context.of("userId", "user123", "traceId", "trace-456"));
Практические применения Context
1. Трассировка запросов:
// Добавление trace ID в контекст
public Mono<ResponseEntity<String>> handleRequest(String traceId) {
return processRequest()
.contextWrite(Context.of("traceId", traceId))
.doOnNext(result -> logResult(result)) // traceId доступен в логгере
.map(ResponseEntity::ok);
}
// Логирование с контекстом
private void logResult(String result) {
Mono.deferContextual(ctx -> {
String traceId = ctx.get("traceId");
log.info("Request {} completed with result: {}", traceId, result);
return Mono.empty();
}).subscribe();
}
2. Аутентификация и авторизация:
// Передача информации о пользователе
public Mono<String> secureOperation() {
return Mono.deferContextual(ctx -> {
User user = ctx.get("currentUser");
if (user.hasPermission("READ")) {
return performOperation();
}
return Mono.error(new SecurityException("Access denied"));
});
}
// Использование
secureOperation()
.contextWrite(Context.of("currentUser", getCurrentUser()))
.subscribe();
3. Управление транзакциями:
// Передача транзакционного контекста
public Mono<Void> transactionalOperation() {
return Mono.deferContextual(ctx -> {
DatabaseConnection connection = ctx.get("dbConnection");
return performDatabaseOperation(connection);
});
}
// Использование в транзакции
transactionManager.withTransaction(connection ->
transactionalOperation()
.contextWrite(Context.of("dbConnection", connection))
);
Паттерны работы с Context
Композиция контекстов:
// Объединение нескольких источников контекста
Context baseContext = Context.of("app", "myApp");
Context userContext = Context.of("userId", "123");
Context requestContext = Context.of("requestId", "req-456");
// Объединение контекстов
Context fullContext = baseContext
.putAll(userContext)
.putAll(requestContext);
Условное добавление в контекст:
Mono<String> conditionalContext = Mono.just("data")
.contextWrite(ctx -> {
if (needsTracing()) {
return ctx.put("traceId", generateTraceId());
}
return ctx;
});
Важные моменты для собеседования
- Context распространяется upstream - от подписчика к источнику
- Context иммутабелен - каждое изменение создает новый экземпляр
- Context не заменяет параметры методов - используется для метаданных
- Доступ к Context через
deferContextual()
- для reactive доступа - Context доступен во всех операторах - через
contextWrite()