Корутины и асинхронность

Основы корутин

Suspend функции

Suspend функции - специальные функции, которые могут быть приостановлены и возобновлены без блокировки потока. Основа асинхронного программирования в Kotlin.

// Suspend функция - может вызывать другие suspend функции
suspend fun fetchUser(id: Long): User {
    delay(1000)  // приостанавливает выполнение на 1 секунду
    return userRepository.findById(id)
}

// Обычная функция НЕ может вызывать suspend функции напрямую
fun regularFunction() {
    // fetchUser(1)  // ОШИБКА - нужен coroutine scope
}

// Suspend функции можно вызывать только из:
// 1. Других suspend функций
// 2. Корутин (launch, async)
// 3. runBlocking (только для тестов/main)

CoroutineScope vs SupervisorScope

CoroutineScope и SupervisorScope определяют область жизни корутин и стратегию обработки ошибок.

// coroutineScope - все дочерние корутины связаны
suspend fun processData() = coroutineScope {
    val job1 = launch { task1() }
    val job2 = launch { task2() }
    // Если task1 упадет с ошибкой, task2 тоже будет отменена
}

// supervisorScope - дочерние корутины независимы
suspend fun processDataIndependently() = supervisorScope {
    val job1 = launch { task1() }
    val job2 = launch { task2() }
    // Если task1 упадет, task2 продолжит работу
}

// Практический пример: обработка списка независимых задач
suspend fun processUsers(users: List<User>) = supervisorScope {
    users.map { user ->
        launch {
            try {
                processUser(user)  // если обработка одного пользователя упадет
            } catch (e: Exception) {
                logger.error("Failed to process user ${user.id}", e)
            }
        }
    }.joinAll()  // дождаться завершения всех задач
}

Билдеры корутин

Launch - "fire and forget"

Launch создает корутину, которая выполняется параллельно и не возвращает результат. Используется для side effects.

// Простой launch
val job = GlobalScope.launch {
    println("Running in coroutine")
    delay(1000)
    println("Coroutine completed")
}

// Launch с обработкой ошибок
val job = CoroutineScope(Dispatchers.IO).launch {
    try {
        heavyOperation()
    } catch (e: Exception) {
        logger.error("Operation failed", e)
    }
}

job.cancel()  // отмена корутины
job.join()    // ожидание завершения

Async - возврат результата

Async создает корутину, которая возвращает результат через Deferred<T>. Используется когда нужен результат вычислений.

// Async для получения результата
suspend fun fetchUserData(userId: Long): UserData = coroutineScope {
    val userDeferred = async { userService.getUser(userId) }
    val profileDeferred = async { profileService.getProfile(userId) }
    
    // await() дожидается результата
    val user = userDeferred.await()
    val profile = profileDeferred.await()
    
    UserData(user, profile)
}

// Параллельное выполнение нескольких задач
suspend fun processMultipleUsers(ids: List<Long>): List<User> = coroutineScope {
    ids.map { id ->
        async { userService.getUser(id) }  // запускаем все параллельно
    }.awaitAll()  // дожидаемся всех результатов
}

WithContext - смена контекста

WithContext переключает выполнение на другой контекст и возвращает результат.

suspend fun saveToDatabase(data: Data): Long = withContext(Dispatchers.IO) {
    // Переключаемся на IO поток для работы с БД
    database.save(data).id
}

suspend fun computeHash(data: ByteArray): String = withContext(Dispatchers.Default) {
    // Переключаемся на CPU-интенсивный поток
    MessageDigest.getInstance("SHA-256").digest(data).toHexString()
}

// Комбинирование контекстов
suspend fun processFile(file: File): ProcessResult {
    val content = withContext(Dispatchers.IO) {
        file.readText()  // IO операция
    }
    
    val processed = withContext(Dispatchers.Default) {
        heavyProcessing(content)  // CPU операция
    }
    
    return withContext(Dispatchers.IO) {
        saveResult(processed)  // снова IO операция
    }
}

Контексты выполнения (Dispatchers)

Типы Dispatchers

Dispatchers определяют на каких потоках будут выполняться корутины.

// Dispatchers.Default - CPU-интенсивные задачи
launch(Dispatchers.Default) {
    val result = heavyComputation()  // вычисления, сортировка, парсинг
}

// Dispatchers.IO - блокирующие I/O операции
launch(Dispatchers.IO) {
    val data = database.query()     // БД запросы
    val response = httpClient.get() // HTTP запросы
    val content = file.readText()   // файловые операции
}

// Dispatchers.Unconfined - наследует поток вызывающего
launch(Dispatchers.Unconfined) {
    println("Thread: ${Thread.currentThread().name}")
    delay(100)
    println("Thread after delay: ${Thread.currentThread().name}")  // может измениться
}

// Dispatchers.Main - UI поток (Android/Desktop)
launch(Dispatchers.Main) {
    updateUI()  // обновление пользовательского интерфейса
}

Кастомные Dispatchers

// Ограниченный пул потоков для специфических задач
val dbDispatcher = Dispatchers.IO.limitedParallelism(5)

// Собственный Executor
val customDispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()

launch(dbDispatcher) {
    heavyDatabaseOperation()
}

// Не забывайте закрывать кастомные диспетчеры
customDispatcher.close()

Обработка исключений

Exception Propagation

Исключения в корутинах распространяются по иерархии и могут отменять связанные корутины.

// В coroutineScope исключение отменяет все дочерние корутины
suspend fun fragileOperation() = coroutineScope {
    launch { 
        delay(100)
        throw RuntimeException("Task 1 failed") 
    }
    launch { 
        delay(200)
        println("Task 2 completed")  // НЕ выполнится - будет отменена
    }
}

// В supervisorScope исключения изолированы
suspend fun resilientOperation() = supervisorScope {
    launch { 
        delay(100)
        throw RuntimeException("Task 1 failed") 
    }
    launch { 
        delay(200)
        println("Task 2 completed")  // выполнится независимо от Task 1
    }
}

Exception Handlers

// CoroutineExceptionHandler для необработанных исключений
val handler = CoroutineExceptionHandler { _, exception ->
    logger.error("Uncaught exception", exception)
}

// Применение на уровне scope
val scope = CoroutineScope(Dispatchers.Default + handler)
scope.launch {
    throw RuntimeException("This will be caught by handler")
}

// Try-catch в корутинах
suspend fun safeOperation() {
    try {
        riskyOperation()
    } catch (e: CustomException) {
        handleSpecificException(e)
    } catch (e: Exception) {
        logger.error("Unexpected error", e)
        throw e  // пробрасываем дальше если нужно
    }
}

Structured Concurrency

Принципы структурированного параллелизма

Structured Concurrency обеспечивает предсказуемое управление жизненным циклом корутин через иерархию scope'ов.

class UserService {
    private val serviceScope = CoroutineScope(
        Dispatchers.Default + SupervisorJob() + 
        CoroutineExceptionHandler { _, e -> logger.error("Service error", e) }
    )
    
    suspend fun processUser(userId: Long) = serviceScope.async {
        // Все дочерние корутины будут отменены при отмене родительской
        val userData = fetchUserData(userId)
        val notifications = fetchNotifications(userId)
        
        UserProcessingResult(userData, notifications)
    }
    
    fun shutdown() {
        serviceScope.cancel()  // отменяет все активные корутины
    }
}

// Автоматическая отмена при выходе из scope
suspend fun processWithTimeout() = withTimeout(5000) {
    // Если операция займет больше 5 секунд, все дочерние корутины будут отменены
    coroutineScope {
        launch { longOperation1() }
        launch { longOperation2() }
    }
}

Flow, Channel, SharedFlow/StateFlow

Flow - холодные асинхронные потоки

Flow - холодный поток данных, который начинает эмиссию только при подписке.

// Создание Flow
fun numbersFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(1000)
        emit(i)  // эмиссия значения
    }
}

// Операторы трансформации
fun processNumbers(): Flow<String> = numbersFlow()
    .filter { it % 2 == 0 }           // фильтрация
    .map { "Number: $it" }            // трансформация
    .flowOn(Dispatchers.Default)      // смена контекста

// Потребление Flow
suspend fun consumeFlow() {
    processNumbers().collect { value ->
        println(value)  // обработка каждого элемента
    }
}

// Flow для работы с БД
fun userUpdatesFlow(userId: Long): Flow<User> = flow {
    while (currentCoroutineContext().isActive) {
        emit(userRepository.findById(userId))
        delay(30000)  // обновление каждые 30 секунд
    }
}

Channel - горячие каналы связи

Channel - горячий канал для передачи данных между корутинами с буферизацией.

// Создание канала
val channel = Channel<String>(capacity = Channel.BUFFERED)

// Producer корутина
launch {
    for (i in 1..5) {
        channel.send("Message $i")
        delay(1000)
    }
    channel.close()  // закрытие канала
}

// Consumer корутина
launch {
    for (message in channel) {  // итерация до закрытия
        println("Received: $message")
    }
}

// Типы каналов по емкости
val unlimited = Channel<Int>(Channel.UNLIMITED)    // без ограничений
val conflated = Channel<Int>(Channel.CONFLATED)    // только последнее значение
val rendezvous = Channel<Int>(Channel.RENDEZVOUS)  // синхронная передача

SharedFlow и StateFlow - горячие потоки

SharedFlow - горячий поток для broadcasting событий множественным подписчикам. StateFlow - специализированный SharedFlow для состояния с conflation.

// SharedFlow для событий
class EventBus {
    private val _events = MutableSharedFlow<Event>(
        replay = 0,           // количество кешируемых событий
        extraBufferCapacity = 64  // дополнительный буфер
    )
    val events: SharedFlow<Event> = _events.asSharedFlow()
    
    fun publishEvent(event: Event) {
        _events.tryEmit(event)
    }
}

// StateFlow для состояния
class UserViewModel {
    private val _userState = MutableStateFlow<UserState>(UserState.Loading)
    val userState: StateFlow<UserState> = _userState.asStateFlow()
    
    suspend fun loadUser(id: Long) {
        _userState.value = UserState.Loading
        try {
            val user = userRepository.getUser(id)
            _userState.value = UserState.Success(user)
        } catch (e: Exception) {
            _userState.value = UserState.Error(e.message)
        }
    }
}

// Подписка на StateFlow
launch {
    userViewModel.userState.collect { state ->
        when (state) {
            is UserState.Loading -> showLoader()
            is UserState.Success -> displayUser(state.user)
            is UserState.Error -> showError(state.message)
        }
    }
}

Сравнение Flow vs Channel vs SharedFlow/StateFlow

Тип Холодный/Горячий Подписчики Использование
Flow Холодный Один Последовательная обработка данных
Channel Горячий Один-к-одному Связь между корутинами
SharedFlow Горячий Множественные События, уведомления
StateFlow Горячий Множественные Состояние UI, кеширование
// Выбор правильного инструмента:

// Flow - для трансформации данных
fun processFileFlow(file: File): Flow<ProcessedLine> = flow {
    file.useLines { lines ->
        lines.forEach { line ->
            emit(processLine(line))
        }
    }
}

// Channel - для producer-consumer паттерна
suspend fun processWithWorkers() {
    val workChannel = Channel<Work>(100)
    val resultChannel = Channel<Result>(100)
    
    // Workers
    repeat(5) {
        launch { worker(workChannel, resultChannel) }
    }
    
    // Producer
    launch { produceWork(workChannel) }
    
    // Results collector
    launch { collectResults(resultChannel) }
}

// SharedFlow - для событий системы
class NotificationService {
    private val _notifications = MutableSharedFlow<Notification>()
    val notifications = _notifications.asSharedFlow()
    
    fun sendNotification(notification: Notification) {
        _notifications.tryEmit(notification)
    }
}

// StateFlow - для состояния приложения
class AppState {
    private val _currentUser = MutableStateFlow<User?>(null)
    val currentUser = _currentUser.asStateFlow()
    
    fun setUser(user: User) {
        _currentUser.value = user
    }
}

Ключевые моменты для собеседования:

  • Suspend функции не блокируют потоки, а приостанавливают выполнение
  • CoroutineScope vs SupervisorScope определяют стратегию обработки ошибок
  • Dispatchers выбираются по типу задач: IO для блокирующих операций, Default для CPU
  • Structured Concurrency обеспечивает предсказуемое управление жизненным циклом
  • Flow холодный и один-к-одному, SharedFlow/StateFlow горячие и один-ко-многим

Основы корутин

Что такое корутина?

Корутина (Coroutine) — это легковесная единица асинхронного выполнения, которая может быть приостановлена (suspended) и возобновлена позже без блокировки потока. Корутины позволяют писать асинхронный код в последовательном стиле.

Ключевые концепции:

  • Suspension — возможность приостановить выполнение функции
  • Non-blocking — не блокируют поток во время ожидания
  • Structured concurrency — корутины организованы в иерархию с автоматической отменой
  • Cooperative multitasking — корутины добровольно отдают управление
// Простая корутина
suspend fun fetchUser(id: String): User {
    delay(1000) // приостанавливает корутину, НЕ блокирует поток
    return userRepository.findById(id)
}

Отличия корутин от потоков

Корутины как "Lightweight Threads"

Аспект Thread Coroutine
Вес ~1-8 MB стека ~100 байт объекта
Создание Дорогое (системный вызов) Очень дешёвое
Переключение контекста Дорогое (~µs) Быстрое (~ns)
Количество Тысячи Миллионы
Управление OS Scheduler Корутинный диспетчер

Практическое сравнение

// Потоки — дорого и ограниченно
repeat(100_000) {
    thread { 
        Thread.sleep(1000)
        println("Thread $it")
    }
} // OutOfMemoryError!

// Корутины — легко и масштабируемо
repeat(100_000) {
    GlobalScope.launch {
        delay(1000) // не блокирует поток
        println("Coroutine $it")
    }
} // Работает отлично!

Почему корутины легковесные:

  • Стек на куче — нет выделения OS стека
  • Кооперативная многозадачность — нет preemptive switching
  • Один поток может выполнять тысячи корутин
  • Suspension points — приостановка только в определённых местах

Сравнение с классическими подходами

Thread vs Coroutine

// Thread — блокирующий подход
class ThreadExample {
    fun processRequests() {
        repeat(1000) { id ->
            thread {
                val result = blockingHttpCall(id) // блокирует поток
                processResult(result)
            }
        }
    }
}

// Coroutine — неблокирующий подход
class CoroutineExample {
    suspend fun processRequests() {
        repeat(1000) { id ->
            launch {
                val result = suspendingHttpCall(id) // не блокирует поток
                processResult(result)
            }
        }
    }
}

