Корутины и асинхронность
Основы корутин
Suspend функции
Suspend функции - специальные функции, которые могут быть приостановлены и возобновлены без блокировки потока. Основа асинхронного программирования в Kotlin.
// Suspend функция - может вызывать другие suspend функции
suspend fun fetchUser(id: Long): User {
delay(1000) // приостанавливает выполнение на 1 секунду
return userRepository.findById(id)
}
// Обычная функция НЕ может вызывать suspend функции напрямую
fun regularFunction() {
// fetchUser(1) // ОШИБКА - нужен coroutine scope
}
// Suspend функции можно вызывать только из:
// 1. Других suspend функций
// 2. Корутин (launch, async)
// 3. runBlocking (только для тестов/main)
CoroutineScope vs SupervisorScope
CoroutineScope и SupervisorScope определяют область жизни корутин и стратегию обработки ошибок.
// coroutineScope - все дочерние корутины связаны
suspend fun processData() = coroutineScope {
val job1 = launch { task1() }
val job2 = launch { task2() }
// Если task1 упадет с ошибкой, task2 тоже будет отменена
}
// supervisorScope - дочерние корутины независимы
suspend fun processDataIndependently() = supervisorScope {
val job1 = launch { task1() }
val job2 = launch { task2() }
// Если task1 упадет, task2 продолжит работу
}
// Практический пример: обработка списка независимых задач
suspend fun processUsers(users: List<User>) = supervisorScope {
users.map { user ->
launch {
try {
processUser(user) // если обработка одного пользователя упадет
} catch (e: Exception) {
logger.error("Failed to process user ${user.id}", e)
}
}
}.joinAll() // дождаться завершения всех задач
}
Билдеры корутин
Launch - "fire and forget"
Launch создает корутину, которая выполняется параллельно и не возвращает результат. Используется для side effects.
// Простой launch
val job = GlobalScope.launch {
println("Running in coroutine")
delay(1000)
println("Coroutine completed")
}
// Launch с обработкой ошибок
val job = CoroutineScope(Dispatchers.IO).launch {
try {
heavyOperation()
} catch (e: Exception) {
logger.error("Operation failed", e)
}
}
job.cancel() // отмена корутины
job.join() // ожидание завершения
Async - возврат результата
Async создает корутину, которая возвращает результат через Deferred<T>
. Используется когда нужен результат вычислений.
// Async для получения результата
suspend fun fetchUserData(userId: Long): UserData = coroutineScope {
val userDeferred = async { userService.getUser(userId) }
val profileDeferred = async { profileService.getProfile(userId) }
// await() дожидается результата
val user = userDeferred.await()
val profile = profileDeferred.await()
UserData(user, profile)
}
// Параллельное выполнение нескольких задач
suspend fun processMultipleUsers(ids: List<Long>): List<User> = coroutineScope {
ids.map { id ->
async { userService.getUser(id) } // запускаем все параллельно
}.awaitAll() // дожидаемся всех результатов
}
WithContext - смена контекста
WithContext переключает выполнение на другой контекст и возвращает результат.
suspend fun saveToDatabase(data: Data): Long = withContext(Dispatchers.IO) {
// Переключаемся на IO поток для работы с БД
database.save(data).id
}
suspend fun computeHash(data: ByteArray): String = withContext(Dispatchers.Default) {
// Переключаемся на CPU-интенсивный поток
MessageDigest.getInstance("SHA-256").digest(data).toHexString()
}
// Комбинирование контекстов
suspend fun processFile(file: File): ProcessResult {
val content = withContext(Dispatchers.IO) {
file.readText() // IO операция
}
val processed = withContext(Dispatchers.Default) {
heavyProcessing(content) // CPU операция
}
return withContext(Dispatchers.IO) {
saveResult(processed) // снова IO операция
}
}
Контексты выполнения (Dispatchers)
Типы Dispatchers
Dispatchers определяют на каких потоках будут выполняться корутины.
// Dispatchers.Default - CPU-интенсивные задачи
launch(Dispatchers.Default) {
val result = heavyComputation() // вычисления, сортировка, парсинг
}
// Dispatchers.IO - блокирующие I/O операции
launch(Dispatchers.IO) {
val data = database.query() // БД запросы
val response = httpClient.get() // HTTP запросы
val content = file.readText() // файловые операции
}
// Dispatchers.Unconfined - наследует поток вызывающего
launch(Dispatchers.Unconfined) {
println("Thread: ${Thread.currentThread().name}")
delay(100)
println("Thread after delay: ${Thread.currentThread().name}") // может измениться
}
// Dispatchers.Main - UI поток (Android/Desktop)
launch(Dispatchers.Main) {
updateUI() // обновление пользовательского интерфейса
}
Кастомные Dispatchers
// Ограниченный пул потоков для специфических задач
val dbDispatcher = Dispatchers.IO.limitedParallelism(5)
// Собственный Executor
val customDispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
launch(dbDispatcher) {
heavyDatabaseOperation()
}
// Не забывайте закрывать кастомные диспетчеры
customDispatcher.close()
Обработка исключений
Exception Propagation
Исключения в корутинах распространяются по иерархии и могут отменять связанные корутины.
// В coroutineScope исключение отменяет все дочерние корутины
suspend fun fragileOperation() = coroutineScope {
launch {
delay(100)
throw RuntimeException("Task 1 failed")
}
launch {
delay(200)
println("Task 2 completed") // НЕ выполнится - будет отменена
}
}
// В supervisorScope исключения изолированы
suspend fun resilientOperation() = supervisorScope {
launch {
delay(100)
throw RuntimeException("Task 1 failed")
}
launch {
delay(200)
println("Task 2 completed") // выполнится независимо от Task 1
}
}
Exception Handlers
// CoroutineExceptionHandler для необработанных исключений
val handler = CoroutineExceptionHandler { _, exception ->
logger.error("Uncaught exception", exception)
}
// Применение на уровне scope
val scope = CoroutineScope(Dispatchers.Default + handler)
scope.launch {
throw RuntimeException("This will be caught by handler")
}
// Try-catch в корутинах
suspend fun safeOperation() {
try {
riskyOperation()
} catch (e: CustomException) {
handleSpecificException(e)
} catch (e: Exception) {
logger.error("Unexpected error", e)
throw e // пробрасываем дальше если нужно
}
}
Structured Concurrency
Принципы структурированного параллелизма
Structured Concurrency обеспечивает предсказуемое управление жизненным циклом корутин через иерархию scope'ов.
class UserService {
private val serviceScope = CoroutineScope(
Dispatchers.Default + SupervisorJob() +
CoroutineExceptionHandler { _, e -> logger.error("Service error", e) }
)
suspend fun processUser(userId: Long) = serviceScope.async {
// Все дочерние корутины будут отменены при отмене родительской
val userData = fetchUserData(userId)
val notifications = fetchNotifications(userId)
UserProcessingResult(userData, notifications)
}
fun shutdown() {
serviceScope.cancel() // отменяет все активные корутины
}
}
// Автоматическая отмена при выходе из scope
suspend fun processWithTimeout() = withTimeout(5000) {
// Если операция займет больше 5 секунд, все дочерние корутины будут отменены
coroutineScope {
launch { longOperation1() }
launch { longOperation2() }
}
}
Flow, Channel, SharedFlow/StateFlow
Flow - холодные асинхронные потоки
Flow - холодный поток данных, который начинает эмиссию только при подписке.
// Создание Flow
fun numbersFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(1000)
emit(i) // эмиссия значения
}
}
// Операторы трансформации
fun processNumbers(): Flow<String> = numbersFlow()
.filter { it % 2 == 0 } // фильтрация
.map { "Number: $it" } // трансформация
.flowOn(Dispatchers.Default) // смена контекста
// Потребление Flow
suspend fun consumeFlow() {
processNumbers().collect { value ->
println(value) // обработка каждого элемента
}
}
// Flow для работы с БД
fun userUpdatesFlow(userId: Long): Flow<User> = flow {
while (currentCoroutineContext().isActive) {
emit(userRepository.findById(userId))
delay(30000) // обновление каждые 30 секунд
}
}
Channel - горячие каналы связи
Channel - горячий канал для передачи данных между корутинами с буферизацией.
// Создание канала
val channel = Channel<String>(capacity = Channel.BUFFERED)
// Producer корутина
launch {
for (i in 1..5) {
channel.send("Message $i")
delay(1000)
}
channel.close() // закрытие канала
}
// Consumer корутина
launch {
for (message in channel) { // итерация до закрытия
println("Received: $message")
}
}
// Типы каналов по емкости
val unlimited = Channel<Int>(Channel.UNLIMITED) // без ограничений
val conflated = Channel<Int>(Channel.CONFLATED) // только последнее значение
val rendezvous = Channel<Int>(Channel.RENDEZVOUS) // синхронная передача
SharedFlow и StateFlow - горячие потоки
SharedFlow - горячий поток для broadcasting событий множественным подписчикам. StateFlow - специализированный SharedFlow для состояния с conflation.
// SharedFlow для событий
class EventBus {
private val _events = MutableSharedFlow<Event>(
replay = 0, // количество кешируемых событий
extraBufferCapacity = 64 // дополнительный буфер
)
val events: SharedFlow<Event> = _events.asSharedFlow()
fun publishEvent(event: Event) {
_events.tryEmit(event)
}
}
// StateFlow для состояния
class UserViewModel {
private val _userState = MutableStateFlow<UserState>(UserState.Loading)
val userState: StateFlow<UserState> = _userState.asStateFlow()
suspend fun loadUser(id: Long) {
_userState.value = UserState.Loading
try {
val user = userRepository.getUser(id)
_userState.value = UserState.Success(user)
} catch (e: Exception) {
_userState.value = UserState.Error(e.message)
}
}
}
// Подписка на StateFlow
launch {
userViewModel.userState.collect { state ->
when (state) {
is UserState.Loading -> showLoader()
is UserState.Success -> displayUser(state.user)
is UserState.Error -> showError(state.message)
}
}
}
Сравнение Flow vs Channel vs SharedFlow/StateFlow
Тип | Холодный/Горячий | Подписчики | Использование |
---|---|---|---|
Flow | Холодный | Один | Последовательная обработка данных |
Channel | Горячий | Один-к-одному | Связь между корутинами |
SharedFlow | Горячий | Множественные | События, уведомления |
StateFlow | Горячий | Множественные | Состояние UI, кеширование |
// Выбор правильного инструмента:
// Flow - для трансформации данных
fun processFileFlow(file: File): Flow<ProcessedLine> = flow {
file.useLines { lines ->
lines.forEach { line ->
emit(processLine(line))
}
}
}
// Channel - для producer-consumer паттерна
suspend fun processWithWorkers() {
val workChannel = Channel<Work>(100)
val resultChannel = Channel<Result>(100)
// Workers
repeat(5) {
launch { worker(workChannel, resultChannel) }
}
// Producer
launch { produceWork(workChannel) }
// Results collector
launch { collectResults(resultChannel) }
}
// SharedFlow - для событий системы
class NotificationService {
private val _notifications = MutableSharedFlow<Notification>()
val notifications = _notifications.asSharedFlow()
fun sendNotification(notification: Notification) {
_notifications.tryEmit(notification)
}
}
// StateFlow - для состояния приложения
class AppState {
private val _currentUser = MutableStateFlow<User?>(null)
val currentUser = _currentUser.asStateFlow()
fun setUser(user: User) {
_currentUser.value = user
}
}
Ключевые моменты для собеседования:
- Suspend функции не блокируют потоки, а приостанавливают выполнение
- CoroutineScope vs SupervisorScope определяют стратегию обработки ошибок
- Dispatchers выбираются по типу задач: IO для блокирующих операций, Default для CPU
- Structured Concurrency обеспечивает предсказуемое управление жизненным циклом
- Flow холодный и один-к-одному, SharedFlow/StateFlow горячие и один-ко-многим
Основы корутин
Что такое корутина?
Корутина (Coroutine) — это легковесная единица асинхронного выполнения, которая может быть приостановлена (suspended) и возобновлена позже без блокировки потока. Корутины позволяют писать асинхронный код в последовательном стиле.
Ключевые концепции:
- Suspension — возможность приостановить выполнение функции
- Non-blocking — не блокируют поток во время ожидания
- Structured concurrency — корутины организованы в иерархию с автоматической отменой
- Cooperative multitasking — корутины добровольно отдают управление
// Простая корутина
suspend fun fetchUser(id: String): User {
delay(1000) // приостанавливает корутину, НЕ блокирует поток
return userRepository.findById(id)
}
Отличия корутин от потоков
Корутины как "Lightweight Threads"
Аспект | Thread | Coroutine |
---|---|---|
Вес | ~1-8 MB стека | ~100 байт объекта |
Создание | Дорогое (системный вызов) | Очень дешёвое |
Переключение контекста | Дорогое (~µs) | Быстрое (~ns) |
Количество | Тысячи | Миллионы |
Управление | OS Scheduler | Корутинный диспетчер |
Практическое сравнение
// Потоки — дорого и ограниченно
repeat(100_000) {
thread {
Thread.sleep(1000)
println("Thread $it")
}
} // OutOfMemoryError!
// Корутины — легко и масштабируемо
repeat(100_000) {
GlobalScope.launch {
delay(1000) // не блокирует поток
println("Coroutine $it")
}
} // Работает отлично!
Почему корутины легковесные:
- Стек на куче — нет выделения OS стека
- Кооперативная многозадачность — нет preemptive switching
- Один поток может выполнять тысячи корутин
- Suspension points — приостановка только в определённых местах
Сравнение с классическими подходами
Thread vs Coroutine
// Thread — блокирующий подход
class ThreadExample {
fun processRequests() {
repeat(1000) { id ->
thread {
val result = blockingHttpCall(id) // блокирует поток
processResult(result)
}
}
}
}
// Coroutine — неблокирующий подход
class CoroutineExample {
suspend fun processRequests() {
repeat(1000) { id ->
launch {
val result = suspendingHttpCall(id) // не блокирует поток
processResult(result)
}
}
}
}
Ключевые отличия:
- Thread: 1 поток = 1 задача, блокировка при I/O
- Coroutine: 1 поток = множество корутин, suspension при I/O
ExecutorService vs CoroutineScope
// ExecutorService — императивное управление потоками
class ExecutorExample {
private val executor = Executors.newFixedThreadPool(10)
fun processData(): CompletableFuture<String> {
return CompletableFuture.supplyAsync({
// сложная цепочка callbacks
fetchDataFromApi()
.thenCompose { data ->
processData(data)
}.thenApply { result ->
formatResult(result)
}
}, executor)
}
fun shutdown() {
executor.shutdown() // ручное управление
}
}
// CoroutineScope — декларативные корутины
class CoroutineExample : CoroutineScope {
override val coroutineContext = Dispatchers.IO + SupervisorJob()
suspend fun processData(): String {
// линейный код, как синхронный
val data = fetchDataFromApi()
val processed = processData(data)
return formatResult(processed)
}
fun cleanup() {
coroutineContext.cancel() // автоматическая отмена всех дочерних корутин
}
}
Преимущества корутин:
- Читаемость — линейный код вместо callback hell
- Structured concurrency — автоматическое управление жизненным циклом
- Exception handling — естественная обработка исключений
- Cancellation — кооперативная отмена работы
Базовые строительные блоки
Suspend функции
// suspend функция может быть приостановлена
suspend fun loadUserData(userId: String): UserData {
val profile = loadProfile(userId) // может приостановиться
val settings = loadSettings(userId) // может приостановиться
return UserData(profile, settings)
}
// Обычная функция НЕ может вызывать suspend функции
fun regularFunction() {
// loadUserData() // ОШИБКА! Нужен корутинный контекст
}
Важно: suspend
— это маркер для компилятора, что функция может быть приостановлена
Корутинные билдеры
// launch — fire-and-forget, возвращает Job
val job = GlobalScope.launch {
doSomething()
} // Unit
// async — возвращает Deferred<T> для получения результата
val deferred = GlobalScope.async {
computeSomething()
} // Deferred<String>
val result = deferred.await()
// runBlocking — блокирует текущий поток (только для тестов/main)
runBlocking {
val result = suspendingOperation()
println(result)
}
Dispatchers — где выполняются корутины
// Main — UI поток (Android/Desktop)
launch(Dispatchers.Main) {
updateUI(data)
}
// IO — операции ввода-вывода (сеть, файлы, БД)
launch(Dispatchers.IO) {
val data = loadFromDatabase()
}
// Default — CPU-интенсивные задачи
launch(Dispatchers.Default) {
val result = heavyComputation()
}
// Unconfined — продолжает в том же потоке (осторожно!)
launch(Dispatchers.Unconfined) {
quickOperation()
}
Structured Concurrency
Иерархия корутин
class UserService : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.IO + job
suspend fun loadUserDashboard(userId: String): Dashboard {
// Все корутины автоматически отменятся при отмене родительской
val profile = async { loadProfile(userId) }
val notifications = async { loadNotifications(userId) }
val recommendations = async { loadRecommendations(userId) }
return Dashboard(
profile = profile.await(),
notifications = notifications.await(),
recommendations = recommendations.await()
)
}
fun shutdown() {
job.cancel() // отменяет ВСЕ дочерние корутины
}
}
Принципы Structured Concurrency:
- Родительская корутина ждёт завершения всех дочерних
- Отмена распространяется сверху вниз
- Исключения всплывают снизу вверх
- Автоматическая очистка ресурсов
Обработка ошибок
// SupervisorJob — изолирует ошибки дочерних корутин
val supervisor = SupervisorJob()
val scope = CoroutineScope(Dispatchers.IO + supervisor)
scope.launch {
try {
val result1 = async { riskyOperation1() }
val result2 = async { riskyOperation2() } // может упасть
// Если result2 упадёт, result1 продолжит работу
processResults(result1.await(), result2.await())
} catch (e: Exception) {
handleError(e)
}
}
Практические паттерны для Backend
Параллельная обработка запросов
@RestController
class UserController(
private val userService: UserService
) : CoroutineScope {
override val coroutineContext = Dispatchers.IO + SupervisorJob()
@GetMapping("/users/{id}/details")
suspend fun getUserDetails(@PathVariable id: String): UserDetails {
// Параллельное выполнение независимых операций
val userDeferred = async { userService.findById(id) }
val ordersDeferred = async { orderService.findByUserId(id) }
val preferencesDeferred = async { preferencesService.findByUserId(id) }
return UserDetails(
user = userDeferred.await(),
orders = ordersDeferred.await(),
preferences = preferencesDeferred.await()
)
}
}
Batch обработка
suspend fun processBatchOfUsers(userIds: List<String>) {
userIds.chunked(10) // группы по 10
.forEach { batch ->
batch.map { userId ->
async { processUser(userId) } // параллельно внутри группы
}.awaitAll() // ждём завершения группы
}
}
Timeout и отмена
suspend fun fetchDataWithTimeout(): ApiResponse = withTimeout(5000) {
val data = apiClient.fetchData() // автоматически отменится через 5 сек
processData(data)
}
// Кооперативная отмена
suspend fun longRunningTask() {
repeat(1000) { iteration ->
ensureActive() // проверяет, не отменена ли корутина
doSomeWork(iteration)
delay(10) // suspension point — место для отмены
}
}
Ключевые отличия для интервью
Memory Model
// Thread — каждый поток имеет свой стек
Thread.start() // ~1-8 MB памяти
// Coroutine — объект в куче
launch { } // ~100 байт
Blocking vs Suspension
// Blocking — поток заблокирован и не может делать другую работу
Thread.sleep(1000) // поток простаивает
// Suspension — поток освобождается для других корутин
delay(1000) // поток выполняет другие корутины
Scalability
// Thread pool — ограниченное количество потоков
val executor = Executors.newFixedThreadPool(100) // максимум 100 одновременных задач
// Coroutines — практически неограниченная масштабируемость
repeat(1_000_000) { launch { } } // миллион корутин на нескольких потоках
Частые вопросы на собеседовании
"Как корутины решают проблему callback hell?"
// Callback hell
fetchUser(userId) { user ->
fetchOrders(user.id) { orders ->
fetchPayments(orders) { payments ->
updateUI(user, orders, payments)
}
}
}
// Корутины — линейный код
val user = fetchUser(userId)
val orders = fetchOrders(user.id)
val payments = fetchPayments(orders)
updateUI(user, orders, payments)
"Чем отличается launch от async?"
- launch: fire-and-forget, возвращает Job для управления
- async: возвращает Deferred
для получения результата
"Как работает Structured Concurrency?"
- Родительская корутина контролирует жизненный цикл дочерних
- Автоматическая отмена при завершении родителя
- Распространение исключений вверх по иерархии
"Производительность корутин vs потоков?"
- Создание: корутины в 1000х раз быстрее
- Memory overhead: корутины в 1000х меньше
- Context switching: корутины на порядки быстрее
- Throughput: корутины поддерживают миллионы одновременных операций
Главное преимущество: Корутины позволяют писать высокопроизводительный асинхронный код в синхронном стиле, что критично для современных backend систем с высокой нагрузкой.
Ключевые строительные блоки корутин
Suspend функции
Suspend функция — это функция, которая может быть приостановлена и возобновлена позже без блокировки потока. Ключевое слово suspend
указывает компилятору, что функция может приостанавливать выполнение в suspension points.
Базовые принципы
// suspend функция может вызывать другие suspend функции
suspend fun fetchUserData(id: String): UserData {
val profile = fetchProfile(id) // suspension point
val settings = fetchSettings(id) // suspension point
return UserData(profile, settings)
}
// Обычная функция НЕ может вызывать suspend функции напрямую
fun normalFunction() {
// fetchUserData("123") // ОШИБКА компиляции!
}
Что происходит под капотом:
- Компилятор преобразует
suspend
функцию в state machine - В каждом suspension point функция может сохранить состояние и освободить поток
- При возобновлении состояние восстанавливается и выполнение продолжается
Suspension Points
suspend fun processData(): String {
println("Начало обработки")
delay(1000) // suspension point - корутина приостанавливается
println("Обработка завершена")
return apiCall() // suspension point - если apiCall() suspend
}
Важно: Приостановка происходит только в явных suspension points — вызовах других suspend функций.
CoroutineScope
CoroutineScope — это интерфейс, который определяет область жизни корутин. Обеспечивает structured concurrency — корутины организованы в иерархию с автоматическим управлением жизненным циклом.
Основные реализации
// GlobalScope — живёт всё время жизни приложения (осторожно!)
GlobalScope.launch {
// может пережить Activity/Fragment
}
// ViewModelScope — автоматически отменяется при очистке ViewModel
class UserViewModel : ViewModel() {
fun loadData() {
viewModelScope.launch {
// автоматически отменится при onCleared()
}
}
}
// Кастомный scope с контролируемым жизненным циклом
class UserService : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.IO + job
fun cleanup() {
job.cancel() // отменяет ВСЕ дочерние корутины
}
}
Принципы Structured Concurrency:
- Родительская корутина ждёт завершения всех дочерних
- Отмена распространяется от родителя к детям
- Исключения всплывают от детей к родителю
- Автоматическая очистка при завершении scope
Корутинные билдеры
launch - Fire and Forget
// launch возвращает Job для управления корутиной
val job: Job = scope.launch {
repeat(1000) {
doWork(it)
delay(100)
}
}
// Управление выполнением
job.cancel() // отменить корутину
job.join() // дождаться завершения
job.isActive // проверить статус
Когда использовать launch:
- Side effects — обновление UI, логирование, отправка уведомлений
- Fire-and-forget операции
- Когда результат выполнения не нужен
async - Параллельное выполнение с результатом
// async возвращает Deferred<T> для получения результата
val userDeferred: Deferred<User> = scope.async { fetchUser(id) }
val ordersDeferred: Deferred<List<Order>> = scope.async { fetchOrders(id) }
// Получение результатов (параллельно загружались)
val user = userDeferred.await()
val orders = ordersDeferred.await()
Когда использовать async:
- Параллельные вычисления с возвратом результата
- Композиция результатов из нескольких источников
- Когда нужно дождаться результата выполнения
runBlocking - Мост между мирами
// Блокирует текущий поток до завершения всех корутин
fun main() = runBlocking {
val result = async { heavyComputation() }
println("Результат: ${result.await()}")
}
// В тестах
@Test
fun testSuspendFunction() = runBlocking {
val result = suspendingFunction()
assertEquals(expected, result)
}
Когда использовать runBlocking:
- Точка входа в корутинный мир (main функция)
- Тесты suspend функций
- Интеграция с legacy кодом
Важно: НЕ используйте runBlocking в production коде — блокирует поток!
Job и Deferred
Job - Управление жизненным циклом
val job = launch {
try {
longRunningTask()
} catch (e: CancellationException) {
cleanup() // выполняется при отмене
throw e // важно перебросить
}
}
// Управление Job
job.cancel() // отменить
job.cancelAndJoin() // отменить и дождаться завершения
job.join() // дождаться завершения без отмены
// Состояния Job
job.isActive // выполняется ли
job.isCancelled // отменена ли
job.isCompleted // завершена ли
Deferred - Job с результатом
val deferred: Deferred<String> = async {
expensiveComputation()
}
// Получение результата
val result = deferred.await() // приостанавливает до получения результата
val resultOrNull = deferred.getCompleted() // не приостанавливает, может бросить исключение
// Deferred наследует Job
deferred.cancel() // можно отменить
deferred.join() // можно дождаться завершения
Ключевые отличия:
- Job: управление выполнением, без результата
- Deferred: Job + возможность получить результат через
await()
CoroutineContext
CoroutineContext — это индексированный набор элементов, который определяет контекст выполнения корутины: диспетчер, Job, обработчик исключений и другие параметры.
Композиция контекста
// Элементы контекста
val context = Dispatchers.IO + // диспетчер
SupervisorJob() + // Job
CoroutineName("UserService") + // имя для отладки
CoroutineExceptionHandler { _, ex -> // обработчик ошибок
log.error("Coroutine failed", ex)
}
// Создание scope с контекстом
val scope = CoroutineScope(context)
Наследование и переопределение
class ApiService : CoroutineScope {
override val coroutineContext = Dispatchers.IO + SupervisorJob()
suspend fun fetchData(): Data {
// Наследует IO dispatcher от родительского контекста
val preliminaryData = loadFromCache()
// Переключается на Default для CPU-интенсивной работы
return withContext(Dispatchers.Default) {
processData(preliminaryData)
}
}
}
Элементы CoroutineContext:
- Dispatcher — где выполняется корутина
- Job — управление жизненным циклом
- CoroutineName — имя для отладки
- CoroutineExceptionHandler — обработка необработанных исключений
withContext и смена контекста
withContext - Временное переключение контекста
suspend fun processUserData(userData: String): ProcessedData =
withContext(Dispatchers.Default) {
// CPU-интенсивная обработка в Default dispatcher
heavyDataProcessing(userData)
} // автоматически возвращается в исходный контекст
suspend fun saveToDatabase(data: Data) {
withContext(Dispatchers.IO) {
// I/O операция в IO dispatcher
database.save(data)
}
}
Ключевые особенности:
- Возвращает результат — в отличие от launch/async
- Приостанавливает вызывающую корутину
- Автоматически возвращается в исходный контекст
- Thread-safe переключение между диспетчерами
Dispatchers - Где выполняются корутины
Типы диспетчеров
// Main — основной поток (UI в Android/Desktop)
launch(Dispatchers.Main) {
updateUI(data) // обновление интерфейса
progressBar.visible = false
}
// IO — операции ввода-вывода (сеть, файлы, БД)
launch(Dispatchers.IO) {
val data = database.findAll() // чтение из БД
val response = apiClient.fetch() // HTTP запрос
writeToFile(data) // запись в файл
}
// Default — CPU-интенсивные задачи
launch(Dispatchers.Default) {
val result = complexCalculation() // математические вычисления
val processed = sortLargeList() // сортировка больших данных
val compressed = compressData() // сжатие данных
}
// Unconfined — продолжает в том же потоке (осторожно!)
launch(Dispatchers.Unconfined) {
quickOperation() // выполняется в текущем потоке
delay(100) // после suspension может оказаться в другом потоке!
}
Характеристики диспетчеров
Dispatcher | Количество потоков | Назначение | Примеры использования |
---|---|---|---|
Main | 1 (UI поток) | UI операции | Обновление интерфейса |
IO | 64+ (expandable) | I/O операции | БД, сеть, файлы |
Default | CPU cores | CPU-задачи | Вычисления, обработка |
Unconfined | Любой | Специальные случаи | Тесты, legacy код |
Практические паттерны
class UserRepository {
// Чтение из БД — IO dispatcher
suspend fun findUser(id: String): User = withContext(Dispatchers.IO) {
database.findById(id)
}
// Обработка данных — Default dispatcher
suspend fun processUserStats(users: List<User>): Statistics =
withContext(Dispatchers.Default) {
users.groupBy { it.region }
.mapValues { it.value.size }
.let { Statistics(it) }
}
}
class UserController {
suspend fun getUserDashboard(id: String): UserDashboard {
// Параллельные I/O операции
val user = async(Dispatchers.IO) { userRepo.findUser(id) }
val orders = async(Dispatchers.IO) { orderRepo.findByUserId(id) }
val stats = async(Dispatchers.Default) {
calculateUserStats(id) // CPU-интенсивная задача
}
return UserDashboard(
user = user.await(),
orders = orders.await(),
stats = stats.await()
)
}
}
Структурированная параллельность в действии
Комбинирование всех элементов
class OrderService : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext =
Dispatchers.IO + supervisorJob + CoroutineName("OrderService")
suspend fun processOrder(orderId: String): OrderResult =
withContext(coroutineContext) {
try {
// Параллельная загрузка данных
val order = async { orderRepo.findById(orderId) }
val user = async { userRepo.findById(order.await().userId) }
val inventory = async(Dispatchers.IO) {
inventoryService.checkAvailability(orderId)
}
// CPU-интенсивная обработка
val calculations = async(Dispatchers.Default) {
calculateOrderTotal(order.await(), inventory.await())
}
// Результат
OrderResult(
order = order.await(),
user = user.await(),
total = calculations.await()
)
} catch (e: Exception) {
handleOrderError(orderId, e)
throw e
}
}
fun shutdown() {
supervisorJob.cancel() // отменяет все дочерние корутины
}
}
Ключевые вопросы для интервью
"Как работает suspension?"
- Компилятор создаёт state machine из suspend функции
- В suspension points сохраняется состояние локальных переменных
- Поток освобождается для выполнения других корутин
- При возобновлении состояние восстанавливается
"Чем отличается launch от async?"
- launch: fire-and-forget, возвращает Job, для side effects
- async: возвращает Deferred
, для получения результата
"Когда использовать какой Dispatcher?"
- IO: сеть, БД, файлы (может блокироваться)
- Default: CPU-задачи (количество потоков = CPU cores)
- Main: UI операции (один поток)
"Что такое Structured Concurrency?"
- Иерархия корутин с автоматическим управлением
- Отмена родителя отменяет всех детей
- Исключения детей всплывают к родителю
- Автоматическая очистка ресурсов
Главное понимание: Эти строительные блоки работают вместе для создания высокопроизводительных, структурированных и безопасных асинхронных приложений, что критично для современной backend разработки.
Структурная конкуренция
Что такое Structured Concurrency?
Structured Concurrency (структурная конкуренция) — это парадигма программирования, которая организует асинхронные операции в строгую иерархическую структуру, где время жизни дочерних операций ограничено временем жизни родительских.
Основные принципы:
- Иерархия корутин — чёткое родительско-дочернее отношение
- Автоматическая отмена — отмена родителя отменяет всех детей
- Распространение исключений — ошибки детей всплывают к родителю
- Завершение — родитель ждёт завершения всех дочерних операций
// Structured concurrency в действии
suspend fun fetchUserDashboard(userId: String) = coroutineScope {
// Все корутины автоматически отменятся, если функция будет отменена
val profile = async { fetchProfile(userId) }
val orders = async { fetchOrders(userId) }
val recommendations = async { fetchRecommendations(userId) }
Dashboard(
profile = profile.await(),
orders = orders.await(),
recommendations = recommendations.await()
)
// Функция завершится только когда ВСЕ дочерние корутины завершатся
}
Зачем нужна структурная конкуренция?
Проблемы неструктурированного подхода
// ❌ Неструктурированный подход - проблемы
class BadUserService {
fun loadUserData(userId: String) {
GlobalScope.launch { fetchProfile(userId) } // может "утечь"
GlobalScope.launch { fetchOrders(userId) } // никто не контролирует
GlobalScope.launch { fetchNotifications(userId) } // нет связи между операциями
// Проблемы:
// 1. Невозможно отменить операции
// 2. Нет гарантии завершения
// 3. Утечки памяти
// 4. Нет обработки ошибок
}
}
Решение через Structured Concurrency
// ✅ Структурированный подход - решение
class GoodUserService : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.IO + job
suspend fun loadUserData(userId: String): UserData = coroutineScope {
val profile = async { fetchProfile(userId) }
val orders = async { fetchOrders(userId) }
val notifications = async { fetchNotifications(userId) }
UserData(
profile = profile.await(),
orders = orders.await(),
notifications = notifications.await()
)
}
fun cleanup() {
job.cancel() // отменяет ВСЕ операции
}
}
Преимущества Structured Concurrency
1. Предсказуемое управление ресурсами
suspend fun processFiles(files: List<File>) = coroutineScope {
files.map { file ->
async {
val data = readFile(file) // открывает ресурс
processData(data)
}
}.awaitAll()
// Гарантия: все файлы будут закрыты, даже при отмене
}
2. Автоматическая обработка ошибок
suspend fun fetchAllData() = try {
coroutineScope {
val task1 = async { riskyOperation1() }
val task2 = async { riskyOperation2() } // может упасть
val task3 = async { riskyOperation3() }
// Если task2 упадёт, task1 и task3 автоматически отменятся
Triple(task1.await(), task2.await(), task3.await())
}
} catch (e: Exception) {
handleError(e) // единая точка обработки ошибок
}
3. Простота отмены операций
class DownloadManager : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.IO + job
fun startDownloads(urls: List<String>) = launch {
urls.forEach { url ->
launch { downloadFile(url) } // все привязаны к родительскому scope
}
}
fun cancelAllDownloads() {
job.cancel() // одна команда отменяет все загрузки
}
}
CoroutineScope + SupervisorJob
Обычный Job vs SupervisorJob
// Обычный Job - ошибка одного ребёнка отменяет всех
class RegularJobExample : CoroutineScope {
override val coroutineContext = Dispatchers.IO + Job()
fun processData() = launch {
val task1 = launch { reliableOperation() }
val task2 = launch { riskyOperation() } // упадёт
val task3 = launch { anotherOperation() }
// Если task2 упадёт, task1 и task3 тоже отменятся!
}
}
// SupervisorJob - изолирует ошибки детей
class SupervisorJobExample : CoroutineScope {
override val coroutineContext = Dispatchers.IO + SupervisorJob()
fun processData() = launch {
val task1 = launch { reliableOperation() }
val task2 = launch { riskyOperation() } // упадёт
val task3 = launch { anotherOperation() }
// task2 упадёт, но task1 и task3 продолжат работу
}
}
Практические паттерны с SupervisorJob
1. Микросервисная архитектура
class ApiGateway : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext = Dispatchers.IO + supervisorJob
suspend fun fetchDashboardData(userId: String): DashboardData {
// Каждый сервис независим - падение одного не влияет на другие
val userService = async {
try { userServiceClient.getUser(userId) }
catch (e: Exception) { null } // fallback
}
val orderService = async {
try { orderServiceClient.getOrders(userId) }
catch (e: Exception) { emptyList() } // fallback
}
val notificationService = async {
try { notificationServiceClient.getNotifications(userId) }
catch (e: Exception) { emptyList() } // fallback
}
return DashboardData(
user = userService.await(),
orders = orderService.await(),
notifications = notificationService.await()
)
}
}
2. Background обработка
class BackgroundTaskManager : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext = Dispatchers.Default + supervisorJob
fun startBackgroundTasks() {
// Каждая задача независима
launch {
while (isActive) {
cleanupOldData()
delay(TimeUnit.HOURS.toMillis(1))
}
}
launch {
while (isActive) {
generateReports()
delay(TimeUnit.HOURS.toMillis(6))
}
}
launch {
while (isActive) {
syncWithExternalSystem()
delay(TimeUnit.MINUTES.toMillis(30))
}
}
}
}
coroutineScope vs supervisorScope
coroutineScope - строгая иерархия
suspend fun fetchCriticalData() = coroutineScope {
// Все операции критичны - если одна упадёт, отменяем всё
val userData = async { fetchUser() } // критично
val accountData = async { fetchAccount() } // критично
val securityData = async { fetchSecurity() } // критично
CriticalData(
user = userData.await(),
account = accountData.await(),
security = securityData.await()
)
// Если любая операция упадёт, все остальные отменятся
}
supervisorScope - изолированные дочерние операции
suspend fun fetchOptionalData() = supervisorScope {
// Операции независимы - ошибка одной не влияет на другие
val profile = async {
try { fetchProfile() }
catch (e: Exception) { DefaultProfile }
}
val preferences = async {
try { fetchPreferences() }
catch (e: Exception) { DefaultPreferences }
}
val recommendations = async {
try { fetchRecommendations() }
catch (e: Exception) { emptyList() }
}
OptionalData(
profile = profile.await(),
preferences = preferences.await(),
recommendations = recommendations.await()
)
// Каждая операция обрабатывает свои ошибки независимо
}
Сравнительная таблица
Аспект | coroutineScope | supervisorScope |
---|---|---|
Ошибка дочерней корутины | Отменяет всех siblings | Изолирована от siblings |
Обработка исключений | Всплывает к родителю | Требует явную обработку |
Использование | Критичные операции | Независимые операции |
Fallback стратегия | Всё или ничего | Частичный успех |
Практические примеры выбора
Когда использовать coroutineScope:
// Финансовая транзакция - всё или ничего
suspend fun processPayment(payment: Payment) = coroutineScope {
val validation = async { validatePayment(payment) }
val authorization = async { authorizePayment(payment) }
val processing = async { processTransaction(payment) }
// Если любой шаг упадёт, вся транзакция отменяется
PaymentResult(
validation.await(),
authorization.await(),
processing.await()
)
}
Когда использовать supervisorScope:
// Загрузка dashboard - показываем что получилось
suspend fun loadDashboard(userId: String) = supervisorScope {
val weather = async {
try { weatherService.getCurrentWeather() }
catch (e: Exception) { "Погода недоступна" }
}
val news = async {
try { newsService.getLatestNews() }
catch (e: Exception) { emptyList() }
}
val stocks = async {
try { stockService.getPortfolio(userId) }
catch (e: Exception) { emptyList() }
}
// Показываем то, что удалось загрузить
Dashboard(weather.await(), news.await(), stocks.await())
}
Продвинутые паттерны
Комбинирование подходов
class OrderProcessingService : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext = Dispatchers.IO + supervisorJob
suspend fun processOrder(order: Order): OrderResult = supervisorScope {
// Критичная часть - обработка заказа
val coreProcessing = async {
coroutineScope { // строгая иерархия для критичных операций
val validation = async { validateOrder(order) }
val inventory = async { reserveInventory(order) }
val payment = async { processPayment(order) }
CoreOrderData(
validation.await(),
inventory.await(),
payment.await()
)
}
}
// Дополнительные сервисы - могут упасть без критичных последствий
val notifications = async {
try { sendOrderNotification(order) }
catch (e: Exception) { log.warn("Notification failed", e); null }
}
val analytics = async {
try { trackOrderEvent(order) }
catch (e: Exception) { log.warn("Analytics failed", e); null }
}
val recommendations = async {
try { updateRecommendations(order) }
catch (e: Exception) { log.warn("Recommendations failed", e); null }
}
OrderResult(
core = coreProcessing.await(),
notificationSent = notifications.await() != null,
analyticsTracked = analytics.await() != null,
recommendationsUpdated = recommendations.await() != null
)
}
}
Exception Handling Strategy
class DataAggregationService {
suspend fun aggregateUserData(userId: String): AggregatedData =
supervisorScope {
val results = listOf(
async { fetchFromSource1(userId) },
async { fetchFromSource2(userId) },
async { fetchFromSource3(userId) }
).map { deferred ->
try {
deferred.await()
} catch (e: Exception) {
log.warn("Data source failed", e)
null // fallback значение
}
}
AggregatedData(
source1Data = results[0],
source2Data = results[1],
source3Data = results[2],
completeness = results.count { it != null } / results.size.toDouble()
)
}
}
Ключевые вопросы для интервью
"Зачем нужна Structured Concurrency?"
- Предсказуемость: чёткие границы жизненного цикла операций
- Надёжность: автоматическая очистка ресурсов
- Простота отладки: иерархическая структура легче для понимания
- Безопасность: нет утечек памяти и ресурсов
"Когда использовать SupervisorJob?"
- Независимые операции: ошибка одной не должна влиять на другие
- Микросервисная архитектура: каждый сервис обрабатывает свои ошибки
- Background задачи: изоляция различных фоновых процессов
"Разница между coroutineScope и supervisorScope?"
- coroutineScope: строгая иерархия, "всё или ничего"
- supervisorScope: изолированные дочерние операции, "частичный успех"
"Как обрабатывать исключения в structured concurrency?"
- Regular scope: исключения всплывают автоматически
- Supervisor scope: требует явную обработку в каждой корутине
- Комбинированный подход: критичные операции в regular, опциональные в supervisor
Главное понимание: Structured Concurrency обеспечивает предсказуемое и надёжное управление асинхронными операциями, что критично для production систем с высокими требованиями к стабильности.
Отмена и обработка исключений в корутинах
Механизм отмены корутин
Отмена корутин — это кооперативный процесс, где корутина добровольно проверяет свой статус и прекращает работу. В отличие от Thread.interrupt(), отмена корутин безопасна и предсказуема.
Ключевые принципы отмены
- Кооперативность — корутина сама решает, когда остановиться
- Безопасность — отмена не прерывает операцию посередине
- Структурированность — отмена распространяется по иерархии
- Исключение как сигнал — CancellationException указывает на отмену
cancel() - инициация отмены
Способы отмены
val job = launch {
repeat(1000) { i ->
println("Работаю: $i")
delay(500) // suspension point - место для проверки отмены
}
}
// Различные способы отмены
job.cancel() // простая отмена
job.cancel("Превышен таймаут") // отмена с сообщением
job.cancelAndJoin() // отмена + ожидание завершения
// Отмена с причиной
job.cancel(TimeoutCancellationException("Операция слишком долгая"))
Проверка статуса отмены
suspend fun longRunningTask() {
repeat(1000) { iteration ->
// Методы проверки отмены:
ensureActive() // бросает CancellationException если отменена
if (!isActive) { // мягкая проверка без исключения
cleanup()
return
}
yield() // проверяет отмену + даёт возможность другим корутинам
doWork(iteration)
}
}
Важно: Отмена проверяется только в suspension points или при явной проверке!
isActive и кооперативная отмена
isActive - проверка без исключений
class DataProcessor : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.Default + job
fun processLargeDataset(data: List<Data>) = launch {
data.forEachIndexed { index, item ->
// Кооперативная проверка отмены
if (!isActive) {
saveProgress(index) // сохраняем прогресс
return@launch // корректный выход
}
processItem(item)
// Альтернатива через ensureActive()
ensureActive() // бросит CancellationException если отменена
}
}
}
Правильная обработка отмены
suspend fun downloadFile(url: String, file: File) {
var connection: HttpURLConnection? = null
var outputStream: FileOutputStream? = null
try {
connection = createConnection(url)
outputStream = FileOutputStream(file)
val buffer = ByteArray(1024)
while (isActive) { // проверяем отмену в цикле
val bytesRead = connection.inputStream.read(buffer)
if (bytesRead == -1) break
outputStream.write(buffer, 0, bytesRead)
yield() // suspension point для отмены
}
} catch (e: CancellationException) {
file.delete() // очищаем частично скачанный файл
throw e // ВАЖНО: перебрасываем CancellationException
} finally {
connection?.disconnect()
outputStream?.close()
}
}
CancellationException - специальное исключение
Особенности CancellationException
suspend fun handlingCancellation() {
try {
longRunningOperation()
} catch (e: CancellationException) {
// CancellationException НЕ считается ошибкой
cleanup() // выполняем очистку
throw e // ОБЯЗАТЕЛЬНО перебрасываем!
} catch (e: Exception) {
// Обычные исключения обрабатываем
handleError(e)
throw e
}
}
// CancellationException не ломает структуру
suspend fun parentOperation() = coroutineScope {
try {
val job1 = async { operation1() }
val job2 = async { operation2() }
job1.cancel() // отменяем job1
// job2 продолжит работу, отмена job1 не влияет на него
job2.await()
} catch (e: CancellationException) {
// Родительский scope получит CancellationException
// только если отменён сам родитель
}
}
Timeout как отмена
suspend fun operationWithTimeout() {
try {
withTimeout(5000) { // автоматическая отмена через 5 секунд
slowOperation()
}
} catch (e: TimeoutCancellationException) {
// TimeoutCancellationException наследует CancellationException
handleTimeout()
}
}
// Мягкий timeout
suspend fun operationWithSoftTimeout() {
val result = withTimeoutOrNull(5000) {
slowOperation()
}
if (result == null) {
handleTimeout()
} else {
processResult(result)
}
}
Распространение исключений: launch vs async
launch - исключения всплывают немедленно
fun launchExceptionPropagation() = runBlocking {
val exceptionHandler = CoroutineExceptionHandler { _, exception ->
println("Перехвачено в handler: ${exception.message}")
}
val scope = CoroutineScope(SupervisorJob() + exceptionHandler)
scope.launch {
throw RuntimeException("Ошибка в launch") // всплывает немедленно
}
scope.launch {
delay(1000)
println("Эта корутина продолжит работу") // выполнится
}
delay(2000)
}
async - исключения всплывают при await()
fun asyncExceptionPropagation() = runBlocking {
val scope = CoroutineScope(SupervisorJob())
val deferred1 = scope.async {
throw RuntimeException("Ошибка в async") // НЕ всплывает сразу
}
val deferred2 = scope.async {
delay(1000)
"Успешный результат"
}
try {
// Исключение всплывёт здесь, при вызове await()
val result1 = deferred1.await()
} catch (e: Exception) {
println("Перехвачено при await(): ${e.message}")
}
// deferred2 завершится успешно
val result2 = deferred2.await()
println("Результат: $result2")
}
Практический пример: обработка множественных async
suspend fun fetchMultipleResources(): CombinedResult = supervisorScope {
val resource1 = async {
try { fetchResource1() }
catch (e: Exception) { null } // локальная обработка
}
val resource2 = async {
try { fetchResource2() }
catch (e: Exception) { null }
}
val resource3 = async {
try { fetchResource3() }
catch (e: Exception) { null }
}
CombinedResult(
resource1 = resource1.await(),
resource2 = resource2.await(),
resource3 = resource3.await()
)
}
SupervisorJob vs обычный Job
Обычный Job - все связаны
class RegularJobBehavior {
fun demonstrateRegularJob() = runBlocking {
val parentJob = Job()
val scope = CoroutineScope(Dispatchers.Default + parentJob)
val child1 = scope.launch {
try {
delay(2000)
println("Child 1 завершён")
} catch (e: CancellationException) {
println("Child 1 отменён") // выполнится
}
}
val child2 = scope.launch {
delay(500)
throw RuntimeException("Child 2 упал") // ломает всех
}
val child3 = scope.launch {
try {
delay(1000)
println("Child 3 завершён") // НЕ выполнится
} catch (e: CancellationException) {
println("Child 3 отменён") // выполнится
}
}
delay(3000)
// Результат: child2 ломает child1 и child3
}
}
SupervisorJob - изоляция ошибок
class SupervisorJobBehavior {
fun demonstrateSupervisorJob() = runBlocking {
val supervisorJob = SupervisorJob()
val scope = CoroutineScope(Dispatchers.Default + supervisorJob)
val child1 = scope.launch {
delay(2000)
println("Child 1 завершён") // выполнится
}
val child2 = scope.launch {
delay(500)
throw RuntimeException("Child 2 упал") // изолированная ошибка
}
val child3 = scope.launch {
delay(1000)
println("Child 3 завершён") // выполнится
}
delay(3000)
// Результат: только child2 упадёт, остальные работают
}
}
Сравнительная таблица поведения
Сценарий | Job | SupervisorJob |
---|---|---|
Ошибка в дочерней корутине | Отменяет всех siblings | Изолирует ошибку |
Отмена родителя | Отменяет всех детей | Отменяет всех детей |
Обработка исключений | Всплывает к родителю | Требует локальную обработку |
Use case | Критичные операции | Независимые операции |
Практические паттерны обработки исключений
Паттерн 1: Graceful degradation
class ApiService : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext = Dispatchers.IO + supervisorJob +
CoroutineExceptionHandler { _, ex -> log.error("Unhandled exception", ex) }
suspend fun loadDashboard(userId: String): Dashboard = supervisorScope {
// Каждый компонент обрабатывает свои ошибки
val profile = async {
try { profileService.getProfile(userId) }
catch (e: Exception) {
log.warn("Profile service failed", e)
DefaultProfile(userId)
}
}
val orders = async {
try { orderService.getOrders(userId) }
catch (e: Exception) {
log.warn("Order service failed", e)
emptyList<Order>()
}
}
Dashboard(
profile = profile.await(),
orders = orders.await()
)
}
}
Паттерн 2: Circuit breaker
class ResilientApiClient {
private var failureCount = 0
private var lastFailureTime = 0L
private val circuitBreakerTimeout = 30_000L // 30 секунд
suspend fun makeRequest(request: ApiRequest): ApiResponse {
// Проверяем circuit breaker
if (isCircuitOpen()) {
throw CircuitBreakerException("Service temporarily unavailable")
}
return try {
val response = withTimeout(5000) {
httpClient.execute(request)
}
resetFailureCount() // успех - сбрасываем счётчик
response
} catch (e: Exception) {
incrementFailureCount()
when (e) {
is TimeoutCancellationException -> throw ServiceTimeoutException(e)
is CancellationException -> throw e // не ломаем отмену
else -> throw ServiceException("API call failed", e)
}
}
}
private fun isCircuitOpen(): Boolean {
return failureCount >= 5 &&
(System.currentTimeMillis() - lastFailureTime) < circuitBreakerTimeout
}
}
Паттерн 3: Retry с экспоненциальным backoff
suspend fun <T> withRetry(
times: Int = 3,
initialDelay: Long = 100,
maxDelay: Long = 1000,
factor: Double = 2.0,
block: suspend () -> T
): T {
var currentDelay = initialDelay
repeat(times - 1) { attempt ->
try {
return block()
} catch (e: CancellationException) {
throw e // не ретраим отмену
} catch (e: Exception) {
log.warn("Attempt ${attempt + 1} failed", e)
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
}
return block() // последняя попытка без ретрая
}
// Использование
suspend fun reliableApiCall(): ApiResponse = withRetry(times = 3) {
apiClient.fetchData()
}
Продвинутые техники
Селективная отмена
class BatchProcessor {
suspend fun processBatch(items: List<Item>): BatchResult = supervisorScope {
val results = mutableListOf<ProcessingResult>()
val activeJobs = mutableListOf<Job>()
items.chunked(10).forEach { chunk ->
val chunkJob = launch {
chunk.forEach { item ->
if (isActive) { // проверяем отмену для каждого элемента
val result = processItem(item)
synchronized(results) { results.add(result) }
}
}
}
activeJobs.add(chunkJob)
}
try {
activeJobs.joinAll()
} catch (e: CancellationException) {
// Отменяем все активные задачи
activeJobs.forEach { it.cancel() }
throw e
}
BatchResult(results)
}
}
Composite exception handling
class CompositeExceptionHandler {
suspend fun processMultipleSources(): ProcessingResult = supervisorScope {
val exceptions = mutableListOf<Exception>()
val results = mutableListOf<SourceResult>()
val jobs = dataSources.map { source ->
async {
try {
val result = source.fetchData()
synchronized(results) { results.add(result) }
} catch (e: Exception) {
synchronized(exceptions) { exceptions.add(e) }
}
}
}
jobs.awaitAll()
ProcessingResult(
results = results,
partialFailures = exceptions,
success = exceptions.isEmpty()
)
}
}
Ключевые вопросы для интервью
"Как работает отмена корутин?"
- Кооперативный механизм — корутина проверяет статус в suspension points
- CancellationException — специальное исключение, не считается ошибкой
- Структурированное распространение — от родителя к детям
"Чем отличается поведение исключений в launch и async?"
- launch: исключения всплывают немедленно
- async: исключения всплывают при вызове await()
"Когда использовать SupervisorJob?"
- Независимые операции — ошибка одной не должна влиять на другие
- Graceful degradation — частичный успех лучше полного провала
- Микросервисная архитектура — изоляция ошибок сервисов
"Как правильно обрабатывать CancellationException?"
- Всегда перебрасывать — не поглощать отмену
- Выполнять cleanup — очищать ресурсы перед перебросом
- Не логировать как ошибку — отмена это нормальное поведение
Главное понимание: Корректная обработка отмены и исключений критична для стабильности и предсказуемости асинхронных систем в production environment.
Контекст корутины (CoroutineContext)
Что такое CoroutineContext?
CoroutineContext — это индексированный набор элементов, который определяет окружение выполнения корутины. Это immutable коллекция, где каждый элемент имеет уникальный ключ (Key) и может быть получен по этому ключу.
Основные принципы:
- Композиция — элементы объединяются через оператор
+
- Наследование — дочерние корутины наследуют контекст родителя
- Переопределение — новые элементы заменяют старые с тем же ключом
- Неизменяемость — создаётся новый контекст при изменениях
// Базовая структура CoroutineContext
val context: CoroutineContext =
Dispatchers.IO + // где выполняется
SupervisorJob() + // управление жизненным циклом
CoroutineName("ApiService") + // имя для отладки
CoroutineExceptionHandler { _, ex -> // обработка исключений
log.error("Unhandled exception", ex)
}
Элементы CoroutineContext
1. Dispatcher - где выполняется корутина
Dispatcher определяет какие потоки используются для выполнения корутины. Это основной элемент для управления ресурсами и производительностью.
// Встроенные диспетчеры
val ioContext = Dispatchers.IO // I/O операции (64+ потоков)
val defaultContext = Dispatchers.Default // CPU задачи (CPU cores потоков)
val mainContext = Dispatchers.Main // UI поток (1 поток)
val unconfinedContext = Dispatchers.Unconfined // текущий поток
// Кастомный диспетчер
val customDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
val customContext = customDispatcher + CoroutineName("CustomPool")
Характеристики диспетчеров:
- IO: оптимизирован для блокирующих операций, expandable pool
- Default: оптимизирован для CPU-задач, fixed pool по количеству ядер
- Main: единственный UI поток, для обновления интерфейса
- Unconfined: продолжает в том потоке где приостановился
2. Job - управление жизненным циклом
Job представляет жизненный цикл корутины и позволяет управлять её выполнением: отменять, ожидать завершения, проверять статус.
// Типы Job
val regularJob = Job() // обычный Job
val supervisorJob = SupervisorJob() // изолирует ошибки детей
// Иерархия Job
val parentJob = SupervisorJob()
val childContext = Dispatchers.IO + Job(parent = parentJob)
// Управление Job
val job = launch(childContext) {
longRunningTask()
}
job.cancel() // отменить
job.join() // дождаться завершения
job.isActive // проверить статус
Ключевые особенности:
- Иерархия: дочерние Job отменяются при отмене родительского
- SupervisorJob: изолирует ошибки дочерних Job
- Автоматическое создание: если не указан, создаётся автоматически
3. CoroutineName - отладка и мониторинг
CoroutineName добавляет человекочитаемое имя корутине для упрощения отладки, логирования и профилирования.
// Именование корутин
val namedContext = CoroutineName("UserDataLoader") + Dispatchers.IO
launch(namedContext) {
// В логах и профайлере будет видно имя "UserDataLoader"
loadUserData()
}
// Получение имени из контекста
suspend fun getCurrentCoroutineName(): String? {
return coroutineContext[CoroutineName]?.name
}
// Вложенные имена
val parentContext = CoroutineName("ApiService")
val childContext = parentContext + CoroutineName("DatabaseQuery")
// Результат: "ApiService.DatabaseQuery"
4. CoroutineExceptionHandler - обработка исключений
CoroutineExceptionHandler определяет как обрабатывать необработанные исключения в корутинах. Работает только для root корутин (запущенных с launch).
// Глобальный обработчик ошибок
val exceptionHandler = CoroutineExceptionHandler { context, exception ->
val coroutineName = context[CoroutineName]?.name ?: "Unknown"
log.error("Exception in coroutine '$coroutineName'", exception)
// Можно отправить метрики, уведомления и т.д.
metricsService.recordException(exception)
}
// Использование в контексте
val serviceContext = Dispatchers.IO +
SupervisorJob() +
CoroutineName("BackgroundService") +
exceptionHandler
class BackgroundService : CoroutineScope {
override val coroutineContext = serviceContext
fun startProcessing() = launch {
// Любые необработанные исключения попадут в exceptionHandler
processData()
}
}
Важно: CoroutineExceptionHandler НЕ работает с async — там исключения всплывают при await().
5. ThreadContextElement - передача данных
ThreadContextElement позволяет передавать thread-local данные между корутинами, даже при смене потоков.
// Кастомный элемент контекста
class UserContext(val userId: String) : AbstractCoroutineContextElement(UserContext) {
companion object Key : CoroutineContext.Key<UserContext>
override fun toString(): String = "UserContext($userId)"
}
// Использование
val userContext = UserContext("user123") + Dispatchers.IO
launch(userContext) {
val currentUser = coroutineContext[UserContext]?.userId
println("Processing for user: $currentUser")
withContext(Dispatchers.Default) {
// UserContext автоматически передастся в новый поток
val stillSameUser = coroutineContext[UserContext]?.userId
println("Still processing for user: $stillSameUser")
}
}
Композиция и манипуляция контекста
Оператор + (плюс) - объединение элементов
// Базовый контекст
val baseContext = Dispatchers.IO + SupervisorJob()
// Добавление элементов
val enrichedContext = baseContext +
CoroutineName("ApiService") +
CoroutineExceptionHandler { _, ex -> handleError(ex) }
// Переопределение элементов (новый заменяет старый)
val newContext = enrichedContext + Dispatchers.Default // заменяет Dispatchers.IO
Методы доступа к элементам
suspend fun analyzeCurrentContext() {
val context = coroutineContext
// Получение элементов по ключу
val dispatcher = context[ContinuationInterceptor] as? CoroutineDispatcher
val job = context[Job]
val name = context[CoroutineName]?.name
// Проверка наличия элемента
val hasExceptionHandler = context[CoroutineExceptionHandler] != null
println("Dispatcher: $dispatcher")
println("Job: $job")
println("Name: $name")
println("Has exception handler: $hasExceptionHandler")
}
Удаление элементов
// Удаление элемента из контекста
val contextWithoutName = originalContext.minusKey(CoroutineName)
// Фильтрация контекста
val filteredContext = originalContext.fold(EmptyCoroutineContext) { acc, element ->
when (element) {
is CoroutineName -> acc // пропускаем CoroutineName
else -> acc + element // оставляем остальные элементы
}
}
Наследование контекста
Автоматическое наследование
class ParentService : CoroutineScope {
// Родительский контекст
override val coroutineContext = Dispatchers.IO +
SupervisorJob() +
CoroutineName("ParentService")
suspend fun processData() {
// Дочерняя корутина наследует родительский контекст
launch {
// Автоматически получает: Dispatchers.IO + SupervisorJob + CoroutineName
doWork()
}
// Переопределение элементов для дочерней корутины
launch(Dispatchers.Default + CoroutineName("ChildWorker")) {
// Получает: Dispatchers.Default + SupervisorJob + CoroutineName("ChildWorker")
doCpuIntensiveWork()
}
}
}
Явное управление наследованием
suspend fun controlledInheritance() = coroutineScope {
val parentContext = coroutineContext
println("Parent context: $parentContext")
// Полное наследование
launch {
println("Child 1 context: $coroutineContext") // идентичен родительскому
}
// Частичное переопределение
launch(CoroutineName("CustomChild")) {
println("Child 2 context: $coroutineContext") // изменено только имя
}
// Полное переопределение
launch(Dispatchers.Unconfined + Job() + CoroutineName("IndependentChild")) {
println("Child 3 context: $coroutineContext") // полностью новый контекст
}
}
Практические паттерны
Паттерн 1: Сервисные контексты
// Базовые контексты для разных типов сервисов
object ServiceContexts {
// Для I/O операций
val ioService = Dispatchers.IO +
SupervisorJob() +
CoroutineExceptionHandler { _, ex ->
log.error("IO Service error", ex)
}
// Для CPU-интенсивных задач
val computeService = Dispatchers.Default +
SupervisorJob() +
CoroutineExceptionHandler { _, ex ->
log.error("Compute Service error", ex)
}
// Для UI операций (Android/Desktop)
val uiService = Dispatchers.Main +
SupervisorJob() +
CoroutineExceptionHandler { _, ex ->
showErrorDialog(ex)
}
}
// Использование
class UserRepository : CoroutineScope {
override val coroutineContext = ServiceContexts.ioService +
CoroutineName("UserRepository")
}
Паттерн 2: Request-scoped контекст
// Контекст привязанный к HTTP запросу
class RequestContext(
val requestId: String,
val userId: String?,
val traceId: String
) : AbstractCoroutineContextElement(RequestContext) {
companion object Key : CoroutineContext.Key<RequestContext>
}
@RestController
class UserController {
@GetMapping("/users/{id}")
suspend fun getUser(@PathVariable id: String): UserDto = withContext(
RequestContext(
requestId = UUID.randomUUID().toString(),
userId = id,
traceId = MDC.get("traceId")
)
) {
// Весь код в этом блоке имеет доступ к RequestContext
userService.findById(id)
}
}
class UserService {
suspend fun findById(id: String): UserDto {
val requestContext = coroutineContext[RequestContext]
log.info("Processing request ${requestContext?.requestId} for user $id")
return userRepository.findById(id)
}
}
Паттерн 3: Контекст с метриками
class MetricsContext(
val operationName: String,
private val startTime: Long = System.currentTimeMillis()
) : AbstractCoroutineContextElement(MetricsContext) {
companion object Key : CoroutineContext.Key<MetricsContext>
fun recordDuration() {
val duration = System.currentTimeMillis() - startTime
metricsRegistry.timer(operationName).record(duration, TimeUnit.MILLISECONDS)
}
}
// Автоматическое измерение времени выполнения
suspend fun <T> withMetrics(operationName: String, block: suspend () -> T): T {
val metricsContext = MetricsContext(operationName)
return try {
withContext(metricsContext) {
block()
}
} finally {
metricsContext.recordDuration()
}
}
// Использование
suspend fun processOrder(orderId: String) = withMetrics("order.processing") {
// Время выполнения автоматически измеряется
orderRepository.findById(orderId)
.let { validateOrder(it) }
.let { processPayment(it) }
.let { updateInventory(it) }
}
Продвинутые техники
Динамическое изменение контекста
class AdaptiveService : CoroutineScope {
private var _context = Dispatchers.IO + SupervisorJob()
override val coroutineContext: CoroutineContext get() = _context
fun adaptToLoad(currentLoad: Int) {
val newDispatcher = when {
currentLoad > 80 -> Dispatchers.IO.limitedParallelism(32)
currentLoad > 50 -> Dispatchers.IO.limitedParallelism(16)
else -> Dispatchers.IO
}
_context = newDispatcher +
_context[Job]!! +
CoroutineName("AdaptiveService-Load$currentLoad")
}
}
Контекст-зависимая логика
suspend fun smartOperation() {
val currentDispatcher = coroutineContext[ContinuationInterceptor]
when (currentDispatcher) {
Dispatchers.Main -> {
// Уже в UI потоке - можно обновлять UI
updateProgressBar()
}
Dispatchers.IO -> {
// В IO потоке - можно делать блокирующие операции
performBlockingIO()
}
else -> {
// Переключаемся в подходящий контекст
withContext(Dispatchers.IO) {
performBlockingIO()
}
}
}
}
Ключевые вопросы для интервью
"Что такое CoroutineContext и зачем он нужен?"
- Окружение выполнения корутины - где, как и с какими параметрами
- Композиция элементов - Dispatcher + Job + дополнительные элементы
- Наследование - дочерние корутины получают контекст родителя
"Как работает наследование контекста?"
- Автоматическое копирование всех элементов от родителя к ребёнку
- Переопределение через оператор
+
заменяет элементы с тем же ключом - Сохранение иерархии Job для структурированной конкуренции
"Чем отличается Job от SupervisorJob в контексте?"
- Job: ошибка дочерней корутины отменяет siblings
- SupervisorJob: изолирует ошибки дочерних корутин
"Как добавить кастомные элементы в контекст?"
- Наследование от AbstractCoroutineContextElement
- Определение уникального Key
- Реализация логики передачи между потоками
"Когда использовать CoroutineExceptionHandler?"
- Root корутины запущенные с launch (не async)
- Глобальная обработка ошибок на уровне сервиса
- Логирование и мониторинг необработанных исключений
Главное понимание: CoroutineContext обеспечивает декларативное управление всеми аспектами выполнения корутин, что критично для построения maintainable и observable асинхронных систем.
Kotlin Flow
Что такое Flow?
Flow — это асинхронная последовательность значений в Kotlin, которая излучает (emit) данные over time. Это корутинная альтернатива RxJava/Reactive Streams, интегрированная в экосистему корутин с поддержкой structured concurrency.
Ключевые принципы:
- Асинхронность — работает с suspend функциями
- Cold по умолчанию — выполняется только при подписке (collect)
- Отменяемость — автоматически отменяется при отмене корутины
- Backpressure — поддерживает управление потоком данных
// Базовый пример Flow
val numbersFlow = flow {
for (i in 1..5) {
delay(1000) // асинхронная операция
emit(i) // излучение значения
}
}
// Использование
numbersFlow.collect { value ->
println("Received: $value") // получение каждого значения
}
Cold vs Hot Streams
Cold Streams - выполняются при подписке
Cold Flow начинает работу только при вызове collect() и создаёт независимый поток данных для каждого подписчика.
val coldFlow = flow {
println("Starting cold flow") // выполнится для каждого collect
repeat(3) { i ->
delay(1000)
emit(i)
}
}
// Каждый collect запускает flow заново
launch { coldFlow.collect { println("Collector 1: $it") } }
launch { coldFlow.collect { println("Collector 2: $it") } }
// Результат: "Starting cold flow" напечатается дважды
// Cold flow как factory функция
fun createDataStream(userId: String) = flow {
val data = fetchUserData(userId) // выполняется при каждом collect
emit(data)
}
Характеристики Cold Flow:
- Lazy — не выполняется без подписки
- Unicast — каждый подписчик получает собственный поток
- Reproducible — повторный collect даёт тот же результат
- Resource efficient — создаётся только когда нужен
Hot Streams - выполняются независимо от подписчиков
Hot Flow работает независимо от подписчиков и разделяет данные между всеми активными наблюдателями.
// SharedFlow - hot stream
val hotFlow = MutableSharedFlow<Int>()
// Запуск производителя данных
launch {
repeat(5) { i ->
delay(1000)
hotFlow.emit(i) // излучает независимо от подписчиков
}
}
// Подписчики получают только новые данные
launch {
delay(2500) // подписываемся через 2.5 секунды
hotFlow.collect { println("Late subscriber: $it") } // получит только 3, 4
}
Характеристики Hot Flow:
- Eager — работает независимо от подписчиков
- Multicast — все подписчики получают одни данные
- Stateful — может хранить последние значения
- Always active — продолжает работу даже без подписчиков
Сравнительная таблица
Аспект | Cold Flow | Hot Flow |
---|---|---|
Активация | При collect() | Сразу при создании |
Подписчики | Независимые потоки | Общий поток данных |
Ресурсы | Создаются по требованию | Постоянно активны |
Use case | API вызовы, БД запросы | UI состояния, события |
Основные операторы Flow
map - преобразование значений
val numbers = flowOf(1, 2, 3, 4, 5)
val doubled = numbers.map { it * 2 }
// Результат: 2, 4, 6, 8, 10
val userNames = userIds.map { id ->
userRepository.findById(id).name // suspend функция
}
Назначение: Преобразование каждого элемента потока с возможностью вызова suspend функций.
filter - фильтрация значений
val evenNumbers = numbers.filter { it % 2 == 0 }
// Результат: 2, 4
val activeUsers = users.filter { user ->
userService.isActive(user.id) // suspend функция
}
Назначение: Отбор элементов по условию, может использовать suspend функции.
flatMapConcat - последовательное преобразование
val userOrders = userIds.flatMapConcat { userId ->
orderRepository.getOrdersFlow(userId) // возвращает Flow<Order>
}
// Ждёт завершения каждого потока перед переходом к следующему
// Практический пример
fun getUserDataFlow(userIds: List<String>) = userIds.asFlow()
.flatMapConcat { userId ->
flow {
val profile = userService.getProfile(userId)
val orders = orderService.getOrders(userId)
emit(UserData(profile, orders))
}
}
Назначение: Преобразование каждого элемента в Flow и последовательное объединение результатов.
collect - подписка и получение данных
// Простая подписка
flow.collect { value ->
println("Received: $value")
}
// Обработка с сохранением в список
val results = mutableListOf<String>()
flow.collect { results.add(it) }
// Collect с обработкой ошибок
try {
flow.collect { processData(it) }
} catch (e: Exception) {
handleError(e)
}
Назначение: Терминальный оператор для подписки на Flow и получения всех значений.
onEach - выполнение действий без изменения потока
val processedFlow = dataFlow
.onEach { data ->
log.info("Processing data: $data") // side effect
}
.onEach { data ->
metricsService.recordDataPoint(data) // ещё один side effect
}
.map { processData(it) } // основная трансформация
// onEach не изменяет сам поток данных
processedFlow.collect { result ->
// получаем обработанные данные
}
Назначение: Выполнение side effects (логирование, метрики, валидация) без изменения самих данных.
catch - обработка исключений
val resilientFlow = riskyDataFlow
.map { data ->
riskyTransformation(data) // может бросить исключение
}
.catch { exception ->
when (exception) {
is NetworkException -> emit(CachedData) // fallback значение
is ValidationException -> {
log.warn("Validation failed", exception)
// не emit - пропускаем невалидные данные
}
else -> throw exception // перебрасываем неожиданные ошибки
}
}
Назначение: Перехват и обработка исключений в upstream операторах с возможностью emit fallback значений.
flowOn - смена контекста выполнения
val optimizedFlow = databaseFlow
.flowOn(Dispatchers.IO) // upstream операции в IO
.map { data ->
heavyComputation(data) // CPU-интенсивная обработка
}
.flowOn(Dispatchers.Default) // эта операция в Default
.onEach { result ->
updateUI(result) // UI операции остаются в текущем контексте
}
// Практический пример
class DataProcessor {
fun processDataStream() = flow {
// Эмиссия в IO контексте
val rawData = loadFromDatabase()
emit(rawData)
}
.flowOn(Dispatchers.IO)
.map { data ->
// Обработка в Default контексте
processData(data)
}
.flowOn(Dispatchers.Default)
}
Назначение: Изменение контекста выполнения для upstream операций без влияния на downstream.
StateFlow, SharedFlow, MutableStateFlow
StateFlow - состояние с последним значением
StateFlow — это hot flow, который хранит текущее состояние и сразу отдаёт его новым подписчикам.
class UserViewModel {
private val _userState = MutableStateFlow<UiState>(UiState.Loading)
val userState: StateFlow<UiState> = _userState.asStateFlow()
suspend fun loadUser(id: String) {
_userState.value = UiState.Loading
try {
val user = userRepository.findById(id)
_userState.value = UiState.Success(user)
} catch (e: Exception) {
_userState.value = UiState.Error(e.message)
}
}
}
// Подписка
userViewModel.userState.collect { state ->
when (state) {
is UiState.Loading -> showProgress()
is UiState.Success -> showUser(state.user)
is UiState.Error -> showError(state.message)
}
}
Характеристики StateFlow:
- Conflated — хранит только последнее значение
- Hot — излучает независимо от подписчиков
- Replay 1 — новые подписчики сразу получают текущее состояние
- Distinct — не излучает одинаковые значения подряд
SharedFlow - события и уведомления
SharedFlow — это hot flow для событий и уведомлений, которые не имеют "текущего состояния".
class EventBus {
private val _events = MutableSharedFlow<AppEvent>()
val events: SharedFlow<AppEvent> = _events.asSharedFlow()
suspend fun publishEvent(event: AppEvent) {
_events.emit(event)
}
}
// Разные типы событий
sealed class AppEvent {
data class UserLoggedIn(val userId: String) : AppEvent()
data class OrderCreated(val orderId: String) : AppEvent()
object NetworkError : AppEvent()
}
// Подписка на события
eventBus.events.collect { event ->
when (event) {
is AppEvent.UserLoggedIn -> handleUserLogin(event.userId)
is AppEvent.OrderCreated -> sendNotification(event.orderId)
is AppEvent.NetworkError -> showNetworkError()
}
}
MutableStateFlow vs MutableSharedFlow конфигурация
// MutableStateFlow - простое состояние
val simpleState = MutableStateFlow("initial")
// MutableSharedFlow с настройками
val configuredSharedFlow = MutableSharedFlow<String>(
replay = 2, // последние 2 значения для новых подписчиков
extraBufferCapacity = 10, // дополнительный буфер
onBufferOverflow = BufferOverflow.DROP_OLDEST // стратегия при переполнении
)
// Сложные конфигурации для разных сценариев
class NotificationService {
// Критичные уведомления - не теряем
private val criticalEvents = MutableSharedFlow<CriticalEvent>(
replay = 0,
extraBufferCapacity = Int.MAX_VALUE,
onBufferOverflow = BufferOverflow.SUSPEND
)
// Обычные события - можем потерять старые
private val regularEvents = MutableSharedFlow<RegularEvent>(
replay = 1,
extraBufferCapacity = 64,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
}
Flow vs LiveData (Android контекст)
Основные отличия
Аспект | Flow | LiveData |
---|---|---|
Платформа | Multiplatform | Android only |
Корутины | Нативная поддержка | Требует адаптеры |
Операторы | Богатый набор | Ограниченный набор |
Threading | Любой Dispatcher | Main thread только |
Lifecycle | Ручное управление | Автоматическое |
Миграция с LiveData на Flow
// LiveData подход
class UserViewModelLiveData : ViewModel() {
private val _users = MutableLiveData<List<User>>()
val users: LiveData<List<User>> = _users
fun loadUsers() {
viewModelScope.launch {
_users.value = userRepository.getUsers()
}
}
}
// Flow подход
class UserViewModelFlow : ViewModel() {
val users: StateFlow<List<User>> = userRepository.getUsersFlow()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
}
// Подписка в UI (Android)
class UserFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
// LiveData подписка
viewModel.users.observe(viewLifecycleOwner) { users ->
updateUI(users)
}
// Flow подписка
viewLifecycleOwner.lifecycleScope.launch {
viewModel.users.collect { users ->
updateUI(users)
}
}
}
}
Преимущества Flow:
- Композиция операторов — сложные трансформации данных
- Корутинная интеграция — естественная работа с suspend функциями
- Multiplatform — код работает на всех платформах
- Backpressure — контроль скорости обработки данных
Подписка и отмена
Подписка с жизненным циклом
class DataService : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.IO + job
fun startListening() {
// Подписка привязана к scope службы
launch {
dataFlow.collect { data ->
processData(data)
}
}
}
fun stop() {
job.cancel() // автоматически отменяет все подписки
}
}
Условная отмена подписки
suspend fun conditionalCollection() {
dataFlow
.takeWhile { data -> data.isValid } // автоматическая отмена при условии
.collect { data ->
processValidData(data)
}
// collect завершится когда встретится невалидный элемент
}
// Отмена по времени
suspend fun timedCollection() = withTimeout(30_000) {
dataFlow.collect { data ->
processData(data)
}
// автоматическая отмена через 30 секунд
}
Обработка отмены в производителе
fun createCancellableFlow() = flow {
try {
repeat(100) { i ->
ensureActive() // проверка отмены
val data = fetchData(i)
emit(data)
delay(1000)
}
} catch (e: CancellationException) {
cleanup() // очистка ресурсов при отмене
throw e // важно перебросить
}
}
Практические паттерны
Паттерн 1: Reactive Repository
class UserRepository {
private val _users = MutableStateFlow<List<User>>(emptyList())
val users: StateFlow<List<User>> = _users.asStateFlow()
private val _events = MutableSharedFlow<UserEvent>()
val events: SharedFlow<UserEvent> = _events.asSharedFlow()
suspend fun addUser(user: User) {
val updatedUsers = _users.value + user
_users.value = updatedUsers
_events.emit(UserEvent.UserAdded(user))
}
suspend fun removeUser(userId: String) {
val updatedUsers = _users.value.filterNot { it.id == userId }
_users.value = updatedUsers
_events.emit(UserEvent.UserRemoved(userId))
}
}
Паттерн 2: Flow для real-time данных
class StockPriceService {
fun getPriceUpdates(symbol: String): Flow<Price> = flow {
val webSocket = createWebSocket(symbol)
try {
webSocket.messages.collect { message ->
val price = parsePrice(message)
emit(price)
}
} finally {
webSocket.close()
}
}
.flowOn(Dispatchers.IO)
.retry(3) { exception ->
exception is NetworkException
}
}
Паттерн 3: Combining multiple flows
class DashboardService(
private val userService: UserService,
private val orderService: OrderService,
private val notificationService: NotificationService
) {
fun getDashboardData(userId: String): Flow<DashboardData> =
combine(
userService.getUserFlow(userId),
orderService.getOrdersFlow(userId),
notificationService.getNotificationsFlow(userId)
) { user, orders, notifications ->
DashboardData(
user = user,
orders = orders,
notifications = notifications,
lastUpdated = System.currentTimeMillis()
)
}
}
Ключевые вопросы для интервью
"Чем отличается Cold от Hot Flow?"
- Cold: выполняется при collect, каждый подписчик получает новый поток
- Hot: работает независимо, подписчики получают общие данные
"Когда использовать StateFlow vs SharedFlow?"
- StateFlow: для состояния UI, конфигурации, текущих значений
- SharedFlow: для событий, уведомлений, actions
"Как работает backpressure в Flow?"
- Suspension — производитель приостанавливается если потребитель не успевает
- Buffer — промежуточное хранение для сглаживания скачков
- Overflow strategies — поведение при переполнении буфера
"В чём преимущества Flow перед RxJava?"
- Корутинная интеграция — естественная работа с suspend функциями
- Structured concurrency — автоматическая отмена
- Null safety — типобезопасность Kotlin
- Меньше boilerplate — более простой API
"Как правильно обрабатывать ошибки в Flow?"
- catch оператор для upstream ошибок
- try-catch в collect для downstream ошибок
- retry для автоматических повторов
- onCompletion для финализации
Главное понимание: Flow предоставляет реактивную парадигму с корутинной интеграцией, что делает асинхронную обработку потоков данных естественной частью Kotlin экосистемы.
Корутины и Spring
Интеграция корутин в Spring Framework
Spring Framework с версии 5.2 предоставляет нативную поддержку корутин Kotlin, позволяя писать неблокирующие реактивные приложения в императивном стиле. Это дает возможность использовать знакомый синтаксис вместо сложных reactive chains.
Ключевые преимущества:
- Императивный код вместо callback chains
- Structured concurrency — автоматическое управление жизненным циклом
- Высокая производительность — неблокирующие I/O операции
- Простота отладки — линейный flow выполнения
// Традиционный Spring MVC (блокирующий)
@RestController
class UserController(private val userService: UserService) {
@GetMapping("/users/{id}")
fun getUser(@PathVariable id: String): User {
return userService.findById(id) // блокирует поток
}
}
// Spring WebFlux с корутинами (неблокирующий)
@RestController
class UserControllerCoroutines(private val userService: UserService) {
@GetMapping("/users/{id}")
suspend fun getUser(@PathVariable id: String): User {
return userService.findById(id) // не блокирует поток
}
}
Spring WebFlux + корутины
Controller с suspend функциями
Suspend controllers позволяют обрабатывать HTTP запросы неблокирующим способом, автоматически интегрируясь с reactive stack Spring WebFlux.
@RestController
@RequestMapping("/api/v1")
class ApiController(
private val userService: UserService,
private val orderService: OrderService
) {
// Простой suspend endpoint
@GetMapping("/users/{id}")
suspend fun getUser(@PathVariable id: String): UserDto {
return userService.findById(id)
}
// Параллельная загрузка данных
@GetMapping("/users/{id}/dashboard")
suspend fun getUserDashboard(@PathVariable id: String): DashboardDto {
// Параллельное выполнение запросов
val userDeferred = async { userService.findById(id) }
val ordersDeferred = async { orderService.findByUserId(id) }
val preferencesDeferred = async { preferencesService.findByUserId(id) }
return DashboardDto(
user = userDeferred.await(),
orders = ordersDeferred.await(),
preferences = preferencesDeferred.await()
)
}
// Flow как response
@GetMapping("/users/{id}/notifications", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun getNotificationStream(@PathVariable id: String): Flow<NotificationDto> {
return notificationService.getNotificationFlow(id)
.map { it.toDto() }
}
}
Обработка исключений в корутинах
@ControllerAdvice
class CoroutineExceptionHandler {
@ExceptionHandler(IllegalArgumentException::class)
suspend fun handleValidationException(ex: IllegalArgumentException): ResponseEntity<ErrorDto> {
// Может выполнять suspend операции для логирования/метрик
auditService.logError(ex)
return ResponseEntity.badRequest()
.body(ErrorDto("Validation failed: ${ex.message}"))
}
@ExceptionHandler(TimeoutCancellationException::class)
suspend fun handleTimeout(ex: TimeoutCancellationException): ResponseEntity<ErrorDto> {
return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
.body(ErrorDto("Request timeout"))
}
}
Spring Data + корутины
R2DBC с корутинами
R2DBC (Reactive Relational Database Connectivity) обеспечивает неблокирующий доступ к реляционным БД с поддержкой корутин через Spring Data R2DBC.
// Репозиторий с suspend функциями
interface UserRepository : CoroutineCrudRepository<User, String> {
// Автоматически генерируемые suspend методы
// suspend fun findById(id: String): User?
// suspend fun save(user: User): User
// suspend fun deleteById(id: String)
// Кастомные suspend запросы
@Query("SELECT * FROM users WHERE email = :email")
suspend fun findByEmail(email: String): User?
@Query("SELECT * FROM users WHERE status = :status")
fun findByStatus(status: UserStatus): Flow<User>
@Query("SELECT * FROM users WHERE created_at > :date")
suspend fun findRecentUsers(date: LocalDateTime): List<User>
}
// Использование в сервисе
@Service
class UserService(private val userRepository: UserRepository) {
suspend fun createUser(request: CreateUserRequest): User {
// Проверка существования пользователя
val existingUser = userRepository.findByEmail(request.email)
if (existingUser != null) {
throw UserAlreadyExistsException("User with email ${request.email} already exists")
}
// Создание нового пользователя
val user = User(
id = UUID.randomUUID().toString(),
email = request.email,
name = request.name,
status = UserStatus.ACTIVE,
createdAt = LocalDateTime.now()
)
return userRepository.save(user)
}
// Streaming результатов
fun getActiveUsersStream(): Flow<User> {
return userRepository.findByStatus(UserStatus.ACTIVE)
}
}
MongoDB Reactive с корутинами
// Reactive MongoDB репозиторий
interface ProductRepository : ReactiveMongoRepository<Product, String> {
fun findByCategory(category: String): Flow<Product>
@Query("{ 'price': { \$gte: ?0, \$lte: ?1 } }")
fun findByPriceRange(minPrice: BigDecimal, maxPrice: BigDecimal): Flow<Product>
}
// Сервис с корутинами
@Service
class ProductService(private val productRepository: ProductRepository) {
suspend fun findProductById(id: String): Product? {
return productRepository.findById(id).awaitFirstOrNull()
}
suspend fun saveProduct(product: Product): Product {
return productRepository.save(product).awaitFirst()
}
// Batch обработка через Flow
suspend fun updatePrices(updates: List<PriceUpdate>) {
updates.asFlow()
.flatMapConcat { update ->
productRepository.findById(update.productId)
.map { product ->
product.copy(price = update.newPrice)
}
.flatMap { productRepository.save(it) }
}
.collect() // Выполняем все обновления
}
}
Spring Security + корутины
Reactive Security с корутинами
Spring Security WebFlux поддерживает корутины для неблокирующей аутентификации и авторизации.
@Configuration
@EnableWebFluxSecurity
class SecurityConfig {
@Bean
fun securityWebFilterChain(http: ServerHttpSecurity): SecurityWebFilterChain {
return http
.authorizeExchange { exchanges ->
exchanges
.pathMatchers("/api/public/**").permitAll()
.pathMatchers("/api/admin/**").hasRole("ADMIN")
.anyExchange().authenticated()
}
.oauth2ResourceServer { oauth2 ->
oauth2.jwt()
}
.build()
}
}
// Controller с безопасностью
@RestController
@RequestMapping("/api/secure")
class SecureController(private val orderService: OrderService) {
@GetMapping("/orders")
suspend fun getMyOrders(authentication: Authentication): List<OrderDto> {
val userId = authentication.name
return orderService.findByUserId(userId)
}
@PreAuthorize("hasRole('ADMIN')")
@GetMapping("/admin/orders")
suspend fun getAllOrders(): List<OrderDto> {
return orderService.findAll()
}
// Получение текущего пользователя в корутине
@GetMapping("/profile")
suspend fun getProfile(): UserProfileDto {
val authentication = currentCoroutineContext()[ReactiveSecurityContextHolder.CONTEXT_KEY]
?.authentication ?: throw UnauthorizedException()
return userService.getProfile(authentication.name)
}
}
Кастомная аутентификация с корутинами
@Component
class CoroutineAuthenticationManager(
private val userService: UserService,
private val tokenService: TokenService
) : ReactiveAuthenticationManager {
override fun authenticate(authentication: Authentication): Mono<Authentication> {
return mono {
authenticateCoroutine(authentication)
}
}
private suspend fun authenticateCoroutine(authentication: Authentication): Authentication {
val token = authentication.credentials as String
// Неблокирующая проверка токена
val claims = tokenService.validateToken(token)
val user = userService.findById(claims.userId)
?: throw BadCredentialsException("User not found")
return UsernamePasswordAuthenticationToken(
user.email,
null,
user.authorities
)
}
}
Конфигурация и производительность
Настройка корутинного контекста
@Configuration
class CoroutineConfig {
@Bean
@Primary
fun applicationCoroutineScope(): CoroutineScope {
return CoroutineScope(
SupervisorJob() +
Dispatchers.Default +
CoroutineName("SpringApp") +
CoroutineExceptionHandler { _, exception ->
log.error("Unhandled coroutine exception", exception)
}
)
}
@Bean
fun ioCoroutineScope(): CoroutineScope {
return CoroutineScope(
SupervisorJob() +
Dispatchers.IO +
CoroutineName("SpringIO")
)
}
}
// Использование кастомных scope
@Service
class BackgroundTaskService(
@Qualifier("ioCoroutineScope") private val ioScope: CoroutineScope
) {
fun startBackgroundProcessing() {
ioScope.launch {
while (isActive) {
processDataBatch()
delay(TimeUnit.MINUTES.toMillis(5))
}
}
}
}
Мониторинг и метрики корутин
@Component
class CoroutineMetrics(private val meterRegistry: MeterRegistry) {
private val activeCoroutines = Gauge.builder("coroutines.active")
.description("Number of active coroutines")
.register(meterRegistry)
private val coroutineTimer = Timer.builder("coroutines.execution.time")
.description("Coroutine execution time")
.register(meterRegistry)
suspend fun <T> withMetrics(operation: String, block: suspend () -> T): T {
return Timer.Sample.start(meterRegistry).use { sample ->
activeCoroutines.incrementAndGet()
try {
block()
} finally {
activeCoroutines.decrementAndGet()
sample.stop(Timer.builder("coroutines.$operation.time").register(meterRegistry))
}
}
}
}
// Использование в контроллере
@RestController
class MetricsController(private val metrics: CoroutineMetrics) {
@GetMapping("/data")
suspend fun getData(): DataDto = metrics.withMetrics("getData") {
loadDataFromMultipleSources()
}
}
Практические паттерны
Паттерн 1: Reactive Gateway
@RestController
@RequestMapping("/api/gateway")
class ApiGateway(
private val userService: UserServiceClient,
private val orderService: OrderServiceClient,
private val inventoryService: InventoryServiceClient
) {
@GetMapping("/users/{id}/complete-profile")
suspend fun getCompleteProfile(@PathVariable id: String): CompleteProfileDto =
supervisorScope {
// Параллельные вызовы микросервисов
val userProfile = async {
try { userService.getProfile(id) }
catch (e: Exception) { null } // graceful degradation
}
val orderHistory = async {
try { orderService.getOrderHistory(id) }
catch (e: Exception) { emptyList() }
}
val recommendations = async {
try { inventoryService.getRecommendations(id) }
catch (e: Exception) { emptyList() }
}
CompleteProfileDto(
profile = userProfile.await(),
orders = orderHistory.await(),
recommendations = recommendations.await()
)
}
}
Паттерн 2: Event-driven processing
@Service
class OrderEventProcessor(
private val orderRepository: OrderRepository,
private val notificationService: NotificationService,
private val auditService: AuditService
) : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.Default + job
private val orderEvents = MutableSharedFlow<OrderEvent>()
@PostConstruct
fun startProcessing() {
// Обработка событий заказов
launch {
orderEvents.collect { event ->
when (event) {
is OrderEvent.Created -> handleOrderCreated(event)
is OrderEvent.Cancelled -> handleOrderCancelled(event)
is OrderEvent.Completed -> handleOrderCompleted(event)
}
}
}
}
suspend fun publishEvent(event: OrderEvent) {
orderEvents.emit(event)
}
private suspend fun handleOrderCreated(event: OrderEvent.Created) = supervisorScope {
// Параллельная обработка
launch { notificationService.sendOrderConfirmation(event.order) }
launch { auditService.logOrderCreation(event.order) }
launch { updateInventory(event.order) }
}
@PreDestroy
fun stop() {
job.cancel()
}
}
Паттерн 3: Batch processing
@Service
class DataSyncService(
private val externalApiClient: ExternalApiClient,
private val localRepository: DataRepository
) {
@Scheduled(fixedDelay = 300_000) // каждые 5 минут
suspend fun syncData() {
log.info("Starting data synchronization")
try {
externalApiClient.getUpdatedRecords()
.chunked(100) // обрабатываем пачками по 100
.collect { batch ->
processBatch(batch)
}
} catch (e: Exception) {
log.error("Data sync failed", e)
}
}
private suspend fun processBatch(records: List<ExternalRecord>) = supervisorScope {
records.map { record ->
async {
try {
val localRecord = record.toLocalRecord()
localRepository.save(localRecord)
} catch (e: Exception) {
log.warn("Failed to process record ${record.id}", e)
}
}
}.awaitAll()
}
}
Лучшие практики и подводные камни
Конфигурация thread pool
@Configuration
class WebFluxConfig : WebFluxConfigurer {
override fun configureHttpMessageCodecs(configurer: ServerCodecConfigurer) {
// Увеличиваем лимиты для streaming
configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024) // 16MB
}
}
// Настройка R2DBC connection pool
@Configuration
class R2dbcConfig {
@Bean
fun connectionFactory(): ConnectionFactory {
return ConnectionFactories.get(
ConnectionFactoryOptions.builder()
.option(DRIVER, "postgresql")
.option(HOST, "localhost")
.option(PORT, 5432)
.option(USER, "app")
.option(PASSWORD, "password")
.option(DATABASE, "appdb")
.option(INITIAL_SIZE, 10)
.option(MAX_SIZE, 50)
.option(MAX_IDLE_TIME, Duration.ofMinutes(30))
.build()
)
}
}
Обработка ошибок в WebFlux
@Component
@Order(-2) // Высокий приоритет
class GlobalErrorWebExceptionHandler : WebExceptionHandler {
override fun handle(exchange: ServerWebExchange, ex: Throwable): Mono<Void> {
return when (ex) {
is CancellationException -> {
// Корутина была отменена - это нормально
exchange.response.statusCode = HttpStatus.NO_CONTENT
exchange.response.setComplete()
}
is TimeoutCancellationException -> {
exchange.response.statusCode = HttpStatus.REQUEST_TIMEOUT
writeErrorResponse(exchange, "Request timeout")
}
else -> {
exchange.response.statusCode = HttpStatus.INTERNAL_SERVER_ERROR
writeErrorResponse(exchange, "Internal server error")
}
}
}
private fun writeErrorResponse(exchange: ServerWebExchange, message: String): Mono<Void> {
val buffer = exchange.response.bufferFactory().wrap(message.toByteArray())
return exchange.response.writeWith(Mono.just(buffer))
}
}
Ключевые вопросы для интервью
"Как Spring интегрируется с корутинами?"
- Автоматическое bridge между suspend функциями и Mono/Flux
- Reactive stack остаётся неизменным под капотом
- Императивный API поверх реактивной архитектуры
"В чём преимущества корутин над традиционными Mono/Flux?"
- Читаемость — линейный код вместо reactive chains
- Отладка — стандартные инструменты отладки работают
- Exception handling — обычные try-catch блоки
- Composition — легко комбинировать операции
"Как работает backpressure с корутинами в Spring?"
- Автоматическое suspension — корутина приостанавливается при медленном consumer
- Flow интеграция — поддержка backpressure на уровне Kotlin Flow
- Buffer management — настройка буферизации через Spring конфигурацию
"Какие ограничения у корутин в Spring?"
- Reactive stack — необходим WebFlux, не работает с обычным MVC
- Database drivers — требуются R2DBC драйверы вместо JDBC
- Learning curve — команда должна понимать корутины и reactive programming
"Как мониторить производительность корутин в Spring?"
- Micrometer интеграция — метрики через стандартные Spring механизмы
- Structured logging — correlation ID через coroutine context
- Health checks — мониторинг активных корутин и connection pools
Главное понимание: Корутины в Spring обеспечивают высокую производительность реактивных приложений с простотой императивного кода, что критично для modern high-load backend систем.
Производительность и ошибки в корутинах
Потенциальные утечки памяти
GlobalScope - источник утечек
GlobalScope — это глобальная область видимости корутин, которая живёт всё время жизни приложения. Использование GlobalScope может привести к утечкам памяти и неконтролируемому выполнению корутин.
// ❌ ПЛОХО - утечка памяти
class UserService {
fun loadUserData(userId: String) {
GlobalScope.launch {
val userData = fetchUserFromApi(userId) // может выполняться вечно
updateCache(userData)
}
// Нет способа отменить эту корутину!
// Если UserService уничтожается, корутина продолжает работать
}
}
// ❌ ПЛОХО - долгоживущие ссылки
class MainActivity {
fun startBackgroundWork() {
GlobalScope.launch {
while (true) {
processData() // бесконечный цикл
delay(1000)
// Активность может быть уничтожена, но корутина продолжит работу
// и будет держать ссылку на контекст через замыкание
}
}
}
}
Правильное управление жизненным циклом
// ✅ ХОРОШО - контролируемый scope
class UserService : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.IO + job
fun loadUserData(userId: String) {
launch { // привязано к scope сервиса
val userData = fetchUserFromApi(userId)
updateCache(userData)
}
}
fun cleanup() {
job.cancel() // отменяет ВСЕ корутины сервиса
}
}
// ✅ ХОРОШО - scope привязанный к lifecycle
class BackgroundTaskManager {
private var serviceJob: Job? = null
fun start() {
serviceJob = CoroutineScope(Dispatchers.Default).launch {
while (isActive) { // проверяет отмену
processData()
delay(1000)
}
}
}
fun stop() {
serviceJob?.cancel() // явная отмена
serviceJob = null
}
}
Утечки в ViewModels (Android пример)
// ❌ ПЛОХО - утечка ViewModel
class UserViewModel : ViewModel() {
fun loadData() {
GlobalScope.launch { // переживёт ViewModel
val data = repository.loadData()
// Попытка обновить UI после уничтожения ViewModel
_uiState.value = UiState.Success(data)
}
}
}
// ✅ ХОРОШО - viewModelScope
class UserViewModel : ViewModel() {
fun loadData() {
viewModelScope.launch { // автоматически отменяется в onCleared()
val data = repository.loadData()
_uiState.value = UiState.Success(data)
}
}
}
Заморозка UI при неправильном использовании Dispatcher
Неправильное использование Main Dispatcher
Main Dispatcher предназначен для UI операций и имеет один поток. Выполнение тяжёлых операций в Main Dispatcher заблокирует UI.
// ❌ ПЛОХО - блокировка UI
class DataProcessor {
suspend fun processLargeDataset(data: List<DataItem>) {
// Выполняется в Main dispatcher по умолчанию
data.forEach { item ->
val result = performHeavyComputation(item) // БЛОКИРУЕТ UI!
updateProgressBar(result.progress)
}
}
}
// ❌ ПЛОХО - синхронные операции в Main
suspend fun loadDataFromNetwork(): String {
// Если это выполняется в Main dispatcher
return URL("https://api.example.com/data")
.readText() // Блокирующий I/O вызов! UI зависнет
}
Правильное переключение контекста
// ✅ ХОРОШО - правильное использование dispatchers
class DataProcessor {
suspend fun processLargeDataset(data: List<DataItem>) = withContext(Dispatchers.Main) {
data.forEach { item ->
// Тяжёлые вычисления в background
val result = withContext(Dispatchers.Default) {
performHeavyComputation(item)
}
// Обновление UI в Main
updateProgressBar(result.progress)
}
}
}
// ✅ ХОРОШО - правильная архитектура
class NetworkService {
suspend fun loadData(): String = withContext(Dispatchers.IO) {
// I/O операции в IO dispatcher
httpClient.get("https://api.example.com/data")
}
suspend fun processData(rawData: String): ProcessedData = withContext(Dispatchers.Default) {
// CPU-интенсивные операции в Default dispatcher
parseAndProcess(rawData)
}
}
class UIController {
suspend fun refreshData() {
// UI код остаётся в Main dispatcher
showLoading()
try {
val rawData = networkService.loadData()
val processedData = networkService.processData(rawData)
// Обновление UI в Main dispatcher
displayData(processedData)
} catch (e: Exception) {
showError(e.message)
} finally {
hideLoading()
}
}
}
Избежание блокирующих операций
// ❌ ПЛОХО - блокирующие операции в корутинах
suspend fun badDatabaseAccess() {
// Блокирующий JDBC в корутине
val connection = DriverManager.getConnection(jdbcUrl)
val resultSet = connection.createStatement()
.executeQuery("SELECT * FROM users") // блокирует поток!
}
// ✅ ХОРОШО - неблокирующие операции
suspend fun goodDatabaseAccess() {
// R2DBC или другие неблокирующие драйверы
val users = userRepository.findAll() // suspend функция
return users
}
// ✅ ХОРОШО - изоляция блокирующих операций
suspend fun legacySystemIntegration() = withContext(Dispatchers.IO) {
// Изолируем блокирующий код в IO dispatcher
val legacyClient = LegacyJdbcClient()
legacyClient.blockingOperation() // ограничено IO pool
}
Разница между launch и async
launch - Fire and Forget
launch создаёт корутину для side effects (побочных эффектов) и не возвращает результат. Используется когда результат выполнения не важен.
// launch для side effects
class NotificationService {
fun sendNotifications(users: List<User>) {
users.forEach { user ->
// Не ждём завершения каждой отправки
scope.launch {
emailService.sendEmail(user.email, "Welcome!")
analyticsService.trackEmailSent(user.id)
}
}
// Функция завершается сразу, уведомления отправляются в background
}
}
// ❌ ПЛОХО - неправильное использование launch
class DataService {
suspend fun getUserData(id: String): UserData {
var userData: UserData? = null
launch { // launch НЕ блокирует выполнение
userData = fetchUserData(id)
}
return userData!! // ОШИБКА! userData всегда null
}
}
async - Параллельные вычисления с результатом
async создаёт корутину, которая возвращает результат через Deferred. Используется для параллельного выполнения операций с последующим ожиданием результатов.
// async для параллельных операций
class UserProfileService {
suspend fun loadCompleteProfile(userId: String): CompleteProfile {
// Запускаем операции параллельно
val profileDeferred = async { userService.getProfile(userId) }
val ordersDeferred = async { orderService.getUserOrders(userId) }
val preferencesDeferred = async { preferencesService.getUserPreferences(userId) }
// Ждём завершения всех операций
return CompleteProfile(
profile = profileDeferred.await(),
orders = ordersDeferred.await(),
preferences = preferencesDeferred.await()
)
}
}
Сравнительная таблица launch vs async
Аспект | launch | async |
---|---|---|
Возвращаемое значение | Job | Deferred |
Получение результата | Не предусмотрено | await() |
Назначение | Side effects | Параллельные вычисления |
Исключения | Всплывают к родителю | Заморожены до await() |
Use case | Логирование, уведомления | API вызовы, вычисления |
Практические примеры
class OrderProcessor {
// launch для независимых действий
suspend fun processOrder(order: Order) {
// Основная логика обработки заказа
val processedOrder = validateAndProcessOrder(order)
// Независимые side effects - не ждём их завершения
launch { sendOrderConfirmation(order.customerEmail) }
launch { updateInventory(order.items) }
launch { logOrderProcessing(order.id) }
launch { updateAnalytics(order) }
// Возвращаем результат сразу
return processedOrder
}
// async для агрегации данных
suspend fun createOrderSummary(orderId: String): OrderSummary {
// Параллельная загрузка связанных данных
val orderData = async { orderRepository.findById(orderId) }
val customerData = async { customerRepository.findById(orderData.await().customerId) }
val paymentData = async { paymentService.getPaymentInfo(orderId) }
val shippingData = async { shippingService.getShippingInfo(orderId) }
// Ждём все результаты для создания summary
return OrderSummary(
order = orderData.await(),
customer = customerData.await(),
payment = paymentData.await(),
shipping = shippingData.await()
)
}
}
Корутины vs RxJava/Project Reactor
Сравнение парадигм
Корутины представляют императивный подход к асинхронному программированию, в то время как RxJava/Reactor следуют функциональной реактивной парадигме.
// RxJava - функциональная цепочка
class RxUserService {
fun loadUserProfile(userId: String): Single<UserProfile> {
return userRepository.findById(userId)
.flatMap { user ->
ordersRepository.findByUserId(user.id)
.map { orders -> UserProfile(user, orders) }
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.timeout(30, TimeUnit.SECONDS)
.retry(3)
}
}
// Корутины - императивный подход
class CoroutineUserService {
suspend fun loadUserProfile(userId: String): UserProfile = withContext(Dispatchers.IO) {
withTimeout(30_000) {
retry(3) {
val user = userRepository.findById(userId)
val orders = ordersRepository.findByUserId(user.id)
UserProfile(user, orders)
}
}
}
}
Преимущества корутин
1. Читаемость и отладка
// RxJava - сложно отлаживать
fun complexRxChain(): Observable<Result> {
return dataSource1.loadData()
.flatMap { data1 ->
dataSource2.loadData(data1.id)
.flatMap { data2 ->
dataSource3.processData(data1, data2)
.map { processedData ->
Result(data1, data2, processedData)
}
}
}
.onErrorResumeNext { error ->
handleError(error)
.andThen(Observable.just(defaultResult))
}
}
// Корутины - линейный код
suspend fun complexCoroutineChain(): Result {
return try {
val data1 = dataSource1.loadData()
val data2 = dataSource2.loadData(data1.id)
val processedData = dataSource3.processData(data1, data2)
Result(data1, data2, processedData)
} catch (error: Exception) {
handleError(error)
defaultResult
}
}
2. Exception handling
// RxJava - сложная обработка ошибок
fun rxErrorHandling(): Single<String> {
return networkCall()
.onErrorResumeNext { throwable ->
when (throwable) {
is NetworkException -> cacheCall()
is TimeoutException -> Single.error(UserFriendlyException("Timeout"))
else -> Single.error(throwable)
}
}
.doOnError { logError(it) }
}
// Корутины - естественная обработка
suspend fun coroutineErrorHandling(): String {
return try {
networkCall()
} catch (e: NetworkException) {
cacheCall()
} catch (e: TimeoutException) {
logError(e)
throw UserFriendlyException("Timeout")
} catch (e: Exception) {
logError(e)
throw e
}
}
Преимущества RxJava/Reactor
1. Богатые операторы
// RxJava - мощные операторы для потоков
fun rxStreamProcessing(): Observable<ProcessedData> {
return dataStream
.debounce(300, TimeUnit.MILLISECONDS) // защита от частых событий
.distinctUntilChanged() // только изменившиеся значения
.flatMap { data ->
processData(data).toObservable()
}
.scan { previous, current -> // аккумуляция состояния
combineData(previous, current)
}
.sample(1, TimeUnit.SECONDS) // сэмплирование каждую секунду
}
// Корутины - требуют больше кода для той же логики
fun coroutineStreamProcessing(): Flow<ProcessedData> {
return dataFlow
.debounce(300)
.distinctUntilChanged()
.flatMapLatest { data ->
flow { emit(processData(data)) }
}
.scan(initialValue) { previous, current ->
combineData(previous, current)
}
.sample(1000)
}
2. Backpressure стратегии
// RxJava - встроенные стратегии backpressure
fun rxBackpressure(): Flowable<String> {
return dataProducer
.onBackpressureBuffer(1000) // буферизация
.onBackpressureDrop { dropped -> // пропуск элементов
log.warn("Dropped item: $dropped")
}
.observeOn(Schedulers.computation(), false, 128) // настройка буфера
}
Сравнительная таблица
Аспект | Корутины | RxJava/Reactor |
---|---|---|
Парадигма | Императивная | Функциональная |
Читаемость | Высокая | Средняя |
Отладка | Простая | Сложная |
Exception handling | Естественная | Сложная |
Операторы | Базовые | Богатые |
Backpressure | Автоматическая | Ручная настройка |
Learning curve | Низкая | Высокая |
Performance | Высокая | Высокая |
Когда использовать что
Корутины выбирать для:
- Backend API — простота императивного кода
- Mobile приложения — легкость понимания и отладки
- Новые проекты — современный подход
- Команды без RX опыта — низкий порог входа
RxJava/Reactor выбирать для:
- Сложная stream обработка — богатые операторы
- Legacy системы — уже используется RX
- Специфичные backpressure требования — точный контроль
- Команды с RX экспертизой — используем существующий опыт
Performance оптимизации
Правильное использование Dispatchers
// ❌ ПЛОХО - неэффективное использование
class BadPerformanceService {
suspend fun processData(items: List<Item>): List<Result> {
return items.map { item ->
// Каждая операция создаёт новый context switch
withContext(Dispatchers.Default) {
heavyComputation(item)
}
}
}
}
// ✅ ХОРОШО - батчевая обработка
class GoodPerformanceService {
suspend fun processData(items: List<Item>): List<Result> =
withContext(Dispatchers.Default) {
// Один context switch для всей обработки
items.map { item ->
heavyComputation(item)
}
}
}
Оптимизация создания корутин
// ❌ ПЛОХО - создание множества корутин
class IneffientProcessor {
suspend fun processLargeDataset(data: List<DataItem>) {
data.forEach { item ->
launch { // создаём корутину для каждого элемента
processItem(item)
}
}
}
}
// ✅ ХОРОШО - батчевая обработка
class EfficientProcessor {
suspend fun processLargeDataset(data: List<DataItem>) {
data.chunked(100) // группы по 100 элементов
.map { chunk ->
async {
chunk.forEach { item ->
processItem(item) // обрабатываем в одной корутине
}
}
}
.awaitAll()
}
}
Ключевые вопросы для интервью
"Какие основные источники утечек в корутинах?"
- GlobalScope — корутины переживают создавший их объект
- Неконтролируемые scope — отсутствие явного управления жизненным циклом
- Циклические ссылки — корутины держат ссылки на UI/контекст
"Как избежать заморозки UI в корутинах?"
- Правильные Dispatchers — IO для сети/БД, Default для вычислений
- withContext для переключения контекста тяжёлых операций
- Избегать блокирующих операций в корутинах
"В чём разница между launch и async?"
- launch: fire-and-forget, для side effects, исключения всплывают сразу
- async: для получения результата, исключения при await()
"Когда выбрать корутины вместо RxJava?"
- Простота кода — императивный стиль понятнее
- Exception handling — естественная обработка ошибок
- Отладка — стандартные инструменты работают
- Team expertise — команда знакома с императивным программированием
"Какие performance оптимизации важны для корутин?"
- Батчевая обработка — группировка операций для уменьшения overhead
- Правильное использование Dispatchers — избегать ненужных переключений
- Structured concurrency — эффективное управление ресурсами
Главное понимание: Корутины требуют осознанного подхода к управлению жизненным циклом и правильного выбора dispatchers для достижения максимальной производительности без ущерба для user experience.