Инструменты синхронизации с примерами

Lock Interface

Lock - базовый интерфейс для механизмов блокировки в Java. Предоставляет более гибкую альтернативу synchronized блокам с возможностью прерывания, таймаутов и условных переменных.

Ключевые особенности:

  • Явное управление блокировкой/разблокировкой
  • Возможность прерывания ожидания блокировки
  • Попытка получения блокировки с таймаутом
  • Поддержка нескольких Condition объектов
Lock lock = new ReentrantLock();

// Базовое использование
lock.lock();
try {
    // критическая секция
} finally {
    lock.unlock(); // обязательно в finally
}

// С таймаутом
if (lock.tryLock(5, TimeUnit.SECONDS)) {
    try {
        // работа
    } finally {
        lock.unlock();
    }
}

Когда использовать: Нужна гибкость управления блокировками, прерывание потоков, работа с условными переменными.


ReentrantLock

ReentrantLock - реентерабельная блокировка, позволяющая одному потоку многократно захватывать одну и ту же блокировку. Основная реализация Lock интерфейса.

Реентерабельность означает, что поток может повторно захватить уже принадлежащую ему блокировку без deadlock.

Особенности:

  • Справедливая (fair) и несправедливая блокировка
  • Мониторинг состояния блокировки
  • Поддержка Condition объектов для сложной координации
private final ReentrantLock lock = new ReentrantLock(true); // fair lock
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

public void put(Object item) throws InterruptedException {
    lock.lock();
    try {
        while (count == capacity) {
            notFull.await(); // ждем места
        }
        queue[count++] = item;
        notEmpty.signal(); // сигнализируем о новом элементе
    } finally {
        lock.unlock();
    }
}

Справедливая блокировка - потоки получают блокировку в порядке запроса. Снижает производительность, но предотвращает starvation.

Применение: Замена synchronized для сложных сценариев синхронизации, когда нужны условные переменные или гибкое управление.


ReentrantReadWriteLock

ReadWriteLock - разделяет доступ на чтение и запись. Множественные потоки могут одновременно читать, но запись эксклюзивна.

Принцип работы:

  • Несколько потоков могут одновременно держать read-блокировку
  • Только один поток может держать write-блокировку
  • Write-блокировка исключает все остальные блокировки
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();

public String getData(String key) {
    readLock.lock();
    try {
        return map.get(key); // параллельное чтение
    } finally {
        readLock.unlock();
    }
}

public void updateData(String key, String value) {
    writeLock.lock();
    try {
        map.put(key, value); // эксклюзивная запись
    } finally {
        writeLock.unlock();
    }
}

Когда эффективен: Много операций чтения, мало записи. Неэффективен при частых записях из-за overhead.


Semaphore

Semaphore - семафор, контролирует количество потоков, одновременно выполняющих определенную операцию. Поддерживает счетчик разрешений (permits).

Принцип:

  • acquire() - получить разрешение (уменьшает счетчик)
  • release() - освободить разрешение (увеличивает счетчик)
  • Если разрешений нет, поток блокируется
// Ограничиваем до 3 одновременных подключений к БД
private final Semaphore dbConnections = new Semaphore(3);

public void executeQuery() throws InterruptedException {
    dbConnections.acquire(); // получаем разрешение
    try {
        // работа с БД
        performDatabaseOperation();
    } finally {
        dbConnections.release(); // возвращаем разрешение
    }
}

// Можно получить несколько разрешений
dbConnections.acquire(2); // получить 2 разрешения

Применение: Ограничение ресурсов (connection pools, rate limiting), управление нагрузкой на систему.


CountDownLatch

CountDownLatch - синхронизатор однократного использования. Позволяет потокам ждать завершения определенного количества операций в других потоках.

Принцип:

  • Инициализируется счетчиком
  • countDown() уменьшает счетчик
  • await() блокирует до обнуления счетчика
  • После обнуления latch нельзя переиспользовать
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(3);

// Главный поток
public void coordinator() throws InterruptedException {
    // запускаем 3 рабочих потока
    for (int i = 0; i < 3; i++) {
        new Thread(new Worker(startSignal, doneSignal)).start();
    }
    
    // даем команду старт
    startSignal.countDown();
    
    // ждем завершения всех
    doneSignal.await();
    System.out.println("Все задачи выполнены");
}

class Worker implements Runnable {
    public void run() {
        startSignal.await(); // ждем команды старт
        doWork();
        doneSignal.countDown(); // сигналим о завершении
    }
}

Применение: Ожидание готовности всех компонентов, координация старта параллельных задач, сбор результатов от множества потоков.


CyclicBarrier

CyclicBarrier - циклический барьер для синхронизации группы потоков. Потоки ждут друг друга в определенной точке, затем продолжают одновременно.