Ключевые отличия:

  • Thread: 1 поток = 1 задача, блокировка при I/O
  • Coroutine: 1 поток = множество корутин, suspension при I/O

ExecutorService vs CoroutineScope

// ExecutorService — императивное управление потоками
class ExecutorExample {
    private val executor = Executors.newFixedThreadPool(10)
    
    fun processData(): CompletableFuture<String> {
        return CompletableFuture.supplyAsync({
            // сложная цепочка callbacks
            fetchDataFromApi()
                .thenCompose { data -> 
                    processData(data)
                }.thenApply { result ->
                    formatResult(result)
                }
        }, executor)
    }
    
    fun shutdown() {
        executor.shutdown() // ручное управление
    }
}

// CoroutineScope — декларативные корутины
class CoroutineExample : CoroutineScope {
    override val coroutineContext = Dispatchers.IO + SupervisorJob()
    
    suspend fun processData(): String {
        // линейный код, как синхронный
        val data = fetchDataFromApi()
        val processed = processData(data)
        return formatResult(processed)
    }
    
    fun cleanup() {
        coroutineContext.cancel() // автоматическая отмена всех дочерних корутин
    }
}

Преимущества корутин:

  • Читаемость — линейный код вместо callback hell
  • Structured concurrency — автоматическое управление жизненным циклом
  • Exception handling — естественная обработка исключений
  • Cancellation — кооперативная отмена работы

Базовые строительные блоки

Suspend функции

// suspend функция может быть приостановлена
suspend fun loadUserData(userId: String): UserData {
    val profile = loadProfile(userId) // может приостановиться
    val settings = loadSettings(userId) // может приостановиться
    return UserData(profile, settings)
}

// Обычная функция НЕ может вызывать suspend функции
fun regularFunction() {
    // loadUserData() // ОШИБКА! Нужен корутинный контекст
}

Важно: suspend — это маркер для компилятора, что функция может быть приостановлена

Корутинные билдеры

// launch — fire-and-forget, возвращает Job
val job = GlobalScope.launch {
    doSomething()
} // Unit

// async — возвращает Deferred<T> для получения результата
val deferred = GlobalScope.async {
    computeSomething()
} // Deferred<String>
val result = deferred.await()

// runBlocking — блокирует текущий поток (только для тестов/main)
runBlocking {
    val result = suspendingOperation()
    println(result)
}

Dispatchers — где выполняются корутины

// Main — UI поток (Android/Desktop)
launch(Dispatchers.Main) {
    updateUI(data)
}

// IO — операции ввода-вывода (сеть, файлы, БД)
launch(Dispatchers.IO) {
    val data = loadFromDatabase()
}

// Default — CPU-интенсивные задачи
launch(Dispatchers.Default) {
    val result = heavyComputation()
}

// Unconfined — продолжает в том же потоке (осторожно!)
launch(Dispatchers.Unconfined) {
    quickOperation()
}

Structured Concurrency

Иерархия корутин

class UserService : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + job
    
    suspend fun loadUserDashboard(userId: String): Dashboard {
        // Все корутины автоматически отменятся при отмене родительской
        val profile = async { loadProfile(userId) }
        val notifications = async { loadNotifications(userId) }
        val recommendations = async { loadRecommendations(userId) }
        
        return Dashboard(
            profile = profile.await(),
            notifications = notifications.await(),
            recommendations = recommendations.await()
        )
    }
    
    fun shutdown() {
        job.cancel() // отменяет ВСЕ дочерние корутины
    }
}

Принципы Structured Concurrency:

  • Родительская корутина ждёт завершения всех дочерних
  • Отмена распространяется сверху вниз
  • Исключения всплывают снизу вверх
  • Автоматическая очистка ресурсов

Обработка ошибок

// SupervisorJob — изолирует ошибки дочерних корутин
val supervisor = SupervisorJob()
val scope = CoroutineScope(Dispatchers.IO + supervisor)

scope.launch {
    try {
        val result1 = async { riskyOperation1() }
        val result2 = async { riskyOperation2() } // может упасть
        
        // Если result2 упадёт, result1 продолжит работу
        processResults(result1.await(), result2.await())
    } catch (e: Exception) {
        handleError(e)
    }
}

Практические паттерны для Backend

Параллельная обработка запросов

@RestController
class UserController(
    private val userService: UserService
) : CoroutineScope {
    
    override val coroutineContext = Dispatchers.IO + SupervisorJob()
    
    @GetMapping("/users/{id}/details")
    suspend fun getUserDetails(@PathVariable id: String): UserDetails {
        // Параллельное выполнение независимых операций
        val userDeferred = async { userService.findById(id) }
        val ordersDeferred = async { orderService.findByUserId(id) }
        val preferencesDeferred = async { preferencesService.findByUserId(id) }
        
        return UserDetails(
            user = userDeferred.await(),
            orders = ordersDeferred.await(),
            preferences = preferencesDeferred.await()
        )
    }
}

Batch обработка

suspend fun processBatchOfUsers(userIds: List<String>) {
    userIds.chunked(10) // группы по 10
        .forEach { batch ->
            batch.map { userId ->
                async { processUser(userId) } // параллельно внутри группы
            }.awaitAll() // ждём завершения группы
        }
}

Timeout и отмена

suspend fun fetchDataWithTimeout(): ApiResponse = withTimeout(5000) {
    val data = apiClient.fetchData() // автоматически отменится через 5 сек
    processData(data)
}

// Кооперативная отмена
suspend fun longRunningTask() {
    repeat(1000) { iteration ->
        ensureActive() // проверяет, не отменена ли корутина
        doSomeWork(iteration)
        delay(10) // suspension point — место для отмены
    }
}

Ключевые отличия для интервью

Memory Model

// Thread — каждый поток имеет свой стек
Thread.start() // ~1-8 MB памяти

// Coroutine — объект в куче
launch { } // ~100 байт

Blocking vs Suspension

// Blocking — поток заблокирован и не может делать другую работу
Thread.sleep(1000) // поток простаивает

// Suspension — поток освобождается для других корутин
delay(1000) // поток выполняет другие корутины

Scalability

// Thread pool — ограниченное количество потоков
val executor = Executors.newFixedThreadPool(100) // максимум 100 одновременных задач

// Coroutines — практически неограниченная масштабируемость
repeat(1_000_000) { launch { } } // миллион корутин на нескольких потоках

Частые вопросы на собеседовании

"Как корутины решают проблему callback hell?"

// Callback hell
fetchUser(userId) { user ->
    fetchOrders(user.id) { orders ->
        fetchPayments(orders) { payments ->
            updateUI(user, orders, payments)
        }
    }
}

// Корутины — линейный код
val user = fetchUser(userId)
val orders = fetchOrders(user.id)
val payments = fetchPayments(orders)
updateUI(user, orders, payments)

"Чем отличается launch от async?"

  • launch: fire-and-forget, возвращает Job для управления
  • async: возвращает Deferred для получения результата

"Как работает Structured Concurrency?"

  • Родительская корутина контролирует жизненный цикл дочерних
  • Автоматическая отмена при завершении родителя
  • Распространение исключений вверх по иерархии

"Производительность корутин vs потоков?"

  • Создание: корутины в 1000х раз быстрее
  • Memory overhead: корутины в 1000х меньше
  • Context switching: корутины на порядки быстрее
  • Throughput: корутины поддерживают миллионы одновременных операций

Главное преимущество: Корутины позволяют писать высокопроизводительный асинхронный код в синхронном стиле, что критично для современных backend систем с высокой нагрузкой.

Ключевые строительные блоки корутин

Suspend функции

Suspend функция — это функция, которая может быть приостановлена и возобновлена позже без блокировки потока. Ключевое слово suspend указывает компилятору, что функция может приостанавливать выполнение в suspension points.

Базовые принципы

// suspend функция может вызывать другие suspend функции
suspend fun fetchUserData(id: String): UserData {
    val profile = fetchProfile(id)    // suspension point
    val settings = fetchSettings(id)  // suspension point
    return UserData(profile, settings)
}

// Обычная функция НЕ может вызывать suspend функции напрямую
fun normalFunction() {
    // fetchUserData("123") // ОШИБКА компиляции!
}

Что происходит под капотом:

  • Компилятор преобразует suspend функцию в state machine
  • В каждом suspension point функция может сохранить состояние и освободить поток
  • При возобновлении состояние восстанавливается и выполнение продолжается

Suspension Points

suspend fun processData(): String {
    println("Начало обработки")
    delay(1000)              // suspension point - корутина приостанавливается
    println("Обработка завершена")
    return apiCall()         // suspension point - если apiCall() suspend
}

Важно: Приостановка происходит только в явных suspension points — вызовах других suspend функций.


CoroutineScope

CoroutineScope — это интерфейс, который определяет область жизни корутин. Обеспечивает structured concurrency — корутины организованы в иерархию с автоматическим управлением жизненным циклом.

Основные реализации

// GlobalScope — живёт всё время жизни приложения (осторожно!)
GlobalScope.launch { 
    // может пережить Activity/Fragment
}

// ViewModelScope — автоматически отменяется при очистке ViewModel
class UserViewModel : ViewModel() {
    fun loadData() {
        viewModelScope.launch {
            // автоматически отменится при onCleared()
        }
    }
}

// Кастомный scope с контролируемым жизненным циклом
class UserService : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + job
    
    fun cleanup() {
        job.cancel() // отменяет ВСЕ дочерние корутины
    }
}

Принципы Structured Concurrency:

  • Родительская корутина ждёт завершения всех дочерних
  • Отмена распространяется от родителя к детям
  • Исключения всплывают от детей к родителю
  • Автоматическая очистка при завершении scope

Корутинные билдеры

launch - Fire and Forget

// launch возвращает Job для управления корутиной
val job: Job = scope.launch {
    repeat(1000) {
        doWork(it)
        delay(100)
    }
}

// Управление выполнением
job.cancel()           // отменить корутину
job.join()             // дождаться завершения
job.isActive           // проверить статус

Когда использовать launch:

  • Side effects — обновление UI, логирование, отправка уведомлений
  • Fire-and-forget операции
  • Когда результат выполнения не нужен

async - Параллельное выполнение с результатом

// async возвращает Deferred<T> для получения результата
val userDeferred: Deferred<User> = scope.async { fetchUser(id) }
val ordersDeferred: Deferred<List<Order>> = scope.async { fetchOrders(id) }

// Получение результатов (параллельно загружались)
val user = userDeferred.await()
val orders = ordersDeferred.await()

Когда использовать async:

  • Параллельные вычисления с возвратом результата
  • Композиция результатов из нескольких источников
  • Когда нужно дождаться результата выполнения

runBlocking - Мост между мирами

// Блокирует текущий поток до завершения всех корутин
fun main() = runBlocking {
    val result = async { heavyComputation() }
    println("Результат: ${result.await()}")
}

// В тестах
@Test
fun testSuspendFunction() = runBlocking {
    val result = suspendingFunction()
    assertEquals(expected, result)
}

Когда использовать runBlocking:

  • Точка входа в корутинный мир (main функция)
  • Тесты suspend функций
  • Интеграция с legacy кодом

Важно: НЕ используйте runBlocking в production коде — блокирует поток!


Job и Deferred

Job - Управление жизненным циклом

val job = launch {
    try {
        longRunningTask()
    } catch (e: CancellationException) {
        cleanup() // выполняется при отмене
        throw e   // важно перебросить
    }
}

// Управление Job
job.cancel()                    // отменить
job.cancelAndJoin()            // отменить и дождаться завершения
job.join()                     // дождаться завершения без отмены

// Состояния Job
job.isActive                   // выполняется ли
job.isCancelled               // отменена ли
job.isCompleted               // завершена ли

Deferred - Job с результатом

val deferred: Deferred<String> = async {
    expensiveComputation()
}

// Получение результата
val result = deferred.await()           // приостанавливает до получения результата
val resultOrNull = deferred.getCompleted() // не приостанавливает, может бросить исключение

// Deferred наследует Job
deferred.cancel()              // можно отменить
deferred.join()                // можно дождаться завершения

Ключевые отличия:

  • Job: управление выполнением, без результата
  • Deferred: Job + возможность получить результат через await()

CoroutineContext

CoroutineContext — это индексированный набор элементов, который определяет контекст выполнения корутины: диспетчер, Job, обработчик исключений и другие параметры.

Композиция контекста

// Элементы контекста
val context = Dispatchers.IO +                    // диспетчер
              SupervisorJob() +                   // Job
              CoroutineName("UserService") +      // имя для отладки
              CoroutineExceptionHandler { _, ex -> // обработчик ошибок
                  log.error("Coroutine failed", ex)
              }

// Создание scope с контекстом
val scope = CoroutineScope(context)

Наследование и переопределение

class ApiService : CoroutineScope {
    override val coroutineContext = Dispatchers.IO + SupervisorJob()
    
    suspend fun fetchData(): Data {
        // Наследует IO dispatcher от родительского контекста
        val preliminaryData = loadFromCache()
        
        // Переключается на Default для CPU-интенсивной работы
        return withContext(Dispatchers.Default) {
            processData(preliminaryData)
        }
    }
}

Элементы CoroutineContext:

  • Dispatcher — где выполняется корутина
  • Job — управление жизненным циклом
  • CoroutineName — имя для отладки
  • CoroutineExceptionHandler — обработка необработанных исключений

withContext и смена контекста

withContext - Временное переключение контекста

suspend fun processUserData(userData: String): ProcessedData = 
    withContext(Dispatchers.Default) {
        // CPU-интенсивная обработка в Default dispatcher
        heavyDataProcessing(userData)
    } // автоматически возвращается в исходный контекст

suspend fun saveToDatabase(data: Data) {
    withContext(Dispatchers.IO) {
        // I/O операция в IO dispatcher
        database.save(data)
    }
}

Ключевые особенности:

  • Возвращает результат — в отличие от launch/async
  • Приостанавливает вызывающую корутину
  • Автоматически возвращается в исходный контекст
  • Thread-safe переключение между диспетчерами

Dispatchers - Где выполняются корутины

Типы диспетчеров

// Main — основной поток (UI в Android/Desktop)
launch(Dispatchers.Main) {
    updateUI(data)        // обновление интерфейса
    progressBar.visible = false
}

// IO — операции ввода-вывода (сеть, файлы, БД)
launch(Dispatchers.IO) {
    val data = database.findAll()     // чтение из БД
    val response = apiClient.fetch()  // HTTP запрос
    writeToFile(data)                 // запись в файл
}

// Default — CPU-интенсивные задачи
launch(Dispatchers.Default) {
    val result = complexCalculation()  // математические вычисления
    val processed = sortLargeList()    // сортировка больших данных
    val compressed = compressData()    // сжатие данных
}

