Инструменты синхронизации с примерами
Lock Interface
Lock - базовый интерфейс для механизмов блокировки в Java. Предоставляет более гибкую альтернативу synchronized блокам с возможностью прерывания, таймаутов и условных переменных.
Ключевые особенности:
- Явное управление блокировкой/разблокировкой
- Возможность прерывания ожидания блокировки
- Попытка получения блокировки с таймаутом
- Поддержка нескольких Condition объектов
Lock lock = new ReentrantLock();
// Базовое использование
lock.lock();
try {
// критическая секция
} finally {
lock.unlock(); // обязательно в finally
}
// С таймаутом
if (lock.tryLock(5, TimeUnit.SECONDS)) {
try {
// работа
} finally {
lock.unlock();
}
}
Когда использовать: Нужна гибкость управления блокировками, прерывание потоков, работа с условными переменными.
ReentrantLock
ReentrantLock - реентерабельная блокировка, позволяющая одному потоку многократно захватывать одну и ту же блокировку. Основная реализация Lock интерфейса.
Реентерабельность означает, что поток может повторно захватить уже принадлежащую ему блокировку без deadlock.
Особенности:
- Справедливая (fair) и несправедливая блокировка
- Мониторинг состояния блокировки
- Поддержка Condition объектов для сложной координации
private final ReentrantLock lock = new ReentrantLock(true); // fair lock
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public void put(Object item) throws InterruptedException {
lock.lock();
try {
while (count == capacity) {
notFull.await(); // ждем места
}
queue[count++] = item;
notEmpty.signal(); // сигнализируем о новом элементе
} finally {
lock.unlock();
}
}
Справедливая блокировка - потоки получают блокировку в порядке запроса. Снижает производительность, но предотвращает starvation.
Применение: Замена synchronized для сложных сценариев синхронизации, когда нужны условные переменные или гибкое управление.
ReentrantReadWriteLock
ReadWriteLock - разделяет доступ на чтение и запись. Множественные потоки могут одновременно читать, но запись эксклюзивна.
Принцип работы:
- Несколько потоков могут одновременно держать read-блокировку
- Только один поток может держать write-блокировку
- Write-блокировка исключает все остальные блокировки
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
public String getData(String key) {
readLock.lock();
try {
return map.get(key); // параллельное чтение
} finally {
readLock.unlock();
}
}
public void updateData(String key, String value) {
writeLock.lock();
try {
map.put(key, value); // эксклюзивная запись
} finally {
writeLock.unlock();
}
}
Когда эффективен: Много операций чтения, мало записи. Неэффективен при частых записях из-за overhead.
Semaphore
Semaphore - семафор, контролирует количество потоков, одновременно выполняющих определенную операцию. Поддерживает счетчик разрешений (permits).
Принцип:
- acquire() - получить разрешение (уменьшает счетчик)
- release() - освободить разрешение (увеличивает счетчик)
- Если разрешений нет, поток блокируется
// Ограничиваем до 3 одновременных подключений к БД
private final Semaphore dbConnections = new Semaphore(3);
public void executeQuery() throws InterruptedException {
dbConnections.acquire(); // получаем разрешение
try {
// работа с БД
performDatabaseOperation();
} finally {
dbConnections.release(); // возвращаем разрешение
}
}
// Можно получить несколько разрешений
dbConnections.acquire(2); // получить 2 разрешения
Применение: Ограничение ресурсов (connection pools, rate limiting), управление нагрузкой на систему.
CountDownLatch
CountDownLatch - синхронизатор однократного использования. Позволяет потокам ждать завершения определенного количества операций в других потоках.
Принцип:
- Инициализируется счетчиком
- countDown() уменьшает счетчик
- await() блокирует до обнуления счетчика
- После обнуления latch нельзя переиспользовать
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(3);
// Главный поток
public void coordinator() throws InterruptedException {
// запускаем 3 рабочих потока
for (int i = 0; i < 3; i++) {
new Thread(new Worker(startSignal, doneSignal)).start();
}
// даем команду старт
startSignal.countDown();
// ждем завершения всех
doneSignal.await();
System.out.println("Все задачи выполнены");
}
class Worker implements Runnable {
public void run() {
startSignal.await(); // ждем команды старт
doWork();
doneSignal.countDown(); // сигналим о завершении
}
}
Применение: Ожидание готовности всех компонентов, координация старта параллельных задач, сбор результатов от множества потоков.
CyclicBarrier
CyclicBarrier - циклический барьер для синхронизации группы потоков. Потоки ждут друг друга в определенной точке, затем продолжают одновременно.
Отличия от CountDownLatch:
- Переиспользуемый (cyclic)
- Потоки ждут друг друга, а не события
- Может выполнить действие при достижении барьера
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("Все потоки готовы, начинаем следующую фазу");
});
class PhaseWorker implements Runnable {
public void run() {
for (int phase = 0; phase < 3; phase++) {
doPhaseWork(phase);
barrier.await(); // ждем остальных потоков
// все потоки продолжают одновременно
}
}
}
Применение: Алгоритмы с фазами (MapReduce), параллельные вычисления с синхронизацией, игровые циклы.
Phaser
Phaser - более гибкая версия CyclicBarrier. Поддерживает динамическое изменение количества участников и множественные фазы.
Возможности:
- Регистрация/отмена регистрации участников во время выполнения
- Иерархические фазеры (дерево синхронизации)
- Завершение работы фазера
Phaser phaser = new Phaser(1); // регистрируем главный поток
// Динамическая регистрация
for (int i = 0; i < 3; i++) {
phaser.register(); // +1 участник
new Thread(() -> {
for (int phase = 0; phase < 3; phase++) {
doWork(phase);
phaser.arriveAndAwaitAdvance(); // завершили фазу, ждем других
}
phaser.arriveAndDeregister(); // завершаем участие
}).start();
}
phaser.arriveAndDeregister(); // главный поток завершает участие
Когда использовать: Сложные многофазные алгоритмы, динамическое количество участников, иерархическая синхронизация.
Exchanger
Exchanger - синхронизатор для обмена данными между двумя потоками. Потоки встречаются в точке обмена и передают друг другу объекты.
Принцип:
- Только для двух потоков
- exchange() блокирует до встречи с партнером
- Потоки обмениваются данными и продолжают работу
Exchanger<String> exchanger = new Exchanger<>();
// Поток-производитель
new Thread(() -> {
try {
String data = "данные от производителя";
String response = exchanger.exchange(data);
System.out.println("Получил: " + response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Поток-потребитель
new Thread(() -> {
try {
String data = "данные от потребителя";
String received = exchanger.exchange(data);
System.out.println("Получил: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
Применение: Pipeline обработки данных, producer-consumer с обратной связью, алгоритмы сортировки с обменом.
Сравнение инструментов
Инструмент | Участники | Переиспользование | Основное назначение |
---|---|---|---|
Lock | Любое количество | Да | Базовая синхронизация |
Semaphore | Ограниченное | Да | Ограничение ресурсов |
CountDownLatch | Любое количество | Нет | Ожидание событий |
CyclicBarrier | Фиксированное | Да | Синхронизация фаз |
Phaser | Динамическое | Да | Сложная многофазная синхронизация |
Exchanger | Ровно 2 | Да | Обмен данными |
Выбор инструмента зависит от задачи: нужно ли переиспользование, сколько потоков участвует, какой тип синхронизации требуется.
CompletableFuture: примеры
Что такое CompletableFuture
CompletableFuture - мощный инструмент для асинхронного программирования в Java. Представляет результат асинхронной операции, который может быть получен в будущем. Комбинирует возможности Future и CompletionStage.
Ключевые преимущества:
- Неблокирующие операции с callback'ами
- Композиция асинхронных операций (цепочки)
- Обработка ошибок в асинхронном коде
- Комбинирование результатов нескольких операций
Создание CompletableFuture
// Уже завершенный Future
CompletableFuture<String> completed = CompletableFuture.completedFuture("готово");
// Асинхронное выполнение без возврата значения
CompletableFuture<Void> async = CompletableFuture.runAsync(() -> {
System.out.println("Выполняется в другом потоке");
});
// Асинхронное выполнение с возвратом значения
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
return "результат из другого потока";
});
// С кастомным Executor'ом
CompletableFuture<String> customPool = CompletableFuture.supplyAsync(() ->
"работа в кастомном пуле", customExecutor);
runAsync - для операций без возврата значения (Runnable) supplyAsync - для операций с возвратом значения (Supplier)
Трансформация результата
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");
// thenApply - синхронная трансформация
CompletableFuture<String> upper = future.thenApply(s -> s.toUpperCase());
// thenApplyAsync - асинхронная трансформация
CompletableFuture<Integer> length = future.thenApplyAsync(s -> s.length());
// thenCompose - для операций, возвращающих CompletableFuture (flatMap)
CompletableFuture<String> composed = future.thenCompose(s ->
CompletableFuture.supplyAsync(() -> s + " world"));
thenApply - выполняется в том же потоке
thenApplyAsync - выполняется в другом потоке
thenCompose - для избежания вложенных CompletableFuture<CompletableFuture
Выполнение действий
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "данные");
// thenAccept - принимает результат, ничего не возвращает
future.thenAccept(result -> System.out.println("Получили: " + result));
// thenRun - выполняется после завершения, не получает результат
future.thenRun(() -> System.out.println("Операция завершена"));
// Цепочка операций
future
.thenApply(s -> s.toUpperCase())
.thenAccept(s -> System.out.println(s))
.thenRun(() -> System.out.println("Все готово"));
thenAccept - Consumer, получает результат thenRun - Runnable, просто выполняется после завершения
Комбинирование Future
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B");
// thenCombine - ждет оба результата, комбинирует их
CompletableFuture<String> combined = future1.thenCombine(future2,
(a, b) -> a + b); // результат: "AB"
// allOf - ждет завершения всех (возвращает Void)
CompletableFuture<Void> allDone = CompletableFuture.allOf(future1, future2);
// anyOf - ждет первый завершившийся
CompletableFuture<Object> firstDone = CompletableFuture.anyOf(future1, future2);
// Получение всех результатов после allOf
CompletableFuture<List<String>> allResults = CompletableFuture.allOf(future1, future2)
.thenApply(v -> List.of(future1.join(), future2.join()));
thenCombine - для двух Future allOf - для множества Future (все должны завершиться) anyOf - первый завершившийся
Обработка ошибок
CompletableFuture<String> risky = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("Ошибка!");
return "Успех";
});
// handle - обрабатывает и результат, и ошибку
CompletableFuture<String> handled = risky.handle((result, throwable) -> {
if (throwable != null) {
return "Обработали ошибку: " + throwable.getMessage();
}
return result;
});
// exceptionally - только для ошибок
CompletableFuture<String> recovered = risky.exceptionally(throwable ->
"Запасной результат");
// whenComplete - выполняется всегда (finally аналог)
risky.whenComplete((result, throwable) -> {
if (throwable != null) {
System.err.println("Ошибка: " + throwable);
} else {
System.out.println("Результат: " + result);
}
});
handle - BiFunction<T, Throwable, U>, может изменить тип результата exceptionally - Function<Throwable, T>, только для ошибок whenComplete - BiConsumer<T, Throwable>, не изменяет результат
Получение результатов
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "готово";
});
// Блокирующие методы
try {
String result1 = future.get(); // может кинуть ExecutionException
String result2 = future.get(5, TimeUnit.SECONDS); // с таймаутом
String result3 = future.join(); // RuntimeException вместо checked
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// обработка ошибок
}
// Неблокирующие проверки
if (future.isDone()) {
System.out.println("Завершено");
}
if (future.isCompletedExceptionally()) {
System.out.println("Завершено с ошибкой");
}
// Получение с дефолтным значением
String resultOrDefault = future.getNow("дефолт"); // если не готово
get() - блокирует, кидает checked exceptions join() - блокирует, кидает unchecked exceptions getNow() - неблокирующий, возвращает дефолт если не готово
Практические паттерны
Параллельные HTTP запросы
public CompletableFuture<UserProfile> getUserProfile(String userId) {
CompletableFuture<User> userFuture = fetchUser(userId);
CompletableFuture<List<Order>> ordersFuture = fetchUserOrders(userId);
CompletableFuture<Settings> settingsFuture = fetchUserSettings(userId);
return userFuture.thenCombine(ordersFuture, UserProfile::new)
.thenCombine(settingsFuture, (profile, settings) ->
profile.withSettings(settings));
}
Retry с задержкой
public CompletableFuture<String> retryableOperation() {
return CompletableFuture.supplyAsync(() -> riskyCall())
.exceptionally(throwable -> {
return CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)
.execute(() -> retryableOperation());
});
}
Timeout wrapper
public static <T> CompletableFuture<T> withTimeout(
CompletableFuture<T> future, long timeout, TimeUnit unit) {
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() ->
timeoutFuture.completeExceptionally(new TimeoutException()),
timeout, unit);
return future.applyToEither(timeoutFuture, Function.identity());
}
Важные моменты для собеседования
Thread Pool: По умолчанию использует ForkJoinPool.commonPool(). Для I/O операций лучше использовать отдельный пул.
Exception Handling: Необработанные исключения "проглатываются". Всегда используйте handle/exceptionally.
Блокировка: get()/join() блокируют поток. В реактивном коде избегайте блокировок.
Memory Leaks: Незавершенные CompletableFuture могут держать ссылки на объекты. Используйте timeout.
Performance: Для простых операций может быть overhead. Подходит для I/O bound операций, не для CPU bound.