Отличия от CountDownLatch:

  • Переиспользуемый (cyclic)
  • Потоки ждут друг друга, а не события
  • Может выполнить действие при достижении барьера
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("Все потоки готовы, начинаем следующую фазу");
});

class PhaseWorker implements Runnable {
    public void run() {
        for (int phase = 0; phase < 3; phase++) {
            doPhaseWork(phase);
            
            barrier.await(); // ждем остальных потоков
            // все потоки продолжают одновременно
        }
    }
}

Применение: Алгоритмы с фазами (MapReduce), параллельные вычисления с синхронизацией, игровые циклы.


Phaser

Phaser - более гибкая версия CyclicBarrier. Поддерживает динамическое изменение количества участников и множественные фазы.

Возможности:

  • Регистрация/отмена регистрации участников во время выполнения
  • Иерархические фазеры (дерево синхронизации)
  • Завершение работы фазера
Phaser phaser = new Phaser(1); // регистрируем главный поток

// Динамическая регистрация
for (int i = 0; i < 3; i++) {
    phaser.register(); // +1 участник
    new Thread(() -> {
        for (int phase = 0; phase < 3; phase++) {
            doWork(phase);
            phaser.arriveAndAwaitAdvance(); // завершили фазу, ждем других
        }
        phaser.arriveAndDeregister(); // завершаем участие
    }).start();
}

phaser.arriveAndDeregister(); // главный поток завершает участие

Когда использовать: Сложные многофазные алгоритмы, динамическое количество участников, иерархическая синхронизация.


Exchanger

Exchanger - синхронизатор для обмена данными между двумя потоками. Потоки встречаются в точке обмена и передают друг другу объекты.

Принцип:

  • Только для двух потоков
  • exchange() блокирует до встречи с партнером
  • Потоки обмениваются данными и продолжают работу
Exchanger<String> exchanger = new Exchanger<>();

// Поток-производитель
new Thread(() -> {
    try {
        String data = "данные от производителя";
        String response = exchanger.exchange(data);
        System.out.println("Получил: " + response);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

// Поток-потребитель
new Thread(() -> {
    try {
        String data = "данные от потребителя";
        String received = exchanger.exchange(data);
        System.out.println("Получил: " + received);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

Применение: Pipeline обработки данных, producer-consumer с обратной связью, алгоритмы сортировки с обменом.


Сравнение инструментов

Инструмент Участники Переиспользование Основное назначение
Lock Любое количество Да Базовая синхронизация
Semaphore Ограниченное Да Ограничение ресурсов
CountDownLatch Любое количество Нет Ожидание событий
CyclicBarrier Фиксированное Да Синхронизация фаз
Phaser Динамическое Да Сложная многофазная синхронизация
Exchanger Ровно 2 Да Обмен данными

Выбор инструмента зависит от задачи: нужно ли переиспользование, сколько потоков участвует, какой тип синхронизации требуется.

CompletableFuture: примеры

Что такое CompletableFuture

CompletableFuture - мощный инструмент для асинхронного программирования в Java. Представляет результат асинхронной операции, который может быть получен в будущем. Комбинирует возможности Future и CompletionStage.

Ключевые преимущества:

  • Неблокирующие операции с callback'ами
  • Композиция асинхронных операций (цепочки)
  • Обработка ошибок в асинхронном коде
  • Комбинирование результатов нескольких операций

Создание CompletableFuture

// Уже завершенный Future
CompletableFuture<String> completed = CompletableFuture.completedFuture("готово");

// Асинхронное выполнение без возврата значения
CompletableFuture<Void> async = CompletableFuture.runAsync(() -> {
    System.out.println("Выполняется в другом потоке");
});

// Асинхронное выполнение с возвратом значения
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
    return "результат из другого потока";
});

// С кастомным Executor'ом
CompletableFuture<String> customPool = CompletableFuture.supplyAsync(() -> 
    "работа в кастомном пуле", customExecutor);

runAsync - для операций без возврата значения (Runnable) supplyAsync - для операций с возвратом значения (Supplier)


Трансформация результата

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");

// thenApply - синхронная трансформация
CompletableFuture<String> upper = future.thenApply(s -> s.toUpperCase());

// thenApplyAsync - асинхронная трансформация
CompletableFuture<Integer> length = future.thenApplyAsync(s -> s.length());

// thenCompose - для операций, возвращающих CompletableFuture (flatMap)
CompletableFuture<String> composed = future.thenCompose(s -> 
    CompletableFuture.supplyAsync(() -> s + " world"));

thenApply - выполняется в том же потоке thenApplyAsync - выполняется в другом потоке thenCompose - для избежания вложенных CompletableFuture<CompletableFuture>


Выполнение действий

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "данные");

// thenAccept - принимает результат, ничего не возвращает
future.thenAccept(result -> System.out.println("Получили: " + result));

// thenRun - выполняется после завершения, не получает результат
future.thenRun(() -> System.out.println("Операция завершена"));

// Цепочка операций
future
    .thenApply(s -> s.toUpperCase())
    .thenAccept(s -> System.out.println(s))
    .thenRun(() -> System.out.println("Все готово"));

thenAccept - Consumer, получает результат thenRun - Runnable, просто выполняется после завершения


Комбинирование Future

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "B");

// thenCombine - ждет оба результата, комбинирует их
CompletableFuture<String> combined = future1.thenCombine(future2, 
    (a, b) -> a + b); // результат: "AB"

// allOf - ждет завершения всех (возвращает Void)
CompletableFuture<Void> allDone = CompletableFuture.allOf(future1, future2);

// anyOf - ждет первый завершившийся
CompletableFuture<Object> firstDone = CompletableFuture.anyOf(future1, future2);

// Получение всех результатов после allOf
CompletableFuture<List<String>> allResults = CompletableFuture.allOf(future1, future2)
    .thenApply(v -> List.of(future1.join(), future2.join()));

thenCombine - для двух Future allOf - для множества Future (все должны завершиться) anyOf - первый завершившийся


Обработка ошибок

CompletableFuture<String> risky = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) throw new RuntimeException("Ошибка!");
    return "Успех";
});