// Unconfined — продолжает в том же потоке (осторожно!)
launch(Dispatchers.Unconfined) {
    quickOperation()     // выполняется в текущем потоке
    delay(100)          // после suspension может оказаться в другом потоке!
}

Характеристики диспетчеров

Dispatcher Количество потоков Назначение Примеры использования
Main 1 (UI поток) UI операции Обновление интерфейса
IO 64+ (expandable) I/O операции БД, сеть, файлы
Default CPU cores CPU-задачи Вычисления, обработка
Unconfined Любой Специальные случаи Тесты, legacy код

Практические паттерны

class UserRepository {
    // Чтение из БД — IO dispatcher
    suspend fun findUser(id: String): User = withContext(Dispatchers.IO) {
        database.findById(id)
    }
    
    // Обработка данных — Default dispatcher  
    suspend fun processUserStats(users: List<User>): Statistics = 
        withContext(Dispatchers.Default) {
            users.groupBy { it.region }
                 .mapValues { it.value.size }
                 .let { Statistics(it) }
        }
}

class UserController {
    suspend fun getUserDashboard(id: String): UserDashboard {
        // Параллельные I/O операции
        val user = async(Dispatchers.IO) { userRepo.findUser(id) }
        val orders = async(Dispatchers.IO) { orderRepo.findByUserId(id) }
        val stats = async(Dispatchers.Default) { 
            calculateUserStats(id) // CPU-интенсивная задача
        }
        
        return UserDashboard(
            user = user.await(),
            orders = orders.await(), 
            stats = stats.await()
        )
    }
}

Структурированная параллельность в действии

Комбинирование всех элементов

class OrderService : CoroutineScope {
    private val supervisorJob = SupervisorJob()
    override val coroutineContext = 
        Dispatchers.IO + supervisorJob + CoroutineName("OrderService")
    
    suspend fun processOrder(orderId: String): OrderResult = 
        withContext(coroutineContext) {
            try {
                // Параллельная загрузка данных
                val order = async { orderRepo.findById(orderId) }
                val user = async { userRepo.findById(order.await().userId) }
                val inventory = async(Dispatchers.IO) { 
                    inventoryService.checkAvailability(orderId) 
                }
                
                // CPU-интенсивная обработка
                val calculations = async(Dispatchers.Default) {
                    calculateOrderTotal(order.await(), inventory.await())
                }
                
                // Результат
                OrderResult(
                    order = order.await(),
                    user = user.await(),
                    total = calculations.await()
                )
            } catch (e: Exception) {
                handleOrderError(orderId, e)
                throw e
            }
        }
    
    fun shutdown() {
        supervisorJob.cancel() // отменяет все дочерние корутины
    }
}

Ключевые вопросы для интервью

"Как работает suspension?"

  • Компилятор создаёт state machine из suspend функции
  • В suspension points сохраняется состояние локальных переменных
  • Поток освобождается для выполнения других корутин
  • При возобновлении состояние восстанавливается

"Чем отличается launch от async?"

  • launch: fire-and-forget, возвращает Job, для side effects
  • async: возвращает Deferred, для получения результата

"Когда использовать какой Dispatcher?"

  • IO: сеть, БД, файлы (может блокироваться)
  • Default: CPU-задачи (количество потоков = CPU cores)
  • Main: UI операции (один поток)

"Что такое Structured Concurrency?"

  • Иерархия корутин с автоматическим управлением
  • Отмена родителя отменяет всех детей
  • Исключения детей всплывают к родителю
  • Автоматическая очистка ресурсов

Главное понимание: Эти строительные блоки работают вместе для создания высокопроизводительных, структурированных и безопасных асинхронных приложений, что критично для современной backend разработки.

Структурная конкуренция

Что такое Structured Concurrency?

Structured Concurrency (структурная конкуренция) — это парадигма программирования, которая организует асинхронные операции в строгую иерархическую структуру, где время жизни дочерних операций ограничено временем жизни родительских.

Основные принципы:

  • Иерархия корутин — чёткое родительско-дочернее отношение
  • Автоматическая отмена — отмена родителя отменяет всех детей
  • Распространение исключений — ошибки детей всплывают к родителю
  • Завершение — родитель ждёт завершения всех дочерних операций
// Structured concurrency в действии
suspend fun fetchUserDashboard(userId: String) = coroutineScope {
    // Все корутины автоматически отменятся, если функция будет отменена
    val profile = async { fetchProfile(userId) }
    val orders = async { fetchOrders(userId) }
    val recommendations = async { fetchRecommendations(userId) }
    
    Dashboard(
        profile = profile.await(),
        orders = orders.await(),
        recommendations = recommendations.await()
    )
    // Функция завершится только когда ВСЕ дочерние корутины завершатся
}

Зачем нужна структурная конкуренция?

Проблемы неструктурированного подхода

// ❌ Неструктурированный подход - проблемы
class BadUserService {
    fun loadUserData(userId: String) {
        GlobalScope.launch { fetchProfile(userId) }      // может "утечь"
        GlobalScope.launch { fetchOrders(userId) }       // никто не контролирует
        GlobalScope.launch { fetchNotifications(userId) } // нет связи между операциями
        
        // Проблемы:
        // 1. Невозможно отменить операции
        // 2. Нет гарантии завершения
        // 3. Утечки памяти
        // 4. Нет обработки ошибок
    }
}

Решение через Structured Concurrency

// ✅ Структурированный подход - решение
class GoodUserService : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + job
    
    suspend fun loadUserData(userId: String): UserData = coroutineScope {
        val profile = async { fetchProfile(userId) }
        val orders = async { fetchOrders(userId) }
        val notifications = async { fetchNotifications(userId) }
        
        UserData(
            profile = profile.await(),
            orders = orders.await(),
            notifications = notifications.await()
        )
    }
    
    fun cleanup() {
        job.cancel() // отменяет ВСЕ операции
    }
}

Преимущества Structured Concurrency

1. Предсказуемое управление ресурсами

suspend fun processFiles(files: List<File>) = coroutineScope {
    files.map { file ->
        async {
            val data = readFile(file)        // открывает ресурс
            processData(data)
        }
    }.awaitAll()
    // Гарантия: все файлы будут закрыты, даже при отмене
}

2. Автоматическая обработка ошибок

suspend fun fetchAllData() = try {
    coroutineScope {
        val task1 = async { riskyOperation1() }
        val task2 = async { riskyOperation2() }  // может упасть
        val task3 = async { riskyOperation3() }
        
        // Если task2 упадёт, task1 и task3 автоматически отменятся
        Triple(task1.await(), task2.await(), task3.await())
    }
} catch (e: Exception) {
    handleError(e) // единая точка обработки ошибок
}

3. Простота отмены операций

class DownloadManager : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + job
    
    fun startDownloads(urls: List<String>) = launch {
        urls.forEach { url ->
            launch { downloadFile(url) } // все привязаны к родительскому scope
        }
    }
    
    fun cancelAllDownloads() {
        job.cancel() // одна команда отменяет все загрузки
    }
}

CoroutineScope + SupervisorJob

Обычный Job vs SupervisorJob

// Обычный Job - ошибка одного ребёнка отменяет всех
class RegularJobExample : CoroutineScope {
    override val coroutineContext = Dispatchers.IO + Job()
    
    fun processData() = launch {
        val task1 = launch { reliableOperation() }
        val task2 = launch { riskyOperation() }      // упадёт
        val task3 = launch { anotherOperation() }
        
        // Если task2 упадёт, task1 и task3 тоже отменятся!
    }
}

// SupervisorJob - изолирует ошибки детей
class SupervisorJobExample : CoroutineScope {
    override val coroutineContext = Dispatchers.IO + SupervisorJob()
    
    fun processData() = launch {
        val task1 = launch { reliableOperation() }
        val task2 = launch { riskyOperation() }      // упадёт
        val task3 = launch { anotherOperation() }
        
        // task2 упадёт, но task1 и task3 продолжат работу
    }
}

Практические паттерны с SupervisorJob

1. Микросервисная архитектура

class ApiGateway : CoroutineScope {
    private val supervisorJob = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + supervisorJob
    
    suspend fun fetchDashboardData(userId: String): DashboardData {
        // Каждый сервис независим - падение одного не влияет на другие
        val userService = async { 
            try { userServiceClient.getUser(userId) }
            catch (e: Exception) { null } // fallback
        }
        val orderService = async { 
            try { orderServiceClient.getOrders(userId) }
            catch (e: Exception) { emptyList() } // fallback
        }
        val notificationService = async { 
            try { notificationServiceClient.getNotifications(userId) }
            catch (e: Exception) { emptyList() } // fallback
        }
        
        return DashboardData(
            user = userService.await(),
            orders = orderService.await(),
            notifications = notificationService.await()
        )
    }
}

2. Background обработка

class BackgroundTaskManager : CoroutineScope {
    private val supervisorJob = SupervisorJob()
    override val coroutineContext = Dispatchers.Default + supervisorJob
    
    fun startBackgroundTasks() {
        // Каждая задача независима
        launch { 
            while (isActive) {
                cleanupOldData()
                delay(TimeUnit.HOURS.toMillis(1))
            }
        }
        
        launch {
            while (isActive) {
                generateReports()
                delay(TimeUnit.HOURS.toMillis(6))
            }
        }
        
        launch {
            while (isActive) {
                syncWithExternalSystem()
                delay(TimeUnit.MINUTES.toMillis(30))
            }
        }
    }
}

coroutineScope vs supervisorScope

coroutineScope - строгая иерархия

suspend fun fetchCriticalData() = coroutineScope {
    // Все операции критичны - если одна упадёт, отменяем всё
    val userData = async { fetchUser() }          // критично
    val accountData = async { fetchAccount() }    // критично  
    val securityData = async { fetchSecurity() } // критично
    
    CriticalData(
        user = userData.await(),
        account = accountData.await(),
        security = securityData.await()
    )
    // Если любая операция упадёт, все остальные отменятся
}

supervisorScope - изолированные дочерние операции

suspend fun fetchOptionalData() = supervisorScope {
    // Операции независимы - ошибка одной не влияет на другие
    val profile = async { 
        try { fetchProfile() }
        catch (e: Exception) { DefaultProfile }
    }
    val preferences = async { 
        try { fetchPreferences() }
        catch (e: Exception) { DefaultPreferences }
    }
    val recommendations = async { 
        try { fetchRecommendations() }
        catch (e: Exception) { emptyList() }
    }
    
    OptionalData(
        profile = profile.await(),
        preferences = preferences.await(),
        recommendations = recommendations.await()
    )
    // Каждая операция обрабатывает свои ошибки независимо
}

Сравнительная таблица

Аспект coroutineScope supervisorScope
Ошибка дочерней корутины Отменяет всех siblings Изолирована от siblings
Обработка исключений Всплывает к родителю Требует явную обработку
Использование Критичные операции Независимые операции
Fallback стратегия Всё или ничего Частичный успех

Практические примеры выбора

Когда использовать coroutineScope:

// Финансовая транзакция - всё или ничего
suspend fun processPayment(payment: Payment) = coroutineScope {
    val validation = async { validatePayment(payment) }
    val authorization = async { authorizePayment(payment) }
    val processing = async { processTransaction(payment) }
    
    // Если любой шаг упадёт, вся транзакция отменяется
    PaymentResult(
        validation.await(),
        authorization.await(), 
        processing.await()
    )
}

Когда использовать supervisorScope:

// Загрузка dashboard - показываем что получилось
suspend fun loadDashboard(userId: String) = supervisorScope {
    val weather = async { 
        try { weatherService.getCurrentWeather() }
        catch (e: Exception) { "Погода недоступна" }
    }
    val news = async { 
        try { newsService.getLatestNews() }
        catch (e: Exception) { emptyList() }
    }
    val stocks = async { 
        try { stockService.getPortfolio(userId) }
        catch (e: Exception) { emptyList() }
    }
    
    // Показываем то, что удалось загрузить
    Dashboard(weather.await(), news.await(), stocks.await())
}

Продвинутые паттерны

Комбинирование подходов

class OrderProcessingService : CoroutineScope {
    private val supervisorJob = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + supervisorJob
    
    suspend fun processOrder(order: Order): OrderResult = supervisorScope {
        // Критичная часть - обработка заказа
        val coreProcessing = async {
            coroutineScope { // строгая иерархия для критичных операций
                val validation = async { validateOrder(order) }
                val inventory = async { reserveInventory(order) }
                val payment = async { processPayment(order) }
                
                CoreOrderData(
                    validation.await(),
                    inventory.await(),
                    payment.await()
                )
            }
        }
        
        // Дополнительные сервисы - могут упасть без критичных последствий
        val notifications = async { 
            try { sendOrderNotification(order) }
            catch (e: Exception) { log.warn("Notification failed", e); null }
        }
        val analytics = async { 
            try { trackOrderEvent(order) }
            catch (e: Exception) { log.warn("Analytics failed", e); null }
        }
        val recommendations = async { 
            try { updateRecommendations(order) }
            catch (e: Exception) { log.warn("Recommendations failed", e); null }
        }
        
        OrderResult(
            core = coreProcessing.await(),
            notificationSent = notifications.await() != null,
            analyticsTracked = analytics.await() != null,
            recommendationsUpdated = recommendations.await() != null
        )
    }
}

Exception Handling Strategy

class DataAggregationService {
    suspend fun aggregateUserData(userId: String): AggregatedData = 
        supervisorScope {
            val results = listOf(
                async { fetchFromSource1(userId) },
                async { fetchFromSource2(userId) },
                async { fetchFromSource3(userId) }
            ).map { deferred ->
                try {
                    deferred.await()
                } catch (e: Exception) {
                    log.warn("Data source failed", e)
                    null // fallback значение
                }
            }
            
            AggregatedData(
                source1Data = results[0],
                source2Data = results[1], 
                source3Data = results[2],
                completeness = results.count { it != null } / results.size.toDouble()
            )
        }
}

Ключевые вопросы для интервью

"Зачем нужна Structured Concurrency?"

  • Предсказуемость: чёткие границы жизненного цикла операций
  • Надёжность: автоматическая очистка ресурсов
  • Простота отладки: иерархическая структура легче для понимания
  • Безопасность: нет утечек памяти и ресурсов

"Когда использовать SupervisorJob?"

  • Независимые операции: ошибка одной не должна влиять на другие
  • Микросервисная архитектура: каждый сервис обрабатывает свои ошибки
  • Background задачи: изоляция различных фоновых процессов

"Разница между coroutineScope и supervisorScope?"

  • coroutineScope: строгая иерархия, "всё или ничего"
  • supervisorScope: изолированные дочерние операции, "частичный успех"

"Как обрабатывать исключения в structured concurrency?"

  • Regular scope: исключения всплывают автоматически
  • Supervisor scope: требует явную обработку в каждой корутине
  • Комбинированный подход: критичные операции в regular, опциональные в supervisor

Главное понимание: Structured Concurrency обеспечивает предсказуемое и надёжное управление асинхронными операциями, что критично для production систем с высокими требованиями к стабильности.

Отмена и обработка исключений в корутинах

Механизм отмены корутин

Отмена корутин — это кооперативный процесс, где корутина добровольно проверяет свой статус и прекращает работу. В отличие от Thread.interrupt(), отмена корутин безопасна и предсказуема.

Ключевые принципы отмены

  • Кооперативность — корутина сама решает, когда остановиться
  • Безопасность — отмена не прерывает операцию посередине
  • Структурированность — отмена распространяется по иерархии
  • Исключение как сигнал — CancellationException указывает на отмену

cancel() - инициация отмены

Способы отмены

val job = launch {
    repeat(1000) { i ->
        println("Работаю: $i")
        delay(500) // suspension point - место для проверки отмены
    }
}

// Различные способы отмены
job.cancel()                           // простая отмена
job.cancel("Превышен таймаут")         // отмена с сообщением
job.cancelAndJoin()                    // отмена + ожидание завершения

// Отмена с причиной
job.cancel(TimeoutCancellationException("Операция слишком долгая"))

Проверка статуса отмены

suspend fun longRunningTask() {
    repeat(1000) { iteration ->
        // Методы проверки отмены:
        
        ensureActive()           // бросает CancellationException если отменена
        
        if (!isActive) {         // мягкая проверка без исключения
            cleanup()
            return
        }
        
        yield()                  // проверяет отмену + даёт возможность другим корутинам
        
        doWork(iteration)
    }
}

Важно: Отмена проверяется только в suspension points или при явной проверке!


isActive и кооперативная отмена

isActive - проверка без исключений

class DataProcessor : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.Default + job
    
    fun processLargeDataset(data: List<Data>) = launch {
        data.forEachIndexed { index, item ->
            // Кооперативная проверка отмены
            if (!isActive) {
                saveProgress(index) // сохраняем прогресс
                return@launch      // корректный выход
            }
            
            processItem(item)
            
            // Альтернатива через ensureActive()
            ensureActive() // бросит CancellationException если отменена
        }
    }
}

Правильная обработка отмены

suspend fun downloadFile(url: String, file: File) {
    var connection: HttpURLConnection? = null
    var outputStream: FileOutputStream? = null
    
    try {
        connection = createConnection(url)
        outputStream = FileOutputStream(file)
        
        val buffer = ByteArray(1024)
        while (isActive) { // проверяем отмену в цикле
            val bytesRead = connection.inputStream.read(buffer)
            if (bytesRead == -1) break
            
            outputStream.write(buffer, 0, bytesRead)
            yield() // suspension point для отмены
        }
    } catch (e: CancellationException) {
        file.delete() // очищаем частично скачанный файл
        throw e       // ВАЖНО: перебрасываем CancellationException
    } finally {
        connection?.disconnect()
        outputStream?.close()
    }
}

CancellationException - специальное исключение

Особенности CancellationException

suspend fun handlingCancellation() {
    try {
        longRunningOperation()
    } catch (e: CancellationException) {
        // CancellationException НЕ считается ошибкой
        cleanup() // выполняем очистку
        throw e   // ОБЯЗАТЕЛЬНО перебрасываем!
    } catch (e: Exception) {
        // Обычные исключения обрабатываем
        handleError(e)
        throw e
    }
}

// CancellationException не ломает структуру
suspend fun parentOperation() = coroutineScope {
    try {
        val job1 = async { operation1() }
        val job2 = async { operation2() }
        
        job1.cancel() // отменяем job1
        
        // job2 продолжит работу, отмена job1 не влияет на него
        job2.await()
    } catch (e: CancellationException) {
        // Родительский scope получит CancellationException
        // только если отменён сам родитель
    }
}

Timeout как отмена

suspend fun operationWithTimeout() {
    try {
        withTimeout(5000) { // автоматическая отмена через 5 секунд
            slowOperation()
        }
    } catch (e: TimeoutCancellationException) {
        // TimeoutCancellationException наследует CancellationException
        handleTimeout()
    }
}

// Мягкий timeout
suspend fun operationWithSoftTimeout() {
    val result = withTimeoutOrNull(5000) {
        slowOperation()
    }
    
    if (result == null) {
        handleTimeout()
    } else {
        processResult(result)
    }
}

Распространение исключений: launch vs async

launch - исключения всплывают немедленно

fun launchExceptionPropagation() = runBlocking {
    val exceptionHandler = CoroutineExceptionHandler { _, exception ->
        println("Перехвачено в handler: ${exception.message}")
    }
    
    val scope = CoroutineScope(SupervisorJob() + exceptionHandler)
    
    scope.launch {
        throw RuntimeException("Ошибка в launch") // всплывает немедленно
    }
    
    scope.launch {
        delay(1000)
        println("Эта корутина продолжит работу") // выполнится
    }
    
    delay(2000)
}

async - исключения всплывают при await()

fun asyncExceptionPropagation() = runBlocking {
    val scope = CoroutineScope(SupervisorJob())
    
    val deferred1 = scope.async {
        throw RuntimeException("Ошибка в async") // НЕ всплывает сразу
    }
    
    val deferred2 = scope.async {
        delay(1000)
        "Успешный результат"
    }
    
    try {
        // Исключение всплывёт здесь, при вызове await()
        val result1 = deferred1.await()
    } catch (e: Exception) {
        println("Перехвачено при await(): ${e.message}")
    }
    
    // deferred2 завершится успешно
    val result2 = deferred2.await()
    println("Результат: $result2")
}

Практический пример: обработка множественных async

suspend fun fetchMultipleResources(): CombinedResult = supervisorScope {
    val resource1 = async { 
        try { fetchResource1() }
        catch (e: Exception) { null } // локальная обработка
    }
    
    val resource2 = async { 
        try { fetchResource2() }
        catch (e: Exception) { null }
    }
    
    val resource3 = async { 
        try { fetchResource3() }
        catch (e: Exception) { null }
    }
    
    CombinedResult(
        resource1 = resource1.await(),
        resource2 = resource2.await(),
        resource3 = resource3.await()
    )
}

SupervisorJob vs обычный Job

Обычный Job - все связаны

class RegularJobBehavior {
    fun demonstrateRegularJob() = runBlocking {
        val parentJob = Job()
        val scope = CoroutineScope(Dispatchers.Default + parentJob)
        
        val child1 = scope.launch {
            try {
                delay(2000)
                println("Child 1 завершён")
            } catch (e: CancellationException) {
                println("Child 1 отменён") // выполнится
            }
        }
        
        val child2 = scope.launch {
            delay(500)
            throw RuntimeException("Child 2 упал") // ломает всех
        }
        
        val child3 = scope.launch {
            try {
                delay(1000)
                println("Child 3 завершён") // НЕ выполнится
            } catch (e: CancellationException) {
                println("Child 3 отменён") // выполнится
            }
        }
        
        delay(3000)
        // Результат: child2 ломает child1 и child3
    }
}

SupervisorJob - изоляция ошибок

class SupervisorJobBehavior {
    fun demonstrateSupervisorJob() = runBlocking {
        val supervisorJob = SupervisorJob()
        val scope = CoroutineScope(Dispatchers.Default + supervisorJob)
        
        val child1 = scope.launch {
            delay(2000)
            println("Child 1 завершён") // выполнится
        }
        
        val child2 = scope.launch {
            delay(500)
            throw RuntimeException("Child 2 упал") // изолированная ошибка
        }
        
        val child3 = scope.launch {
            delay(1000)
            println("Child 3 завершён") // выполнится
        }
        
        delay(3000)
        // Результат: только child2 упадёт, остальные работают
    }
}

Сравнительная таблица поведения

Сценарий Job SupervisorJob
Ошибка в дочерней корутине Отменяет всех siblings Изолирует ошибку
Отмена родителя Отменяет всех детей Отменяет всех детей
Обработка исключений Всплывает к родителю Требует локальную обработку
Use case Критичные операции Независимые операции

Практические паттерны обработки исключений

Паттерн 1: Graceful degradation

class ApiService : CoroutineScope {
    private val supervisorJob = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + supervisorJob + 
        CoroutineExceptionHandler { _, ex -> log.error("Unhandled exception", ex) }
    
    suspend fun loadDashboard(userId: String): Dashboard = supervisorScope {
        // Каждый компонент обрабатывает свои ошибки
        val profile = async {
            try { profileService.getProfile(userId) }
            catch (e: Exception) { 
                log.warn("Profile service failed", e)
                DefaultProfile(userId)
            }
        }
        
        val orders = async {
            try { orderService.getOrders(userId) }
            catch (e: Exception) {
                log.warn("Order service failed", e)
                emptyList<Order>()
            }
        }
        
        Dashboard(
            profile = profile.await(),
            orders = orders.await()
        )
    }
}

Паттерн 2: Circuit breaker

class ResilientApiClient {
    private var failureCount = 0
    private var lastFailureTime = 0L
    private val circuitBreakerTimeout = 30_000L // 30 секунд
    
    suspend fun makeRequest(request: ApiRequest): ApiResponse {
        // Проверяем circuit breaker
        if (isCircuitOpen()) {
            throw CircuitBreakerException("Service temporarily unavailable")
        }
        
        return try {
            val response = withTimeout(5000) {
                httpClient.execute(request)
            }
            resetFailureCount() // успех - сбрасываем счётчик
            response
        } catch (e: Exception) {
            incrementFailureCount()
            when (e) {
                is TimeoutCancellationException -> throw ServiceTimeoutException(e)
                is CancellationException -> throw e // не ломаем отмену
                else -> throw ServiceException("API call failed", e)
            }
        }
    }
    
    private fun isCircuitOpen(): Boolean {
        return failureCount >= 5 && 
               (System.currentTimeMillis() - lastFailureTime) < circuitBreakerTimeout
    }
}

Паттерн 3: Retry с экспоненциальным backoff

suspend fun <T> withRetry(
    times: Int = 3,
    initialDelay: Long = 100,
    maxDelay: Long = 1000,
    factor: Double = 2.0,
    block: suspend () -> T
): T {
    var currentDelay = initialDelay
    
    repeat(times - 1) { attempt ->
        try {
            return block()
        } catch (e: CancellationException) {
            throw e // не ретраим отмену
        } catch (e: Exception) {
            log.warn("Attempt ${attempt + 1} failed", e)
            delay(currentDelay)
            currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
        }
    }
    
    return block() // последняя попытка без ретрая
}

// Использование
suspend fun reliableApiCall(): ApiResponse = withRetry(times = 3) {
    apiClient.fetchData()
}

Продвинутые техники

Селективная отмена

class BatchProcessor {
    suspend fun processBatch(items: List<Item>): BatchResult = supervisorScope {
        val results = mutableListOf<ProcessingResult>()
        val activeJobs = mutableListOf<Job>()
        
        items.chunked(10).forEach { chunk ->
            val chunkJob = launch {
                chunk.forEach { item ->
                    if (isActive) { // проверяем отмену для каждого элемента
                        val result = processItem(item)
                        synchronized(results) { results.add(result) }
                    }
                }
            }
            activeJobs.add(chunkJob)
        }
        
        try {
            activeJobs.joinAll()
        } catch (e: CancellationException) {
            // Отменяем все активные задачи
            activeJobs.forEach { it.cancel() }
            throw e
        }
        
        BatchResult(results)
    }
}

Composite exception handling

class CompositeExceptionHandler {
    suspend fun processMultipleSources(): ProcessingResult = supervisorScope {
        val exceptions = mutableListOf<Exception>()
        val results = mutableListOf<SourceResult>()
        
        val jobs = dataSources.map { source ->
            async {
                try {
                    val result = source.fetchData()
                    synchronized(results) { results.add(result) }
                } catch (e: Exception) {
                    synchronized(exceptions) { exceptions.add(e) }
                }
            }
        }
        
        jobs.awaitAll()
        
        ProcessingResult(
            results = results,
            partialFailures = exceptions,
            success = exceptions.isEmpty()
        )
    }
}

Ключевые вопросы для интервью

"Как работает отмена корутин?"

  • Кооперативный механизм — корутина проверяет статус в suspension points
  • CancellationException — специальное исключение, не считается ошибкой
  • Структурированное распространение — от родителя к детям

"Чем отличается поведение исключений в launch и async?"

  • launch: исключения всплывают немедленно
  • async: исключения всплывают при вызове await()

"Когда использовать SupervisorJob?"

  • Независимые операции — ошибка одной не должна влиять на другие
  • Graceful degradation — частичный успех лучше полного провала
  • Микросервисная архитектура — изоляция ошибок сервисов

"Как правильно обрабатывать CancellationException?"

  • Всегда перебрасывать — не поглощать отмену
  • Выполнять cleanup — очищать ресурсы перед перебросом
  • Не логировать как ошибку — отмена это нормальное поведение

Главное понимание: Корректная обработка отмены и исключений критична для стабильности и предсказуемости асинхронных систем в production environment.

Контекст корутины (CoroutineContext)

Что такое CoroutineContext?

CoroutineContext — это индексированный набор элементов, который определяет окружение выполнения корутины. Это immutable коллекция, где каждый элемент имеет уникальный ключ (Key) и может быть получен по этому ключу.

Основные принципы:

  • Композиция — элементы объединяются через оператор +
  • Наследование — дочерние корутины наследуют контекст родителя
  • Переопределение — новые элементы заменяют старые с тем же ключом
  • Неизменяемость — создаётся новый контекст при изменениях
// Базовая структура CoroutineContext
val context: CoroutineContext = 
    Dispatchers.IO +                    // где выполняется
    SupervisorJob() +                   // управление жизненным циклом  
    CoroutineName("ApiService") +       // имя для отладки
    CoroutineExceptionHandler { _, ex -> // обработка исключений
        log.error("Unhandled exception", ex)
    }

Элементы CoroutineContext

1. Dispatcher - где выполняется корутина

Dispatcher определяет какие потоки используются для выполнения корутины. Это основной элемент для управления ресурсами и производительностью.

// Встроенные диспетчеры
val ioContext = Dispatchers.IO              // I/O операции (64+ потоков)
val defaultContext = Dispatchers.Default    // CPU задачи (CPU cores потоков)
val mainContext = Dispatchers.Main          // UI поток (1 поток)
val unconfinedContext = Dispatchers.Unconfined // текущий поток