// handle - обрабатывает и результат, и ошибку
CompletableFuture<String> handled = risky.handle((result, throwable) -> {
    if (throwable != null) {
        return "Обработали ошибку: " + throwable.getMessage();
    }
    return result;
});

// exceptionally - только для ошибок
CompletableFuture<String> recovered = risky.exceptionally(throwable -> 
    "Запасной результат");

// whenComplete - выполняется всегда (finally аналог)
risky.whenComplete((result, throwable) -> {
    if (throwable != null) {
        System.err.println("Ошибка: " + throwable);
    } else {
        System.out.println("Результат: " + result);
    }
});

handle - BiFunction<T, Throwable, U>, может изменить тип результата exceptionally - Function<Throwable, T>, только для ошибок whenComplete - BiConsumer<T, Throwable>, не изменяет результат


Получение результатов

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(1000);
    return "готово";
});

// Блокирующие методы
try {
    String result1 = future.get(); // может кинуть ExecutionException
    String result2 = future.get(5, TimeUnit.SECONDS); // с таймаутом
    String result3 = future.join(); // RuntimeException вместо checked
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    // обработка ошибок
}

// Неблокирующие проверки
if (future.isDone()) {
    System.out.println("Завершено");
}
if (future.isCompletedExceptionally()) {
    System.out.println("Завершено с ошибкой");
}

// Получение с дефолтным значением
String resultOrDefault = future.getNow("дефолт"); // если не готово

get() - блокирует, кидает checked exceptions join() - блокирует, кидает unchecked exceptions getNow() - неблокирующий, возвращает дефолт если не готово


Практические паттерны

Параллельные HTTP запросы

public CompletableFuture<UserProfile> getUserProfile(String userId) {
    CompletableFuture<User> userFuture = fetchUser(userId);
    CompletableFuture<List<Order>> ordersFuture = fetchUserOrders(userId);
    CompletableFuture<Settings> settingsFuture = fetchUserSettings(userId);
    
    return userFuture.thenCombine(ordersFuture, UserProfile::new)
                    .thenCombine(settingsFuture, (profile, settings) -> 
                        profile.withSettings(settings));
}

Retry с задержкой

public CompletableFuture<String> retryableOperation() {
    return CompletableFuture.supplyAsync(() -> riskyCall())
        .exceptionally(throwable -> {
            return CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)
                .execute(() -> retryableOperation());
        });
}

Timeout wrapper

public static <T> CompletableFuture<T> withTimeout(
    CompletableFuture<T> future, long timeout, TimeUnit unit) {
    
    CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    scheduler.schedule(() -> 
        timeoutFuture.completeExceptionally(new TimeoutException()), 
        timeout, unit);
    
    return future.applyToEither(timeoutFuture, Function.identity());
}

Важные моменты для собеседования

Thread Pool: По умолчанию использует ForkJoinPool.commonPool(). Для I/O операций лучше использовать отдельный пул.

Exception Handling: Необработанные исключения "проглатываются". Всегда используйте handle/exceptionally.

Блокировка: get()/join() блокируют поток. В реактивном коде избегайте блокировок.

Memory Leaks: Незавершенные CompletableFuture могут держать ссылки на объекты. Используйте timeout.

Performance: Для простых операций может быть overhead. Подходит для I/O bound операций, не для CPU bound.