// Кастомный диспетчер
val customDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
val customContext = customDispatcher + CoroutineName("CustomPool")

Характеристики диспетчеров:

  • IO: оптимизирован для блокирующих операций, expandable pool
  • Default: оптимизирован для CPU-задач, fixed pool по количеству ядер
  • Main: единственный UI поток, для обновления интерфейса
  • Unconfined: продолжает в том потоке где приостановился

2. Job - управление жизненным циклом

Job представляет жизненный цикл корутины и позволяет управлять её выполнением: отменять, ожидать завершения, проверять статус.

// Типы Job
val regularJob = Job()                    // обычный Job
val supervisorJob = SupervisorJob()       // изолирует ошибки детей

// Иерархия Job
val parentJob = SupervisorJob()
val childContext = Dispatchers.IO + Job(parent = parentJob)

// Управление Job
val job = launch(childContext) { 
    longRunningTask() 
}

job.cancel()        // отменить
job.join()          // дождаться завершения  
job.isActive        // проверить статус

Ключевые особенности:

  • Иерархия: дочерние Job отменяются при отмене родительского
  • SupervisorJob: изолирует ошибки дочерних Job
  • Автоматическое создание: если не указан, создаётся автоматически

3. CoroutineName - отладка и мониторинг

CoroutineName добавляет человекочитаемое имя корутине для упрощения отладки, логирования и профилирования.

// Именование корутин
val namedContext = CoroutineName("UserDataLoader") + Dispatchers.IO

launch(namedContext) {
    // В логах и профайлере будет видно имя "UserDataLoader"
    loadUserData()
}

// Получение имени из контекста
suspend fun getCurrentCoroutineName(): String? {
    return coroutineContext[CoroutineName]?.name
}

// Вложенные имена
val parentContext = CoroutineName("ApiService")
val childContext = parentContext + CoroutineName("DatabaseQuery")
// Результат: "ApiService.DatabaseQuery"

4. CoroutineExceptionHandler - обработка исключений

CoroutineExceptionHandler определяет как обрабатывать необработанные исключения в корутинах. Работает только для root корутин (запущенных с launch).

// Глобальный обработчик ошибок
val exceptionHandler = CoroutineExceptionHandler { context, exception ->
    val coroutineName = context[CoroutineName]?.name ?: "Unknown"
    log.error("Exception in coroutine '$coroutineName'", exception)
    
    // Можно отправить метрики, уведомления и т.д.
    metricsService.recordException(exception)
}

// Использование в контексте
val serviceContext = Dispatchers.IO + 
                    SupervisorJob() + 
                    CoroutineName("BackgroundService") +
                    exceptionHandler

class BackgroundService : CoroutineScope {
    override val coroutineContext = serviceContext
    
    fun startProcessing() = launch {
        // Любые необработанные исключения попадут в exceptionHandler
        processData()
    }
}

Важно: CoroutineExceptionHandler НЕ работает с async — там исключения всплывают при await().

5. ThreadContextElement - передача данных

ThreadContextElement позволяет передавать thread-local данные между корутинами, даже при смене потоков.

// Кастомный элемент контекста
class UserContext(val userId: String) : AbstractCoroutineContextElement(UserContext) {
    companion object Key : CoroutineContext.Key<UserContext>
    
    override fun toString(): String = "UserContext($userId)"
}

// Использование
val userContext = UserContext("user123") + Dispatchers.IO

launch(userContext) {
    val currentUser = coroutineContext[UserContext]?.userId
    println("Processing for user: $currentUser")
    
    withContext(Dispatchers.Default) {
        // UserContext автоматически передастся в новый поток
        val stillSameUser = coroutineContext[UserContext]?.userId
        println("Still processing for user: $stillSameUser")
    }
}

Композиция и манипуляция контекста

Оператор + (плюс) - объединение элементов

// Базовый контекст
val baseContext = Dispatchers.IO + SupervisorJob()

// Добавление элементов
val enrichedContext = baseContext + 
                     CoroutineName("ApiService") +
                     CoroutineExceptionHandler { _, ex -> handleError(ex) }

// Переопределение элементов (новый заменяет старый)
val newContext = enrichedContext + Dispatchers.Default  // заменяет Dispatchers.IO

Методы доступа к элементам

suspend fun analyzeCurrentContext() {
    val context = coroutineContext
    
    // Получение элементов по ключу
    val dispatcher = context[ContinuationInterceptor] as? CoroutineDispatcher
    val job = context[Job]
    val name = context[CoroutineName]?.name
    
    // Проверка наличия элемента
    val hasExceptionHandler = context[CoroutineExceptionHandler] != null
    
    println("Dispatcher: $dispatcher")
    println("Job: $job")
    println("Name: $name")
    println("Has exception handler: $hasExceptionHandler")
}

Удаление элементов

// Удаление элемента из контекста
val contextWithoutName = originalContext.minusKey(CoroutineName)

// Фильтрация контекста
val filteredContext = originalContext.fold(EmptyCoroutineContext) { acc, element ->
    when (element) {
        is CoroutineName -> acc  // пропускаем CoroutineName
        else -> acc + element    // оставляем остальные элементы
    }
}

Наследование контекста

Автоматическое наследование

class ParentService : CoroutineScope {
    // Родительский контекст
    override val coroutineContext = Dispatchers.IO + 
                                   SupervisorJob() + 
                                   CoroutineName("ParentService")
    
    suspend fun processData() {
        // Дочерняя корутина наследует родительский контекст
        launch {
            // Автоматически получает: Dispatchers.IO + SupervisorJob + CoroutineName
            doWork()
        }
        
        // Переопределение элементов для дочерней корутины
        launch(Dispatchers.Default + CoroutineName("ChildWorker")) {
            // Получает: Dispatchers.Default + SupervisorJob + CoroutineName("ChildWorker")
            doCpuIntensiveWork()
        }
    }
}

Явное управление наследованием

suspend fun controlledInheritance() = coroutineScope {
    val parentContext = coroutineContext
    println("Parent context: $parentContext")
    
    // Полное наследование
    launch {
        println("Child 1 context: $coroutineContext") // идентичен родительскому
    }
    
    // Частичное переопределение
    launch(CoroutineName("CustomChild")) {
        println("Child 2 context: $coroutineContext") // изменено только имя
    }
    
    // Полное переопределение
    launch(Dispatchers.Unconfined + Job() + CoroutineName("IndependentChild")) {
        println("Child 3 context: $coroutineContext") // полностью новый контекст
    }
}

Практические паттерны

Паттерн 1: Сервисные контексты

// Базовые контексты для разных типов сервисов
object ServiceContexts {
    // Для I/O операций
    val ioService = Dispatchers.IO + 
                   SupervisorJob() + 
                   CoroutineExceptionHandler { _, ex -> 
                       log.error("IO Service error", ex)
                   }
    
    // Для CPU-интенсивных задач
    val computeService = Dispatchers.Default + 
                        SupervisorJob() + 
                        CoroutineExceptionHandler { _, ex -> 
                            log.error("Compute Service error", ex)
                        }
    
    // Для UI операций (Android/Desktop)
    val uiService = Dispatchers.Main + 
                   SupervisorJob() + 
                   CoroutineExceptionHandler { _, ex -> 
                       showErrorDialog(ex)
                   }
}

// Использование
class UserRepository : CoroutineScope {
    override val coroutineContext = ServiceContexts.ioService + 
                                   CoroutineName("UserRepository")
}

Паттерн 2: Request-scoped контекст

// Контекст привязанный к HTTP запросу
class RequestContext(
    val requestId: String,
    val userId: String?,
    val traceId: String
) : AbstractCoroutineContextElement(RequestContext) {
    companion object Key : CoroutineContext.Key<RequestContext>
}

@RestController
class UserController {
    
    @GetMapping("/users/{id}")
    suspend fun getUser(@PathVariable id: String): UserDto = withContext(
        RequestContext(
            requestId = UUID.randomUUID().toString(),
            userId = id,
            traceId = MDC.get("traceId")
        )
    ) {
        // Весь код в этом блоке имеет доступ к RequestContext
        userService.findById(id)
    }
}

class UserService {
    suspend fun findById(id: String): UserDto {
        val requestContext = coroutineContext[RequestContext]
        log.info("Processing request ${requestContext?.requestId} for user $id")
        
        return userRepository.findById(id)
    }
}

Паттерн 3: Контекст с метриками

class MetricsContext(
    val operationName: String,
    private val startTime: Long = System.currentTimeMillis()
) : AbstractCoroutineContextElement(MetricsContext) {
    companion object Key : CoroutineContext.Key<MetricsContext>
    
    fun recordDuration() {
        val duration = System.currentTimeMillis() - startTime
        metricsRegistry.timer(operationName).record(duration, TimeUnit.MILLISECONDS)
    }
}

// Автоматическое измерение времени выполнения
suspend fun <T> withMetrics(operationName: String, block: suspend () -> T): T {
    val metricsContext = MetricsContext(operationName)
    return try {
        withContext(metricsContext) {
            block()
        }
    } finally {
        metricsContext.recordDuration()
    }
}

// Использование
suspend fun processOrder(orderId: String) = withMetrics("order.processing") {
    // Время выполнения автоматически измеряется
    orderRepository.findById(orderId)
        .let { validateOrder(it) }
        .let { processPayment(it) }
        .let { updateInventory(it) }
}

Продвинутые техники

Динамическое изменение контекста

class AdaptiveService : CoroutineScope {
    private var _context = Dispatchers.IO + SupervisorJob()
    override val coroutineContext: CoroutineContext get() = _context
    
    fun adaptToLoad(currentLoad: Int) {
        val newDispatcher = when {
            currentLoad > 80 -> Dispatchers.IO.limitedParallelism(32)
            currentLoad > 50 -> Dispatchers.IO.limitedParallelism(16)
            else -> Dispatchers.IO
        }
        
        _context = newDispatcher + 
                  _context[Job]!! + 
                  CoroutineName("AdaptiveService-Load$currentLoad")
    }
}

Контекст-зависимая логика

suspend fun smartOperation() {
    val currentDispatcher = coroutineContext[ContinuationInterceptor]
    
    when (currentDispatcher) {
        Dispatchers.Main -> {
            // Уже в UI потоке - можно обновлять UI
            updateProgressBar()
        }
        Dispatchers.IO -> {
            // В IO потоке - можно делать блокирующие операции
            performBlockingIO()
        }
        else -> {
            // Переключаемся в подходящий контекст
            withContext(Dispatchers.IO) {
                performBlockingIO()
            }
        }
    }
}

Ключевые вопросы для интервью

"Что такое CoroutineContext и зачем он нужен?"

  • Окружение выполнения корутины - где, как и с какими параметрами
  • Композиция элементов - Dispatcher + Job + дополнительные элементы
  • Наследование - дочерние корутины получают контекст родителя

"Как работает наследование контекста?"

  • Автоматическое копирование всех элементов от родителя к ребёнку
  • Переопределение через оператор + заменяет элементы с тем же ключом
  • Сохранение иерархии Job для структурированной конкуренции

"Чем отличается Job от SupervisorJob в контексте?"

  • Job: ошибка дочерней корутины отменяет siblings
  • SupervisorJob: изолирует ошибки дочерних корутин

"Как добавить кастомные элементы в контекст?"

  • Наследование от AbstractCoroutineContextElement
  • Определение уникального Key
  • Реализация логики передачи между потоками

"Когда использовать CoroutineExceptionHandler?"

  • Root корутины запущенные с launch (не async)
  • Глобальная обработка ошибок на уровне сервиса
  • Логирование и мониторинг необработанных исключений

Главное понимание: CoroutineContext обеспечивает декларативное управление всеми аспектами выполнения корутин, что критично для построения maintainable и observable асинхронных систем.

Kotlin Flow

Что такое Flow?

Flow — это асинхронная последовательность значений в Kotlin, которая излучает (emit) данные over time. Это корутинная альтернатива RxJava/Reactive Streams, интегрированная в экосистему корутин с поддержкой structured concurrency.

Ключевые принципы:

  • Асинхронность — работает с suspend функциями
  • Cold по умолчанию — выполняется только при подписке (collect)
  • Отменяемость — автоматически отменяется при отмене корутины
  • Backpressure — поддерживает управление потоком данных
// Базовый пример Flow
val numbersFlow = flow {
    for (i in 1..5) {
        delay(1000)    // асинхронная операция
        emit(i)        // излучение значения
    }
}

// Использование
numbersFlow.collect { value ->
    println("Received: $value") // получение каждого значения
}

Cold vs Hot Streams

Cold Streams - выполняются при подписке

Cold Flow начинает работу только при вызове collect() и создаёт независимый поток данных для каждого подписчика.

val coldFlow = flow {
    println("Starting cold flow") // выполнится для каждого collect
    repeat(3) { i ->
        delay(1000)
        emit(i)
    }
}

// Каждый collect запускает flow заново
launch { coldFlow.collect { println("Collector 1: $it") } }
launch { coldFlow.collect { println("Collector 2: $it") } }
// Результат: "Starting cold flow" напечатается дважды

// Cold flow как factory функция
fun createDataStream(userId: String) = flow {
    val data = fetchUserData(userId) // выполняется при каждом collect
    emit(data)
}

Характеристики Cold Flow:

  • Lazy — не выполняется без подписки
  • Unicast — каждый подписчик получает собственный поток
  • Reproducible — повторный collect даёт тот же результат
  • Resource efficient — создаётся только когда нужен

Hot Streams - выполняются независимо от подписчиков

Hot Flow работает независимо от подписчиков и разделяет данные между всеми активными наблюдателями.

// SharedFlow - hot stream
val hotFlow = MutableSharedFlow<Int>()

// Запуск производителя данных
launch {
    repeat(5) { i ->
        delay(1000)
        hotFlow.emit(i) // излучает независимо от подписчиков
    }
}

// Подписчики получают только новые данные
launch {
    delay(2500) // подписываемся через 2.5 секунды
    hotFlow.collect { println("Late subscriber: $it") } // получит только 3, 4
}

Характеристики Hot Flow:

  • Eager — работает независимо от подписчиков
  • Multicast — все подписчики получают одни данные
  • Stateful — может хранить последние значения
  • Always active — продолжает работу даже без подписчиков

Сравнительная таблица

Аспект Cold Flow Hot Flow
Активация При collect() Сразу при создании
Подписчики Независимые потоки Общий поток данных
Ресурсы Создаются по требованию Постоянно активны
Use case API вызовы, БД запросы UI состояния, события

Основные операторы Flow

map - преобразование значений

val numbers = flowOf(1, 2, 3, 4, 5)

val doubled = numbers.map { it * 2 }
// Результат: 2, 4, 6, 8, 10

val userNames = userIds.map { id ->
    userRepository.findById(id).name // suspend функция
}

Назначение: Преобразование каждого элемента потока с возможностью вызова suspend функций.

filter - фильтрация значений

val evenNumbers = numbers.filter { it % 2 == 0 }
// Результат: 2, 4

val activeUsers = users.filter { user ->
    userService.isActive(user.id) // suspend функция
}

Назначение: Отбор элементов по условию, может использовать suspend функции.

flatMapConcat - последовательное преобразование

val userOrders = userIds.flatMapConcat { userId ->
    orderRepository.getOrdersFlow(userId) // возвращает Flow<Order>
}
// Ждёт завершения каждого потока перед переходом к следующему

// Практический пример
fun getUserDataFlow(userIds: List<String>) = userIds.asFlow()
    .flatMapConcat { userId ->
        flow {
            val profile = userService.getProfile(userId)
            val orders = orderService.getOrders(userId)
            emit(UserData(profile, orders))
        }
    }

Назначение: Преобразование каждого элемента в Flow и последовательное объединение результатов.

collect - подписка и получение данных

// Простая подписка
flow.collect { value ->
    println("Received: $value")
}

// Обработка с сохранением в список
val results = mutableListOf<String>()
flow.collect { results.add(it) }

// Collect с обработкой ошибок
try {
    flow.collect { processData(it) }
} catch (e: Exception) {
    handleError(e)
}

Назначение: Терминальный оператор для подписки на Flow и получения всех значений.

onEach - выполнение действий без изменения потока

val processedFlow = dataFlow
    .onEach { data -> 
        log.info("Processing data: $data") // side effect
    }
    .onEach { data ->
        metricsService.recordDataPoint(data) // ещё один side effect
    }
    .map { processData(it) } // основная трансформация

// onEach не изменяет сам поток данных
processedFlow.collect { result ->
    // получаем обработанные данные
}

Назначение: Выполнение side effects (логирование, метрики, валидация) без изменения самих данных.

catch - обработка исключений

val resilientFlow = riskyDataFlow
    .map { data -> 
        riskyTransformation(data) // может бросить исключение
    }
    .catch { exception ->
        when (exception) {
            is NetworkException -> emit(CachedData) // fallback значение
            is ValidationException -> { 
                log.warn("Validation failed", exception)
                // не emit - пропускаем невалидные данные
            }
            else -> throw exception // перебрасываем неожиданные ошибки
        }
    }

Назначение: Перехват и обработка исключений в upstream операторах с возможностью emit fallback значений.

flowOn - смена контекста выполнения

val optimizedFlow = databaseFlow
    .flowOn(Dispatchers.IO)        // upstream операции в IO
    .map { data -> 
        heavyComputation(data)     // CPU-интенсивная обработка
    }
    .flowOn(Dispatchers.Default)   // эта операция в Default
    .onEach { result ->
        updateUI(result)           // UI операции остаются в текущем контексте
    }

// Практический пример
class DataProcessor {
    fun processDataStream() = flow {
        // Эмиссия в IO контексте
        val rawData = loadFromDatabase()
        emit(rawData)
    }
    .flowOn(Dispatchers.IO)
    .map { data ->
        // Обработка в Default контексте  
        processData(data)
    }
    .flowOn(Dispatchers.Default)
}

Назначение: Изменение контекста выполнения для upstream операций без влияния на downstream.


StateFlow, SharedFlow, MutableStateFlow

StateFlow - состояние с последним значением

StateFlow — это hot flow, который хранит текущее состояние и сразу отдаёт его новым подписчикам.

class UserViewModel {
    private val _userState = MutableStateFlow<UiState>(UiState.Loading)
    val userState: StateFlow<UiState> = _userState.asStateFlow()
    
    suspend fun loadUser(id: String) {
        _userState.value = UiState.Loading
        try {
            val user = userRepository.findById(id)
            _userState.value = UiState.Success(user)
        } catch (e: Exception) {
            _userState.value = UiState.Error(e.message)
        }
    }
}

// Подписка
userViewModel.userState.collect { state ->
    when (state) {
        is UiState.Loading -> showProgress()
        is UiState.Success -> showUser(state.user)
        is UiState.Error -> showError(state.message)
    }
}

Характеристики StateFlow:

  • Conflated — хранит только последнее значение
  • Hot — излучает независимо от подписчиков
  • Replay 1 — новые подписчики сразу получают текущее состояние
  • Distinct — не излучает одинаковые значения подряд

SharedFlow - события и уведомления

SharedFlow — это hot flow для событий и уведомлений, которые не имеют "текущего состояния".

class EventBus {
    private val _events = MutableSharedFlow<AppEvent>()
    val events: SharedFlow<AppEvent> = _events.asSharedFlow()
    
    suspend fun publishEvent(event: AppEvent) {
        _events.emit(event)
    }
}

// Разные типы событий
sealed class AppEvent {
    data class UserLoggedIn(val userId: String) : AppEvent()
    data class OrderCreated(val orderId: String) : AppEvent()
    object NetworkError : AppEvent()
}

// Подписка на события
eventBus.events.collect { event ->
    when (event) {
        is AppEvent.UserLoggedIn -> handleUserLogin(event.userId)
        is AppEvent.OrderCreated -> sendNotification(event.orderId)
        is AppEvent.NetworkError -> showNetworkError()
    }
}

MutableStateFlow vs MutableSharedFlow конфигурация

// MutableStateFlow - простое состояние
val simpleState = MutableStateFlow("initial")

// MutableSharedFlow с настройками
val configuredSharedFlow = MutableSharedFlow<String>(
    replay = 2,           // последние 2 значения для новых подписчиков
    extraBufferCapacity = 10, // дополнительный буфер
    onBufferOverflow = BufferOverflow.DROP_OLDEST // стратегия при переполнении
)

// Сложные конфигурации для разных сценариев
class NotificationService {
    // Критичные уведомления - не теряем
    private val criticalEvents = MutableSharedFlow<CriticalEvent>(
        replay = 0,
        extraBufferCapacity = Int.MAX_VALUE,
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    
    // Обычные события - можем потерять старые
    private val regularEvents = MutableSharedFlow<RegularEvent>(
        replay = 1,
        extraBufferCapacity = 64,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
}

Flow vs LiveData (Android контекст)

Основные отличия

Аспект Flow LiveData
Платформа Multiplatform Android only
Корутины Нативная поддержка Требует адаптеры
Операторы Богатый набор Ограниченный набор
Threading Любой Dispatcher Main thread только
Lifecycle Ручное управление Автоматическое

Миграция с LiveData на Flow

// LiveData подход
class UserViewModelLiveData : ViewModel() {
    private val _users = MutableLiveData<List<User>>()
    val users: LiveData<List<User>> = _users
    
    fun loadUsers() {
        viewModelScope.launch {
            _users.value = userRepository.getUsers()
        }
    }
}

// Flow подход
class UserViewModelFlow : ViewModel() {
    val users: StateFlow<List<User>> = userRepository.getUsersFlow()
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )
}

// Подписка в UI (Android)
class UserFragment : Fragment() {
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        // LiveData подписка
        viewModel.users.observe(viewLifecycleOwner) { users ->
            updateUI(users)
        }
        
        // Flow подписка
        viewLifecycleOwner.lifecycleScope.launch {
            viewModel.users.collect { users ->
                updateUI(users)
            }
        }
    }
}

Преимущества Flow:

  • Композиция операторов — сложные трансформации данных
  • Корутинная интеграция — естественная работа с suspend функциями
  • Multiplatform — код работает на всех платформах
  • Backpressure — контроль скорости обработки данных

Подписка и отмена

Подписка с жизненным циклом

class DataService : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + job
    
    fun startListening() {
        // Подписка привязана к scope службы
        launch {
            dataFlow.collect { data ->
                processData(data)
            }
        }
    }
    
    fun stop() {
        job.cancel() // автоматически отменяет все подписки
    }
}

Условная отмена подписки

suspend fun conditionalCollection() {
    dataFlow
        .takeWhile { data -> data.isValid } // автоматическая отмена при условии
        .collect { data ->
            processValidData(data)
        }
    // collect завершится когда встретится невалидный элемент
}

// Отмена по времени
suspend fun timedCollection() = withTimeout(30_000) {
    dataFlow.collect { data ->
        processData(data)
    }
    // автоматическая отмена через 30 секунд
}

Обработка отмены в производителе

fun createCancellableFlow() = flow {
    try {
        repeat(100) { i ->
            ensureActive() // проверка отмены
            val data = fetchData(i)
            emit(data)
            delay(1000)
        }
    } catch (e: CancellationException) {
        cleanup() // очистка ресурсов при отмене
        throw e   // важно перебросить
    }
}

Практические паттерны

Паттерн 1: Reactive Repository

class UserRepository {
    private val _users = MutableStateFlow<List<User>>(emptyList())
    val users: StateFlow<List<User>> = _users.asStateFlow()
    
    private val _events = MutableSharedFlow<UserEvent>()
    val events: SharedFlow<UserEvent> = _events.asSharedFlow()
    
    suspend fun addUser(user: User) {
        val updatedUsers = _users.value + user
        _users.value = updatedUsers
        _events.emit(UserEvent.UserAdded(user))
    }
    
    suspend fun removeUser(userId: String) {
        val updatedUsers = _users.value.filterNot { it.id == userId }
        _users.value = updatedUsers
        _events.emit(UserEvent.UserRemoved(userId))
    }
}

Паттерн 2: Flow для real-time данных

class StockPriceService {
    fun getPriceUpdates(symbol: String): Flow<Price> = flow {
        val webSocket = createWebSocket(symbol)
        try {
            webSocket.messages.collect { message ->
                val price = parsePrice(message)
                emit(price)
            }
        } finally {
            webSocket.close()
        }
    }
    .flowOn(Dispatchers.IO)
    .retry(3) { exception ->
        exception is NetworkException
    }
}

Паттерн 3: Combining multiple flows

class DashboardService(
    private val userService: UserService,
    private val orderService: OrderService,
    private val notificationService: NotificationService
) {
    fun getDashboardData(userId: String): Flow<DashboardData> = 
        combine(
            userService.getUserFlow(userId),
            orderService.getOrdersFlow(userId),
            notificationService.getNotificationsFlow(userId)
        ) { user, orders, notifications ->
            DashboardData(
                user = user,
                orders = orders,
                notifications = notifications,
                lastUpdated = System.currentTimeMillis()
            )
        }
}

Ключевые вопросы для интервью

"Чем отличается Cold от Hot Flow?"

  • Cold: выполняется при collect, каждый подписчик получает новый поток
  • Hot: работает независимо, подписчики получают общие данные

"Когда использовать StateFlow vs SharedFlow?"

  • StateFlow: для состояния UI, конфигурации, текущих значений
  • SharedFlow: для событий, уведомлений, actions

"Как работает backpressure в Flow?"

  • Suspension — производитель приостанавливается если потребитель не успевает
  • Buffer — промежуточное хранение для сглаживания скачков
  • Overflow strategies — поведение при переполнении буфера

"В чём преимущества Flow перед RxJava?"

  • Корутинная интеграция — естественная работа с suspend функциями
  • Structured concurrency — автоматическая отмена
  • Null safety — типобезопасность Kotlin
  • Меньше boilerplate — более простой API

"Как правильно обрабатывать ошибки в Flow?"

  • catch оператор для upstream ошибок
  • try-catch в collect для downstream ошибок
  • retry для автоматических повторов
  • onCompletion для финализации

Главное понимание: Flow предоставляет реактивную парадигму с корутинной интеграцией, что делает асинхронную обработку потоков данных естественной частью Kotlin экосистемы.

Корутины и Spring

Интеграция корутин в Spring Framework

Spring Framework с версии 5.2 предоставляет нативную поддержку корутин Kotlin, позволяя писать неблокирующие реактивные приложения в императивном стиле. Это дает возможность использовать знакомый синтаксис вместо сложных reactive chains.

Ключевые преимущества:

  • Императивный код вместо callback chains
  • Structured concurrency — автоматическое управление жизненным циклом
  • Высокая производительность — неблокирующие I/O операции
  • Простота отладки — линейный flow выполнения
// Традиционный Spring MVC (блокирующий)
@RestController
class UserController(private val userService: UserService) {
    
    @GetMapping("/users/{id}")
    fun getUser(@PathVariable id: String): User {
        return userService.findById(id) // блокирует поток
    }
}

// Spring WebFlux с корутинами (неблокирующий)
@RestController
class UserControllerCoroutines(private val userService: UserService) {
    
    @GetMapping("/users/{id}")
    suspend fun getUser(@PathVariable id: String): User {
        return userService.findById(id) // не блокирует поток
    }
}

Spring WebFlux + корутины

Controller с suspend функциями

Suspend controllers позволяют обрабатывать HTTP запросы неблокирующим способом, автоматически интегрируясь с reactive stack Spring WebFlux.

@RestController
@RequestMapping("/api/v1")
class ApiController(
    private val userService: UserService,
    private val orderService: OrderService
) {
    
    // Простой suspend endpoint
    @GetMapping("/users/{id}")
    suspend fun getUser(@PathVariable id: String): UserDto {
        return userService.findById(id)
    }
    
    // Параллельная загрузка данных
    @GetMapping("/users/{id}/dashboard")
    suspend fun getUserDashboard(@PathVariable id: String): DashboardDto {
        // Параллельное выполнение запросов
        val userDeferred = async { userService.findById(id) }
        val ordersDeferred = async { orderService.findByUserId(id) }
        val preferencesDeferred = async { preferencesService.findByUserId(id) }
        
        return DashboardDto(
            user = userDeferred.await(),
            orders = ordersDeferred.await(),
            preferences = preferencesDeferred.await()
        )
    }
    
    // Flow как response
    @GetMapping("/users/{id}/notifications", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun getNotificationStream(@PathVariable id: String): Flow<NotificationDto> {
        return notificationService.getNotificationFlow(id)
            .map { it.toDto() }
    }
}

Обработка исключений в корутинах

@ControllerAdvice
class CoroutineExceptionHandler {
    
    @ExceptionHandler(IllegalArgumentException::class)
    suspend fun handleValidationException(ex: IllegalArgumentException): ResponseEntity<ErrorDto> {
        // Может выполнять suspend операции для логирования/метрик
        auditService.logError(ex)
        
        return ResponseEntity.badRequest()
            .body(ErrorDto("Validation failed: ${ex.message}"))
    }
    
    @ExceptionHandler(TimeoutCancellationException::class)
    suspend fun handleTimeout(ex: TimeoutCancellationException): ResponseEntity<ErrorDto> {
        return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
            .body(ErrorDto("Request timeout"))
    }
}

Spring Data + корутины

R2DBC с корутинами

R2DBC (Reactive Relational Database Connectivity) обеспечивает неблокирующий доступ к реляционным БД с поддержкой корутин через Spring Data R2DBC.

// Репозиторий с suspend функциями
interface UserRepository : CoroutineCrudRepository<User, String> {
    
    // Автоматически генерируемые suspend методы
    // suspend fun findById(id: String): User?
    // suspend fun save(user: User): User
    // suspend fun deleteById(id: String)
    
    // Кастомные suspend запросы
    @Query("SELECT * FROM users WHERE email = :email")
    suspend fun findByEmail(email: String): User?
    
    @Query("SELECT * FROM users WHERE status = :status")
    fun findByStatus(status: UserStatus): Flow<User>
    
    @Query("SELECT * FROM users WHERE created_at > :date")
    suspend fun findRecentUsers(date: LocalDateTime): List<User>
}

// Использование в сервисе
@Service
class UserService(private val userRepository: UserRepository) {
    
    suspend fun createUser(request: CreateUserRequest): User {
        // Проверка существования пользователя
        val existingUser = userRepository.findByEmail(request.email)
        if (existingUser != null) {
            throw UserAlreadyExistsException("User with email ${request.email} already exists")
        }
        
        // Создание нового пользователя
        val user = User(
            id = UUID.randomUUID().toString(),
            email = request.email,
            name = request.name,
            status = UserStatus.ACTIVE,
            createdAt = LocalDateTime.now()
        )
        
        return userRepository.save(user)
    }
    
    // Streaming результатов
    fun getActiveUsersStream(): Flow<User> {
        return userRepository.findByStatus(UserStatus.ACTIVE)
    }
}

MongoDB Reactive с корутинами

// Reactive MongoDB репозиторий
interface ProductRepository : ReactiveMongoRepository<Product, String> {
    
    fun findByCategory(category: String): Flow<Product>
    
    @Query("{ 'price': { \$gte: ?0, \$lte: ?1 } }")
    fun findByPriceRange(minPrice: BigDecimal, maxPrice: BigDecimal): Flow<Product>
}

// Сервис с корутинами
@Service
class ProductService(private val productRepository: ProductRepository) {
    
    suspend fun findProductById(id: String): Product? {
        return productRepository.findById(id).awaitFirstOrNull()
    }
    
    suspend fun saveProduct(product: Product): Product {
        return productRepository.save(product).awaitFirst()
    }
    
    // Batch обработка через Flow
    suspend fun updatePrices(updates: List<PriceUpdate>) {
        updates.asFlow()
            .flatMapConcat { update ->
                productRepository.findById(update.productId)
                    .map { product -> 
                        product.copy(price = update.newPrice)
                    }
                    .flatMap { productRepository.save(it) }
            }
            .collect() // Выполняем все обновления
    }
}

Spring Security + корутины

Reactive Security с корутинами

Spring Security WebFlux поддерживает корутины для неблокирующей аутентификации и авторизации.

@Configuration
@EnableWebFluxSecurity
class SecurityConfig {
    
    @Bean
    fun securityWebFilterChain(http: ServerHttpSecurity): SecurityWebFilterChain {
        return http
            .authorizeExchange { exchanges ->
                exchanges
                    .pathMatchers("/api/public/**").permitAll()
                    .pathMatchers("/api/admin/**").hasRole("ADMIN")
                    .anyExchange().authenticated()
            }
            .oauth2ResourceServer { oauth2 ->
                oauth2.jwt()
            }
            .build()
    }
}

// Controller с безопасностью
@RestController
@RequestMapping("/api/secure")
class SecureController(private val orderService: OrderService) {
    
    @GetMapping("/orders")
    suspend fun getMyOrders(authentication: Authentication): List<OrderDto> {
        val userId = authentication.name
        return orderService.findByUserId(userId)
    }
    
    @PreAuthorize("hasRole('ADMIN')")
    @GetMapping("/admin/orders")
    suspend fun getAllOrders(): List<OrderDto> {
        return orderService.findAll()
    }
    
    // Получение текущего пользователя в корутине
    @GetMapping("/profile")
    suspend fun getProfile(): UserProfileDto {
        val authentication = currentCoroutineContext()[ReactiveSecurityContextHolder.CONTEXT_KEY]
            ?.authentication ?: throw UnauthorizedException()
        
        return userService.getProfile(authentication.name)
    }
}

Кастомная аутентификация с корутинами

@Component
class CoroutineAuthenticationManager(
    private val userService: UserService,
    private val tokenService: TokenService
) : ReactiveAuthenticationManager {
    
    override fun authenticate(authentication: Authentication): Mono<Authentication> {
        return mono {
            authenticateCoroutine(authentication)
        }
    }
    
    private suspend fun authenticateCoroutine(authentication: Authentication): Authentication {
        val token = authentication.credentials as String
        
        // Неблокирующая проверка токена
        val claims = tokenService.validateToken(token)
        val user = userService.findById(claims.userId)
            ?: throw BadCredentialsException("User not found")
        
        return UsernamePasswordAuthenticationToken(
            user.email, 
            null, 
            user.authorities
        )
    }
}

Конфигурация и производительность

Настройка корутинного контекста

@Configuration
class CoroutineConfig {
    
    @Bean
    @Primary
    fun applicationCoroutineScope(): CoroutineScope {
        return CoroutineScope(
            SupervisorJob() + 
            Dispatchers.Default + 
            CoroutineName("SpringApp") +
            CoroutineExceptionHandler { _, exception ->
                log.error("Unhandled coroutine exception", exception)
            }
        )
    }
    
    @Bean
    fun ioCoroutineScope(): CoroutineScope {
        return CoroutineScope(
            SupervisorJob() + 
            Dispatchers.IO + 
            CoroutineName("SpringIO")
        )
    }
}

// Использование кастомных scope
@Service
class BackgroundTaskService(
    @Qualifier("ioCoroutineScope") private val ioScope: CoroutineScope
) {
    
    fun startBackgroundProcessing() {
        ioScope.launch {
            while (isActive) {
                processDataBatch()
                delay(TimeUnit.MINUTES.toMillis(5))
            }
        }
    }
}

Мониторинг и метрики корутин

@Component
class CoroutineMetrics(private val meterRegistry: MeterRegistry) {
    
    private val activeCoroutines = Gauge.builder("coroutines.active")
        .description("Number of active coroutines")
        .register(meterRegistry)
    
    private val coroutineTimer = Timer.builder("coroutines.execution.time")
        .description("Coroutine execution time")
        .register(meterRegistry)
    
    suspend fun <T> withMetrics(operation: String, block: suspend () -> T): T {
        return Timer.Sample.start(meterRegistry).use { sample ->
            activeCoroutines.incrementAndGet()
            try {
                block()
            } finally {
                activeCoroutines.decrementAndGet()
                sample.stop(Timer.builder("coroutines.$operation.time").register(meterRegistry))
            }
        }
    }
}

// Использование в контроллере
@RestController
class MetricsController(private val metrics: CoroutineMetrics) {
    
    @GetMapping("/data")
    suspend fun getData(): DataDto = metrics.withMetrics("getData") {
        loadDataFromMultipleSources()
    }
}

Практические паттерны

Паттерн 1: Reactive Gateway

@RestController
@RequestMapping("/api/gateway")
class ApiGateway(
    private val userService: UserServiceClient,
    private val orderService: OrderServiceClient,
    private val inventoryService: InventoryServiceClient
) {
    
    @GetMapping("/users/{id}/complete-profile")
    suspend fun getCompleteProfile(@PathVariable id: String): CompleteProfileDto = 
        supervisorScope {
            // Параллельные вызовы микросервисов
            val userProfile = async { 
                try { userService.getProfile(id) }
                catch (e: Exception) { null } // graceful degradation
            }
            
            val orderHistory = async { 
                try { orderService.getOrderHistory(id) }
                catch (e: Exception) { emptyList() }
            }
            
            val recommendations = async { 
                try { inventoryService.getRecommendations(id) }
                catch (e: Exception) { emptyList() }
            }
            
            CompleteProfileDto(
                profile = userProfile.await(),
                orders = orderHistory.await(),
                recommendations = recommendations.await()
            )
        }
}

Паттерн 2: Event-driven processing

@Service
class OrderEventProcessor(
    private val orderRepository: OrderRepository,
    private val notificationService: NotificationService,
    private val auditService: AuditService
) : CoroutineScope {
    
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.Default + job
    
    private val orderEvents = MutableSharedFlow<OrderEvent>()
    
    @PostConstruct
    fun startProcessing() {
        // Обработка событий заказов
        launch {
            orderEvents.collect { event ->
                when (event) {
                    is OrderEvent.Created -> handleOrderCreated(event)
                    is OrderEvent.Cancelled -> handleOrderCancelled(event)
                    is OrderEvent.Completed -> handleOrderCompleted(event)
                }
            }
        }
    }
    
    suspend fun publishEvent(event: OrderEvent) {
        orderEvents.emit(event)
    }
    
    private suspend fun handleOrderCreated(event: OrderEvent.Created) = supervisorScope {
        // Параллельная обработка
        launch { notificationService.sendOrderConfirmation(event.order) }
        launch { auditService.logOrderCreation(event.order) }
        launch { updateInventory(event.order) }
    }
    
    @PreDestroy
    fun stop() {
        job.cancel()
    }
}

Паттерн 3: Batch processing

@Service
class DataSyncService(
    private val externalApiClient: ExternalApiClient,
    private val localRepository: DataRepository
) {
    
    @Scheduled(fixedDelay = 300_000) // каждые 5 минут
    suspend fun syncData() {
        log.info("Starting data synchronization")
        
        try {
            externalApiClient.getUpdatedRecords()
                .chunked(100) // обрабатываем пачками по 100
                .collect { batch ->
                    processBatch(batch)
                }
        } catch (e: Exception) {
            log.error("Data sync failed", e)
        }
    }
    
    private suspend fun processBatch(records: List<ExternalRecord>) = supervisorScope {
        records.map { record ->
            async {
                try {
                    val localRecord = record.toLocalRecord()
                    localRepository.save(localRecord)
                } catch (e: Exception) {
                    log.warn("Failed to process record ${record.id}", e)
                }
            }
        }.awaitAll()
    }
}

Лучшие практики и подводные камни

Конфигурация thread pool

@Configuration
class WebFluxConfig : WebFluxConfigurer {
    
    override fun configureHttpMessageCodecs(configurer: ServerCodecConfigurer) {
        // Увеличиваем лимиты для streaming
        configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024) // 16MB
    }
}

// Настройка R2DBC connection pool
@Configuration
class R2dbcConfig {
    
    @Bean
    fun connectionFactory(): ConnectionFactory {
        return ConnectionFactories.get(
            ConnectionFactoryOptions.builder()
                .option(DRIVER, "postgresql")
                .option(HOST, "localhost")
                .option(PORT, 5432)
                .option(USER, "app")
                .option(PASSWORD, "password")
                .option(DATABASE, "appdb")
                .option(INITIAL_SIZE, 10)
                .option(MAX_SIZE, 50)
                .option(MAX_IDLE_TIME, Duration.ofMinutes(30))
                .build()
        )
    }
}

Обработка ошибок в WebFlux

@Component
@Order(-2) // Высокий приоритет
class GlobalErrorWebExceptionHandler : WebExceptionHandler {
    
    override fun handle(exchange: ServerWebExchange, ex: Throwable): Mono<Void> {
        return when (ex) {
            is CancellationException -> {
                // Корутина была отменена - это нормально
                exchange.response.statusCode = HttpStatus.NO_CONTENT
                exchange.response.setComplete()
            }
            is TimeoutCancellationException -> {
                exchange.response.statusCode = HttpStatus.REQUEST_TIMEOUT
                writeErrorResponse(exchange, "Request timeout")
            }
            else -> {
                exchange.response.statusCode = HttpStatus.INTERNAL_SERVER_ERROR
                writeErrorResponse(exchange, "Internal server error")
            }
        }
    }
    
    private fun writeErrorResponse(exchange: ServerWebExchange, message: String): Mono<Void> {
        val buffer = exchange.response.bufferFactory().wrap(message.toByteArray())
        return exchange.response.writeWith(Mono.just(buffer))
    }
}

Ключевые вопросы для интервью

"Как Spring интегрируется с корутинами?"

  • Автоматическое bridge между suspend функциями и Mono/Flux
  • Reactive stack остаётся неизменным под капотом
  • Императивный API поверх реактивной архитектуры

"В чём преимущества корутин над традиционными Mono/Flux?"

  • Читаемость — линейный код вместо reactive chains
  • Отладка — стандартные инструменты отладки работают
  • Exception handling — обычные try-catch блоки
  • Composition — легко комбинировать операции

"Как работает backpressure с корутинами в Spring?"

  • Автоматическое suspension — корутина приостанавливается при медленном consumer
  • Flow интеграция — поддержка backpressure на уровне Kotlin Flow
  • Buffer management — настройка буферизации через Spring конфигурацию

"Какие ограничения у корутин в Spring?"

  • Reactive stack — необходим WebFlux, не работает с обычным MVC
  • Database drivers — требуются R2DBC драйверы вместо JDBC
  • Learning curve — команда должна понимать корутины и reactive programming

"Как мониторить производительность корутин в Spring?"

  • Micrometer интеграция — метрики через стандартные Spring механизмы
  • Structured logging — correlation ID через coroutine context
  • Health checks — мониторинг активных корутин и connection pools

Главное понимание: Корутины в Spring обеспечивают высокую производительность реактивных приложений с простотой императивного кода, что критично для modern high-load backend систем.

Производительность и ошибки в корутинах

Потенциальные утечки памяти

GlobalScope - источник утечек

GlobalScope — это глобальная область видимости корутин, которая живёт всё время жизни приложения. Использование GlobalScope может привести к утечкам памяти и неконтролируемому выполнению корутин.

// ❌ ПЛОХО - утечка памяти
class UserService {
    fun loadUserData(userId: String) {
        GlobalScope.launch {
            val userData = fetchUserFromApi(userId) // может выполняться вечно
            updateCache(userData)
        }
        // Нет способа отменить эту корутину!
        // Если UserService уничтожается, корутина продолжает работать
    }
}

// ❌ ПЛОХО - долгоживущие ссылки
class MainActivity {
    fun startBackgroundWork() {
        GlobalScope.launch {
            while (true) {
                processData() // бесконечный цикл
                delay(1000)
                // Активность может быть уничтожена, но корутина продолжит работу
                // и будет держать ссылку на контекст через замыкание
            }
        }
    }
}

Правильное управление жизненным циклом

// ✅ ХОРОШО - контролируемый scope
class UserService : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + job
    
    fun loadUserData(userId: String) {
        launch { // привязано к scope сервиса
            val userData = fetchUserFromApi(userId)
            updateCache(userData)
        }
    }
    
    fun cleanup() {
        job.cancel() // отменяет ВСЕ корутины сервиса
    }
}

// ✅ ХОРОШО - scope привязанный к lifecycle
class BackgroundTaskManager {
    private var serviceJob: Job? = null
    
    fun start() {
        serviceJob = CoroutineScope(Dispatchers.Default).launch {
            while (isActive) { // проверяет отмену
                processData()
                delay(1000)
            }
        }
    }
    
    fun stop() {
        serviceJob?.cancel() // явная отмена
        serviceJob = null
    }
}

Утечки в ViewModels (Android пример)

// ❌ ПЛОХО - утечка ViewModel
class UserViewModel : ViewModel() {
    fun loadData() {
        GlobalScope.launch { // переживёт ViewModel
            val data = repository.loadData()
            // Попытка обновить UI после уничтожения ViewModel
            _uiState.value = UiState.Success(data)
        }
    }
}

// ✅ ХОРОШО - viewModelScope
class UserViewModel : ViewModel() {
    fun loadData() {
        viewModelScope.launch { // автоматически отменяется в onCleared()
            val data = repository.loadData()
            _uiState.value = UiState.Success(data)
        }
    }
}

Заморозка UI при неправильном использовании Dispatcher

Неправильное использование Main Dispatcher

Main Dispatcher предназначен для UI операций и имеет один поток. Выполнение тяжёлых операций в Main Dispatcher заблокирует UI.

// ❌ ПЛОХО - блокировка UI
class DataProcessor {
    suspend fun processLargeDataset(data: List<DataItem>) {
        // Выполняется в Main dispatcher по умолчанию
        data.forEach { item ->
            val result = performHeavyComputation(item) // БЛОКИРУЕТ UI!
            updateProgressBar(result.progress)
        }
    }
}

// ❌ ПЛОХО - синхронные операции в Main
suspend fun loadDataFromNetwork(): String {
    // Если это выполняется в Main dispatcher
    return URL("https://api.example.com/data")
        .readText() // Блокирующий I/O вызов! UI зависнет
}

Правильное переключение контекста

// ✅ ХОРОШО - правильное использование dispatchers
class DataProcessor {
    suspend fun processLargeDataset(data: List<DataItem>) = withContext(Dispatchers.Main) {
        data.forEach { item ->
            // Тяжёлые вычисления в background
            val result = withContext(Dispatchers.Default) {
                performHeavyComputation(item)
            }
            // Обновление UI в Main
            updateProgressBar(result.progress)
        }
    }
}

// ✅ ХОРОШО - правильная архитектура
class NetworkService {
    suspend fun loadData(): String = withContext(Dispatchers.IO) {
        // I/O операции в IO dispatcher
        httpClient.get("https://api.example.com/data")
    }
    
    suspend fun processData(rawData: String): ProcessedData = withContext(Dispatchers.Default) {
        // CPU-интенсивные операции в Default dispatcher
        parseAndProcess(rawData)
    }
}

class UIController {
    suspend fun refreshData() {
        // UI код остаётся в Main dispatcher
        showLoading()
        
        try {
            val rawData = networkService.loadData()
            val processedData = networkService.processData(rawData)
            
            // Обновление UI в Main dispatcher
            displayData(processedData)
        } catch (e: Exception) {
            showError(e.message)
        } finally {
            hideLoading()
        }
    }
}

Избежание блокирующих операций

// ❌ ПЛОХО - блокирующие операции в корутинах
suspend fun badDatabaseAccess() {
    // Блокирующий JDBC в корутине
    val connection = DriverManager.getConnection(jdbcUrl)
    val resultSet = connection.createStatement()
        .executeQuery("SELECT * FROM users") // блокирует поток!
}

// ✅ ХОРОШО - неблокирующие операции
suspend fun goodDatabaseAccess() {
    // R2DBC или другие неблокирующие драйверы
    val users = userRepository.findAll() // suspend функция
    return users
}

// ✅ ХОРОШО - изоляция блокирующих операций
suspend fun legacySystemIntegration() = withContext(Dispatchers.IO) {
    // Изолируем блокирующий код в IO dispatcher
    val legacyClient = LegacyJdbcClient()
    legacyClient.blockingOperation() // ограничено IO pool
}

Разница между launch и async

launch - Fire and Forget

launch создаёт корутину для side effects (побочных эффектов) и не возвращает результат. Используется когда результат выполнения не важен.

// launch для side effects
class NotificationService {
    fun sendNotifications(users: List<User>) {
        users.forEach { user ->
            // Не ждём завершения каждой отправки
            scope.launch {
                emailService.sendEmail(user.email, "Welcome!")
                analyticsService.trackEmailSent(user.id)
            }
        }
        // Функция завершается сразу, уведомления отправляются в background
    }
}

// ❌ ПЛОХО - неправильное использование launch
class DataService {
    suspend fun getUserData(id: String): UserData {
        var userData: UserData? = null
        
        launch { // launch НЕ блокирует выполнение
            userData = fetchUserData(id)
        }
        
        return userData!! // ОШИБКА! userData всегда null
    }
}

async - Параллельные вычисления с результатом

async создаёт корутину, которая возвращает результат через Deferred. Используется для параллельного выполнения операций с последующим ожиданием результатов.

// async для параллельных операций
class UserProfileService {
    suspend fun loadCompleteProfile(userId: String): CompleteProfile {
        // Запускаем операции параллельно
        val profileDeferred = async { userService.getProfile(userId) }
        val ordersDeferred = async { orderService.getUserOrders(userId) }
        val preferencesDeferred = async { preferencesService.getUserPreferences(userId) }
        
        // Ждём завершения всех операций
        return CompleteProfile(
            profile = profileDeferred.await(),
            orders = ordersDeferred.await(),
            preferences = preferencesDeferred.await()
        )
    }
}

Сравнительная таблица launch vs async

Аспект launch async
Возвращаемое значение Job Deferred
Получение результата Не предусмотрено await()
Назначение Side effects Параллельные вычисления
Исключения Всплывают к родителю Заморожены до await()
Use case Логирование, уведомления API вызовы, вычисления

Практические примеры

class OrderProcessor {
    // launch для независимых действий
    suspend fun processOrder(order: Order) {
        // Основная логика обработки заказа
        val processedOrder = validateAndProcessOrder(order)
        
        // Независимые side effects - не ждём их завершения
        launch { sendOrderConfirmation(order.customerEmail) }
        launch { updateInventory(order.items) }
        launch { logOrderProcessing(order.id) }
        launch { updateAnalytics(order) }
        
        // Возвращаем результат сразу
        return processedOrder
    }
    
    // async для агрегации данных
    suspend fun createOrderSummary(orderId: String): OrderSummary {
        // Параллельная загрузка связанных данных
        val orderData = async { orderRepository.findById(orderId) }
        val customerData = async { customerRepository.findById(orderData.await().customerId) }
        val paymentData = async { paymentService.getPaymentInfo(orderId) }
        val shippingData = async { shippingService.getShippingInfo(orderId) }
        
        // Ждём все результаты для создания summary
        return OrderSummary(
            order = orderData.await(),
            customer = customerData.await(),
            payment = paymentData.await(),
            shipping = shippingData.await()
        )
    }
}

Корутины vs RxJava/Project Reactor

Сравнение парадигм

Корутины представляют императивный подход к асинхронному программированию, в то время как RxJava/Reactor следуют функциональной реактивной парадигме.

// RxJava - функциональная цепочка
class RxUserService {
    fun loadUserProfile(userId: String): Single<UserProfile> {
        return userRepository.findById(userId)
            .flatMap { user ->
                ordersRepository.findByUserId(user.id)
                    .map { orders -> UserProfile(user, orders) }
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .timeout(30, TimeUnit.SECONDS)
            .retry(3)
    }
}

// Корутины - императивный подход
class CoroutineUserService {
    suspend fun loadUserProfile(userId: String): UserProfile = withContext(Dispatchers.IO) {
        withTimeout(30_000) {
            retry(3) {
                val user = userRepository.findById(userId)
                val orders = ordersRepository.findByUserId(user.id)
                UserProfile(user, orders)
            }
        }
    }
}

Преимущества корутин

1. Читаемость и отладка

// RxJava - сложно отлаживать
fun complexRxChain(): Observable<Result> {
    return dataSource1.loadData()
        .flatMap { data1 ->
            dataSource2.loadData(data1.id)
                .flatMap { data2 ->
                    dataSource3.processData(data1, data2)
                        .map { processedData ->
                            Result(data1, data2, processedData)
                        }
                }
        }
        .onErrorResumeNext { error ->
            handleError(error)
                .andThen(Observable.just(defaultResult))
        }
}

// Корутины - линейный код
suspend fun complexCoroutineChain(): Result {
    return try {
        val data1 = dataSource1.loadData()
        val data2 = dataSource2.loadData(data1.id)
        val processedData = dataSource3.processData(data1, data2)
        Result(data1, data2, processedData)
    } catch (error: Exception) {
        handleError(error)
        defaultResult
    }
}

2. Exception handling

// RxJava - сложная обработка ошибок
fun rxErrorHandling(): Single<String> {
    return networkCall()
        .onErrorResumeNext { throwable ->
            when (throwable) {
                is NetworkException -> cacheCall()
                is TimeoutException -> Single.error(UserFriendlyException("Timeout"))
                else -> Single.error(throwable)
            }
        }
        .doOnError { logError(it) }
}

// Корутины - естественная обработка
suspend fun coroutineErrorHandling(): String {
    return try {
        networkCall()
    } catch (e: NetworkException) {
        cacheCall()
    } catch (e: TimeoutException) {
        logError(e)
        throw UserFriendlyException("Timeout")
    } catch (e: Exception) {
        logError(e)
        throw e
    }
}

Преимущества RxJava/Reactor

1. Богатые операторы

// RxJava - мощные операторы для потоков
fun rxStreamProcessing(): Observable<ProcessedData> {
    return dataStream
        .debounce(300, TimeUnit.MILLISECONDS)  // защита от частых событий
        .distinctUntilChanged()               // только изменившиеся значения
        .flatMap { data ->
            processData(data).toObservable()
        }
        .scan { previous, current ->          // аккумуляция состояния
            combineData(previous, current)
        }
        .sample(1, TimeUnit.SECONDS)          // сэмплирование каждую секунду
}

// Корутины - требуют больше кода для той же логики
fun coroutineStreamProcessing(): Flow<ProcessedData> {
    return dataFlow
        .debounce(300)
        .distinctUntilChanged()
        .flatMapLatest { data ->
            flow { emit(processData(data)) }
        }
        .scan(initialValue) { previous, current ->
            combineData(previous, current)
        }
        .sample(1000)
}

2. Backpressure стратегии

// RxJava - встроенные стратегии backpressure
fun rxBackpressure(): Flowable<String> {
    return dataProducer
        .onBackpressureBuffer(1000)           // буферизация
        .onBackpressureDrop { dropped ->      // пропуск элементов
            log.warn("Dropped item: $dropped")
        }
        .observeOn(Schedulers.computation(), false, 128) // настройка буфера
}

Сравнительная таблица

Аспект Корутины RxJava/Reactor
Парадигма Императивная Функциональная
Читаемость Высокая Средняя
Отладка Простая Сложная
Exception handling Естественная Сложная
Операторы Базовые Богатые
Backpressure Автоматическая Ручная настройка
Learning curve Низкая Высокая
Performance Высокая Высокая

Когда использовать что

Корутины выбирать для:

  • Backend API — простота императивного кода
  • Mobile приложения — легкость понимания и отладки
  • Новые проекты — современный подход
  • Команды без RX опыта — низкий порог входа

RxJava/Reactor выбирать для:

  • Сложная stream обработка — богатые операторы
  • Legacy системы — уже используется RX
  • Специфичные backpressure требования — точный контроль
  • Команды с RX экспертизой — используем существующий опыт

Performance оптимизации

Правильное использование Dispatchers

// ❌ ПЛОХО - неэффективное использование
class BadPerformanceService {
    suspend fun processData(items: List<Item>): List<Result> {
        return items.map { item ->
            // Каждая операция создаёт новый context switch
            withContext(Dispatchers.Default) {
                heavyComputation(item)
            }
        }
    }
}

// ✅ ХОРОШО - батчевая обработка
class GoodPerformanceService {
    suspend fun processData(items: List<Item>): List<Result> = 
        withContext(Dispatchers.Default) {
            // Один context switch для всей обработки
            items.map { item ->
                heavyComputation(item)
            }
        }
}

Оптимизация создания корутин

// ❌ ПЛОХО - создание множества корутин
class IneffientProcessor {
    suspend fun processLargeDataset(data: List<DataItem>) {
        data.forEach { item ->
            launch { // создаём корутину для каждого элемента
                processItem(item)
            }
        }
    }
}

// ✅ ХОРОШО - батчевая обработка
class EfficientProcessor {
    suspend fun processLargeDataset(data: List<DataItem>) {
        data.chunked(100) // группы по 100 элементов
            .map { chunk ->
                async {
                    chunk.forEach { item ->
                        processItem(item) // обрабатываем в одной корутине
                    }
                }
            }
            .awaitAll()
    }
}

Ключевые вопросы для интервью

"Какие основные источники утечек в корутинах?"

  • GlobalScope — корутины переживают создавший их объект
  • Неконтролируемые scope — отсутствие явного управления жизненным циклом
  • Циклические ссылки — корутины держат ссылки на UI/контекст

"Как избежать заморозки UI в корутинах?"

  • Правильные Dispatchers — IO для сети/БД, Default для вычислений
  • withContext для переключения контекста тяжёлых операций
  • Избегать блокирующих операций в корутинах

"В чём разница между launch и async?"

  • launch: fire-and-forget, для side effects, исключения всплывают сразу
  • async: для получения результата, исключения при await()

"Когда выбрать корутины вместо RxJava?"

  • Простота кода — императивный стиль понятнее
  • Exception handling — естественная обработка ошибок
  • Отладка — стандартные инструменты работают
  • Team expertise — команда знакома с императивным программированием

"Какие performance оптимизации важны для корутин?"

  • Батчевая обработка — группировка операций для уменьшения overhead
  • Правильное использование Dispatchers — избегать ненужных переключений
  • Structured concurrency — эффективное управление ресурсами

Главное понимание: Корутины требуют осознанного подхода к управлению жизненным циклом и правильного выбора dispatchers для достижения максимальной производительности без ущерба для user experience.