Внутреннее устройство
1. Архитектура PostgreSQL
Общая структура
┌─────────────────────────────────────────────────────────────┐
│ PostgreSQL Instance │
├─────────────────────────────────────────────────────────────┤
│ Client Applications │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ psql │ │ App 1 │ │ App 2 │ │ JDBC │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Connection Layer (Postmaster + Backend Processes) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Postmaster (Main Process) │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Backend 1│ │Backend 2│ │Backend N│ ... │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Background Processes │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │WAL Writer│ │Checkpointer│ │Stats │ │Autovacuum│ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Shared Memory │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Shared Buffers │ WAL Buffers │ Lock Tables │ etc. │ │
│ └─────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Storage Layer │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Data Files │ WAL Files │ Control Files │ Temp Files│ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Процессы PostgreSQL
Postmaster (Главный процесс)
- Функции: Прием соединений, создание backend процессов, управление background процессами
- Единственный: Один процесс на инстанс PostgreSQL
- PID файл:
postmaster.pid
в data директории
Backend процессы
- Один процесс на соединение: Каждое клиентское соединение получает свой backend процесс
- Изоляция: Каждый backend изолирован от других
- Жизненный цикл: Создается при подключении, уничтожается при отключении
Background процессы
# Основные background процессы
WAL Writer # Записывает WAL буферы на диск
Checkpointer # Выполняет checkpoint'ы
Autovacuum # Автоматическая очистка и анализ
Stats Collector # Сбор статистики
Logger # Логирование
Archiver # Архивирование WAL файлов
2. Система хранения данных
Структура файлов
PGDATA/
├── postgresql.conf # Основной конфигурационный файл
├── pg_hba.conf # Конфигурация аутентификации
├── pg_ident.conf # Маппинг пользователей
├── postmaster.pid # PID главного процесса
├── base/ # Базы данных
│ ├── 1/ # template1
│ ├── 13442/ # template0
│ └── 16384/ # пользовательская БД
│ ├── 16385 # таблица (relation)
│ ├── 16385_fsm # Free Space Map
│ ├── 16385_vm # Visibility Map
│ └── ...
├── global/ # Глобальные объекты
│ ├── pg_control # Контрольный файл
│ ├── pg_database # Каталог баз данных
│ └── ...
├── pg_wal/ # WAL файлы
│ ├── 000000010000000000000001
│ ├── 000000010000000000000002
│ └── ...
├── pg_xact/ # Статусы транзакций
├── pg_multixact/ # Мульти-транзакции
├── pg_notify/ # LISTEN/NOTIFY
├── pg_serial/ # Serializable transactions
├── pg_snapshots/ # Exported snapshots
├── pg_subtrans/ # Подтранзакции
├── pg_tblspc/ # Табличные пространства
├── pg_twophase/ # Двухфазные транзакции
└── pg_stat/ # Статистические файлы
Физическая структура таблиц
Страницы (Pages/Blocks)
Размер страницы: 8KB (по умолчанию)
┌─────────────────────────────────────────────────────────────┐
│ Page Header (24 bytes) │
├─────────────────────────────────────────────────────────────┤
│ Item Pointers Array │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Item 1 │ │ Item 2 │ │ Item N │ ... │
│ └─────────┘ └─────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Free Space │
├─────────────────────────────────────────────────────────────┤
│ Tuples │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Tuple N │ ... │ Tuple 2 │ Tuple 1 │ │
│ └─────────────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Special Space │
└─────────────────────────────────────────────────────────────┘
Page Header структура
typedef struct PageHeaderData {
PageXLogRecPtr pd_lsn; // LSN последней модификации
uint16 pd_checksum; // Контрольная сумма страницы
uint16 pd_flags; // Флаги страницы
LocationIndex pd_lower; // Конец item pointer array
LocationIndex pd_upper; // Начало tuple data
LocationIndex pd_special; // Начало special space
uint16 pd_pagesize_version; // Размер и версия
TransactionId pd_prune_xid; // XID для pruning
} PageHeaderData;
Tuple структура
typedef struct HeapTupleHeaderData {
union {
HeapTupleFields t_heap;
DatumTupleFields t_datum;
} t_choice;
ItemPointerData t_ctid; // Текущий TID этого tuple
uint16 t_infomask2; // Флаги и количество атрибутов
uint16 t_infomask; // Различные флаги
uint8 t_hoff; // Смещение до user data
// Далее идут данные атрибутов
} HeapTupleHeaderData;
Object Identifier (OID) система
-- Получение OID объектов
SELECT oid, relname FROM pg_class WHERE relname = 'users';
SELECT oid, typname FROM pg_type WHERE typname = 'integer';
SELECT oid, proname FROM pg_proc WHERE proname = 'now';
-- Системные каталоги
pg_class -- Таблицы, индексы, последовательности
pg_attribute -- Колонки таблиц
pg_type -- Типы данных
pg_proc -- Функции и процедуры
pg_index -- Индексы
pg_constraint -- Ограничения
3. Система управления транзакциями
MVCC (Multiversion Concurrency Control)
Transaction ID (XID)
-- Текущий XID
SELECT txid_current();
-- Информация о транзакции
SELECT xmin, xmax, ctid FROM users WHERE id = 1;
Tuple Visibility
Каждый tuple содержит:
- xmin: XID транзакции, которая создала tuple
- xmax: XID транзакции, которая удалила tuple (0 если не удален)
- ctid: Физическое расположение tuple (блок, позиция)
Правила видимости:
1. Tuple видим, если xmin committed и < текущего snapshot
2. Tuple невидим, если xmax committed и < текущего snapshot
3. Tuple невидим, если xmin не committed или > текущего snapshot
Snapshot
typedef struct SnapshotData {
TransactionId xmin; // Минимальный XID в snapshot
TransactionId xmax; // Максимальный XID в snapshot
TransactionId *xip; // Массив активных XID
uint32 xcnt; // Количество активных XID
// ... другие поля
} SnapshotData;
Уровни изоляции транзакций
Read Committed (по умолчанию)
BEGIN;
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- Видит committed изменения других транзакций
-- Новый snapshot для каждого statement
COMMIT;
Repeatable Read
BEGIN;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- Использует один snapshot для всей транзакции
-- Не видит изменения, committed после начала транзакции
COMMIT;
Serializable
BEGIN;
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- Предотвращает serialization anomalies
-- Может завершиться с serialization_failure
COMMIT;
Deadlock Detection
-- Настройки deadlock
SET deadlock_timeout = '1s'; -- Время до проверки deadlock
SET log_lock_waits = on; -- Логирование ожиданий блокировок
-- Мониторинг блокировок
SELECT * FROM pg_locks WHERE NOT granted;
SELECT * FROM pg_stat_activity WHERE wait_event_type = 'Lock';
4. Write-Ahead Logging (WAL)
Назначение WAL
- Durability: Гарантия сохранности данных
- Crash Recovery: Восстановление после сбоев
- Replication: Основа для репликации
- Point-in-time Recovery: Восстановление на определенный момент времени
Структура WAL
WAL файлы
# Размер WAL файла: 16MB (по умолчанию)
# Имена файлов: 24 символа hex
000000010000000000000001 # timeline + логический номер
# Структура имени WAL файла:
# TTTTTTTTXXXXXXXXYYYYYYYY
# T - Timeline ID (8 символов)
# X - Logical log file number (8 символов)
# Y - Segment number (8 символов)
WAL Record структура
typedef struct XLogRecord {
uint32 xl_tot_len; // Общая длина записи
TransactionId xl_xid; // XID транзакции
XLogRecPtr xl_prev; // Указатель на предыдущую запись
uint8 xl_info; // Флаги и информация
RmgrId xl_rmid; // Resource manager ID
// Далее идут данные записи
} XLogRecord;
Типы WAL записей
// Resource Managers
#define RM_XLOG_ID 0 // WAL management
#define RM_XACT_ID 1 // Transaction management
#define RM_SMGR_ID 2 // Storage management
#define RM_CLOG_ID 3 // Commit log
#define RM_DBASE_ID 4 // Database management
#define RM_TBLSPC_ID 5 // Tablespace management
#define RM_MULTIXACT_ID 6 // MultiXact management
#define RM_RELMAP_ID 7 // Relation mapping
#define RM_STANDBY_ID 8 // Standby management
#define RM_HEAP2_ID 9 // Heap operations
#define RM_HEAP_ID 10 // Heap operations
#define RM_BTREE_ID 11 // B-tree operations
#define RM_HASH_ID 12 // Hash index operations
#define RM_GIN_ID 13 // GIN index operations
#define RM_GIST_ID 14 // GiST index operations
#define RM_SEQ_ID 15 // Sequence operations
#define RM_SPGIST_ID 16 // SP-GiST index operations
WAL настройки
-- Основные параметры WAL
wal_level = replica -- minimal/replica/logical
max_wal_size = 1GB -- Максимальный размер WAL
min_wal_size = 80MB -- Минимальный размер WAL
wal_buffers = 16MB -- Размер WAL буферов
wal_writer_delay = 200ms -- Задержка WAL writer
wal_writer_flush_after = 1MB -- Принудительный flush
-- Синхронизация
fsync = on -- Принудительная синхронизация
synchronous_commit = on -- Синхронный коммит
wal_sync_method = fsync -- Метод синхронизации
-- Checkpoint
checkpoint_timeout = 5min -- Максимальное время между checkpoint
checkpoint_completion_target = 0.5 -- Целевое время завершения checkpoint
checkpoint_warning = 30s -- Предупреждение о частых checkpoint
Checkpoint процесс
-- Ручной checkpoint
CHECKPOINT;
-- Мониторинг checkpoint
SELECT * FROM pg_stat_bgwriter;
-- Логирование checkpoint
log_checkpoints = on
log_checkpoint_stats = on
5. Система индексов
B-tree индексы (по умолчанию)
Структура B-tree
Root Page
┌─────────────┐
│ 50 │ 100 │
└──┬──────┬───┘
│ │
┌───────┘ └───────┐
┌─────────────┐ ┌─────────────┐
│ 25 │ 40 │ │ 75 │ 90 │
└──┬───┬─────┘ └──┬───┬─────┘
│ │ │ │
┌───────┘ └─────┐ ┌───────┘ └─────┐
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│10│15│20│ │30│35│38│ │60│65│70│ │80│85│88│ Leaf Pages
└────────┘ └────────┘ └────────┘ └────────┘
B-tree метаданные
-- Информация об индексе
SELECT * FROM pg_stat_user_indexes WHERE indexrelname = 'users_email_idx';
-- Размер индекса
SELECT pg_size_pretty(pg_relation_size('users_email_idx'));
-- Статистика использования
SELECT * FROM pg_stat_user_indexes WHERE schemaname = 'public';
GIN (Generalized Inverted Index)
-- Создание GIN индекса
CREATE INDEX users_tags_gin ON users USING GIN (tags);
-- Для полнотекстового поиска
CREATE INDEX articles_content_gin ON articles USING GIN (to_tsvector('english', content));
-- Структура GIN
-- Entry tree: B-tree ключей
-- Posting lists: списки TID для каждого ключа
GiST (Generalized Search Tree)
-- Создание GiST индекса для геометрии
CREATE INDEX locations_point_gist ON locations USING GiST (point);
-- Для полнотекстового поиска
CREATE INDEX articles_content_gist ON articles USING GiST (to_tsvector('english', content));
SP-GiST (Space-Partitioned Generalized Search Tree)
-- Для IP адресов
CREATE INDEX network_ip_spgist ON network USING SPGIST (ip_address inet_ops);
-- Для точек
CREATE INDEX locations_point_spgist ON locations USING SPGIST (point);
BRIN (Block Range Index)
-- Создание BRIN индекса для больших таблиц с сортированными данными
CREATE INDEX logs_timestamp_brin ON logs USING BRIN (timestamp);
-- Настройка размера страницы
CREATE INDEX logs_timestamp_brin ON logs USING BRIN (timestamp) WITH (pages_per_range = 128);
Hash индексы
-- Создание Hash индекса (только для равенства)
CREATE INDEX users_id_hash ON users USING HASH (id);
-- Полезны для больших таблиц с точными поисками
6. Планировщик запросов (Query Planner)
Этапы обработки запроса
1. Parser → Parse Tree
2. Analyzer → Query Tree
3. Rewriter → Query Tree (правила и представления)
4. Planner → Plan Tree
5. Executor → Result
Статистика планировщика
-- Обновление статистики
ANALYZE table_name;
ANALYZE; -- Для всех таблиц
-- Просмотр статистики
SELECT * FROM pg_stats WHERE tablename = 'users';
-- Настройки статистики
ALTER TABLE users ALTER COLUMN email SET STATISTICS 1000; -- Увеличить выборку
SET default_statistics_target = 1000; -- Глобально
Типы планов
Sequential Scan
EXPLAIN (ANALYZE, BUFFERS) SELECT * FROM users WHERE age > 30;
-- Seq Scan on users (cost=0.00..18334.00 rows=66667 width=42)
-- Filter: (age > 30)
-- Buffers: shared hit=8334
Index Scan
EXPLAIN (ANALYZE, BUFFERS) SELECT * FROM users WHERE email = 'john@example.com';
-- Index Scan using users_email_idx on users (cost=0.29..8.31 rows=1 width=42)
-- Index Cond: (email = 'john@example.com'::text)
-- Buffers: shared hit=4
Bitmap Scan
EXPLAIN (ANALYZE, BUFFERS) SELECT * FROM users WHERE age BETWEEN 25 AND 35;
-- Bitmap Heap Scan on users (cost=1234.56..5678.90 rows=5000 width=42)
-- Recheck Cond: ((age >= 25) AND (age <= 35))
-- Buffers: shared hit=1500
-- -> Bitmap Index Scan on users_age_idx (cost=0.00..1233.31 rows=5000)
-- Index Cond: ((age >= 25) AND (age <= 35))
-- Buffers: shared hit=14
Nested Loop Join
EXPLAIN SELECT * FROM users u JOIN orders o ON u.id = o.user_id;
-- Nested Loop (cost=0.29..1234.56 rows=1000 width=84)
-- -> Seq Scan on users u (cost=0.00..334.00 rows=100 width=42)
-- -> Index Scan using orders_user_id_idx on orders o (cost=0.29..8.31 rows=10 width=42)
-- Index Cond: (user_id = u.id)
Hash Join
-- Hash Join (cost=334.00..1567.89 rows=1000 width=84)
-- Hash Cond: (o.user_id = u.id)
-- -> Seq Scan on orders o (cost=0.00..867.00 rows=10000 width=42)
-- -> Hash (cost=334.00..334.00 rows=100 width=42)
-- -> Seq Scan on users u (cost=0.00..334.00 rows=100 width=42)
Merge Join
-- Merge Join (cost=123.45..678.90 rows=1000 width=84)
-- Merge Cond: (u.id = o.user_id)
-- -> Index Scan using users_pkey on users u (cost=0.29..234.56 rows=100 width=42)
-- -> Index Scan using orders_user_id_idx on orders o (cost=0.29..345.67 rows=1000 width=42)
Настройки планировщика
-- Стоимостные параметры
seq_page_cost = 1.0 -- Стоимость последовательного чтения страницы
random_page_cost = 4.0 -- Стоимость случайного чтения страницы (SSD: 1.1)
cpu_tuple_cost = 0.01 -- Стоимость обработки одного tuple
cpu_index_tuple_cost = 0.005 -- Стоимость обработки индексного tuple
cpu_operator_cost = 0.0025 -- Стоимость выполнения оператора
-- Память для операций
work_mem = 4MB -- Память для сортировки и хеширования
maintenance_work_mem = 64MB -- Память для VACUUM, CREATE INDEX
hash_mem_multiplier = 1.0 -- Множитель для hash операций
-- Параллельные операции
max_parallel_workers_per_gather = 2 -- Максимум worker'ов на gather
parallel_tuple_cost = 0.1 -- Стоимость передачи tuple между процессами
parallel_setup_cost = 1000.0 -- Стоимость setup параллелизма
-- Контроль планов
enable_seqscan = on -- Разрешить sequential scan
enable_indexscan = on -- Разрешить index scan
enable_bitmapscan = on -- Разрешить bitmap scan
enable_hashjoin = on -- Разрешить hash join
enable_mergejoin = on -- Разрешить merge join
enable_nestloop = on -- Разрешить nested loop
7. Система памяти
Shared Memory
Shared Buffers
-- Настройка shared buffers (обычно 25% от RAM)
shared_buffers = 2GB
-- Мониторинг использования
SELECT * FROM pg_buffercache_summary();
-- Статистика по таблицам в буферах
SELECT c.relname,
count(*) as buffers,
round(100.0 * count(*) / (SELECT setting FROM pg_settings WHERE name='shared_buffers')::integer, 1) as percent
FROM pg_buffercache b
JOIN pg_class c ON b.relfilenode = pg_relation_filenode(c.oid)
GROUP BY c.relname
ORDER BY 2 DESC
LIMIT 10;
WAL Buffers
-- Размер WAL буферов (обычно 16MB)
wal_buffers = 16MB
-- Мониторинг WAL
SELECT * FROM pg_stat_wal;
Process Memory
work_mem
-- Память для каждой операции сортировки/хеширования
work_mem = 4MB
-- Временное увеличение для сессии
SET work_mem = '256MB';
-- Мониторинг использования temp файлов
SELECT temp_files, temp_bytes FROM pg_stat_database WHERE datname = current_database();
maintenance_work_mem
-- Память для VACUUM, CREATE INDEX, ALTER TABLE
maintenance_work_mem = 256MB
-- Для параллельных операций
max_parallel_maintenance_workers = 2
Эффективный кэш
-- Размер effective_cache_size (обычно 75% от RAM)
effective_cache_size = 6GB
-- Не выделяет память, только информирует планировщик
-- о доступном кэше OS
8. VACUUM и AUTOVACUUM
Назначение VACUUM
- Удаление мертвых tuples: Очистка старых версий строк
- Обновление статистики: Актуализация планировщика
- Предотвращение XID wraparound: Защита от переполнения transaction ID
- Освобождение места: Возврат свободного места в таблицах
Типы VACUUM
Обычный VACUUM
-- VACUUM таблицы
VACUUM users;
-- VACUUM с анализом
VACUUM ANALYZE users;
-- VACUUM всей базы
VACUUM;
-- Verbose режим
VACUUM (VERBOSE) users;
VACUUM FULL
-- Полная перестройка таблицы (блокирует таблицу)
VACUUM FULL users;
-- С новым табличным пространством
VACUUM FULL users TABLESPACE new_tablespace;
Free Space Map (FSM)
-- Просмотр свободного места
SELECT *, round(avail*100.0/8192, 2) as free_percent
FROM pg_freespace('users');
-- Размер FSM файла
SELECT pg_size_pretty(pg_relation_size('users', 'fsm'));
Visibility Map (VM)
-- Просмотр visibility map
SELECT blkno, all_visible, all_frozen
FROM pg_visibility_map('users');
-- Размер VM файла
SELECT pg_size_pretty(pg_relation_size('users', 'vm'));
AUTOVACUUM настройки
-- Глобальные настройки
autovacuum = on -- Включить autovacuum
autovacuum_max_workers = 3 -- Максимум worker'ов
autovacuum_naptime = 1min -- Интервал запуска launcher
-- Пороги для VACUUM
autovacuum_vacuum_threshold = 50 -- Минимум мертвых tuples
autovacuum_vacuum_scale_factor = 0.2 -- 20% от размера таблицы
-- Пороги для ANALYZE
autovacuum_analyze_threshold = 50 -- Минимум измененных tuples
autovacuum_analyze_scale_factor = 0.1 -- 10% от размера таблицы
-- Стоимостная модель
autovacuum_vacuum_cost_delay = 2ms -- Задержка между операциями
autovacuum_vacuum_cost_limit = 200 -- Лимит стоимости
Настройки для отдельных таблиц
-- Изменить настройки для таблицы
ALTER TABLE users SET (
autovacuum_vacuum_threshold = 100,
autovacuum_vacuum_scale_factor = 0.1,
autovacuum_analyze_threshold = 100,
autovacuum_analyze_scale_factor = 0.05
);
-- Отключить autovacuum для таблицы
ALTER TABLE logs SET (autovacuum_enabled = false);
Мониторинг VACUUM
-- Текущие VACUUM операции
SELECT pid, datname, relname, phase,
heap_blks_total, heap_blks_scanned, heap_blks_vacuumed
FROM pg_stat_progress_vacuum;
-- История VACUUM
SELECT schemaname, tablename, last_vacuum, last_autovacuum,
vacuum_count, autovacuum_count, n_dead_tup, n_live_tup
FROM pg_stat_user_tables
ORDER BY last_autovacuum DESC NULLS LAST;
-- Bloat в таблицах
SELECT schemaname, tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,
n_dead_tup,
round(n_dead_tup::numeric / NULLIF(n_live_tup + n_dead_tup, 0) * 100, 2) as dead_percent
FROM pg_stat_user_tables
WHERE n_dead_tup > 0
ORDER BY n_dead_tup DESC;
9. Репликация
Streaming Replication
Настройка Master
-- postgresql.conf
wal_level = replica -- Уровень WAL
max_wal_senders = 10 -- Максимум WAL sender процессов
wal_keep_size = 1
Оптимизации
1. Анализ производительности
EXPLAIN - основной инструмент анализа
Базовый EXPLAIN
-- Показать план запроса без выполнения
EXPLAIN SELECT * FROM users WHERE age > 30;
-- С подробностями
EXPLAIN (ANALYZE, BUFFERS, VERBOSE) SELECT * FROM users WHERE age > 30;
Параметры EXPLAIN
EXPLAIN (
ANALYZE true, -- Выполнить запрос и показать реальные метрики
VERBOSE true, -- Подробная информация о колонках
COSTS true, -- Показать стоимости (по умолчанию)
BUFFERS true, -- Информация об использовании буферов
TIMING true, -- Время выполнения каждого узла
SUMMARY true, -- Сводка по времени выполнения
FORMAT JSON -- Формат вывода: TEXT, XML, JSON, YAML
) SELECT * FROM users u JOIN orders o ON u.id = o.user_id;
Чтение планов запросов
-- Пример плана с объяснениями
Nested Loop (cost=0.29..856.44 rows=100 width=68) (actual time=0.123..5.234 rows=95 loops=1)
│ ├─ cost=0.29..856.44 -- Стоимость: startup..total
│ ├─ rows=100 -- Ожидаемое количество строк
│ ├─ width=68 -- Средний размер строки в байтах
│ ├─ actual time=0.123..5.234 -- Реальное время: первая строка..все строки
│ ├─ rows=95 -- Фактическое количество строк
│ └─ loops=1 -- Количество выполнений узла
-- Buffers (при включенном BUFFERS)
Buffers: shared hit=234 read=45 dirtied=2
│ ├─ hit=234 -- Найдено в shared_buffers
│ ├─ read=45 -- Прочитано с диска
│ └─ dirtied=2 -- Изменено в буферах
pg_stat_statements - мониторинг запросов
-- Установка расширения
CREATE EXTENSION pg_stat_statements;
-- Топ самых медленных запросов
SELECT query, calls, total_exec_time, mean_exec_time, rows
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 10;
-- Топ по среднему времени выполнения
SELECT query, calls, mean_exec_time, stddev_exec_time
FROM pg_stat_statements
WHERE calls > 100
ORDER BY mean_exec_time DESC
LIMIT 10;
-- Сброс статистики
SELECT pg_stat_statements_reset();
Мониторинг активности
-- Текущие активные запросы
SELECT pid, usename, datname, state, query_start,
now() - query_start as duration, query
FROM pg_stat_activity
WHERE state = 'active' AND query != '<IDLE>'
ORDER BY query_start;
-- Заблокированные запросы
SELECT blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.query AS blocked_statement,
blocking_activity.query AS blocking_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;
2. Оптимизация индексов
Стратегии индексирования
B-tree индексы (по умолчанию)
-- Простой индекс
CREATE INDEX idx_users_email ON users (email);
-- Составной индекс (порядок важен!)
CREATE INDEX idx_users_status_created ON users (status, created_at);
-- Частичный индекс
CREATE INDEX idx_users_active ON users (email) WHERE status = 'active';
-- Функциональный индекс
CREATE INDEX idx_users_lower_email ON users (lower(email));
-- Индекс с сортировкой
CREATE INDEX idx_orders_created_desc ON orders (created_at DESC);
Covering индексы (INCLUDE)
-- Индекс с дополнительными колонками
CREATE INDEX idx_users_email_covering ON users (email) INCLUDE (name, created_at);
-- Позволяет избежать обращения к таблице
EXPLAIN SELECT name, created_at FROM users WHERE email = 'john@example.com';
-- Index Only Scan using idx_users_email_covering
Специализированные индексы
-- GIN для массивов и JSON
CREATE INDEX idx_users_tags ON users USING GIN (tags);
CREATE INDEX idx_users_metadata ON users USING GIN (metadata);
-- GiST для геометрии и полнотекстового поиска
CREATE INDEX idx_locations_point ON locations USING GiST (point);
CREATE INDEX idx_articles_search ON articles USING GiST (to_tsvector('english', content));
-- SP-GiST для специальных типов
CREATE INDEX idx_network_ip ON network USING SPGIST (ip_address);
-- BRIN для больших упорядоченных таблиц
CREATE INDEX idx_logs_timestamp ON logs USING BRIN (timestamp);
-- Hash для точного поиска
CREATE INDEX idx_users_id_hash ON users USING HASH (id);
Анализ использования индексов
-- Статистика использования индексов
SELECT schemaname, tablename, indexname, idx_scan, idx_tup_read, idx_tup_fetch
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC;
-- Неиспользуемые индексы
SELECT schemaname, tablename, indexname, pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
WHERE idx_scan = 0 AND schemaname = 'public'
ORDER BY pg_relation_size(indexrelid) DESC;
-- Дублирующиеся индексы
WITH index_columns AS (
SELECT schemaname, tablename, indexname,
array_agg(attname ORDER BY attnum) as columns
FROM pg_stats ps
JOIN pg_index pi ON ps.tablename = (SELECT relname FROM pg_class WHERE oid = pi.indrelid)
JOIN pg_attribute pa ON pa.attrelid = pi.indrelid AND pa.attnum = ANY(pi.indkey)
WHERE schemaname = 'public'
GROUP BY schemaname, tablename, indexname
)
SELECT i1.schemaname, i1.tablename, i1.indexname, i2.indexname, i1.columns
FROM index_columns i1
JOIN index_columns i2 ON i1.schemaname = i2.schemaname
AND i1.tablename = i2.tablename
AND i1.columns = i2.columns
AND i1.indexname > i2.indexname;
Оптимизация составных индексов
-- Правильный порядок колонок (от наиболее селективной к менее селективной)
-- Плохо: CREATE INDEX ON orders (status, customer_id, created_at);
-- Хорошо: CREATE INDEX ON orders (customer_id, created_at, status);
-- Анализ селективности колонок
SELECT attname, n_distinct, correlation
FROM pg_stats
WHERE tablename = 'orders' AND schemaname = 'public'
ORDER BY abs(n_distinct) DESC;
-- Частичные индексы для фильтрации
CREATE INDEX idx_orders_pending ON orders (customer_id, created_at)
WHERE status = 'pending';
3. Оптимизация запросов
WHERE оптимизация
Использование индексов
-- Хорошо: использует индекс
SELECT * FROM users WHERE email = 'john@example.com';
-- Плохо: не использует индекс из-за функции
SELECT * FROM users WHERE upper(email) = 'JOHN@EXAMPLE.COM';
-- Хорошо: функциональный индекс
CREATE INDEX idx_users_upper_email ON users (upper(email));
SELECT * FROM users WHERE upper(email) = 'JOHN@EXAMPLE.COM';
-- Плохо: LIKE с префиксом % не использует индекс
SELECT * FROM users WHERE email LIKE '%@gmail.com';
-- Хорошо: LIKE без префикса %
SELECT * FROM users WHERE email LIKE 'john%';
Оптимизация OR условий
-- Плохо: может не использовать индексы эффективно
SELECT * FROM orders WHERE status = 'pending' OR status = 'processing';
-- Хорошо: использует IN
SELECT * FROM orders WHERE status IN ('pending', 'processing');
-- Еще лучше: UNION для разных индексов
SELECT * FROM orders WHERE status = 'pending'
UNION ALL
SELECT * FROM orders WHERE priority = 'high';
JOIN оптимизация
Типы JOIN и их производительность
-- Nested Loop: хорош для маленьких таблиц
-- Hash Join: хорош когда одна таблица помещается в память
-- Merge Join: хорош для больших отсортированных таблиц
-- Принудительное отключение типов JOIN для тестирования
SET enable_nestloop = off;
SET enable_hashjoin = off;
SET enable_mergejoin = off;
Порядок JOIN
-- PostgreSQL автоматически переупорядочивает JOIN для <= 12 таблиц
-- Для больших запросов порядок может быть важен
-- Правило: начинайте с наиболее селективных условий
SELECT *
FROM users u
JOIN orders o ON u.id = o.user_id -- Много заказов
JOIN products p ON o.product_id = p.id -- Мало продуктов
WHERE u.created_at > '2024-01-01' -- Селективное условие
AND p.category = 'electronics'; -- Селективное условие
Подзапросы vs JOIN
-- Часто JOIN быстрее подзапросов
-- Плохо:
SELECT * FROM users
WHERE id IN (SELECT user_id FROM orders WHERE total > 1000);
-- Лучше:
SELECT DISTINCT u.*
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.total > 1000;
-- EXISTS vs IN
-- EXISTS обычно быстрее для больших таблиц
SELECT * FROM users u
WHERE EXISTS (SELECT 1 FROM orders o WHERE o.user_id = u.id AND o.total > 1000);
LIMIT и пагинация
-- Плохо: OFFSET становится медленным для больших значений
SELECT * FROM orders ORDER BY created_at OFFSET 10000 LIMIT 20;
-- Лучше: cursor-based пагинация
SELECT * FROM orders
WHERE created_at > '2024-01-01 10:30:00'
ORDER BY created_at LIMIT 20;
-- Для обратной пагинации
SELECT * FROM orders
WHERE created_at < '2024-01-01 10:30:00'
ORDER BY created_at DESC LIMIT 20;
4. Настройка параметров PostgreSQL
Память
Shared Buffers
-- Обычно 25% от доступной RAM
shared_buffers = 2GB -- Для сервера с 8GB RAM
-- Проверка эффективности
SELECT * FROM pg_buffercache_summary();
Work Memory
-- Память для сортировки и хеширования на операцию
work_mem = 256MB -- Увеличить для аналитических запросов
-- Глобальная настройка vs сессионная
SET work_mem = '1GB'; -- Для конкретной сессии
-- Мониторинг использования temp файлов
SELECT datname, temp_files, pg_size_pretty(temp_bytes)
FROM pg_stat_database;
Эффективный кэш
-- Размер кэша ОС (не выделяет память, только подсказка планировщику)
effective_cache_size = 6GB -- Обычно 75% от RAM
Checkpoint настройки
-- Интервал checkpoint'ов
checkpoint_timeout = 15min -- Увеличить для OLTP
-- Целевое время завершения checkpoint
checkpoint_completion_target = 0.9 -- 90% от checkpoint_timeout
-- Размер WAL
max_wal_size = 4GB -- Увеличить для уменьшения частоты checkpoint
min_wal_size = 1GB
-- Мониторинг
SELECT * FROM pg_stat_bgwriter;
Планировщик запросов
-- Стоимостные параметры (настроить под ваше оборудование)
seq_page_cost = 1.0 -- Последовательное чтение
random_page_cost = 1.1 -- Случайное чтение (для SSD)
cpu_tuple_cost = 0.01 -- Обработка tuple
cpu_index_tuple_cost = 0.005 -- Обработка индексного tuple
-- Статистика планировщика
default_statistics_target = 1000 -- Увеличить для лучших планов
Параллельные операции
-- Максимум worker'ов
max_parallel_workers = 8
max_parallel_workers_per_gather = 4
max_parallel_maintenance_workers = 4
-- Минимальный размер таблицы для параллелизма
min_parallel_table_scan_size = 8MB
min_parallel_index_scan_size = 512kB
-- Стоимость параллелизма
parallel_tuple_cost = 0.1
parallel_setup_cost = 1000.0
5. Оптимизация VACUUM
Настройка Autovacuum
-- Глобальные настройки
autovacuum = on
autovacuum_max_workers = 6 -- Увеличить для больших баз
autovacuum_naptime = 15s -- Уменьшить для активных баз
-- Пороги срабатывания
autovacuum_vacuum_threshold = 50
autovacuum_vacuum_scale_factor = 0.1 -- Уменьшить для частого VACUUM
-- Стоимостная модель
autovacuum_vacuum_cost_limit = 2000 -- Увеличить для более агрессивного VACUUM
autovacuum_vacuum_cost_delay = 10ms -- Уменьшить для более быстрого VACUUM
Настройка для отдельных таблиц
-- Для часто изменяемых таблиц
ALTER TABLE active_logs SET (
autovacuum_vacuum_scale_factor = 0.05, -- Более частый VACUUM
autovacuum_analyze_scale_factor = 0.02, -- Более частый ANALYZE
autovacuum_vacuum_cost_delay = 5 -- Более агрессивный VACUUM
);
-- Для append-only таблиц
ALTER TABLE audit_logs SET (
autovacuum_vacuum_scale_factor = 0.5, -- Редкий VACUUM
autovacuum_analyze_scale_factor = 0.1 -- Умеренный ANALYZE
);
Мониторинг Bloat
-- Bloat в таблицах
SELECT schemaname, tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,
n_dead_tup,
round(n_dead_tup::numeric / NULLIF(n_live_tup + n_dead_tup, 0) * 100, 2) as dead_percent
FROM pg_stat_user_tables
WHERE n_dead_tup > 1000
ORDER BY dead_percent DESC;
-- Прогресс VACUUM
SELECT pid, datname, relname, phase,
heap_blks_total, heap_blks_scanned, heap_blks_vacuumed,
round(100.0 * heap_blks_scanned / NULLIF(heap_blks_total, 0), 1) as percent_complete
FROM pg_stat_progress_vacuum;
6. Оптимизация соединений
Connection Pooling
-- Настройки соединений
max_connections = 200 -- Не слишком много
superuser_reserved_connections = 3
-- Таймауты
statement_timeout = 300s -- Таймаут для долгих запросов
idle_in_transaction_session_timeout = 60s -- Таймаут для idle транзакций
PgBouncer конфигурация
# pgbouncer.ini
[databases]
mydb = host=localhost port=5432 dbname=mydb
[pgbouncer]
listen_port = 6432
listen_addr = *
auth_type = md5
auth_file = userlist.txt
# Pool modes
pool_mode = transaction # transaction, session, statement
max_client_conn = 1000 # Максимум клиентских соединений
default_pool_size = 25 # Размер пула по умолчанию
reserve_pool_size = 5 # Резервные соединения
# Таймауты
server_reset_query = DISCARD ALL
server_check_delay = 30
7. Мониторинг и диагностика
Системные метрики
-- I/O статистика
SELECT schemaname, tablename,
heap_blks_read, heap_blks_hit,
round(100.0 * heap_blks_hit / NULLIF(heap_blks_hit + heap_blks_read, 0), 2) as cache_hit_ratio
FROM pg_statio_user_tables
ORDER BY heap_blks_read DESC;
-- Размеры таблиц и индексов
SELECT schemaname, tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as total_size,
pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) as table_size,
pg_size_pretty(pg_indexes_size(schemaname||'.'||tablename)) as indexes_size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
-- Статистика по базе данных
SELECT datname,
numbackends, -- Активные соединения
xact_commit, xact_rollback, -- Транзакции
blks_read, blks_hit, -- Блоки
tup_returned, tup_fetched, tup_inserted, tup_updated, tup_deleted
FROM pg_stat_database
WHERE datname = current_database();
Логирование для анализа
-- Настройки логирования
log_statement = 'all' -- Логировать все запросы (осторожно!)
log_min_duration_statement = 1000 -- Логировать медленные запросы (>1сек)
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '
log_checkpoints = on
log_connections = on
log_disconnections = on
log_lock_waits = on
-- Анализ логов с pgBadger
# pgbadger /var/log/postgresql/postgresql.log -o report.html
Полезные расширения
-- pg_stat_statements (анализ запросов)
CREATE EXTENSION pg_stat_statements;
-- pg_buffercache (анализ буферов)
CREATE EXTENSION pg_buffercache;
-- pgstattuple (анализ bloat)
CREATE EXTENSION pgstattuple;
SELECT * FROM pgstattuple('users');
-- pg_visibility (анализ visibility map)
CREATE EXTENSION pg_visibility;
8. Специфические оптимизации
Аналитические запросы
-- Используйте window functions вместо подзапросов
-- Плохо:
SELECT *, (SELECT COUNT(*) FROM orders o2 WHERE o2.user_id = o1.user_id) as user_order_count
FROM orders o1;
-- Хорошо:
SELECT *, COUNT(*) OVER (PARTITION BY user_id) as user_order_count
FROM orders;
-- Материализованные представления для агрегатов
CREATE MATERIALIZED VIEW monthly_sales AS
SELECT date_trunc('month', created_at) as month,
COUNT(*) as orders_count,
SUM(total) as total_sales
FROM orders
GROUP BY 1;
-- Обновление материализованного представления
REFRESH MATERIALIZED VIEW CONCURRENTLY monthly_sales;
Полнотекстовый поиск
-- Создание tsvector колонки
ALTER TABLE articles ADD COLUMN search_vector tsvector;
-- Обновление search_vector
UPDATE articles SET search_vector = to_tsvector('english', title || ' ' || content);
-- Индекс для полнотекстового поиска
CREATE INDEX idx_articles_search ON articles USING GIN (search_vector);
-- Триггер для автообновления
CREATE OR REPLACE FUNCTION articles_search_trigger() RETURNS trigger AS $$
BEGIN
NEW.search_vector := to_tsvector('english', NEW.title || ' ' || NEW.content);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER articles_search_update
BEFORE INSERT OR UPDATE ON articles
FOR EACH ROW EXECUTE FUNCTION articles_search_trigger();
JSON оптимизация
-- Индексы для JSON
CREATE INDEX idx_users_metadata_gin ON users USING GIN (metadata);
CREATE INDEX idx_users_email_json ON users USING BTREE ((metadata->>'email'));
-- Эффективные JSON запросы
-- Хорошо: использует индекс
SELECT * FROM users WHERE metadata->>'email' = 'john@example.com';
-- Плохо: не использует индекс
SELECT * FROM users WHERE metadata->'preferences'->>'theme' = 'dark';
-- Лучше: отдельный индекс
CREATE INDEX idx_users_theme ON users USING BTREE ((metadata->'preferences'->>'theme'));
9. Партиционирование
Типы партиционирования
-- Range партиционирование
CREATE TABLE orders (
id SERIAL,
created_at TIMESTAMP NOT NULL,
total DECIMAL(10,2)
) PARTITION BY RANGE (created_at);
-- Создание партиций
CREATE TABLE orders_2024_01 PARTITION OF orders
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE orders_2024_02 PARTITION OF orders
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- List партиционирование
CREATE TABLE users (
id SERIAL,
region VARCHAR(50),
email VARCHAR(255)
) PARTITION BY LIST (region);
CREATE TABLE users_us PARTITION OF users FOR VALUES IN ('US');
CREATE TABLE users_eu PARTITION OF users FOR VALUES IN ('EU', 'UK');
-- Hash партиционирование
CREATE TABLE logs (
id BIGINT,
message TEXT
) PARTITION BY HASH (id);
CREATE TABLE logs_0 PARTITION OF logs FOR VALUES WITH (modulus 4, remainder 0);
CREATE TABLE logs_1 PARTITION OF logs FOR VALUES WITH (modulus 4, remainder 1);
Настройки для партиционированных таблиц
-- Исключение партиций (constraint exclusion)
SET constraint_exclusion = partition;
-- Автоматическое исключение партиций
SET enable_partition_pruning = on;
SET enable_partitionwise_join = on;
SET enable_partitionwise_aggregate = on;
10. Чек-лист оптимизации
Анализ проблем производительности
- Установить
pg_stat_statements
- Проверить планы выполнения с
Настройка сервера
- Настроить
shared_buffers
- Установить
effective_cache_size
- Настроить
work_mem
- Настроить
random_page_cost
Оптимизация запросов
Мониторинг
Регулярное обслуживание
- Регулярно обновлять статистику (
ANALYZE
11. Инструменты для оптимизации
Анализ запросов
# pgBadger - анализ логов
pgbadger /var/log/postgresql/postgresql.log -o report.html
# pg_stat_kcache - статистика системных вызовов
CREATE EXTENSION pg_stat_kcache;
# auto_explain - автоматическое логирование планов
shared_preload_libraries = 'auto_explain'
auto_explain.log_min_duration = 1000
auto_explain.log_analyze = on
auto_explain.log_buffers = on
Нагрузочное тестирование
# pgbench - встроенный бенчмарк
pgbench -i -s 10 mydb # Инициализация
pgbench -c 10 -j 2 -T 60 mydb # 10 клиентов, 2 потока, 60 секунд
# Кастомный тест
pgbench -c 5 -T 30 -f custom_test.sql mydb
Мониторинг в реальном времени
# pg_top - аналог htop для PostgreSQL
pg_top -d mydb
# pgCenter - интерактивный мониторинг
pgcenter top -h localhost -U postgres mydb
Эта шпаргалка поможет систематически подойти к оптимизации PostgreSQL и значительно улучшить производительность вашей базы данных.
Запросы
1. Основы SELECT
Базовый синтаксис
SELECT column1, column2, ...
FROM table_name
WHERE condition
GROUP BY column1, column2, ...
HAVING condition
ORDER BY column1 [ASC|DESC], column2 [ASC|DESC], ...
LIMIT count OFFSET start;
Простые запросы
-- Выбрать все колонки
SELECT * FROM users;
-- Выбрать конкретные колонки
SELECT id, name, email FROM users;
-- С псевдонимами
SELECT
id AS user_id,
name AS full_name,
email AS email_address
FROM users;
-- Уникальные значения
SELECT DISTINCT country FROM users;
-- Ограничение количества строк
SELECT * FROM users LIMIT 10;
SELECT * FROM users LIMIT 10 OFFSET 20;
WHERE условия
-- Основные операторы
SELECT * FROM users WHERE age > 18;
SELECT * FROM users WHERE age >= 18 AND age <= 65;
SELECT * FROM users WHERE age BETWEEN 18 AND 65;
SELECT * FROM users WHERE country IN ('USA', 'Canada', 'UK');
SELECT * FROM users WHERE email IS NOT NULL;
SELECT * FROM users WHERE name IS NULL;
-- Текстовые операторы
SELECT * FROM users WHERE name LIKE 'John%'; -- Начинается с "John"
SELECT * FROM users WHERE name LIKE '%Smith'; -- Заканчивается на "Smith"
SELECT * FROM users WHERE name LIKE '%Ann%'; -- Содержит "Ann"
SELECT * FROM users WHERE name ILIKE '%john%'; -- Регистронезависимый поиск
-- Регулярные выражения
SELECT * FROM users WHERE email ~ '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$';
SELECT * FROM users WHERE name ~* '^john'; -- Регистронезависимый
-- Логические операторы
SELECT * FROM users WHERE age > 18 AND country = 'USA';
SELECT * FROM users WHERE age < 18 OR age > 65;
SELECT * FROM users WHERE NOT country = 'USA';
2. Агрегатные функции
Основные агрегатные функции
-- Подсчет
SELECT COUNT(*) FROM users; -- Всего строк
SELECT COUNT(email) FROM users; -- Не NULL значений
SELECT COUNT(DISTINCT country) FROM users; -- Уникальных значений
-- Математические функции
SELECT SUM(salary) FROM employees;
SELECT AVG(age) FROM users;
SELECT MIN(created_at), MAX(created_at) FROM users;
SELECT STDDEV(salary) FROM employees; -- Стандартное отклонение
SELECT VARIANCE(salary) FROM employees; -- Дисперсия
-- Строковые агрегаты
SELECT STRING_AGG(name, ', ') FROM users; -- Объединение строк
SELECT STRING_AGG(name, ', ' ORDER BY name) FROM users;
-- Массивы
SELECT ARRAY_AGG(name) FROM users;
SELECT ARRAY_AGG(name ORDER BY created_at) FROM users;
GROUP BY и HAVING
-- Группировка
SELECT country, COUNT(*) as user_count
FROM users
GROUP BY country;
-- Множественная группировка
SELECT country, age_group, COUNT(*) as count
FROM (
SELECT country,
CASE
WHEN age < 18 THEN 'Minor'
WHEN age BETWEEN 18 AND 65 THEN 'Adult'
ELSE 'Senior'
END as age_group
FROM users
) subquery
GROUP BY country, age_group;
-- HAVING (фильтрация после группировки)
SELECT country, COUNT(*) as user_count
FROM users
GROUP BY country
HAVING COUNT(*) > 100;
-- ROLLUP и CUBE
SELECT country, city, COUNT(*)
FROM users
GROUP BY ROLLUP(country, city); -- Промежуточные итоги
SELECT country, city, COUNT(*)
FROM users
GROUP BY CUBE(country, city); -- Все комбинации итогов
-- GROUPING SETS
SELECT country, city, COUNT(*)
FROM users
GROUP BY GROUPING SETS ((country), (city), ()); -- Пользовательские группировки
3. JOIN операции
Типы JOIN
-- INNER JOIN (только совпадающие записи)
SELECT u.name, o.total
FROM users u
INNER JOIN orders o ON u.id = o.user_id;
-- LEFT JOIN (все записи из левой таблицы)
SELECT u.name, o.total
FROM users u
LEFT JOIN orders o ON u.id = o.user_id;
-- RIGHT JOIN (все записи из правой таблицы)
SELECT u.name, o.total
FROM users u
RIGHT JOIN orders o ON u.id = o.user_id;
-- FULL OUTER JOIN (все записи из обеих таблиц)
SELECT u.name, o.total
FROM users u
FULL OUTER JOIN orders o ON u.id = o.user_id;
-- CROSS JOIN (декартово произведение)
SELECT u.name, p.name
FROM users u
CROSS JOIN products p;
Множественные JOIN
SELECT
u.name as customer_name,
o.order_date,
p.name as product_name,
oi.quantity,
oi.price
FROM users u
JOIN orders o ON u.id = o.user_id
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.order_date >= '2024-01-01';
SELF JOIN
-- Поиск сотрудников и их менеджеров
SELECT
e.name as employee,
m.name as manager
FROM employees e
LEFT JOIN employees m ON e.manager_id = m.id;
-- Поиск пар пользователей из одного города
SELECT
u1.name as user1,
u2.name as user2,
u1.city
FROM users u1
JOIN users u2 ON u1.city = u2.city AND u1.id < u2.id;
LATERAL JOIN
-- Для каждого пользователя найти 3 последних заказа
SELECT u.name, o.order_date, o.total
FROM users u
LEFT JOIN LATERAL (
SELECT order_date, total
FROM orders
WHERE user_id = u.id
ORDER BY order_date DESC
LIMIT 3
) o ON true;
4. Подзапросы
Скалярные подзапросы
-- Подзапрос возвращает одно значение
SELECT name, salary,
(SELECT AVG(salary) FROM employees) as avg_salary
FROM employees;
-- В WHERE
SELECT name FROM users
WHERE id = (SELECT user_id FROM orders WHERE id = 1);
Подзапросы с IN/NOT IN
-- Пользователи, которые делали заказы
SELECT * FROM users
WHERE id IN (SELECT DISTINCT user_id FROM orders);
-- Пользователи без заказов
SELECT * FROM users
WHERE id NOT IN (SELECT user_id FROM orders WHERE user_id IS NOT NULL);
EXISTS/NOT EXISTS
-- Более эффективно чем IN для больших таблиц
SELECT * FROM users u
WHERE EXISTS (SELECT 1 FROM orders o WHERE o.user_id = u.id);
-- Пользователи без заказов
SELECT * FROM users u
WHERE NOT EXISTS (SELECT 1 FROM orders o WHERE o.user_id = u.id);
Коррелированные подзапросы
-- Сотрудники с зарплатой выше средней в их отделе
SELECT name, department, salary
FROM employees e1
WHERE salary > (
SELECT AVG(salary)
FROM employees e2
WHERE e2.department = e1.department
);
Подзапросы в FROM (производные таблицы)
SELECT department, avg_salary
FROM (
SELECT department, AVG(salary) as avg_salary
FROM employees
GROUP BY department
) dept_stats
WHERE avg_salary > 50000;
5. Оконные функции (Window Functions)
Базовые оконные функции
-- ROW_NUMBER - нумерация строк
SELECT name, salary,
ROW_NUMBER() OVER (ORDER BY salary DESC) as rank
FROM employees;
-- RANK и DENSE_RANK
SELECT name, salary,
RANK() OVER (ORDER BY salary DESC) as rank,
DENSE_RANK() OVER (ORDER BY salary DESC) as dense_rank
FROM employees;
-- NTILE - разбивка на группы
SELECT name, salary,
NTILE(4) OVER (ORDER BY salary) as quartile
FROM employees;
Оконные функции с PARTITION BY
-- Ранжирование внутри отделов
SELECT name, department, salary,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank
FROM employees;
-- Сравнение с средним по отделу
SELECT name, department, salary,
AVG(salary) OVER (PARTITION BY department) as dept_avg_salary
FROM employees;
LAG и LEAD
-- Предыдущее и следующее значение
SELECT
order_date,
total,
LAG(total) OVER (ORDER BY order_date) as prev_total,
LEAD(total) OVER (ORDER BY order_date) as next_total,
total - LAG(total) OVER (ORDER BY order_date) as difference
FROM orders
ORDER BY order_date;
FIRST_VALUE и LAST_VALUE
-- Первое и последнее значение в окне
SELECT
name,
salary,
FIRST_VALUE(salary) OVER (ORDER BY salary DESC) as highest_salary,
LAST_VALUE(salary) OVER (
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as lowest_salary
FROM employees;
Скользящие окна
-- Скользящее среднее за 7 дней
SELECT
order_date,
daily_total,
AVG(daily_total) OVER (
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7days
FROM daily_sales
ORDER BY order_date;
-- Накопительная сумма
SELECT
order_date,
daily_total,
SUM(daily_total) OVER (
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_total
FROM daily_sales;
6. Common Table Expressions (CTE)
Простые CTE
-- Базовый CTE
WITH high_earners AS (
SELECT name, salary, department
FROM employees
WHERE salary > 80000
)
SELECT department, COUNT(*), AVG(salary)
FROM high_earners
GROUP BY department;
Множественные CTE
WITH
department_stats AS (
SELECT department, AVG(salary) as avg_salary, COUNT(*) as emp_count
FROM employees
GROUP BY department
),
high_paying_depts AS (
SELECT department
FROM department_stats
WHERE avg_salary > 70000
)
SELECT e.name, e.salary, e.department
FROM employees e
JOIN high_paying_depts hpd ON e.department = hpd.department;
Рекурсивные CTE
-- Иерархия сотрудников
WITH RECURSIVE employee_hierarchy AS (
-- Начальное условие (топ-менеджеры)
SELECT id, name, manager_id, 1 as level, name as path
FROM employees
WHERE manager_id IS NULL
UNION ALL
-- Рекурсивная часть
SELECT e.id, e.name, e.manager_id, eh.level + 1,
eh.path || ' -> ' || e.name
FROM employees e
JOIN employee_hierarchy eh ON e.manager_id = eh.id
)
SELECT * FROM employee_hierarchy
ORDER BY level, name;
-- Генерация последовательности дат
WITH RECURSIVE date_series AS (
SELECT '2024-01-01'::date as date
UNION ALL
SELECT date + interval '1 day'
FROM date_series
WHERE date < '2024-12-31'
)
SELECT date FROM date_series;
7. CASE выражения
Простой CASE
SELECT name,
CASE department
WHEN 'Engineering' THEN 'Tech'
WHEN 'Marketing' THEN 'Business'
WHEN 'Sales' THEN 'Business'
ELSE 'Other'
END as division
FROM employees;
Поисковый CASE
SELECT name, salary,
CASE
WHEN salary < 30000 THEN 'Low'
WHEN salary BETWEEN 30000 AND 70000 THEN 'Medium'
WHEN salary > 70000 THEN 'High'
ELSE 'Unknown'
END as salary_grade
FROM employees;
Агрегация с CASE
-- Условный подсчет
SELECT
department,
COUNT(*) as total_employees,
COUNT(CASE WHEN salary > 50000 THEN 1 END) as high_earners,
SUM(CASE WHEN gender = 'M' THEN salary ELSE 0 END) as male_salary_total,
AVG(CASE WHEN experience > 5 THEN salary END) as avg_experienced_salary
FROM employees
GROUP BY department;
Pivot с CASE
-- Превращение строк в колонки
SELECT
product_id,
SUM(CASE WHEN EXTRACT(MONTH FROM order_date) = 1 THEN quantity ELSE 0 END) as jan_sales,
SUM(CASE WHEN EXTRACT(MONTH FROM order_date) = 2 THEN quantity ELSE 0 END) as feb_sales,
SUM(CASE WHEN EXTRACT(MONTH FROM order_date) = 3 THEN quantity ELSE 0 END) as mar_sales
FROM order_items oi
JOIN orders o ON oi.order_id = o.id
WHERE EXTRACT(YEAR FROM order_date) = 2024
GROUP BY product_id;
8. Работа с датами и временем
Функции дат
-- Текущие дата и время
SELECT NOW(); -- Текущая дата и время с timezone
SELECT CURRENT_TIMESTAMP; -- То же что NOW()
SELECT CURRENT_DATE; -- Только дата
SELECT CURRENT_TIME; -- Только время
-- Извлечение частей даты
SELECT
order_date,
EXTRACT(YEAR FROM order_date) as year,
EXTRACT(MONTH FROM order_date) as month,
EXTRACT(DAY FROM order_date) as day,
EXTRACT(DOW FROM order_date) as day_of_week, -- 0=Sunday
EXTRACT(DOY FROM order_date) as day_of_year,
EXTRACT(QUARTER FROM order_date) as quarter
FROM orders;
-- DATE_PART - альтернатива EXTRACT
SELECT DATE_PART('year', order_date) FROM orders;
Форматирование дат
-- TO_CHAR для форматирования
SELECT
order_date,
TO_CHAR(order_date, 'YYYY-MM-DD') as formatted_date,
TO_CHAR(order_date, 'Mon DD, YYYY') as readable_date,
TO_CHAR(order_date, 'Day, Month DD, YYYY') as full_date,
TO_CHAR(order_date, 'HH24:MI:SS') as time_only
FROM orders;
-- Парсинг строк в даты
SELECT TO_DATE('2024-03-15', 'YYYY-MM-DD');
SELECT TO_TIMESTAMP('2024-03-15 14:30:00', 'YYYY-MM-DD HH24:MI:SS');
Арифметика с датами
-- Добавление/вычитание интервалов
SELECT
NOW(),
NOW() + INTERVAL '1 day' as tomorrow,
NOW() - INTERVAL '1 week' as week_ago,
NOW() + INTERVAL '3 months' as in_three_months,
NOW() + INTERVAL '2 years 5 months 10 days' as complex_interval;
-- Разность между датами
SELECT
order_date,
shipped_date,
shipped_date - order_date as days_to_ship,
AGE(shipped_date, order_date) as shipping_duration
FROM orders
WHERE shipped_date IS NOT NULL;
Усечение дат
-- DATE_TRUNC для группировки
SELECT
DATE_TRUNC('month', order_date) as month,
COUNT(*) as orders_count,
SUM(total) as monthly_total
FROM orders
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY month;
-- Различные уровни усечения
SELECT
DATE_TRUNC('hour', created_at) as hour,
DATE_TRUNC('day', created_at) as day,
DATE_TRUNC('week', created_at) as week,
DATE_TRUNC('quarter', created_at) as quarter
FROM events;
Работа с временными зонами
-- Конвертация временных зон
SELECT
created_at,
created_at AT TIME ZONE 'UTC' as utc_time,
created_at AT TIME ZONE 'America/New_York' as ny_time,
created_at AT TIME ZONE 'Europe/London' as london_time
FROM events;
-- Текущая временная зона
SELECT current_setting('timezone');
SET timezone = 'America/Los_Angeles';
9. Работа с JSON
Операторы JSON
-- Создание таблицы с JSON
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
specs JSONB
);
-- Вставка JSON данных
INSERT INTO products (name, specs) VALUES
('Laptop', '{"cpu": "Intel i7", "ram": "16GB", "storage": "512GB SSD", "ports": ["USB-C", "HDMI", "USB-A"]}'),
('Phone', '{"cpu": "A15", "ram": "6GB", "storage": "128GB", "camera": {"main": "12MP", "front": "7MP"}}');
-- Извлечение данных JSON
SELECT
name,
specs->'cpu' as cpu, -- Возвращает JSON
specs->>'cpu' as cpu_text, -- Возвращает текст
specs->'camera'->>'main' as main_camera, -- Вложенные объекты
specs->'ports'->0 as first_port -- Элементы массива
FROM products;
JSON функции
-- Проверка существования ключей
SELECT * FROM products WHERE specs ? 'camera'; -- Есть ключ camera
SELECT * FROM products WHERE specs ?& ARRAY['cpu', 'ram']; -- Есть все ключи
SELECT * FROM products WHERE specs ?| ARRAY['wifi', 'bluetooth']; -- Есть любой из ключей
-- Поиск в массивах
SELECT * FROM products WHERE specs->'ports' @> '"USB-C"'; -- Содержит элемент
SELECT * FROM products WHERE specs->'ports' @> '["USB-C", "HDMI"]'; -- Содержит все элементы
-- JSON функции
SELECT
name,
JSON_ARRAY_LENGTH(specs->'ports') as ports_count,
JSON_OBJECT_KEYS(specs) as spec_keys
FROM products
WHERE specs->'ports' IS NOT NULL;
JSONB индексы
-- GIN индекс для JSONB
CREATE INDEX idx_products_specs ON products USING GIN (specs);
-- Индекс для конкретного пути
CREATE INDEX idx_products_cpu ON products USING BTREE ((specs->>'cpu'));
-- Эффективные запросы
SELECT * FROM products WHERE specs @> '{"cpu": "Intel i7"}';
SELECT * FROM products WHERE specs->>'ram' = '16GB';
JSON агрегация
-- Создание JSON из запроса
SELECT JSON_AGG(
JSON_BUILD_OBJECT(
'name', name,
'cpu', specs->>'cpu',
'ram', specs->>'ram'
)
) as products_json
FROM products;
-- JSON_OBJECT_AGG
SELECT JSON_OBJECT_AGG(name, specs->>'cpu') as cpu_by_product
FROM products;
10. Массивы
Создание и работа с массивами
-- Создание таблицы с массивом
CREATE TABLE articles (
id SERIAL PRIMARY KEY,
title VARCHAR(255),
tags TEXT[]
);
-- Вставка данных с массивами
INSERT INTO articles (title, tags) VALUES
('PostgreSQL Tutorial', ARRAY['database', 'sql', 'tutorial']),
('JSON in PostgreSQL', '{"json", "postgresql", "database"}');
-- Доступ к элементам массива
SELECT
title,
tags[1] as first_tag, -- Массивы начинаются с 1
tags[2:3] as middle_tags, -- Срез массива
array_length(tags, 1) as tags_count
FROM articles;
Операции с массивами
-- Поиск в массивах
SELECT * FROM articles WHERE 'database' = ANY(tags); -- Содержит элемент
SELECT * FROM articles WHERE tags @> ARRAY['database', 'sql']; -- Содержит все элементы
SELECT * FROM articles WHERE tags && ARRAY['json', 'nosql']; -- Пересекается с массивом
-- Модификация массивов
UPDATE articles
SET tags = array_append(tags, 'beginner')
WHERE id = 1;
UPDATE articles
SET tags = array_prepend('advanced', tags)
WHERE id = 2;
UPDATE articles
SET tags = array_remove(tags, 'tutorial')
WHERE id = 1;
Функции массивов
SELECT
title,
tags,
array_length(tags, 1) as count,
array_to_string(tags, ', ') as tags_string,
unnest(tags) as individual_tags
FROM articles;
-- Агрегация массивов
SELECT array_agg(title) as all_titles FROM articles;
11. Условные операторы и функции
COALESCE и NULLIF
-- COALESCE - первое не NULL значение
SELECT
name,
COALESCE(mobile_phone, home_phone, 'No phone') as contact_phone
FROM users;
-- NULLIF - возвращает NULL если значения равны
SELECT
name,
NULLIF(description, '') as clean_description -- Пустые строки в NULL
FROM products;
GREATEST и LEAST
-- Наибольшее и наименьшее значение
SELECT
name,
GREATEST(salary, bonus, commission) as max_income,
LEAST(start_date, probation_end_date) as earliest_date
FROM employees;
Условные агрегаты
-- FILTER для условной агрегации
SELECT
department,
COUNT(*) as total_employees,
COUNT(*) FILTER (WHERE salary > 50000) as high_earners,
AVG(salary) FILTER (WHERE experience > 2) as avg_experienced_salary
FROM employees
GROUP BY department;
12. Строковые функции
Основные строковые функции
SELECT
name,
LENGTH(name) as name_length,
UPPER(name) as uppercase,
LOWER(name) as lowercase,
INITCAP(name) as title_case,
REVERSE(name) as reversed
FROM users;
-- Обрезка и дополнение
SELECT
' hello world ' as original,
TRIM(' hello world ') as trimmed,
LTRIM(' hello world ') as left_trimmed,
RTRIM(' hello world ') as right_trimmed,
LPAD('123', 6, '0') as left_padded, -- '000123'
RPAD('abc', 6, 'x') as right_padded; -- 'abcxxx'
Поиск и замена
SELECT
description,
POSITION('PostgreSQL' IN description) as postgres_position,
STRPOS(description, 'SQL') as sql_position,
REPLACE(description, 'MySQL', 'PostgreSQL') as updated_desc,
TRANSLATE(description, 'aeiou', 'AEIOU') as vowels_upper
FROM articles;
Разделение строк
-- STRING_TO_ARRAY и array_to_string
SELECT
email,
STRING_TO_ARRAY(email, '@') as email_parts,
SPLIT_PART(email, '@', 1) as username,
SPLIT_PART(email, '@', 2) as domain
FROM users;
-- Регулярные выражения
SELECT
phone,
REGEXP_REPLACE(phone, '[^0-9]', '', 'g') as digits_only,
REGEXP_SPLIT_TO_ARRAY(phone, '[^0-9]+') as phone_parts
FROM contacts;
13. UNION и EXCEPT
UNION операции
-- UNION (убирает дубликаты)
SELECT name, 'customer' as type FROM customers
UNION
SELECT name, 'supplier' as type FROM suppliers;
-- UNION ALL (оставляет дубликаты, быстрее)
SELECT product_id, quantity FROM order_items
UNION ALL
SELECT product_id, quantity FROM return_items;
INTERSECT и EXCEPT
-- INTERSECT (пересечение)
SELECT email FROM customers
INTERSECT
SELECT email FROM newsletter_subscribers;
-- EXCEPT (разность)
SELECT email FROM customers
EXCEPT
SELECT email FROM unsubscribed_users;
14. Полезные системные запросы
Информация о таблицах
-- Размеры таблиц
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as total_size,
pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) as table_size,
pg_size_pretty(pg_indexes_size(schemaname||'.'||tablename)) as indexes_size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
-- Структура таблицы
SELECT
column_name,
data_type,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_name = 'users' AND table_schema = 'public'
ORDER BY ordinal_position;
Информация об индексах
-- Индексы таблицы
SELECT
indexname,
indexdef
FROM pg_indexes
WHERE tablename = 'users' AND schemaname = 'public';
-- Статистика использования индексов
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan DESC;
Активные запросы и блокировки
-- Текущие активные запросы
SELECT
pid,
usename,
datname,
state,
query_start,
now() - query_start as duration,
query
FROM pg_stat_activity
WHERE state = 'active'
ORDER BY query_start;
-- Блокировки
SELECT
l.locktype,
l.database,
l.relation,
l.page,
l.tuple,
l.pid,
l.mode,
l.granted
FROM pg_locks l
WHERE NOT l.granted;
15. Примеры комплексных запросов
Анализ продаж
-- Топ продуктов по продажам с ранжированием
WITH product_sales AS (
SELECT
p.name,
p.category,
SUM(oi.quantity * oi.price) as total_sales,
COUNT(DISTINCT o.id) as order_count,
AVG(oi.quantity * oi.price) as avg_order_value
FROM products p
JOIN order_items oi ON p.id = oi.product_id
JOIN orders o ON oi.order_id = o.id
WHERE o.order_date >= '2024-01-01'
GROUP BY p.id, p.name, p.category
),
ranked_products AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY total_sales DESC) as category_rank,
RANK() OVER (ORDER BY total_sales DESC) as overall_rank
FROM product_sales
)
SELECT
name,
category,
total_sales,
order_count,
avg_order_value,
category_rank,
overall_rank
FROM ranked_products
WHERE category_rank <= 5
ORDER BY category, category_rank;
Когортный анализ
-- Анализ удержания пользователей по месяцам
WITH user_cohorts AS (
SELECT
user_id,
DATE_TRUNC('month', MIN(order_date)) as cohort_month
FROM orders
GROUP BY user_id
),
user_activities AS (
SELECT
uc.user_id,
uc.cohort_month,
DATE_TRUNC('month', o.order_date) as activity_month,
EXTRACT(MONTH FROM AGE(o.order_date, uc.cohort_month)) as month_number
Пессимистичные блокировки
1. Основы блокировок в PostgreSQL
Что такое пессимистичные блокировки
Пессимистичные блокировки — это механизм предотвращения конфликтов путем блокировки ресурсов до их фактического использования. В отличие от оптимистичных блокировок, они блокируют данные сразу при чтении.
MVCC vs Явные блокировки
-- MVCC (по умолчанию) - оптимистичные блокировки
BEGIN;
SELECT * FROM accounts WHERE id = 1; -- Не блокирует
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;
-- Явные блокировки - пессимистичные
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE; -- Блокирует строку
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;
2. Типы блокировок строк (Row-Level Locks)
FOR UPDATE
-- Самая строгая блокировка строки
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
-- Блокирует строку для любых изменений другими транзакциями
-- Другие транзакции будут ждать завершения этой
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;
-- Практический пример: резервирование товара
BEGIN;
SELECT quantity FROM products WHERE id = 123 FOR UPDATE;
-- Проверяем доступность
UPDATE products SET quantity = quantity - 1 WHERE id = 123 AND quantity > 0;
COMMIT;
FOR NO KEY UPDATE
-- Блокирует изменения, но разрешает обновление внешних ключей
BEGIN;
SELECT * FROM users WHERE id = 1 FOR NO KEY UPDATE;
-- Другие транзакции могут создавать FK на эту строку
-- Но не могут изменять саму строку
UPDATE users SET name = 'New Name' WHERE id = 1;
COMMIT;
FOR SHARE
-- Разделяемая блокировка - блокирует UPDATE/DELETE, но разрешает SELECT
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR SHARE;
-- Другие транзакции могут читать, но не могут изменять
-- Несколько FOR SHARE блокировок могут существовать одновременно
-- Используется для предотвращения изменений при расчетах
COMMIT;
-- Пример: расчет процентов
BEGIN;
SELECT * FROM accounts WHERE status = 'active' FOR SHARE;
-- Рассчитываем проценты, зная что балансы не изменятся
INSERT INTO interest_payments (account_id, amount)
SELECT id, balance * 0.05 FROM accounts WHERE status = 'active';
COMMIT;
FOR KEY SHARE
-- Самая слабая блокировка - блокирует DELETE и изменения ключевых полей
BEGIN;
SELECT * FROM users WHERE id = 1 FOR KEY SHARE;
-- Другие транзакции могут обновлять неключевые поля
-- Но не могут удалять строку или менять первичный ключ
COMMIT;
3. Модификаторы блокировок
NOWAIT
-- Не ждать, если блокировка недоступна
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE NOWAIT;
-- Если строка заблокирована - сразу выбросить ошибку
-- ERROR: could not obtain lock on row in relation "accounts"
COMMIT;
-- Практическое применение
CREATE OR REPLACE FUNCTION try_reserve_product(product_id INT, qty INT)
RETURNS BOOLEAN AS $$
DECLARE
current_qty INT;
BEGIN
BEGIN
SELECT quantity INTO current_qty
FROM products
WHERE id = product_id
FOR UPDATE NOWAIT;
IF current_qty >= qty THEN
UPDATE products
SET quantity = quantity - qty
WHERE id = product_id;
RETURN TRUE;
ELSE
RETURN FALSE;
END IF;
EXCEPTION
WHEN lock_not_available THEN
RETURN FALSE;
END;
END;
$$ LANGUAGE plpgsql;
SKIP LOCKED
-- Пропустить заблокированные строки
SELECT * FROM tasks
WHERE status = 'pending'
ORDER BY priority DESC
FOR UPDATE SKIP LOCKED
LIMIT 1;
-- Очередь задач для воркеров
CREATE OR REPLACE FUNCTION get_next_task()
RETURNS TABLE(task_id INT, task_data TEXT) AS $$
BEGIN
RETURN QUERY
UPDATE tasks
SET status = 'processing',
worker_id = pg_backend_pid(),
started_at = NOW()
WHERE id = (
SELECT id FROM tasks
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, data;
END;
$$ LANGUAGE plpgsql;
OF table_name
-- Блокировка только конкретных таблиц в JOIN
SELECT u.name, a.balance
FROM users u
JOIN accounts a ON u.id = a.user_id
WHERE u.id = 1
FOR UPDATE OF u; -- Блокируем только строки из таблицы users
-- Полезно при сложных JOIN'ах
SELECT o.id, oi.quantity, p.name
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.id = 123
FOR UPDATE OF p; -- Блокируем только товары
4. Блокировки таблиц (Table-Level Locks)
LOCK TABLE синтаксис
-- Явная блокировка таблицы
BEGIN;
LOCK TABLE accounts IN ACCESS EXCLUSIVE MODE;
-- Полная блокировка таблицы
-- Никто не может читать или писать
COMMIT;
Режимы блокировки таблиц
ACCESS SHARE
-- Самая слабая блокировка (автоматически для SELECT)
BEGIN;
LOCK TABLE accounts IN ACCESS SHARE MODE;
-- Конфликтует только с ACCESS EXCLUSIVE
-- Множественные читатели могут работать одновременно
COMMIT;
ROW SHARE
-- Блокировка для SELECT FOR UPDATE/SHARE
BEGIN;
LOCK TABLE accounts IN ROW SHARE MODE;
-- Конфликтует с EXCLUSIVE и ACCESS EXCLUSIVE
-- Разрешает SELECT и SELECT FOR UPDATE/SHARE
COMMIT;
ROW EXCLUSIVE
-- Блокировка для INSERT/UPDATE/DELETE
BEGIN;
LOCK TABLE accounts IN ROW EXCLUSIVE MODE;
-- Конфликтует с SHARE, SHARE ROW EXCLUSIVE, EXCLUSIVE, ACCESS EXCLUSIVE
-- Автоматически устанавливается при изменении данных
COMMIT;
SHARE UPDATE EXCLUSIVE
-- Блокировка для VACUUM, ANALYZE, CREATE INDEX CONCURRENTLY
BEGIN;
LOCK TABLE accounts IN SHARE UPDATE EXCLUSIVE MODE;
-- Разрешает SELECT, но блокирует изменения схемы и данных
-- Используется для операций обслуживания
COMMIT;
SHARE
-- Блокировка для создания индексов
BEGIN;
LOCK TABLE accounts IN SHARE MODE;
-- Разрешает SELECT, но блокирует любые изменения
-- Несколько SHARE блокировок могут существовать одновременно
COMMIT;
SHARE ROW EXCLUSIVE
-- Эксклюзивная блокировка с разрешением чтения
BEGIN;
LOCK TABLE accounts IN SHARE ROW EXCLUSIVE MODE;
-- Разрешает только SELECT
-- Блокирует все изменения и другие SHARE блокировки
COMMIT;
EXCLUSIVE
-- Почти полная блокировка
BEGIN;
LOCK TABLE accounts IN EXCLUSIVE MODE;
-- Разрешает только SELECT от той же транзакции
-- Блокирует все остальные операции
COMMIT;
ACCESS EXCLUSIVE
-- Полная блокировка таблицы
BEGIN;
LOCK TABLE accounts IN ACCESS EXCLUSIVE MODE;
-- Блокирует абсолютно все операции
-- Автоматически для DROP TABLE, TRUNCATE, некоторых ALTER TABLE
COMMIT;
Матрица совместимости блокировок
Режим AS RS RE SUE S SRE E AE
ACCESS SHARE ✓ ✓ ✓ ✓ ✓ ✓ ✓ ✗
ROW SHARE ✓ ✓ ✓ ✓ ✓ ✓ ✗ ✗
ROW EXCLUSIVE ✓ ✓ ✓ ✓ ✗ ✗ ✗ ✗
SHARE UPD EXCL ✓ ✓ ✓ ✗ ✗ ✗ ✗ ✗
SHARE ✓ ✓ ✗ ✗ ✓ ✗ ✗ ✗
SHARE ROW EXCL ✓ ✓ ✗ ✗ ✗ ✗ ✗ ✗
EXCLUSIVE ✓ ✗ ✗ ✗ ✗ ✗ ✗ ✗
ACCESS EXCLUSIVE ✗ ✗ ✗ ✗ ✗ ✗ ✗ ✗
5. Advisory Locks (Консультативные блокировки)
Сессионные advisory locks
-- Получение блокировки
SELECT pg_advisory_lock(12345); -- Блокирует до конца сессии
SELECT pg_advisory_lock(1, 2); -- Составной ключ
-- Неблокирующее получение
SELECT pg_try_advisory_lock(12345); -- Возвращает true/false
-- Освобождение блокировки
SELECT pg_advisory_unlock(12345);
SELECT pg_advisory_unlock_all(); -- Освободить все advisory locks
Транзакционные advisory locks
-- Блокировка на время транзакции
BEGIN;
SELECT pg_advisory_xact_lock(12345); -- Автоматически освобождается при COMMIT/ROLLBACK
-- ... работа ...
COMMIT;
-- Неблокирующая версия
BEGIN;
SELECT pg_try_advisory_xact_lock(12345);
COMMIT;
Практические примеры advisory locks
Синглтон процесс
-- Гарантия что только один экземпляр процесса работает
CREATE OR REPLACE FUNCTION ensure_singleton_process(process_name TEXT)
RETURNS BOOLEAN AS $$
DECLARE
lock_id BIGINT;
BEGIN
-- Генерируем hash из имени процесса
lock_id := hashtext(process_name);
-- Пытаемся получить блокировку
IF pg_try_advisory_lock(lock_id) THEN
RAISE NOTICE 'Process % started', process_name;
RETURN TRUE;
ELSE
RAISE NOTICE 'Process % already running', process_name;
RETURN FALSE;
END IF;
END;
$$ LANGUAGE plpgsql;
-- Использование
SELECT ensure_singleton_process('daily_report_generator');
Пакетная обработка
-- Блокировка для обработки пакетов данных
CREATE OR REPLACE FUNCTION process_batch(batch_size INT DEFAULT 1000)
RETURNS INT AS $$
DECLARE
processed_count INT := 0;
batch_lock_id BIGINT := 555666777;
BEGIN
-- Пытаемся получить блокировку на обработку
IF NOT pg_try_advisory_lock(batch_lock_id) THEN
RAISE NOTICE 'Another batch process is already running';
RETURN 0;
END IF;
-- Обрабатываем данные
WITH updated AS (
UPDATE queue_items
SET status = 'processing',
processed_at = NOW()
WHERE id IN (
SELECT id FROM queue_items
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT batch_size
)
RETURNING id
)
SELECT COUNT(*) INTO processed_count FROM updated;
-- Освобождаем блокировку
PERFORM pg_advisory_unlock(batch_lock_id);
RETURN processed_count;
END;
$$ LANGUAGE plpgsql;
6. Мониторинг блокировок
Просмотр текущих блокировок
-- Все текущие блокировки
SELECT
l.locktype,
l.database,
l.relation::regclass,
l.page,
l.tuple,
l.virtualxid,
l.transactionid,
l.classid,
l.objid,
l.objsubid,
l.virtualtransaction,
l.pid,
l.mode,
l.granted,
a.usename,
a.query,
a.state,
a.query_start
FROM pg_locks l
LEFT JOIN pg_stat_activity a ON l.pid = a.pid
ORDER BY l.granted, l.pid;
-- Только незавершенные блокировки
SELECT * FROM pg_locks WHERE NOT granted;
Анализ блокировок
-- Детальная информация о заблокированных запросах
WITH blocking_tree AS (
SELECT
activity.pid,
activity.usename,
activity.state,
activity.query,
activity.query_start,
blocking.pid AS blocking_pid,
blocking.query AS blocking_query
FROM pg_stat_activity activity
JOIN pg_locks blocked_locks ON blocked_locks.pid = activity.pid
JOIN pg_locks blocking_locks ON (
blocked_locks.locktype = blocking_locks.locktype
AND blocked_locks.database IS NOT DISTINCT FROM blocking_locks.database
AND blocked_locks.relation IS NOT DISTINCT FROM blocking_locks.relation
AND blocked_locks.page IS NOT DISTINCT FROM blocking_locks.page
AND blocked_locks.tuple IS NOT DISTINCT FROM blocking_locks.tuple
AND blocked_locks.virtualxid IS NOT DISTINCT FROM blocking_locks.virtualxid
AND blocked_locks.transactionid IS NOT DISTINCT FROM blocking_locks.transactionid
AND blocked_locks.classid IS NOT DISTINCT FROM blocking_locks.classid
AND blocked_locks.objid IS NOT DISTINCT FROM blocking_locks.objid
AND blocked_locks.objsubid IS NOT DISTINCT FROM blocking_locks.objsubid
AND blocked_locks.pid != blocking_locks.pid
)
JOIN pg_stat_activity blocking ON blocking_locks.pid = blocking.pid
WHERE NOT blocked_locks.granted
)
SELECT
pid AS blocked_pid,
usename AS blocked_user,
state AS blocked_state,
query_start,
NOW() - query_start AS wait_duration,
query AS blocked_query,
blocking_pid,
blocking_query
FROM blocking_tree
ORDER BY query_start;
Advisory locks мониторинг
-- Текущие advisory locks
SELECT
locktype,
classid,
objid,
objsubid,
virtualtransaction,
pid,
mode,
granted
FROM pg_locks
WHERE locktype = 'advisory'
ORDER BY pid;
-- Advisory locks с информацией о процессах
SELECT
l.classid,
l.objid,
l.pid,
l.mode,
l.granted,
a.usename,
a.application_name,
a.query_start,
a.state,
LEFT(a.query, 50) AS query_preview
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE l.locktype = 'advisory'
ORDER BY l.granted DESC, l.pid;
7. Deadlock (Взаимоблокировки)
Что такое deadlock
-- Сессия 1:
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1; -- Блокирует строку 1
UPDATE accounts SET balance = balance + 100 WHERE id = 2; -- Ждет блокировку строки 2
-- Сессия 2 (одновременно):
BEGIN;
UPDATE accounts SET balance = balance - 50 WHERE id = 2; -- Блокирует строку 2
UPDATE accounts SET balance = balance + 50 WHERE id = 1; -- Ждет блокировку строки 1
-- Результат: deadlock! PostgreSQL автоматически прервет одну из транзакций
Предотвращение deadlock
Упорядочивание ресурсов
-- Плохо: случайный порядок блокировок
UPDATE accounts SET balance = balance - amount WHERE id = from_account;
UPDATE accounts SET balance = balance + amount WHERE id = to_account;
-- Хорошо: всегда блокируем в одном порядке (по ID)
CREATE OR REPLACE FUNCTION transfer_money(from_id INT, to_id INT, amount DECIMAL)
RETURNS BOOLEAN AS $$
DECLARE
first_id INT;
second_id INT;
BEGIN
-- Упорядочиваем ID для предотвращения deadlock
IF from_id < to_id THEN
first_id := from_id;
second_id := to_id;
ELSE
first_id := to_id;
second_id := from_id;
END IF;
-- Блокируем счета в строгом порядке
PERFORM balance FROM accounts WHERE id = first_id FOR UPDATE;
PERFORM balance FROM accounts WHERE id = second_id FOR UPDATE;
-- Выполняем перевод
UPDATE accounts SET balance = balance - amount WHERE id = from_id;
UPDATE accounts SET balance = balance + amount WHERE id = to_id;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
Использование advisory locks
-- Координация через advisory locks
CREATE OR REPLACE FUNCTION safe_transfer(from_id INT, to_id INT, amount DECIMAL)
RETURNS BOOLEAN AS $$
BEGIN
-- Получаем advisory lock для предотвращения deadlock
PERFORM pg_advisory_xact_lock(LEAST(from_id, to_id), GREATEST(from_id, to_id));
-- Теперь можем безопасно работать с аккаунтами
UPDATE accounts SET balance = balance - amount WHERE id = from_id;
UPDATE accounts SET balance = balance + amount WHERE id = to_id;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
Настройки deadlock
-- Время ожидания до проверки на deadlock
SET deadlock_timeout = '1s'; -- По умолчанию 1 секунда
-- Логирование deadlock'ов
SET log_lock_waits = on; -- Логировать ожидания блокировок
SET log_statement = 'all'; -- Логировать все запросы (для анализа)
8. Таймауты и настройки
Основные параметры
-- Таймауты выполнения
SET statement_timeout = '30s'; -- Максимальное время выполнения запроса
SET lock_timeout = '10s'; -- Максимальное время ожидания блокировки
SET idle_in_transaction_session_timeout = '5min'; -- Таймаут для idle транзакций
-- Deadlock настройки
SET deadlock_timeout = '1s'; -- Время до проверки deadlock
SET log_lock_waits = on; -- Логирование ожиданий блокировок
Уровни изоляции и блокировки
-- READ COMMITTED (по умолчанию)
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- Минимальные блокировки, читает последние committed данные
-- REPEATABLE READ
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- Более строгие блокировки, консистентное чтение в рамках транзакции
-- SERIALIZABLE
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- Самые строгие блокировки, полная сериализация
9. Практические паттерны
Паттерн: Очередь задач
-- Таблица задач
CREATE TABLE job_queue (
id SERIAL PRIMARY KEY,
job_type VARCHAR(50),
payload JSONB,
status VARCHAR(20) DEFAULT 'pending',
priority INT DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
started_at TIMESTAMP,
completed_at TIMESTAMP,
worker_pid INT
);
-- Функция получения задачи воркером
CREATE OR REPLACE FUNCTION get_next_job(worker_types TEXT[])
RETURNS TABLE(job_id INT, job_type TEXT, payload JSONB) AS $$
BEGIN
RETURN QUERY
UPDATE job_queue
SET status = 'running',
started_at = NOW(),
worker_pid = pg_backend_pid()
WHERE id = (
SELECT id
FROM job_queue
WHERE status = 'pending'
AND job_type = ANY(worker_types)
ORDER BY priority DESC, created_at ASC
FOR UPDATE SKIP LOCKED -- Ключевая часть!
LIMIT 1
)
RETURNING id, job_queue.job_type, job_queue.payload;
END;
$$ LANGUAGE plpgsql;
Паттерн: Счетчики с блокировками
-- Безопасное увеличение счетчика
CREATE OR REPLACE FUNCTION increment_counter(counter_name TEXT, delta INT DEFAULT 1)
RETURNS INT AS $$
DECLARE
new_value INT;
BEGIN
-- Блокируем строку счетчика
SELECT value INTO new_value
FROM counters
WHERE name = counter_name
FOR UPDATE;
-- Обновляем значение
UPDATE counters
SET value = value + delta,
updated_at = NOW()
WHERE name = counter_name;
RETURN new_value + delta;
END;
$$ LANGUAGE plpgsql;
-- Альтернатива с UPSERT
INSERT INTO counters (name, value)
VALUES ('page_views', 1)
ON CONFLICT (name)
DO UPDATE SET value = counters.value + 1;
Паттерн: Резервирование ресурсов
-- Резервирование мест
CREATE OR REPLACE FUNCTION reserve_seats(event_id INT, seat_count INT)
RETURNS BOOLEAN AS $$
DECLARE
available_seats INT;
BEGIN
-- Блокируем строку события
SELECT available_capacity INTO available_seats
FROM events
WHERE id = event_id
FOR UPDATE;
-- Проверяем доступность
IF available_seats >= seat_count THEN
UPDATE events
SET available_capacity = available_capacity - seat_count
WHERE id = event_id;
RETURN TRUE;
ELSE
RETURN FALSE;
END IF;
END;
$$ LANGUAGE plpgsql;
10. Отладка и диагностика
Поиск долгих блокировок
-- Запросы, ожидающие блокировки дольше 5 минут
SELECT
a.pid,
a.usename,
a.state,
a.query_start,
NOW() - a.query_start AS duration,
a.query
FROM pg_stat_activity a
JOIN pg_locks l ON a.pid = l.pid
WHERE l.granted = false
AND a.query_start < NOW() - INTERVAL '5 minutes'
ORDER BY a.query_start;
Анализ advisory locks
-- Поиск "висящих" advisory locks
SELECT
l.classid,
l.objid,
l.pid,
a.state,
a.query_start,
NOW() - a.query_start AS lock_duration,
a.query
FROM pg_locks l
LEFT JOIN pg_stat_activity a ON l.pid = a.pid
WHERE l.locktype = 'advisory'
AND (a.state = 'idle' OR a.query_start < NOW() - INTERVAL '1 hour')
ORDER BY a.query_start NULLS FIRST;
Убийство блокирующих процессов
-- Осторожно! Завершение процесса
SELECT pg_terminate_backend(12345); -- PID процесса
-- Более мягкое завершение
SELECT pg_cancel_backend(12345); -- Отмена текущего запроса
11. Best Practices
Рекомендации по использованию блокировок
Минимизация времени блокировки
-- Плохо: долгая блокировка
BEGIN;
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
-- ... длительные вычисления ...
UPDATE accounts SET balance = new_balance WHERE id = 1;
COMMIT;
-- Хорошо: короткая блокировка
-- ... вычисления вне транзакции ...
BEGIN;
SELECT balance FROM accounts WHERE id = 1 FOR UPDATE;
UPDATE accounts SET balance = calculated_balance WHERE id = 1;
COMMIT;
Правильный порядок операций
-- Плохо: SELECT FOR UPDATE в середине
BEGIN;
INSERT INTO orders (user_id, total) VALUES (1, 100);
SELECT balance FROM accounts WHERE id = 1 FOR UPDATE; -- Поздняя блокировка
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;
-- Хорошо: блокировки в начале
BEGIN;
SELECT balance FROM accounts WHERE id = 1 FOR UPDATE; -- Ранняя блокировка
INSERT INTO orders (user_id, total) VALUES (1, 100);
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
COMMIT;
Обработка ошибок блокировок
CREATE OR REPLACE FUNCTION safe_operation_with_retry(max_retries INT DEFAULT 3)
RETURNS BOOLEAN AS $$
DECLARE
retry_count INT := 0;
success BOOLEAN := FALSE;
BEGIN
WHILE retry_count < max_retries AND NOT success LOOP
BEGIN
-- Пытаемся выполнить операцию
BEGIN;
PERFORM some_operation_with_locks();
COMMIT;
success := TRUE;
EXCEPTION
WHEN lock_not_available THEN
ROLLBACK;
retry_count := retry_count + 1;
-- Экспоненциальная задержка
PERFORM pg_sleep(0.1 * POWER(2, retry_count));
WHEN serialization_failure THEN
ROLLBACK;
retry_count := retry_count + 1;
PERFORM pg_sleep(0.1 * POWER(2, retry_count));
END;
END LOOP;
RETURN success;
END;
$$ LANGUAGE plpgsql;
Мониторинг производительности
-- Создание представления для мониторинга блокировок
CREATE VIEW lock_monitoring AS
SELECT
l.locktype,
l.relation::regclass AS table_name,
l.mode,
l.granted,
a.pid,
a.usename,
a.state,
a.query_start,
NOW() - a.query_start AS duration,
LEFT(a.query, 100) AS query_preview
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE l.relation IS NOT NULL
ORDER BY l.granted, duration DESC;
-- Использование
SELECT * FROM lock_monitoring WHERE NOT granted;
Эта шпаргалка поможет эффективно использовать пессимистичные блокировки в PostgreSQL для обеспечения целостности данных в многопользовательской среде.
Оптимистичные блокировки
1. Основы оптимистичных блокировок
Что такое оптимистичные блокировки
Оптимистичные блокировки — это подход, при котором предполагается, что конфликты между транзакциями редки. Вместо блокировки ресурсов при чтении, конфликты обнаруживаются при попытке записи.
Оптимистичные vs Пессимистичные
// Пессимистичная блокировка
// Блокируем сразу при чтении
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
// ... работа с данными ...
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
// Оптимистичная блокировка
// Читаем без блокировки, проверяем при записи
SELECT id, balance, version FROM accounts WHERE id = 1;
// ... работа с данными ...
UPDATE accounts SET balance = balance - 100, version = version + 1
WHERE id = 1 AND version = old_version;
Преимущества и недостатки
Преимущества:
- Высокая производительность при низком уровне конфликтов
- Масштабируемость - нет блокировок при чтении
- Предотвращение deadlock'ов
- Лучшая отзывчивость интерфейса
Недостатки:
- Сложность обработки конфликтов
- Возможные retry операций
- Неэффективность при высоком уровне конфликтов
2. Реализация в PostgreSQL
Версионирование через поле version
Создание таблицы
CREATE TABLE accounts (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL,
balance DECIMAL(10,2) NOT NULL DEFAULT 0,
version INT NOT NULL DEFAULT 1,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Триггер для автоматического обновления updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_accounts_updated_at
BEFORE UPDATE ON accounts
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
Оптимистичное обновление
-- Чтение данных
SELECT id, balance, version FROM accounts WHERE id = 1;
-- Оптимистичное обновление
UPDATE accounts
SET balance = balance - 100.00,
version = version + 1
WHERE id = 1 AND version = 5; -- version из предыдущего SELECT
-- Проверка результата
GET DIAGNOSTICS affected_rows = ROW_COUNT;
IF affected_rows = 0 THEN
-- Конфликт! Данные были изменены другой транзакцией
RAISE EXCEPTION 'Optimistic lock failure for account %', account_id;
END IF;
Функция с обработкой конфликтов
CREATE OR REPLACE FUNCTION transfer_money_optimistic(
from_account_id INT,
to_account_id INT,
amount DECIMAL(10,2),
max_retries INT DEFAULT 3
) RETURNS BOOLEAN AS $$
DECLARE
from_balance DECIMAL(10,2);
from_version INT;
to_balance DECIMAL(10,2);
to_version INT;
retry_count INT := 0;
affected_rows INT;
BEGIN
WHILE retry_count < max_retries LOOP
BEGIN
-- Читаем данные счетов
SELECT balance, version INTO from_balance, from_version
FROM accounts WHERE id = from_account_id;
SELECT balance, version INTO to_balance, to_version
FROM accounts WHERE id = to_account_id;
-- Проверяем достаточность средств
IF from_balance < amount THEN
RAISE EXCEPTION 'Insufficient funds';
END IF;
-- Начинаем транзакцию
BEGIN;
-- Списываем с первого счета
UPDATE accounts
SET balance = balance - amount, version = version + 1
WHERE id = from_account_id AND version = from_version;
GET DIAGNOSTICS affected_rows = ROW_COUNT;
IF affected_rows = 0 THEN
ROLLBACK;
RAISE EXCEPTION 'Optimistic lock failure on source account';
END IF;
-- Зачисляем на второй счет
UPDATE accounts
SET balance = balance + amount, version = version + 1
WHERE id = to_account_id AND version = to_version;
GET DIAGNOSTICS affected_rows = ROW_COUNT;
IF affected_rows = 0 THEN
ROLLBACK;
RAISE EXCEPTION 'Optimistic lock failure on target account';
END IF;
COMMIT;
RETURN TRUE;
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
retry_count := retry_count + 1;
IF retry_count >= max_retries THEN
RAISE;
END IF;
-- Небольшая задержка перед повтором
PERFORM pg_sleep(0.01 * retry_count);
END;
END LOOP;
RETURN FALSE;
END;
$$ LANGUAGE plpgsql;
Версионирование через timestamp
Создание таблицы с timestamp версионированием
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10,2) NOT NULL,
quantity INT NOT NULL DEFAULT 0,
last_modified TIMESTAMP NOT NULL DEFAULT NOW()
);
-- Триггер для автообновления last_modified
CREATE OR REPLACE FUNCTION update_last_modified()
RETURNS TRIGGER AS $$
BEGIN
NEW.last_modified = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_products_last_modified
BEFORE UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION update_last_modified();
Оптимистичное обновление с timestamp
-- Чтение данных
SELECT id, name, price, quantity, last_modified
FROM products WHERE id = 1;
-- Оптимистичное обновление
UPDATE products
SET price = 99.99, quantity = quantity - 1
WHERE id = 1 AND last_modified = '2024-01-15 10:30:45.123456';
-- Проверка результата
GET DIAGNOSTICS affected_rows = ROW_COUNT;
IF affected_rows = 0 THEN
RAISE EXCEPTION 'Product was modified by another transaction';
END IF;
Использование xmin для версионирования
Системная колонка xmin как версия
-- PostgreSQL автоматически поддерживает xmin
-- Каждая строка имеет системную колонку xmin (transaction ID создания)
-- Чтение с xmin
SELECT id, balance, xmin FROM accounts WHERE id = 1;
-- Оптимистичное обновление с xmin
UPDATE accounts
SET balance = balance - 100.00
WHERE id = 1 AND xmin = 12345::xid; -- xmin из предыдущего SELECT
-- Проверка успешности
GET DIAGNOSTICS affected_rows = ROW_COUNT;
IF affected_rows = 0 THEN
-- Конфликт обнаружен
RAISE EXCEPTION 'Row was modified by another transaction';
END IF;
3. Реализация в Java с JDBC
Простая реализация с version полем
Entity класс
public class Account {
private Long id;
private Long userId;
private BigDecimal balance;
private Integer version;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
// Конструкторы, геттеры, сеттеры
public Account() {}
public Account(Long userId, BigDecimal balance) {
this.userId = userId;
this.balance = balance;
this.version = 1;
this.createdAt = LocalDateTime.now();
this.updatedAt = LocalDateTime.now();
}
// Геттеры и сеттеры
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public BigDecimal getBalance() { return balance; }
public void setBalance(BigDecimal balance) { this.balance = balance; }
public Integer getVersion() { return version; }
public void setVersion(Integer version) { this.version = version; }
// ... остальные геттеры и сеттеры
}
DAO с оптимистичными блокировками
public class AccountDAO {
private final DataSource dataSource;
public AccountDAO(DataSource dataSource) {
this.dataSource = dataSource;
}
public Account findById(Long id) throws SQLException {
String sql = "SELECT id, user_id, balance, version, created_at, updated_at " +
"FROM accounts WHERE id = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setLong(1, id);
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
Account account = new Account();
account.setId(rs.getLong("id"));
account.setUserId(rs.getLong("user_id"));
account.setBalance(rs.getBigDecimal("balance"));
account.setVersion(rs.getInt("version"));
account.setCreatedAt(rs.getTimestamp("created_at").toLocalDateTime());
account.setUpdatedAt(rs.getTimestamp("updated_at").toLocalDateTime());
return account;
}
return null;
}
}
}
public boolean updateBalance(Long accountId, BigDecimal newBalance, Integer expectedVersion)
throws SQLException {
String sql = "UPDATE accounts SET balance = ?, version = version + 1 " +
"WHERE id = ? AND version = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setBigDecimal(1, newBalance);
stmt.setLong(2, accountId);
stmt.setInt(3, expectedVersion);
int affectedRows = stmt.executeUpdate();
return affectedRows > 0;
}
}
public void transferMoney(Long fromAccountId, Long toAccountId, BigDecimal amount)
throws SQLException, OptimisticLockException {
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
try {
// Читаем данные счетов
Account fromAccount = findByIdForUpdate(conn, fromAccountId);
Account toAccount = findByIdForUpdate(conn, toAccountId);
if (fromAccount == null || toAccount == null) {
throw new IllegalArgumentException("Account not found");
}
if (fromAccount.getBalance().compareTo(amount) < 0) {
throw new IllegalArgumentException("Insufficient funds");
}
// Обновляем балансы
BigDecimal newFromBalance = fromAccount.getBalance().subtract(amount);
BigDecimal newToBalance = toAccount.getBalance().add(amount);
boolean fromUpdated = updateBalanceInTransaction(conn, fromAccountId,
newFromBalance, fromAccount.getVersion());
boolean toUpdated = updateBalanceInTransaction(conn, toAccountId,
newToBalance, toAccount.getVersion());
if (!fromUpdated || !toUpdated) {
conn.rollback();
retryCount++;
if (retryCount >= maxRetries) {
throw new OptimisticLockException("Failed to transfer money after " +
maxRetries + " retries");
}
// Небольшая задержка перед повтором
Thread.sleep(10 * retryCount);
continue;
}
conn.commit();
return; // Успешно завершено
} catch (SQLException | InterruptedException e) {
conn.rollback();
throw e;
}
}
}
}
private Account findByIdForUpdate(Connection conn, Long id) throws SQLException {
String sql = "SELECT id, user_id, balance, version FROM accounts WHERE id = ?";
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setLong(1, id);
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
Account account = new Account();
account.setId(rs.getLong("id"));
account.setUserId(rs.getLong("user_id"));
account.setBalance(rs.getBigDecimal("balance"));
account.setVersion(rs.getInt("version"));
return account;
}
return null;
}
}
}
private boolean updateBalanceInTransaction(Connection conn, Long accountId,
BigDecimal newBalance, Integer expectedVersion) throws SQLException {
String sql = "UPDATE accounts SET balance = ?, version = version + 1 " +
"WHERE id = ? AND version = ?";
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setBigDecimal(1, newBalance);
stmt.setLong(2, accountId);
stmt.setInt(3, expectedVersion);
return stmt.executeUpdate() > 0;
}
}
}
Исключение для оптимистичных блокировок
public class OptimisticLockException extends Exception {
public OptimisticLockException(String message) {
super(message);
}
public OptimisticLockException(String message, Throwable cause) {
super(message, cause);
}
}
Более сложная реализация с retry механизмом
Retry компонент
@Component
public class OptimisticLockRetryTemplate {
private static final Logger logger = LoggerFactory.getLogger(OptimisticLockRetryTemplate.class);
public <T> T execute(Supplier<T> operation, int maxRetries, long baseDelayMs) {
int attempt = 0;
while (attempt < maxRetries) {
try {
return operation.get();
} catch (OptimisticLockException e) {
attempt++;
if (attempt >= maxRetries) {
logger.error("Optimistic lock failed after {} attempts", maxRetries);
throw new RuntimeException("Operation failed after " + maxRetries + " retries", e);
}
long delay = baseDelayMs * (1L << (attempt - 1)); // Экспоненциальная задержка
logger.warn("Optimistic lock conflict, retrying in {}ms (attempt {}/{})",
delay, attempt, maxRetries);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry", ie);
}
}
}
throw new RuntimeException("Should not reach here");
}
public void execute(Runnable operation, int maxRetries, long baseDelayMs) {
execute(() -> {
operation.run();
return null;
}, maxRetries, baseDelayMs);
}
}
Сервис с использованием retry
@Service
public class AccountService {
private final AccountDAO accountDAO;
private final OptimisticLockRetryTemplate retryTemplate;
public AccountService(AccountDAO accountDAO, OptimisticLockRetryTemplate retryTemplate) {
this.accountDAO = accountDAO;
this.retryTemplate = retryTemplate;
}
public void transferMoney(Long fromAccountId, Long toAccountId, BigDecimal amount) {
retryTemplate.execute(
() -> {
try {
accountDAO.transferMoney(fromAccountId, toAccountId, amount);
} catch (SQLException e) {
throw new RuntimeException("Database error", e);
} catch (OptimisticLockException e) {
throw e; // Пробросить для retry
}
},
3, // maxRetries
50 // baseDelayMs
);
}
public void updateBalance(Long accountId, BigDecimal newBalance) {
retryTemplate.execute(
() -> {
try {
Account account = accountDAO.findById(accountId);
if (account == null) {
throw new IllegalArgumentException("Account not found: " + accountId);
}
boolean updated = accountDAO.updateBalance(accountId, newBalance, account.getVersion());
if (!updated) {
throw new OptimisticLockException("Account was modified by another transaction");
}
} catch (SQLException e) {
throw new RuntimeException("Database error", e);
}
},
3,
50
);
}
}
4. Реализация с JPA/Hibernate
Entity с @Version аннотацией
@Entity
@Table(name = "accounts")
public class Account {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "user_id", nullable = false)
private Long userId;
@Column(precision = 10, scale = 2, nullable = false)
private BigDecimal balance = BigDecimal.ZERO;
@Version // Hibernate автоматически управляет версионированием
private Integer version;
@CreationTimestamp
@Column(name = "created_at")
private LocalDateTime createdAt;
@UpdateTimestamp
@Column(name = "updated_at")
private LocalDateTime updatedAt;
// Конструкторы
protected Account() {} // Для JPA
public Account(Long userId, BigDecimal balance) {
this.userId = userId;
this.balance = balance;
}
// Геттеры и сеттеры
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public Long getUserId() { return userId; }
public void setUserId(Long userId) { this.userId = userId; }
public BigDecimal getBalance() { return balance; }
public void setBalance(BigDecimal balance) { this.balance = balance; }
public Integer getVersion() { return version; }
public void setVersion(Integer version) { this.version = version; }
public LocalDateTime getCreatedAt() { return createdAt; }
public LocalDateTime getUpdatedAt() { return updatedAt; }
// Бизнес-методы
public void withdraw(BigDecimal amount) {
if (balance.compareTo(amount) < 0) {
throw new IllegalArgumentException("Insufficient funds");
}
balance = balance.subtract(amount);
}
public void deposit(BigDecimal amount) {
if (amount.compareTo(BigDecimal.ZERO) <= 0) {
throw new IllegalArgumentException("Amount must be positive");
}
balance = balance.add(amount);
}
}
Repository
@Repository
public interface AccountRepository extends JpaRepository<Account, Long> {
@Query("SELECT a FROM Account a WHERE a.id = :id")
Optional<Account> findByIdWithLock(@Param("id") Long id);
@Query("SELECT a FROM Account a WHERE a.userId = :userId")
List<Account> findByUserId(@Param("userId") Long userId);
}
Сервис с JPA оптимистичными блокировками
@Service
@Transactional
public class AccountService {
private final AccountRepository accountRepository;
public AccountService(AccountRepository accountRepository) {
this.accountRepository = accountRepository;
}
@Retryable(
value = {OptimisticLockingFailureException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 50, multiplier = 2)
)
public void transferMoney(Long fromAccountId, Long toAccountId, BigDecimal amount) {
Account fromAccount = accountRepository.findById(fromAccountId)
.orElseThrow(() -> new IllegalArgumentException("Source account not found"));
Account toAccount = accountRepository.findById(toAccountId)
.orElseThrow(() -> new IllegalArgumentException("Target account not found"));
// Бизнес-логика с автоматической проверкой версий
fromAccount.withdraw(amount);
toAccount.deposit(amount);
// Hibernate автоматически проверит версии при flush
accountRepository.save(fromAccount);
accountRepository.save(toAccount);
}
@Retryable(
value = {OptimisticLockingFailureException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 50, multiplier = 2)
)
public void updateBalance(Long accountId, BigDecimal newBalance) {
Account account = accountRepository.findById(accountId)
.orElseThrow(() -> new IllegalArgumentException("Account not found"));
account.setBalance(newBalance);
accountRepository.save(account);
}
@Recover
public void recoverFromOptimisticLock(OptimisticLockingFailureException ex,
Long fromAccountId, Long toAccountId, BigDecimal amount) {
logger.error("Transfer failed after retries: {} -> {}, amount: {}",
fromAccountId, toAccountId, amount, ex);
throw new BusinessException("Transfer failed due to concurrent modifications", ex);
}
}
Конфигурация для retry
@Configuration
@EnableRetry
public class RetryConfig {
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// Фиксированная задержка
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(100L); // 100ms
retryTemplate.setBackOffPolicy(backOffPolicy);
// Настройка retry
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
}
5. Продвинутые техники
Conditional Updates (Условные обновления)
Множественные условия
-- Обновление с множественными проверками
UPDATE products
SET quantity = quantity - 1,
version = version + 1,
last_sold = NOW()
WHERE id = 1
AND version = 5
AND quantity > 0 -- Дополнительная бизнес-логика
AND status = 'available'; -- Проверка статуса
-- В Java
public boolean reserveProduct(Long productId, Integer expectedVersion) {
String sql = """
UPDATE products
SET quantity = quantity - 1, version = version + 1, last_sold = NOW()
WHERE id = ? AND version = ? AND quantity > 0 AND status = 'available'
""";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setLong(1, productId);
stmt.setInt(2, expectedVersion);
return stmt.executeUpdate() > 0;
}
}
Batch оптимистичные обновления
Batch операции с версионированием
public class BatchOptimisticUpdate {
public void updateMultipleAccounts(List<AccountUpdate> updates) throws SQLException {
String sql = "UPDATE accounts SET balance = ?, version = version + 1 " +
"WHERE id = ? AND version = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
conn.setAutoCommit(false);
for (AccountUpdate update : updates) {
stmt.setBigDecimal(1, update.getNewBalance());
stmt.setLong(2, update.getAccountId());
stmt.setInt(3, update.getExpectedVersion());
stmt.addBatch();
}
int[] results = stmt.executeBatch();
// Проверяем результаты
for (int i = 0; i < results.length; i++) {
if (results[i] == 0) {
conn.rollback();
throw new OptimisticLockException(
"Optimistic lock failure for account: " + updates.get(i).getAccountId());
}
}
conn.commit();
}
}
public static class AccountUpdate {
private final Long accountId;
private final BigDecimal newBalance;
private final Integer expectedVersion;
public AccountUpdate(Long accountId, BigDecimal newBalance, Integer expectedVersion) {
this.accountId = accountId;
this.newBalance = newBalance;
this.expectedVersion = expectedVersion;
}
// Геттеры
public Long getAccountId() { return accountId; }
public BigDecimal getNewBalance() { return newBalance; }
public Integer getExpectedVersion() { return expectedVersion; }
}
}
Compare-and-Swap операции
Атомарные CAS операции
public class CASOperations {
// Атомарное увеличение счетчика
public boolean incrementCounter(String counterName, Integer expectedValue, Integer increment)
throws SQLException {
String sql = """
UPDATE counters
SET value = value + ?, last_modified = NOW()
WHERE name = ? AND value = ?
""";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setInt(1, increment);
stmt.setString(2, counterName);
stmt.setInt(3, expectedValue);
return stmt.executeUpdate() > 0;
}
}
// Условная замена значения
public boolean compareAndSwap(Long recordId, String oldValue, String newValue)
throws SQLException {
String sql = """
UPDATE records
SET value = ?, version = version + 1, updated_at = NOW()
WHERE id = ? AND value = ?
""";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, newValue);
stmt.setLong(2, recordId);
stmt.setString(3, oldValue);
return stmt.executeUpdate() > 0;
}
}
}
6. Обработка конфликтов
Стратегии разрешения конфликтов
Fail-Fast стратегия
@Service
public class FailFastAccountService {
public void transferMoney(Long fromId, Long toId, BigDecimal amount) {
try {
performTransfer(fromId, toId, amount);
} catch (OptimisticLockException e) {
// Сразу выбрасываем исключение пользователю
throw new BusinessException("Transaction failed due to concurrent modification. Please try again.");
}
}
private void performTransfer(Long fromId, Long toId, BigDecimal amount) {
// Логика трансфера без retry
}
}
Retry стратегия
@Service
public class RetryAccountService {
private static final int MAX_RETRIES = 3;
private static final long BASE_DELAY_MS = 50;
public void transferMoney(Long fromId, Long toId, BigDecimal amount) {
RetryPolicy retryPolicy = RetryPolicy.builder()
.maxRetries(MAX_RETRIES)
.exponentialBackoff(BASE_DELAY_MS, Duration.ofSeconds(1))
.onRetry(event -> logger.warn("Retry attempt {} for transfer {}->{}",
event.getAttemptCount(), fromId, toId))
.build();
Retry.of("transfer", retryPolicy).executeSupplier(() -> {
performTransfer(fromId, toId, amount);
return null;
});
}
}
Merge стратегия
@Service
public class MergeAccountService {
public void updateAccountData(Long accountId, AccountUpdateRequest request) {
int maxAttempts = 3;
int attempt = 0;
while (attempt < maxAttempts) {
try {
Account currentAccount = accountRepository.findById(accountId)
.orElseThrow(() -> new EntityNotFoundException("Account not found"));
// Мержим изменения
Account mergedAccount = mergeChanges(currentAccount, request);
accountRepository.save(mergedAccount);
return; // Успешно
} catch (OptimisticLockingFailureException e) {
attempt++;
if (attempt >= maxAttempts) {
throw new BusinessException("Failed to update account after merge attempts");
}
// Небольшая задержка перед повтором
try {
Thread.sleep(50 * attempt);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry", ie);
}
}
}
}
Уровни изоляции транзакций
1. Основы изоляции транзакций
Что такое изоляция транзакций
Изоляция транзакций — это свойство ACID, которое определяет, как изменения в одной транзакции видны другим параллельным транзакциям. PostgreSQL поддерживает 4 уровня изоляции согласно стандарту SQL.
Проблемы параллельных транзакций
Dirty Read (Грязное чтение)
-- Транзакция 1 -- Транзакция 2
BEGIN;
UPDATE accounts SET balance = 1000
WHERE id = 1;
BEGIN;
SELECT balance FROM accounts
WHERE id = 1; -- Видит 1000 (uncommitted)
ROLLBACK; -- Но транзакция 1 откатывается!
COMMIT;
-- Транзакция 2 прочитала данные, которые никогда не были committed
Non-Repeatable Read (Неповторяемое чтение)
-- Транзакция 1 -- Транзакция 2
BEGIN;
SELECT balance FROM accounts
WHERE id = 1; -- Читает 500
BEGIN;
UPDATE accounts SET balance = 1000
WHERE id = 1;
COMMIT;
SELECT balance FROM accounts
WHERE id = 1; -- Читает 1000!
COMMIT;
-- Одинаковый SELECT дал разные результаты в одной транзакции
Phantom Read (Фантомное чтение)
-- Транзакция 1 -- Транзакция 2
BEGIN;
SELECT COUNT(*) FROM accounts
WHERE balance > 500; -- Возвращает 5
BEGIN;
INSERT INTO accounts (balance)
VALUES (1000);
COMMIT;
SELECT COUNT(*) FROM accounts
WHERE balance > 500; -- Возвращает 6!
COMMIT;
-- Появились новые строки, соответствующие условию
Serialization Anomaly (Аномалия сериализации)
-- Транзакция 1 -- Транзакция 2
BEGIN; BEGIN;
SELECT SUM(balance) FROM accounts; SELECT SUM(balance) FROM accounts;
-- Сумма = 1000 -- Сумма = 1000
UPDATE accounts SET balance = 500 UPDATE accounts SET balance = 300
WHERE id = 1; WHERE id = 2;
COMMIT; COMMIT;
-- Результат не эквивалентен никакому последовательному выполнению
-- если читались разные данные до обновлений
2. Уровни изоляции в PostgreSQL
Матрица уровней изоляции
Уровень изоляции | Dirty Read | Non-Repeatable Read | Phantom Read | Serialization Anomaly
---------------------|------------|--------------------|--------------|-----------------------
READ UNCOMMITTED | ❌ | ✅ | ✅ | ✅
READ COMMITTED | ❌ | ✅ | ✅ | ✅
REPEATABLE READ | ❌ | ❌ | ❌ | ✅
SERIALIZABLE | ❌ | ❌ | ❌ | ❌
❌ - Возможно
✅ - Предотвращено
Установка уровня изоляции
-- Для текущей транзакции
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- Альтернативный синтаксис
BEGIN;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- Для всей сессии
SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- Глобально в postgresql.conf
default_transaction_isolation = 'repeatable read'
-- Проверка текущего уровня
SHOW transaction_isolation;
SELECT current_setting('transaction_isolation');
3. READ COMMITTED (по умолчанию)
Характеристики
- Самый слабый уровень изоляции в PostgreSQL
- Видит только committed данные других транзакций
- Новый snapshot для каждого statement
- Высокая производительность, минимальные блокировки
Поведение READ COMMITTED
-- Создаем тестовую таблицу
CREATE TABLE accounts (
id SERIAL PRIMARY KEY,
name VARCHAR(50),
balance DECIMAL(10,2)
);
INSERT INTO accounts (name, balance) VALUES
('Alice', 1000.00),
('Bob', 500.00),
('Charlie', 750.00);
Пример 1: Видимость committed изменений
-- Сессия 1 (READ COMMITTED)
BEGIN;
SELECT * FROM accounts WHERE id = 1;
-- alice | 1000.00
-- Сессия 2
BEGIN;
UPDATE accounts SET balance = 1500.00 WHERE id = 1;
COMMIT;
-- Сессия 1 (продолжение)
SELECT * FROM accounts WHERE id = 1;
-- alice | 1500.00 <-- Видит изменения из сессии 2!
COMMIT;
Пример 2: Блокировка при UPDATE
-- Сессия 1
BEGIN;
UPDATE accounts SET balance = balance + 100 WHERE id = 1;
-- Строка заблокирована
-- Сессия 2 (READ COMMITTED)
BEGIN;
UPDATE accounts SET balance = balance + 200 WHERE id = 1;
-- Ждет завершения сессии 1
-- Сессия 1
COMMIT; -- Освобождает блокировку
-- Сессия 2 автоматически продолжает
-- Использует НОВЫЕ данные (1100) + 200 = 1300
COMMIT;
Пример 3: Phantom reads
-- Сессия 1
BEGIN;
SELECT COUNT(*) FROM accounts WHERE balance > 600;
-- Возвращает 2
-- Сессия 2
INSERT INTO accounts (name, balance) VALUES ('David', 800.00);
COMMIT;
-- Сессия 1
SELECT COUNT(*) FROM accounts WHERE balance > 600;
-- Возвращает 3 (phantom read!)
COMMIT;
Когда использовать READ COMMITTED
- OLTP приложения с высокой нагрузкой
- Веб-приложения с короткими транзакциями
- Когда важна производительность больше строгой консистентности
- По умолчанию для большинства случаев
4. REPEATABLE READ
Характеристики
- Предотвращает Non-Repeatable Read и Phantom Read
- Один snapshot на всю транзакцию
- Видит только данные, committed до начала транзакции
- Может завершиться с serialization_failure
Поведение REPEATABLE READ
Пример 1: Консистентное чтение
-- Сессия 1 (REPEATABLE READ)
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT * FROM accounts WHERE id = 1;
-- alice | 1000.00
-- Сессия 2
BEGIN;
UPDATE accounts SET balance = 1500.00 WHERE id = 1;
COMMIT;
-- Сессия 1 (продолжение)
SELECT * FROM accounts WHERE id = 1;
-- alice | 1000.00 <-- Все еще видит старые данные!
COMMIT;
Пример 2: Предотвращение Phantom Read
-- Сессия 1 (REPEATABLE READ)
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT COUNT(*) FROM accounts WHERE balance > 600;
-- Возвращает 2
-- Сессия 2
INSERT INTO accounts (name, balance) VALUES ('David', 800.00);
COMMIT;
-- Сессия 1
SELECT COUNT(*) FROM accounts WHERE balance > 600;
-- Все еще возвращает 2 (phantom read предотвращен)
COMMIT;
Пример 3: Serialization failure при UPDATE
-- Сессия 1 (REPEATABLE READ)
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT balance FROM accounts WHERE id = 1;
-- 1000.00
-- Сессия 2
BEGIN;
UPDATE accounts SET balance = 1200.00 WHERE id = 1;
COMMIT;
-- Сессия 1 (продолжение)
UPDATE accounts SET balance = balance + 100 WHERE id = 1;
-- ERROR: could not serialize access due to concurrent update
ROLLBACK;
Обработка serialization failures
-- Функция с retry логикой
CREATE OR REPLACE FUNCTION transfer_money_repeatable_read(
from_account_id INT,
to_account_id INT,
amount DECIMAL(10,2),
max_retries INT DEFAULT 5
) RETURNS BOOLEAN AS $$
DECLARE
retry_count INT := 0;
from_balance DECIMAL(10,2);
to_balance DECIMAL(10,2);
BEGIN
WHILE retry_count < max_retries LOOP
BEGIN
-- Начинаем REPEATABLE READ транзакцию
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- Читаем балансы
SELECT balance INTO from_balance
FROM accounts WHERE id = from_account_id;
SELECT balance INTO to_balance
FROM accounts WHERE id = to_account_id;
-- Проверяем достаточность средств
IF from_balance < amount THEN
RAISE EXCEPTION 'Insufficient funds';
END IF;
-- Выполняем перевод
UPDATE accounts SET balance = balance - amount
WHERE id = from_account_id;
UPDATE accounts SET balance = balance + amount
WHERE id = to_account_id;
-- Если дошли сюда - успех
RETURN TRUE;
EXCEPTION
WHEN serialization_failure THEN
retry_count := retry_count + 1;
RAISE NOTICE 'Serialization failure, retry % of %', retry_count, max_retries;
IF retry_count >= max_retries THEN
RAISE EXCEPTION 'Transaction failed after % retries', max_retries;
END IF;
-- Небольшая случайная задержка
PERFORM pg_sleep(random() * 0.1);
END;
END LOOP;
RETURN FALSE;
END;
$$ LANGUAGE plpgsql;
Когда использовать REPEATABLE READ
- Отчеты и аналитика, требующие консистентного вида данных
- Batch обработка, где важна целостность данных
- Финансовые операции с требованием строгой консистентности
- Когда нужно избежать phantom reads
5. SERIALIZABLE
Характеристики
- Самый строгий уровень изоляции
- Предотвращает все аномалии, включая serialization anomaly
- Эквивалентен последовательному выполнению транзакций
- Использует Serializable Snapshot Isolation (SSI)
Поведение SERIALIZABLE
Пример 1: Предотвращение serialization anomaly
-- Настройка тестовых данных
CREATE TABLE balances (
account_type VARCHAR(10) PRIMARY KEY,
balance DECIMAL(10,2)
);
INSERT INTO balances VALUES
('checking', 1000.00),
('savings', 1000.00);
-- Сессия 1 (SERIALIZABLE)
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SELECT SUM(balance) FROM balances; -- 2000.00
UPDATE balances SET balance = balance - 100
WHERE account_type = 'checking';
-- Сессия 2 (SERIALIZABLE)
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SELECT SUM(balance) FROM balances; -- 2000.00
UPDATE balances SET balance = balance - 100
WHERE account_type = 'savings';
-- Первая транзакция коммитится
COMMIT; -- Сессия 1 успешна
-- Вторая получает ошибку
COMMIT; -- ERROR: could not serialize access due to read/write dependencies
Пример 2: Read-only транзакции
-- Read-only SERIALIZABLE транзакции никогда не получают serialization failure
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY;
SELECT * FROM accounts;
SELECT SUM(balance) FROM accounts;
-- Множественные SELECT'ы
COMMIT; -- Всегда успешно
Пример 3: Deferrable транзакции
-- DEFERRABLE транзакции могут ждать безопасного snapshot
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE;
-- Может подождать до получения snapshot'а, который гарантированно
-- не будет конфликтовать с running транзакциями
SELECT * FROM accounts;
COMMIT;
Настройки для SERIALIZABLE
-- Максимальное количество предикатных блокировок
max_pred_locks_per_transaction = 64 -- на транзакцию
max_pred_locks_per_relation = -2 -- на отношение
max_pred_locks_per_page = 2 -- на страницу
-- Мониторинг
SELECT * FROM pg_stat_database WHERE datname = current_database();
-- Колонки: conflicts, temp_files, temp_bytes, deadlocks
Когда использовать SERIALIZABLE
- Критически важные финансовые операции
- Системы с высокими требованиями к консистентности
- Сложные business rules с множественными таблицами
- Когда необходима математическая корректность
6. Практические примеры по уровням
Банковские переводы
READ COMMITTED версия
CREATE OR REPLACE FUNCTION transfer_read_committed(
from_id INT, to_id INT, amount DECIMAL(10,2)
) RETURNS BOOLEAN AS $$
BEGIN
-- Используется READ COMMITTED по умолчанию
BEGIN;
-- Может быть non-repeatable read между этими SELECT'ами
IF (SELECT balance FROM accounts WHERE id = from_id) < amount THEN
ROLLBACK;
RETURN FALSE;
END IF;
UPDATE accounts SET balance = balance - amount WHERE id = from_id;
UPDATE accounts SET balance = balance + amount WHERE id = to_id;
COMMIT;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
REPEATABLE READ версия
CREATE OR REPLACE FUNCTION transfer_repeatable_read(
from_id INT, to_id INT, amount DECIMAL(10,2)
) RETURNS BOOLEAN AS $$
DECLARE
retry_count INT := 0;
max_retries INT := 5;
BEGIN
WHILE retry_count < max_retries LOOP
BEGIN
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- Консистентное чтение в рамках транзакции
IF (SELECT balance FROM accounts WHERE id = from_id) < amount THEN
ROLLBACK;
RETURN FALSE;
END IF;
UPDATE accounts SET balance = balance - amount WHERE id = from_id;
UPDATE accounts SET balance = balance + amount WHERE id = to_id;
COMMIT;
RETURN TRUE;
EXCEPTION
WHEN serialization_failure THEN
ROLLBACK;
retry_count := retry_count + 1;
PERFORM pg_sleep(0.01 * retry_count);
END;
END LOOP;
RAISE EXCEPTION 'Transfer failed after % retries', max_retries;
END;
$$ LANGUAGE plpgsql;
SERIALIZABLE версия
CREATE OR REPLACE FUNCTION transfer_serializable(
from_id INT, to_id INT, amount DECIMAL(10,2)
) RETURNS BOOLEAN AS $$
DECLARE
retry_count INT := 0;
max_retries INT := 10;
BEGIN
WHILE retry_count < max_retries LOOP
BEGIN
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- Проверка бизнес-правил в serializable изоляции
IF (SELECT balance FROM accounts WHERE id = from_id) < amount THEN
ROLLBACK;
RETURN FALSE;
END IF;
-- Дополнительные проверки для демонстрации
IF (SELECT COUNT(*) FROM accounts
WHERE id IN (from_id, to_id) AND status = 'active') < 2 THEN
ROLLBACK;
RETURN FALSE;
END IF;
UPDATE accounts SET balance = balance - amount WHERE id = from_id;
UPDATE accounts SET balance = balance + amount WHERE id = to_id;
COMMIT;
RETURN TRUE;
EXCEPTION
WHEN serialization_failure THEN
ROLLBACK;
retry_count := retry_count + 1;
-- Экспоненциальный backoff
PERFORM pg_sleep(0.01 * power(2, retry_count) + random() * 0.01);
END;
END LOOP;
RAISE EXCEPTION 'Transfer failed after % retries', max_retries;
END;
$$ LANGUAGE plpgsql;
7. Мониторинг и диагностика
Мониторинг конфликтов сериализации
-- Статистика по базе данных
SELECT datname,
xact_commit,
xact_rollback,
conflicts,
temp_files,
temp_bytes,
deadlocks
FROM pg_stat_database
WHERE datname = current_database();
-- Детальная статистика конфликтов
SELECT schemaname, tablename,
seq_scan, seq_tup_read,
idx_scan, idx_tup_fetch,
n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables;
Анализ блокировок по уровням изоляции
-- Текущие транзакции и их уровни изоляции
SELECT pid,
usename,
datname,
state,
query_start,
NOW() - query_start AS duration,
LEFT(query, 50) AS query_preview
FROM pg_stat_activity
WHERE state IN ('active', 'idle in transaction')
ORDER BY query_start;
-- Конфликты блокировок
SELECT blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.query AS blocked_statement,
blocking_activity.query AS current_statement_in_blocking_process
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;
Логирование для анализа
-- Настройки логирования в postgresql.conf
log_statement = 'all' -- Логировать все statements
log_duration = on -- Логировать время выполнения
log_lock_waits = on -- Логировать ожидания блокировок
deadlock_timeout = 1s -- Таймаут для обнаружения deadlock
-- Специфичные для SERIALIZABLE
log_min_messages = warning -- Для serialization failures
8. Best Practices
Выбор правильного уровня изоляции
Рекомендации по применению
-- READ COMMITTED (по умолчанию)
-- ✅ Подходит для:
-- - OLTP приложений
-- - Веб-приложений
-- - Коротких транзакций
-- - Высоконагруженных систем
-- REPEATABLE READ
-- ✅ Подходит для:
-- - Отчетов и аналитики
-- - Batch обработки
-- - Когда нужна консистентность чтения
-- - Финансовых расчетов
-- SERIALIZABLE
-- ✅ Подходит для:
-- - Критически важных операций
-- - Сложных бизнес-правил
-- - Когда необходима строгая корректность
-- - Систем с низкой нагрузкой
Паттерны обработки ошибок
Универсальная функция retry
CREATE OR REPLACE FUNCTION execute_with_retry(
operation_name TEXT,
sql_command TEXT,
isolation_level TEXT DEFAULT 'READ COMMITTED',
max_retries INT DEFAULT 5,
base_delay_ms INT DEFAULT 10
) RETURNS BOOLEAN AS $$
DECLARE
retry_count INT := 0;
delay_ms INT;
error_message TEXT;
BEGIN
WHILE retry_count < max_retries LOOP
BEGIN
EXECUTE format('BEGIN TRANSACTION ISOLATION LEVEL %s', isolation_level);
EXECUTE sql_command;
COMMIT;
RAISE NOTICE 'Operation % completed successfully on attempt %',
operation_name, retry_count + 1;
RETURN TRUE;
EXCEPTION
WHEN serialization_failure OR deadlock_detected THEN
ROLLBACK;
retry_count := retry_count + 1;
-- Экспоненциальная задержка с jitter
delay_ms := base_delay_ms * power(2, retry_count - 1) +
(random() * base_delay_ms)::INT;
RAISE NOTICE 'Operation % failed on attempt %, retrying in %ms. Error: %',
operation_name, retry_count, delay_ms, SQLERRM;
IF retry_count >= max_retries THEN
RAISE EXCEPTION 'Operation % failed after % retries. Last error: %',
operation_name, max_retries, SQLERRM;
END IF;
PERFORM pg_sleep(delay_ms / 1000.0);
END;
END LOOP;
RETURN FALSE;
END;
$$ LANGUAGE plpgsql;
-- Использование
SELECT execute_with_retry(
'transfer_money',
'SELECT transfer_money_repeatable_read(1, 2, 100.00)',
'REPEATABLE READ',
5,
10
);
Оптимизация производительности
Минимизация времени транзакций
-- Плохо: долгие транзакции
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- ... множество операций ...
-- ... вычисления ...
-- ... внешние вызовы ...
COMMIT;
-- Хорошо: короткие транзакции
-- Подготовка данных вне транзакции
-- ... вычисления ...
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- Только необходимые операции с БД
UPDATE accounts SET balance = calculated_balance WHERE id = account_id;
COMMIT;
Read-only оптимизации
-- Для read-only операций используйте соответствующий режим
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY;
-- Читающие операции
SELECT * FROM large_report_view;
COMMIT;
-- Для долгих отчетов
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE;
-- Может подождать подходящего snapshot'а
SELECT * FROM complex_analytical_view;
COMMIT;
9. Тестирование уровней изоляции
Тестовый набор для проверки изоляции
-- Создаем тестовую схему
CREATE SCHEMA isolation_test;
CREATE TABLE isolation_test.test_table (
id SERIAL PRIMARY KEY,
value INT NOT NULL,
version INT DEFAULT 1
);
INSERT INTO isolation_test.test_table (value) VALUES (100), (200), (300);
-- Тест Non-Repeatable Read
CREATE OR REPLACE FUNCTION isolation_test.test_non_repeatable_read(
isolation_level TEXT
) RETURNS TABLE(first_read INT, second_read INT, are_equal BOOLEAN) AS $$
DECLARE
first_value INT;
second_value INT;
BEGIN
EXECUTE format('BEGIN TRANSACTION ISOLATION LEVEL %s', isolation_level);
-- Первое чтение
SELECT value INTO first_value FROM isolation_test.test_table WHERE id = 1;
-- Пауза для внешнего вмешательства
PERFORM pg_sleep(2);
-- Второе чтение
SELECT value INTO second_value FROM isolation_test.test_table WHERE id = 1;
COMMIT;
RETURN QUERY SELECT first_value, second_value, (first_value = second_value);
END;
$$ LANGUAGE plpgsql;
-- Использование теста
-- В одной сессии:
SELECT * FROM isolation_test.test_non_repeatable_read('READ COMMITTED');
-- В другой сессии (во время паузы):
UPDATE isolation_test.test_table SET value = 999 WHERE id = 1;
Benchmark различных уровней изоляции
-- Функция для измерения производительности
CREATE OR REPLACE FUNCTION isolation_test.benchmark_isolation_level(
isolation_level TEXT,
iterations INT DEFAULT 1000
) RETURNS TABLE(
level TEXT,
total_time INTERVAL,
avg_time_ms NUMERIC,
success_rate NUMERIC
) AS $$
DECLARE
start_time TIMESTAMP;
end_time TIMESTAMP;
success_count INT := 0;
i INT;
BEGIN
start_time := clock_timestamp();
FOR i IN 1..iterations LOOP
BEGIN
EXECUTE format('BEGIN TRANSACTION ISOLATION LEVEL %s', isolation_level);
-- Простая операция для тестирования
PERFORM * FROM isolation_test.test_table WHERE id = 1;
UPDATE isolation_test.test_table SET version = version + 1 WHERE id = 1;
COMMIT;
success_count := success_count + 1;
EXCEPTION
WHEN serialization_failure THEN
ROLLBACK;
-- Продолжаем без увеличения success_count
END;
END LOOP;
end_time := clock_timestamp();
RETURN QUERY SELECT
isolation_level,
end_time - start_time,
EXTRACT(MILLISECONDS FROM end_time - start_time)::NUMERIC / iterations,
(success_count::NUMERIC / iterations * 100);
END;
$$ LANGUAGE plpgsql;
-- Тестирование
SELECT * FROM isolation_test.benchmark_isolation_level('READ committed');
SELECT * FROM isolation_test.benchmark_isolation_level('repeatable read');
SELECT * FROM isolation_test.benchmark_isolation_level('serializable');
Эта шпаргалка поможет понять и правильно применять уровни изоляции транзакций в PostgreSQL для обеспечения нужного баланса между производительностью и консистентностью данных.
Индексы
1. Основы индексов
Что такое индекс
Индекс — это структура данных, которая ускоряет поиск строк в таблице. Индексы создают отдельные объекты, которые содержат ссылки на строки основной таблицы.
Зачем нужны индексы
- Ускорение SELECT - быстрый поиск данных
- Ускорение JOIN - эффективное соединение таблиц
- Ускорение ORDER BY - сортировка без дополнительных операций
- Уникальность - обеспечение ограничений целостности
- Частичные индексы - индексирование подмножества данных
Стоимость индексов
-- Преимущества: быстрые SELECT, JOIN, ORDER BY
-- Недостатки:
-- - Дополнительное место на диске
-- - Замедление INSERT/UPDATE/DELETE
-- - Необходимость обслуживания (VACUUM, REINDEX)
-- Размер индексов
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as total_size,
pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) as table_size,
pg_size_pretty(pg_indexes_size(schemaname||'.'||tablename)) as indexes_size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_indexes_size(schemaname||'.'||tablename) DESC;
2. Типы индексов
B-tree (по умолчанию)
Создание B-tree индексов
-- Простой индекс
CREATE INDEX idx_users_email ON users (email);
-- Составной индекс
CREATE INDEX idx_orders_user_date ON orders (user_id, created_at);
-- Индекс с сортировкой
CREATE INDEX idx_products_price_desc ON products (price DESC);
-- Индекс с NULLS обработкой
CREATE INDEX idx_users_last_login ON users (last_login DESC NULLS LAST);
-- Уникальный индекс
CREATE UNIQUE INDEX idx_users_email_unique ON users (email);
Когда использовать B-tree
-- Операторы: =, <, <=, >, >=, BETWEEN, IN, IS NULL, IS NOT NULL
SELECT * FROM users WHERE age = 25;
SELECT * FROM users WHERE age BETWEEN 18 AND 65;
SELECT * FROM users WHERE created_at > '2024-01-01';
SELECT * FROM orders ORDER BY created_at DESC; -- Использует индекс для сортировки
-- Префиксные поиски с LIKE
SELECT * FROM users WHERE email LIKE 'john%'; -- Использует индекс
SELECT * FROM users WHERE email LIKE '%@gmail.com'; -- НЕ использует индекс
Структура B-tree индекса
Root Page
┌─────────────┐
│ 50 │ 100 │
└──┬──────┬───┘
│ │
┌───────┘ └───────┐
┌─────────────┐ ┌─────────────┐
│ 25 │ 40 │ │ 75 │ 90 │ Internal Pages
└──┬───┬─────┘ └──┬───┬─────┘
│ │ │ │
┌───────┘ └─────┐ ┌───────┘ └─────┐
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│10│15│20│ │30│35│38│ │60│65│70│ │80│85│88│ Leaf Pages
└────────┘ └────────┘ └────────┘ └────────┘
↓ ↓ ↓ ↓
Data Data Data Data → Table Heap
Hash индексы
Создание Hash индексов
-- Hash индекс для точного поиска
CREATE INDEX idx_users_id_hash ON users USING HASH (id);
-- Только для оператора равенства
SELECT * FROM users WHERE id = 12345; -- Эффективно
SELECT * FROM users WHERE id > 12345; -- НЕ использует hash индекс
Когда использовать Hash
- Только операция равенства (=)
- Большие таблицы с точными поисками
- Уникальные значения с высокой селективностью
- Экономия места по сравнению с B-tree
GIN (Generalized Inverted Index)
Создание GIN индексов
-- Для массивов
CREATE INDEX idx_articles_tags ON articles USING GIN (tags);
-- Для JSONB
CREATE INDEX idx_products_attributes ON products USING GIN (attributes);
-- Для полнотекстового поиска
CREATE INDEX idx_articles_content ON articles USING GIN (to_tsvector('english', content));
-- Для hstore
CREATE EXTENSION hstore;
CREATE INDEX idx_products_properties ON products USING GIN (properties);
Использование GIN индексов
-- Поиск в массивах
SELECT * FROM articles WHERE tags @> ARRAY['postgresql', 'database'];
SELECT * FROM articles WHERE 'postgresql' = ANY(tags);
SELECT * FROM articles WHERE tags && ARRAY['sql', 'nosql'];
-- Поиск в JSONB
SELECT * FROM products WHERE attributes @> '{"color": "red", "size": "large"}';
SELECT * FROM products WHERE attributes ? 'warranty';
SELECT * FROM products WHERE attributes ?& ARRAY['color', 'size'];
-- Полнотекстовый поиск
SELECT * FROM articles
WHERE to_tsvector('english', content) @@ to_tsquery('postgresql & performance');
Настройка GIN
-- Создание с параметрами
CREATE INDEX idx_products_gin ON products USING GIN (attributes)
WITH (fastupdate = off, gin_pending_list_limit = 1024);
-- gin_pending_list_limit - размер pending list для batch обновлений
-- fastupdate - включение/выключение pending list
GiST (Generalized Search Tree)
Создание GiST индексов
-- Для геометрических данных
CREATE INDEX idx_locations_point ON locations USING GiST (point);
-- Для полнотекстового поиска (альтернатива GIN)
CREATE INDEX idx_articles_content_gist ON articles USING GiST (to_tsvector('english', content));
-- Для диапазонов
CREATE INDEX idx_events_period ON events USING GiST (tsrange(start_time, end_time));
-- Для exclusion constraints
ALTER TABLE bookings ADD CONSTRAINT bookings_no_overlap
EXCLUDE USING GiST (room_id WITH =, booking_period WITH &&);
Использование GiST
-- Геометрические запросы
SELECT * FROM locations WHERE point <-> POINT(0,0) < 1000; -- Расстояние
SELECT * FROM locations WHERE polygon @> POINT(100, 200); -- Содержание
SELECT * FROM locations WHERE circle && BOX(POINT(0,0), POINT(100,100)); -- Пересечение
-- Диапазоны
SELECT * FROM events WHERE tsrange(start_time, end_time) @> '2024-01-15 10:00'::timestamp;
SELECT * FROM events WHERE tsrange(start_time, end_time) && tsrange('2024-01-01', '2024-01-31');
SP-GiST (Space-Partitioned GiST)
Создание SP-GiST индексов
-- Для точек (quad-tree разбиение)
CREATE INDEX idx_locations_spgist ON locations USING SPGIST (point);
-- Для IP адресов
CREATE INDEX idx_logs_ip ON access_logs USING SPGIST (client_ip inet_ops);
-- Для текстовых данных с префиксами
CREATE INDEX idx_domains_spgist ON domains USING SPGIST (domain_name text_ops);
Когда использовать SP-GiST
- Пространственные данные с quad-tree разбиением
- IP адреса и сети
- Телефонные номера
- Префиксные поиски в строках
BRIN (Block Range Index)
Создание BRIN индексов
-- Для больших таблиц с естественной сортировкой
CREATE INDEX idx_logs_timestamp_brin ON logs USING BRIN (timestamp);
-- С настройкой размера диапазона
CREATE INDEX idx_sales_date_brin ON sales USING BRIN (sale_date)
WITH (pages_per_range = 128);
-- Для числовых данных
CREATE INDEX idx_transactions_amount_brin ON transactions USING BRIN (amount);
Преимущества BRIN
-- Очень маленький размер индекса
SELECT pg_size_pretty(pg_relation_size('idx_logs_timestamp_brin'));
-- vs
SELECT pg_size_pretty(pg_relation_size('idx_logs_timestamp_btree'));
-- Эффективен для:
-- - Append-only таблицы (логи, временные ряды)
-- - Данные с естественной сортировкой
-- - Очень большие таблицы (терабайты)
3. Составные индексы
Порядок колонок в составном индексе
-- Правило: наиболее селективные колонки первыми
-- Плохо:
CREATE INDEX idx_orders_bad ON orders (status, user_id, created_at);
-- Хорошо (если user_id более селективен):
CREATE INDEX idx_orders_good ON orders (user_id, created_at, status);
-- Анализ селективности
SELECT
attname,
n_distinct,
most_common_vals[1:5],
correlation
FROM pg_stats
WHERE tablename = 'orders' AND schemaname = 'public'
ORDER BY abs(n_distinct) DESC;
Префиксное использование индексов
-- Индекс: (user_id, created_at, status)
-- Эффективно использует индекс:
SELECT * FROM orders WHERE user_id = 123;
SELECT * FROM orders WHERE user_id = 123 AND created_at > '2024-01-01';
SELECT * FROM orders WHERE user_id = 123 AND created_at > '2024-01-01' AND status = 'pending';
-- НЕ использует индекс эффективно:
SELECT * FROM orders WHERE created_at > '2024-01-01'; -- Пропущен user_id
SELECT * FROM orders WHERE status = 'pending'; -- Пропущены user_id и created_at
Покрывающие индексы (INCLUDE)
-- Индекс с дополнительными колонками
CREATE INDEX idx_users_email_covering ON users (email) INCLUDE (name, phone, created_at);
-- Позволяет избежать обращения к таблице
EXPLAIN (ANALYZE, BUFFERS)
SELECT name, phone, created_at FROM users WHERE email = 'john@example.com';
-- Index Only Scan using idx_users_email_covering
-- Ограничения INCLUDE:
-- - Дополнительные колонки не участвуют в сортировке
-- - Не могут использоваться в WHERE условиях для поиска
-- - Доступны только в leaf страницах
4. Специальные типы индексов
Частичные индексы
-- Индекс только для активных пользователей
CREATE INDEX idx_users_active_email ON users (email) WHERE status = 'active';
-- Индекс только для недавних заказов
CREATE INDEX idx_orders_recent ON orders (created_at, total)
WHERE created_at > '2024-01-01';
-- Индекс исключающий NULL значения
CREATE INDEX idx_users_phone ON users (phone) WHERE phone IS NOT NULL;
-- Сложные условия
CREATE INDEX idx_orders_important ON orders (priority, created_at)
WHERE status IN ('pending', 'processing') AND total > 1000;
Функциональные индексы
-- Индекс по функции
CREATE INDEX idx_users_lower_email ON users (lower(email));
-- Использование
SELECT * FROM users WHERE lower(email) = 'john@example.com';
-- Индекс по выражению
CREATE INDEX idx_orders_year_month ON orders (EXTRACT(YEAR FROM created_at), EXTRACT(MONTH FROM created_at));
-- Сложные выражения
CREATE INDEX idx_products_profit ON products ((price - cost) / price) WHERE price > cost;
-- IMMUTABLE функции для индексов
CREATE OR REPLACE FUNCTION normalize_phone(phone TEXT)
RETURNS TEXT AS $$
BEGIN
RETURN regexp_replace(phone, '[^0-9]', '', 'g');
END;
$$ LANGUAGE plpgsql IMMUTABLE;
CREATE INDEX idx_users_phone_normalized ON users (normalize_phone(phone));
Условные уникальные индексы
-- Уникальность только для активных записей
CREATE UNIQUE INDEX idx_users_email_active_unique ON users (email)
WHERE status = 'active';
-- Позволяет иметь дубликаты email для неактивных пользователей
INSERT INTO users (email, status) VALUES ('test@example.com', 'active'); -- OK
INSERT INTO users (email, status) VALUES ('test@example.com', 'inactive'); -- OK
INSERT INTO users (email, status) VALUES ('test@example.com', 'active'); -- ERROR
5. Операции с индексами
Создание индексов
-- Синхронное создание (блокирует таблицу)
CREATE INDEX idx_users_email ON users (email);
-- Конкурентное создание (не блокирует)
CREATE INDEX CONCURRENTLY idx_users_email ON users (email);
-- С IF NOT EXISTS
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email ON users (email);
-- В конкретном табличном пространстве
CREATE INDEX idx_users_email ON users (email) TABLESPACE fast_ssd;
Удаление индексов
-- Синхронное удаление (блокирует)
DROP INDEX idx_users_email;
-- Конкурентное удаление (не блокирует)
DROP INDEX CONCURRENTLY idx_users_email;
-- С IF EXISTS
DROP INDEX CONCURRENTLY IF EXISTS idx_users_email;
Переиндексация
-- REINDEX индекса
REINDEX INDEX idx_users_email;
-- REINDEX таблицы
REINDEX TABLE users;
-- REINDEX базы данных
REINDEX DATABASE mydb;
-- Конкурентная переиндексация (PostgreSQL 12+)
REINDEX INDEX CONCURRENTLY idx_users_email;
Переименование индексов
-- Переименование
ALTER INDEX idx_users_email RENAME TO idx_users_email_old;
-- Изменение табличного пространства
ALTER INDEX idx_users_email SET TABLESPACE slow_hdd;
6. Мониторинг и анализ индексов
Использование индексов
-- Статистика использования индексов
SELECT
schemaname,
tablename,
indexname,
idx_scan, -- Количество сканирований
idx_tup_read, -- Прочитано index entries
idx_tup_fetch -- Получено table rows
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan DESC;
-- Неиспользуемые индексы
SELECT
schemaname,
tablename,
indexname,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
WHERE idx_scan = 0
AND schemaname = 'public'
ORDER BY pg_relation_size(indexrelid) DESC;
Эффективность индексов
-- Cache hit ratio для индексов
SELECT
schemaname,
tablename,
indexname,
idx_blks_read,
idx_blks_hit,
ROUND(100.0 * idx_blks_hit / NULLIF(idx_blks_hit + idx_blks_read, 0), 2) as cache_hit_ratio
FROM pg_statio_user_indexes
WHERE schemaname = 'public' AND (idx_blks_read + idx_blks_hit) > 0
ORDER BY cache_hit_ratio;
Размеры индексов
-- Размеры всех индексов
SELECT
schemaname,
tablename,
indexname,
pg_size_pretty(pg_relation_size(indexrelid)) as size,
pg_relation_size(indexrelid) as size_bytes
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY pg_relation_size(indexrelid) DESC;
-- Топ индексов по размеру
SELECT
schemaname||'.'||indexname as index_name,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY pg_relation_size(indexrelid) DESC
LIMIT 10;
Дублирующиеся индексы
-- Поиск дублирующихся индексов
WITH index_columns AS (
SELECT
schemaname,
tablename,
indexname,
string_agg(attname, ',' ORDER BY attnum) as columns
FROM pg_stat_user_indexes si
JOIN pg_index pi ON si.indexrelid = pi.indexrelid
JOIN pg_attribute pa ON pa.attrelid = pi.indrelid AND pa.attnum = ANY(pi.indkey)
WHERE schemaname = 'public'
GROUP BY schemaname, tablename, indexname
)
SELECT
i1.schemaname,
i1.tablename,
i1.indexname as index1,
i2.indexname as index2,
i1.columns
FROM index_columns i1
JOIN index_columns i2 ON i1.schemaname = i2.schemaname
AND i1.tablename = i2.tablename
AND i1.columns = i2.columns
AND i1.indexname > i2.indexname;
7. Оптимизация индексов
Анализ планов запросов
-- EXPLAIN для анализа использования индексов
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT * FROM users WHERE email = 'john@example.com';
-- Index Scan vs Seq Scan
-- Index Scan - использует индекс
-- Seq Scan - сканирует всю таблицу
-- Bitmap Index Scan - сканирует индекс, затем heap
-- Index Only Scan - только индекс (covering index)
Настройка планировщика
-- Стоимостные параметры
SET random_page_cost = 1.1; -- Для SSD (по умолчанию 4.0 для HDD)
SET seq_page_cost = 1.0; -- Стоимость последовательного чтения
SET cpu_index_tuple_cost = 0.005; -- Стоимость обработки index tuple
-- Принудительное отключение типов сканирования для тестирования
SET enable_seqscan = off; -- Отключить sequential scan
SET enable_indexscan = off; -- Отключить index scan
SET enable_bitmapscan = off; -- Отключить bitmap scan
Стратегии индексирования
Для OLTP систем
-- Приоритет: быстрые точечные запросы
-- Создаем индексы для:
-- - Primary keys (автоматически)
-- - Foreign keys
-- - Уникальные поля
-- - Часто используемые WHERE условия
CREATE INDEX idx_orders_user_id ON orders (user_id); -- FK
CREATE INDEX idx_orders_status ON orders (status); -- Фильтрация
CREATE INDEX idx_orders_created_at ON orders (created_at DESC); -- Сортировка
Для OLAP/аналитических систем
-- Приоритет: быстрые агрегации и сложные запросы
-- Используем:
-- - Составные индексы для GROUP BY
-- - BRIN для больших append-only таблиц
-- - Partial индексы для фильтрации
CREATE INDEX idx_sales_analysis ON sales (region, product_category, sale_date);
CREATE INDEX idx_logs_brin ON logs USING BRIN (timestamp) WITH (pages_per_range = 128);
CREATE INDEX idx_active_customers ON customers (region, signup_date) WHERE status = 'active';
8. Обслуживание индексов
VACUUM и индексы
-- VACUUM очищает мертвые tuples в индексах
VACUUM users;
VACUUM ANALYZE users; -- С обновлением статистики
-- VACUUM FULL пересоздает индексы
VACUUM FULL users; -- Блокирует таблицу!
Мониторинг bloat в индексах
-- Использование pgstattuple для анализа bloat
CREATE EXTENSION pgstattuple;
-- Статистика индекса
SELECT * FROM pgstatindex('idx_users_email');
-- Ключевые метрики:
-- avg_leaf_density - плотность leaf страниц (должна быть > 90%)
-- leaf_fragmentation - фрагментация (должна быть < 10%)
-- Bloat в индексах
SELECT
schemaname,
tablename,
indexname,
pg_size_pretty(pg_relation_size(indexrelid)) as size,
pgstatindex(indexrelid) as stats
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY pg_relation_size(indexrelid) DESC;
Автоматическое обслуживание
-- Настройки autovacuum для индексов
ALTER TABLE users SET (
autovacuum_vacuum_scale_factor = 0.1, -- 10% мертвых tuples
autovacuum_analyze_scale_factor = 0.05, -- 5% для ANALYZE
autovacuum_vacuum_cost_delay = 10 -- Задержка между операциями
);
-- Мониторинг autovacuum
SELECT
schemaname,
tablename,
last_vacuum,
last_autovacuum,
last_analyze,
last_autoanalyze,
vacuum_count,
autovacuum_count
FROM pg_stat_user_tables
WHERE schemaname = 'public'
ORDER BY last_autovacuum DESC NULLS LAST;
9. Практические примеры
E-commerce система
-- Таблица продуктов
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
category_id INT NOT NULL,
price DECIMAL(10,2) NOT NULL,
brand VARCHAR(100),
attributes JSONB,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Оптимальные индексы
CREATE INDEX idx_products_category ON products (category_id);
CREATE INDEX idx_products_price ON products (price);
CREATE INDEX idx_products_brand ON products (brand) WHERE brand IS NOT NULL;
CREATE INDEX idx_products_category_price ON products (category_id, price DESC);
CREATE INDEX idx_products_attributes ON products USING GIN (attributes);
CREATE INDEX idx_products_search ON products USING GIN (to_tsvector('english', name));
CREATE INDEX idx_products_created ON products (created_at DESC) WHERE created_at > '2024-01-01';
-- Covering индекс для каталога
CREATE INDEX idx_products_catalog ON products (category_id, price DESC)
INCLUDE (name, brand) WHERE price > 0;
Система логирования
-- Таблица логов
CREATE TABLE access_logs (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMP NOT NULL,
ip_address INET NOT NULL,
user_id INT,
url TEXT NOT NULL,
response_code INT NOT NULL,
response_time_ms INT NOT NULL
);
-- Оптимальные индексы для логов
CREATE INDEX idx_logs_timestamp_brin ON access_logs USING BRIN (timestamp);
CREATE INDEX idx_logs_user_recent ON access_logs (user_id, timestamp DESC)
WHERE timestamp > NOW() - INTERVAL '30 days';
CREATE INDEX idx_logs_errors ON access_logs (timestamp DESC, response_code)
WHERE response_code >= 400;
CREATE INDEX idx_logs_ip ON access_logs USING SPGIST (ip_address);
CREATE INDEX idx_logs_slow_requests ON access_logs (timestamp, response_time_ms)
WHERE response_time_ms > 1000;
Социальная сеть
-- Таблица постов
CREATE TABLE posts (
id BIGSERIAL PRIMARY KEY,
author_id INT NOT NULL,
content TEXT NOT NULL,
tags TEXT[],
visibility VARCHAR(20) DEFAULT 'public',
created_at TIMESTAMP DEFAULT NOW(),
likes_count INT DEFAULT 0,
comments_count INT DEFAULT 0
);
-- Индексы для социальной сети
CREATE INDEX idx_posts_author_created ON posts (author_id, created_at DESC);
CREATE INDEX idx_posts_timeline ON posts (created_at DESC) WHERE visibility = 'public';
CREATE INDEX idx_posts_popular ON posts (likes_count DESC, created_at DESC)
WHERE visibility = 'public' AND likes_count > 10;
CREATE INDEX idx_posts_tags ON posts USING GIN (tags);
CREATE INDEX idx_posts_search ON posts USING GIN (to_tsvector('english', content));
10. Советы и лучшие практики
Общие рекомендации
-- ✅ Хорошие практики:
-- 1. Анализируйте реальные запросы перед созданием индексов
-- 2. Создавайте индексы на foreign key колонки
-- 3. Используйте составные индексы для сложных WHERE условий
-- 4. Применяйте частичные индексы для фильтрации
-- 5. Мониторьте использование индексов
-- ❌ Избегайте:
-- 1. Создания индексов на каждую колонку
-- 2. Дублирующихся индексов
-- 3. Индексов на маленьких таблицах (<1000 строк)
-- 4. Слишком много индексов на часто изменяемых таблицах
-- 5. Функциональных индексов без IMMUTABLE функций
Чек-лист создания индекса
-- Перед созданием индекса спросите себя:
-- □ Будет ли этот индекс использоваться в реальных запросах?
-- □ Нет ли уже существующего индекса, который покрывает эти колонки?
-- □ Оправдывает ли выигрыш в скорости SELECT стоимость INSERT/UPDATE?
-- □ Можно ли использовать частичный индекс вместо полного?
-- □ Правильный ли порядок колонок в составном индексе?
-- Пример анализа:
EXPLAIN (ANALYZE, BUFFERS) SELECT * FROM users WHERE status = 'active' AND created_at > '2024-01-01';
-- Если Seq Scan - подумайте об индексе
-- Если Index Scan - индекс уже есть и работает
-- Если Bitmap Scan - возможно нужен составной индекс
Мониторинг производительности
-- Создайте представления для регулярного мониторинга
CREATE VIEW index_usage_summary AS
SELECT
schemaname,
tablename,
indexname,
idx_scan,
pg_size_pretty(pg_relation_size(indexrelid)) as size,
CASE
WHEN idx_scan = 0 THEN 'Unused'
WHEN idx_scan < 100 THEN 'Low usage'
WHEN idx_scan < 1000 THEN 'Medium usage'
ELSE 'High usage'
END as usage_category
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY pg_relation_size(indexrelid) DESC;
-- Регулярно проверяйте неиспользуемые индексы
SELECT * FROM index_usage_summary WHERE usage_category = 'Unused';
Эта шпаргалка поможет эффективно создавать, использовать и поддерживать индексы в PostgreSQL для максимальной производительности вашей базы данных.
Шардирование, Репликация, Масштабирование
1. Виды масштабирования
Вертикальное масштабирование (Scale Up)
-- Увеличение ресурсов одного сервера
-- CPU: больше ядер или более быстрые процессоры
-- RAM: увеличение оперативной памяти
-- Storage: переход на SSD, NVMe, увеличение IOPS
-- Настройки PostgreSQL для мощного сервера
shared_buffers = 8GB -- 25% от RAM (32GB сервер)
effective_cache_size = 24GB -- 75% от RAM
work_mem = 256MB -- Для аналитических запросов
maintenance_work_mem = 2GB -- Для VACUUM, CREATE INDEX
max_connections = 200 -- Ограничиваем подключения
max_parallel_workers = 16 -- Используем все ядра
Преимущества вертикального масштабирования
- Простота - не требует изменения архитектуры
- Консистентность - нет проблем с согласованностью данных
- Транзакции - полная поддержка ACID
- Отсутствие сетевых задержек
Недостатки вертикального масштабирования
- Ограниченность - есть физические пределы
- Высокая стоимость - экспоненциальный рост цены
- Single point of failure - единая точка отказа
- Простои при апгрейде
Горизонтальное масштабирование (Scale Out)
-- Распределение нагрузки между несколькими серверами
-- Read Replicas: увеличение производительности чтения
-- Sharding: разделение данных по серверам
-- Connection Pooling: эффективное использование соединений
2. Репликация в PostgreSQL
Streaming Replication
Настройка Master сервера
-- postgresql.conf на Master
wal_level = replica -- Уровень WAL для репликации
max_wal_senders = 10 -- Максимум WAL sender процессов
wal_keep_size = 1GB -- Размер сохраняемых WAL файлов
max_replication_slots = 10 -- Слоты репликации
hot_standby = on -- Позволяет читать на standby
archive_mode = on -- Архивирование WAL
archive_command = 'cp %p /archive/%f' -- Команда архивирования
-- pg_hba.conf на Master
# TYPE DATABASE USER ADDRESS METHOD
host replication replica 192.168.1.0/24 md5
Создание пользователя для репликации
-- На Master сервере
CREATE USER replica REPLICATION LOGIN ENCRYPTED PASSWORD 'replica_password';
Настройка Standby сервера
# Создание базовой копии
pg_basebackup -h master_host -D /var/lib/postgresql/data -U replica -W -v -P -R
# Файл standby.signal создается автоматически (-R)
# postgresql.conf на Standby (добавляется автоматически)
primary_conninfo = 'host=master_host port=5432 user=replica password=replica_password'
promote_trigger_file = '/tmp/promote_standby'
Мониторинг репликации
-- На Master: просмотр активных replication connections
SELECT
client_addr,
client_hostname,
state,
sent_lsn,
write_lsn,
flush_lsn,
replay_lsn,
write_lag,
flush_lag,
replay_lag
FROM pg_stat_replication;
-- На Standby: проверка статуса восстановления
SELECT
pg_is_in_recovery(),
pg_last_wal_receive_lsn(),
pg_last_wal_replay_lsn(),
pg_last_xact_replay_timestamp();
-- Расчет lag в байтах
SELECT
client_addr,
pg_wal_lsn_diff(sent_lsn, write_lsn) AS write_lag_bytes,
pg_wal_lsn_diff(write_lsn, flush_lsn) AS flush_lag_bytes,
pg_wal_lsn_diff(flush_lsn, replay_lsn) AS replay_lag_bytes
FROM pg_stat_replication;
Synchronous Replication
Настройка синхронной репликации
-- postgresql.conf на Master
synchronous_commit = on
synchronous_standby_names = 'standby1,standby2' -- Имена standby серверов
-- Или с приоритетами
synchronous_standby_names = 'FIRST 1 (standby1, standby2, standby3)'
-- Ожидание подтверждения от всех
synchronous_standby_names = 'ANY 2 (standby1, standby2, standby3)'
Типы синхронизации
-- remote_write: ждет записи в OS cache на standby
synchronous_commit = remote_write
-- on (remote_flush): ждет flush на диск на standby (по умолчанию)
synchronous_commit = on
-- remote_apply: ждет применения изменений на standby
synchronous_commit = remote_apply
Logical Replication
Настройка Logical Replication
-- postgresql.conf на Publisher
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
-- Создание publication на Publisher
CREATE PUBLICATION my_publication FOR ALL TABLES;
-- Или для конкретных таблиц
CREATE PUBLICATION users_publication FOR TABLE users, orders;
-- Или с фильтрацией
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');
Настройка Subscriber
-- Создание subscription на Subscriber
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=publisher_host dbname=mydb user=replica password=password'
PUBLICATION my_publication;
-- Мониторинг logical replication
SELECT
subname,
received_lsn,
latest_end_lsn,
latest_end_time,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) as lag
FROM pg_stat_subscription;
Cascading Replication
Настройка каскадной репликации
Master ────► Standby1 ────► Standby2
│ └────► Standby3
└────► Standby4
-- postgresql.conf на Standby1 (промежуточный standby)
hot_standby = on
max_wal_senders = 5 -- Для отправки WAL дальше
wal_level = replica -- Или hot_standby для старых версий
-- Standby2 подключается к Standby1
primary_conninfo = 'host=standby1_host port=5432 user=replica'
3. Шардирование
Встроенное шардирование (Declarative Partitioning)
Range Partitioning
-- Создание партиционированной таблицы по диапазону дат
CREATE TABLE orders (
id BIGSERIAL,
user_id INT NOT NULL,
total DECIMAL(10,2) NOT NULL,
created_at TIMESTAMP NOT NULL,
status VARCHAR(20) NOT NULL
) PARTITION BY RANGE (created_at);
-- Создание партиций
CREATE TABLE orders_2024_01 PARTITION OF orders
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE orders_2024_02 PARTITION OF orders
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
CREATE TABLE orders_2024_03 PARTITION OF orders
FOR VALUES FROM ('2024-03-01') TO ('2024-04-01');
-- Автоматическое создание партиций с pg_partman
CREATE EXTENSION pg_partman;
SELECT partman.create_parent(
p_parent_table => 'public.orders',
p_control => 'created_at',
p_type => 'range',
p_interval => 'monthly'
);
Hash Partitioning
-- Партиционирование по хешу для равномерного распределения
CREATE TABLE users (
id BIGSERIAL,
email VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
) PARTITION BY HASH (id);
-- Создание hash партиций
CREATE TABLE users_part_0 PARTITION OF users FOR VALUES WITH (modulus 4, remainder 0);
CREATE TABLE users_part_1 PARTITION OF users FOR VALUES WITH (modulus 4, remainder 1);
CREATE TABLE users_part_2 PARTITION OF users FOR VALUES WITH (modulus 4, remainder 2);
CREATE TABLE users_part_3 PARTITION OF users FOR VALUES WITH (modulus 4, remainder 3);
List Partitioning
-- Партиционирование по списку значений
CREATE TABLE sales (
id BIGSERIAL,
region VARCHAR(50) NOT NULL,
amount DECIMAL(10,2) NOT NULL,
sale_date DATE NOT NULL
) PARTITION BY LIST (region);
-- Создание партиций по регионам
CREATE TABLE sales_us PARTITION OF sales FOR VALUES IN ('US', 'USA', 'United States');
CREATE TABLE sales_eu PARTITION OF sales FOR VALUES IN ('UK', 'DE', 'FR', 'IT', 'ES');
CREATE TABLE sales_asia PARTITION OF sales FOR VALUES IN ('JP', 'CN', 'IN', 'KR');
CREATE TABLE sales_default PARTITION OF sales DEFAULT;
Внешние решения для шардирования
Citus (Horizontal Scaling)
-- Установка Citus
CREATE EXTENSION citus;
-- Настройка coordinator node
SELECT citus_set_coordinator_host('coordinator_host', 5432);
-- Добавление worker nodes
SELECT citus_add_node('worker1_host', 5432);
SELECT citus_add_node('worker2_host', 5432);
SELECT citus_add_node('worker3_host', 5432);
-- Создание distributed таблицы
CREATE TABLE events (
id BIGSERIAL,
user_id INT NOT NULL,
event_type VARCHAR(50),
created_at TIMESTAMP DEFAULT NOW()
);
-- Шардирование таблицы по user_id
SELECT create_distributed_table('events', 'user_id');
-- Создание reference таблицы (реплицируется на все nodes)
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE,
name VARCHAR(255)
);
SELECT create_reference_table('users');
Postgres-XL (Multi-Master)
-- Архитектура Postgres-XL
-- GTM (Global Transaction Manager) - управление транзакциями
-- Coordinator Nodes - точки входа для клиентов
-- Data Nodes - хранение данных
-- Создание distributed таблицы в Postgres-XL
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL,
total DECIMAL(10,2),
created_at TIMESTAMP DEFAULT NOW()
) DISTRIBUTE BY HASH(user_id);
-- Reference таблица
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
price DECIMAL(10,2)
) DISTRIBUTE BY REPLICATION;
FDW (Foreign Data Wrappers) для шардирования
Настройка postgres_fdw
-- Установка расширения
CREATE EXTENSION postgres_fdw;
-- Создание сервера
CREATE SERVER shard1_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'shard1_host', port '5432', dbname 'shard1_db');
-- Создание user mapping
CREATE USER MAPPING FOR postgres
SERVER shard1_server
OPTIONS (user 'postgres', password 'password');
-- Создание foreign таблицы
CREATE FOREIGN TABLE users_shard1 (
id BIGINT,
email VARCHAR(255),
name VARCHAR(255),
created_at TIMESTAMP
) SERVER shard1_server OPTIONS (schema_name 'public', table_name 'users');
-- Объединение шардов через partitioning
CREATE TABLE users (
id BIGINT NOT NULL,
email VARCHAR(255),
name VARCHAR(255),
created_at TIMESTAMP
) PARTITION BY RANGE (id);
ALTER TABLE users_shard1 INHERIT users;
ALTER TABLE users ADD CONSTRAINT users_shard1_check CHECK (id >= 1 AND id < 1000000);
4. Connection Pooling
PgBouncer
Установка и настройка PgBouncer
# /etc/pgbouncer/pgbouncer.ini
[databases]
mydb = host=localhost port=5432 dbname=mydb max_db_connections=50
[pgbouncer]
listen_port = 6432
listen_addr = *
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
# Режимы pooling
pool_mode = transaction # transaction, session, statement
max_client_conn = 1000 # Максимум клиентских соединений
default_pool_size = 25 # Размер пула по умолчанию
reserve_pool_size = 5 # Резервные соединения
reserve_pool_timeout = 3 # Таймаут резерва
# Таймауты
server_reset_query = DISCARD ALL
server_check_delay = 10
server_check_query = SELECT 1
server_lifetime = 3600
server_idle_timeout = 600
Мониторинг PgBouncer
-- Подключение к консоли PgBouncer
psql -h localhost -p 6432 -U pgbouncer pgbouncer
-- Статистика пулов
SHOW POOLS;
-- Активные соединения
SHOW CLIENTS;
SHOW SERVERS;
-- Статистика
SHOW STATS;
-- Конфигурация
SHOW CONFIG;
Odyssey (Advanced Connection Pooler)
# odyssey.conf
storage "postgres_server" {
type "remote"
host "localhost"
port 5432
}
database "mydb" {
user "postgres" {
authentication "md5"
password "password"
storage "postgres_server"
storage_db "mydb"
storage_user "postgres"
storage_password "password"
pool "transaction"
pool_size 50
pool_timeout 4000
pool_ttl 60
pool_discard no
pool_cancel yes
client_max 1000
application_name_add_host yes
log_query no
log_debug no
}
}
listen {
host "*"
port 6432
backlog 128
}
5. Load Balancing
HAProxy для PostgreSQL
# /etc/haproxy/haproxy.cfg
global
maxconn 4096
log 127.0.0.1:514 local0
defaults
mode tcp
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms
option tcplog
# Master для записи
frontend postgres_master
bind *:5000
default_backend postgres_master_backend
backend postgres_master_backend
balance roundrobin
option httpchk GET /master
server master1 master1_host:5432 check port 8008
# Replicas для чтения
frontend postgres_replica
bind *:5001
default_backend postgres_replica_backend
backend postgres_replica_backend
balance roundrobin
option httpchk GET /replica
server replica1 replica1_host:5432 check port 8008
server replica2 replica2_host:5432 check port 8008
server replica3 replica3_host:5432 check port 8008
Health Check скрипт
#!/usr/bin/env python3
import psycopg2
from http.server import HTTPServer, BaseHTTPRequestHandler
import sys
class HealthCheckHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/master':
if self.check_master():
self.send_response(200)
else:
self.send_response(503)
elif self.path == '/replica':
if self.check_replica():
self.send_response(200)
else:
self.send_response(503)
else:
self.send_response(404)
self.end_headers()
def check_master(self):
try:
conn = psycopg2.connect(
host='localhost',
port=5432,
database='postgres',
user='health_check'
)
cur = conn.cursor()
cur.execute("SELECT NOT pg_is_in_recovery()")
is_master = cur.fetchone()[0]
conn.close()
return is_master
except:
return False
def check_replica(self):
try:
conn = psycopg2.connect(
host='localhost',
port=5432,
database='postgres',
user='health_check'
)
cur = conn.cursor()
cur.execute("SELECT pg_is_in_recovery()")
is_replica = cur.fetchone()[0]
conn.close()
return is_replica
except:
return False
if __name__ == '__main__':
server = HTTPServer(('', 8008), HealthCheckHandler)
server.serve_forever()
6. Monitoring и Alerting
Мониторинг репликации
-- Создание мониторинговых представлений
CREATE VIEW replication_status AS
SELECT
client_addr,
client_hostname,
state,
sent_lsn,
write_lsn,
flush_lsn,
replay_lsn,
EXTRACT(EPOCH FROM write_lag) as write_lag_seconds,
EXTRACT(EPOCH FROM flush_lag) as flush_lag_seconds,
EXTRACT(EPOCH FROM replay_lag) as replay_lag_seconds,
pg_wal_lsn_diff(sent_lsn, replay_lsn) as total_lag_bytes
FROM pg_stat_replication;
-- Алерты на основе lag
CREATE OR REPLACE FUNCTION check_replication_lag()
RETURNS TABLE(alert_level TEXT, message TEXT) AS $$
BEGIN
RETURN QUERY
SELECT
CASE
WHEN total_lag_bytes > 1073741824 THEN 'CRITICAL' -- > 1GB
WHEN total_lag_bytes > 104857600 THEN 'WARNING' -- > 100MB
ELSE 'OK'
END as alert_level,
format('Replication lag: %s bytes for %s',
total_lag_bytes, client_addr) as message
FROM replication_status
WHERE total_lag_bytes > 104857600;
END;
$$ LANGUAGE plpgsql;
Prometheus и Grafana мониторинг
# postgres_exporter конфигурация
# custom_queries.yml
pg_replication_lag:
query: |
SELECT
client_addr,
client_hostname,
EXTRACT(EPOCH FROM write_lag) as write_lag,
EXTRACT(EPOCH FROM flush_lag) as flush_lag,
EXTRACT(EPOCH FROM replay_lag) as replay_lag,
pg_wal_lsn_diff(sent_lsn, replay_lsn) as lag_bytes
FROM pg_stat_replication
metrics:
- client_addr:
usage: "LABEL"
description: "Client address"
- write_lag:
usage: "GAUGE"
description: "Write lag in seconds"
- flush_lag:
usage: "GAUGE"
description: "Flush lag in seconds"
- replay_lag:
usage: "GAUGE"
description: "Replay lag in seconds"
- lag_bytes:
usage: "GAUGE"
description: "Total lag in bytes"
pg_database_size:
query: |
SELECT
datname,
pg_database_size(datname) as size_bytes
FROM pg_database
WHERE datistemplate = false
metrics:
- datname:
usage: "LABEL"
description: "Database name"
- size_bytes:
usage: "GAUGE"
description: "Database size in bytes"
7. Failover и High Availability
Automatic Failover с Patroni
Patroni конфигурация
# patroni.yml
scope: postgres-cluster
namespace: /postgresql/
name: node1
restapi:
listen: 0.0.0.0:8008
connect_address: node1:8008
etcd:
hosts: etcd1:2379,etcd2:2379,etcd3:2379
bootstrap:
dcs:
ttl: 30
loop_wait: 10
retry_timeout: 30
maximum_lag_on_failover: 1048576
master_start_timeout: 300
synchronous_mode: false
postgresql:
use_pg_rewind: true
use_slots: true
parameters:
wal_level: replica
hot_standby: "on"
wal_keep_segments: 8
max_wal_senders: 10
max_replication_slots: 10
wal_log_hints: "on"
initdb:
- encoding: UTF8
- data-checksums
postgresql:
listen: 0.0.0.0:5432
connect_address: node1:5432
data_dir: /var/lib/postgresql/data
bin_dir: /usr/lib/postgresql/13/bin
pgpass: /tmp/pgpass
authentication:
replication:
username: replicator
password: password
superuser:
username: postgres
password: password
tags:
nofailover: false
noloadbalance: false
clonefrom: false
nosync: false
Manual Failover процедуры
# Планируемое переключение (switchover)
# 1. Останавливаем приложения или переводим в read-only режим
# 2. Ждем синхронизации standby серверов
# 3. Продвигаем standby до master
# На standby сервере
pg_ctl promote -D /var/lib/postgresql/data
# Или создание trigger файла
touch /tmp/promote_standby
# 4. Перенастраиваем приложения на новый master
# 5. Старый master становится standby (требует pg_rewind)
# Внеплановое переключение (failover)
# 1. Обнаружение недоступности master
# 2. Выбор наиболее актуального standby
# 3. Продвижение standby до master
# 4. Обновление DNS/load balancer
# 5. Перезапуск приложений
Проверка целостности после failover
-- Проверки после failover
-- 1. Статус нового master
SELECT pg_is_in_recovery(); -- Должно вернуть false
-- 2. Последний применненый WAL
SELECT pg_last_wal_replay_lsn();
-- 3. Проверка активных соединений
SELECT count(*) FROM pg_stat_activity WHERE state = 'active';
-- 4. Проверка репликации (если standby серверы переподключились)
SELECT * FROM pg_stat_replication;
-- 5. Тестовые операции записи
CREATE TEMP TABLE failover_test (id int, ts timestamp default now());
INSERT INTO failover_test (id) VALUES (1);
SELECT * FROM failover_test;
DROP TABLE failover_test;
8. Практические архитектуры
Микросервисная архитектура с Database per Service
-- Сервис Users
-- Database: users_db
-- Tables: users, user_profiles, user_settings
-- Сервис Orders
-- Database: orders_db
-- Tables: orders, order_items, payments
-- Сервис Inventory
-- Database: inventory_db
-- Tables: products, stock, suppliers
-- Challenges:
-- 1. Транзакции между сервисами (Saga pattern)
-- 2. Консистентность данных (Eventually consistent)
-- 3. Join'ы между сервисами (API calls)
Master-Slave архитектура для Read/Write splitting
# Пример Django database роутера
class DatabaseRouter:
def db_for_read(self, model, **hints):
"""Направляем чтение на slave"""
return 'slave'
def db_for_write(self, model, **hints):
"""Направляем запись на master"""
return 'master'
def allow_migrate(self, db, app_label, model_name=None, **hints):
"""Миграции только на master"""
return db == 'master'
# settings.py
DATABASES = {
'default': {}, # Используется как алиас для master
'master': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'mydb',
'HOST': 'master.db.example.com',
'PORT': '5432',
},
'slave': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'mydb',
'HOST': 'slave.db.example.com',
'PORT': '5432',
}
}
DATABASE_ROUTERS = ['myapp.routers.DatabaseRouter']
Multi-Region Setup
# Глобальная архитектура
Region US-East:
- Master PostgreSQL
- 2 Read Replicas
- PgBouncer Pool
- HAProxy Load Balancer
Region EU-West:
- Read Replica (cross-region)
- Local PgBouncer
- HAProxy
Region Asia-Pacific:
- Read Replica (cross-region)
- Local PgBouncer
- HAProxy
# Стратегии:
# 1. Async replication между регионами
# 2. DNS-based routing по географии
# 3. Локальные read replicas для низкой латентности
# 4. Cross-region failover для disaster recovery
9. Best Practices
Проектирование для масштабирования
-- 1. Правильное партиционирование данных
-- Выбирайте partition key, который:
-- - Обеспечивает равномерное распределение
-- - Минимизирует cross-partition запросы
-- - Соответствует паттернам доступа
-- 2. Денормализация для производительности
-- Создавайте summary таблицы для агрегатов
CREATE TABLE daily_user_stats (
user_id INT,
date DATE,
login_count INT,
page_views INT,
time_spent_minutes INT,
PRIMARY KEY (user_id, date)
);
-- 3. Использование материализованных представлений
CREATE MATERIALIZED VIEW monthly_sales AS
SELECT
DATE_TRUNC('month', created_at) as month,
SUM(total) as total_sales,
COUNT(*) as orders_count,
AVG(total) as avg_order_value
FROM orders
GROUP BY DATE_TRUNC('month', created_at);
-- Автоматическое обновление
CREATE OR REPLACE FUNCTION refresh_monthly_sales()
RETURNS void AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY monthly_sales;
END;
$$ LANGUAGE plpgsql;
-- Запуск по расписанию через pg_cron
SELECT cron.schedule('refresh-monthly-sales', '0 1 * * *', 'SELECT refresh_monthly_sales();');
Мониторинг производительности масштабируемой системы
-- Создание dashboard для мониторинга
CREATE VIEW cluster_health AS
SELECT
'master' as server_type,
version() as version,
pg_database_size(current_database()) as db_size,
(SELECT count(*) FROM pg_stat_activity WHERE state = 'active') as active_connections,
(SELECT count(*) FROM pg_stat_replication) as replica_count,
pg_is_in_recovery() as is_replica,
CASE WHEN pg_is_in_recovery() THEN
EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))
ELSE NULL END as replica_lag_seconds;
-- Алерты для критических метрик
CREATE OR REPLACE FUNCTION check_cluster_health()
RETURNS TABLE(
metric_name TEXT,
current_value NUMERIC,
threshold NUMERIC,
status TEXT
) AS $$
BEGIN
-- Проверка активных соединений
RETURN QUERY
SELECT
'active_connections'::TEXT,
(SELECT count(*)::NUMERIC FROM pg_stat_activity WHERE state = 'active'),
200::NUMERIC,
CASE WHEN (SELECT count(*) FROM pg_stat_activity WHERE state = 'active') > 200
THEN 'CRITICAL' ELSE 'OK' END;
-- Проверка размера базы данных
RETURN QUERY
SELECT
'database_size_gb'::TEXT,
pg_database_size(current_database())::NUMERIC / 1073741824,
1000::NUMERIC,
CASE WHEN pg_database_size(current_database()) > 1073741824000 -- 1TB
THEN 'WARNING' ELSE 'OK' END;
-- Проверка lag репликации
IF pg_is_in_recovery() THEN
RETURN QUERY
SELECT
'replication_lag_seconds'::TEXT,
EXTRACT(EPOCH FROM (now()
Процедуры и функции
Основы Stored Procedures и Functions
Stored Procedures — именованные блоки PL/pgSQL кода, хранящиеся в базе данных и выполняющиеся на сервере БД. Обеспечивают инкапсуляцию бизнес-логики, повышение производительности за счет компиляции и кэширования планов выполнения.
Functions vs Procedures в PostgreSQL:
- Functions — возвращают значение, могут использоваться в SQL выражениях, поддерживают только READ операции по умолчанию
- Procedures — не возвращают значение, поддерживают транзакционное управление (COMMIT/ROLLBACK), появились в PostgreSQL 11
Создание функций
Базовый синтаксис функций
-- Простая функция расчета
CREATE OR REPLACE FUNCTION calculate_discount(
order_amount DECIMAL,
customer_type VARCHAR(20)
) RETURNS DECIMAL AS $$
BEGIN
-- CASE выражение для бизнес-логики скидок
RETURN CASE
WHEN customer_type = 'VIP' AND order_amount > 1000 THEN order_amount * 0.15
WHEN customer_type = 'VIP' THEN order_amount * 0.10
WHEN order_amount > 500 THEN order_amount * 0.05
ELSE 0
END;
END;
$$ LANGUAGE plpgsql;
-- Использование функции в запросах
SELECT
order_id,
total_amount,
calculate_discount(total_amount, customer_type) AS discount,
total_amount - calculate_discount(total_amount, customer_type) AS final_amount
FROM orders;
$$ ... $$ — PostgreSQL dollar quoting для избежания проблем с кавычками в коде. LANGUAGE plpgsql — указывает процедурный язык PostgreSQL.
Функции с различными типами возврата
-- Возврат таблицы (TABLE function)
CREATE OR REPLACE FUNCTION get_user_orders(user_id_param BIGINT)
RETURNS TABLE(
order_id BIGINT,
order_date TIMESTAMP,
total_amount DECIMAL,
status VARCHAR(20)
) AS $$
BEGIN
RETURN QUERY
SELECT o.id, o.created_at, o.total_amount, o.status
FROM orders o
WHERE o.user_id = user_id_param
ORDER BY o.created_at DESC;
END;
$$ LANGUAGE plpgsql;
-- Возврат множества записей (SETOF)
CREATE OR REPLACE FUNCTION get_top_customers(limit_count INT DEFAULT 10)
RETURNS SETOF customers AS $$
BEGIN
RETURN QUERY
SELECT c.*
FROM customers c
INNER JOIN (
SELECT user_id, SUM(total_amount) as total_spent
FROM orders
WHERE created_at >= CURRENT_DATE - INTERVAL '1 year'
GROUP BY user_id
ORDER BY total_spent DESC
LIMIT limit_count
) top ON c.id = top.user_id;
END;
$$ LANGUAGE plpgsql;
-- Возврат пользовательского типа
CREATE TYPE order_summary AS (
total_orders INT,
total_amount DECIMAL,
avg_order_value DECIMAL,
last_order_date TIMESTAMP
);
CREATE OR REPLACE FUNCTION get_user_summary(user_id_param BIGINT)
RETURNS order_summary AS $$
DECLARE
result order_summary;
BEGIN
SELECT
COUNT(*),
COALESCE(SUM(total_amount), 0),
COALESCE(AVG(total_amount), 0),
MAX(created_at)
INTO
result.total_orders,
result.total_amount,
result.avg_order_value,
result.last_order_date
FROM orders
WHERE user_id = user_id_param;
RETURN result;
END;
$$ LANGUAGE plpgsql;
Процедуры (PostgreSQL 11+)
Транзакционные процедуры
-- Процедура с управлением транзакциями
CREATE OR REPLACE PROCEDURE process_order_batch(
batch_id BIGINT,
auto_commit BOOLEAN DEFAULT TRUE
) AS $$
DECLARE
order_record RECORD;
processed_count INT := 0;
error_count INT := 0;
BEGIN
-- Обработка пакета заказов
FOR order_record IN
SELECT * FROM orders
WHERE batch_id = process_order_batch.batch_id
AND status = 'PENDING'
LOOP
BEGIN
-- Проверка наличия товаров
PERFORM check_inventory(order_record.id);
-- Резервирование товаров
CALL reserve_inventory(order_record.id);
-- Обновление статуса заказа
UPDATE orders
SET status = 'PROCESSING', updated_at = NOW()
WHERE id = order_record.id;
processed_count := processed_count + 1;
-- Промежуточный COMMIT для больших пакетов
IF auto_commit AND processed_count % 100 = 0 THEN
COMMIT;
END IF;
EXCEPTION
WHEN OTHERS THEN
error_count := error_count + 1;
-- Логирование ошибки
INSERT INTO error_log (batch_id, order_id, error_message, created_at)
VALUES (process_order_batch.batch_id, order_record.id, SQLERRM, NOW());
-- Откат для конкретного заказа
ROLLBACK;
END;
END LOOP;
-- Обновление статистики пакета
UPDATE order_batches
SET
processed_orders = processed_count,
failed_orders = error_count,
status = 'COMPLETED',
completed_at = NOW()
WHERE id = process_order_batch.batch_id;
COMMIT;
RAISE NOTICE 'Batch % processed: % successful, % failed',
batch_id, processed_count, error_count;
END;
$$ LANGUAGE plpgsql;
CALL — вызов процедуры (в отличие от SELECT для функций). COMMIT/ROLLBACK — управление транзакциями внутри процедуры, недоступно в функциях.
Сложная логика и контроль потока
Циклы и условная логика
CREATE OR REPLACE FUNCTION calculate_loyalty_points(
user_id_param BIGINT,
calculation_period INTERVAL DEFAULT '1 year'
) RETURNS JSON AS $$
DECLARE
order_cursor CURSOR FOR
SELECT id, total_amount, created_at, items
FROM orders
WHERE user_id = user_id_param
AND created_at >= NOW() - calculation_period
ORDER BY created_at;
current_order RECORD;
total_points INT := 0;
bonus_multiplier DECIMAL := 1.0;
tier_points INT;
result JSON;
BEGIN
-- Определение множителя на основе статуса клиента
SELECT
CASE
WHEN customer_tier = 'GOLD' THEN 1.5
WHEN customer_tier = 'SILVER' THEN 1.2
ELSE 1.0
END
INTO bonus_multiplier
FROM customers
WHERE id = user_id_param;
-- Итерация по заказам
OPEN order_cursor;
LOOP
FETCH order_cursor INTO current_order;
EXIT WHEN NOT FOUND;
-- Базовые баллы: 1 балл за каждые 10 рублей
tier_points := FLOOR(current_order.total_amount / 10);
-- Бонусы за большие заказы
IF current_order.total_amount > 5000 THEN
tier_points := tier_points + 500;
ELSIF current_order.total_amount > 2000 THEN
tier_points := tier_points + 200;
END IF;
-- Применение множителя
tier_points := FLOOR(tier_points * bonus_multiplier);
total_points := total_points + tier_points;
-- Логирование начисления баллов
INSERT INTO loyalty_points_log (
user_id, order_id, points_earned, calculation_date
) VALUES (
user_id_param, current_order.id, tier_points, NOW()
);
END LOOP;
CLOSE order_cursor;
-- Формирование JSON результата
result := json_build_object(
'user_id', user_id_param,
'period', calculation_period,
'total_points', total_points,
'bonus_multiplier', bonus_multiplier,
'calculation_date', NOW()
);
RETURN result;
END;
$$ LANGUAGE plpgsql;
CURSOR — механизм для пошаговой обработки результатов запроса, эффективен для больших объемов данных. json_build_object() — PostgreSQL функция для создания JSON объектов.
Обработка исключений и динамический SQL
CREATE OR REPLACE FUNCTION dynamic_report_generator(
table_name VARCHAR(50),
date_column VARCHAR(50),
start_date DATE,
end_date DATE,
group_by_columns VARCHAR[] DEFAULT ARRAY['id']
) RETURNS JSON AS $$
DECLARE
sql_query TEXT;
result_json JSON;
row_count INT;
BEGIN
-- Валидация входных параметров
IF table_name IS NULL OR date_column IS NULL THEN
RAISE EXCEPTION 'Table name and date column cannot be null';
END IF;
-- Проверка существования таблицы
IF NOT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_name = dynamic_report_generator.table_name
AND table_schema = 'public'
) THEN
RAISE EXCEPTION 'Table % does not exist', table_name;
END IF;
-- Построение динамического SQL
sql_query := format('
SELECT json_agg(row_to_json(t))
FROM (
SELECT %s, COUNT(*) as record_count
FROM %I
WHERE %I BETWEEN %L AND %L
GROUP BY %s
ORDER BY record_count DESC
LIMIT 1000
) t',
array_to_string(group_by_columns, ', '),
table_name,
date_column,
start_date,
end_date,
array_to_string(group_by_columns, ', ')
);
-- Выполнение динамического запроса
BEGIN
EXECUTE sql_query INTO result_json;
-- Получение количества обработанных записей
EXECUTE format('
SELECT COUNT(*) FROM %I WHERE %I BETWEEN %L AND %L',
table_name, date_column, start_date, end_date
) INTO row_count;
EXCEPTION
WHEN OTHERS THEN
RAISE EXCEPTION 'Error executing dynamic query: %. SQL: %', SQLERRM, sql_query;
END;
-- Добавление метаданных к результату
result_json := json_build_object(
'data', COALESCE(result_json, '[]'::json),
'metadata', json_build_object(
'table_name', table_name,
'date_range', json_build_object('start', start_date, 'end', end_date),
'total_rows', row_count,
'generated_at', NOW()
)
);
RETURN result_json;
END;
$$ LANGUAGE plpgsql;
EXECUTE — выполнение динамически построенного SQL. %I — безопасная подстановка идентификаторов, %L — литералов в format(). SQLERRM — текст последней ошибки.
Триггерные функции
Audit и логирование
-- Создание audit таблицы
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(50) NOT NULL,
operation VARCHAR(10) NOT NULL,
old_values JSONB,
new_values JSONB,
user_id BIGINT,
session_id VARCHAR(100),
ip_address INET,
created_at TIMESTAMP DEFAULT NOW()
);
-- Универсальная audit функция
CREATE OR REPLACE FUNCTION audit_trigger_function()
RETURNS TRIGGER AS $$
DECLARE
old_data JSONB;
new_data JSONB;
current_user_id BIGINT;
current_session_id VARCHAR(100);
client_ip INET;
BEGIN
-- Получение пользовательских данных из контекста
current_user_id := current_setting('app.current_user_id', true)::BIGINT;
current_session_id := current_setting('app.session_id', true);
client_ip := current_setting('app.client_ip', true)::INET;
-- Обработка разных типов операций
IF TG_OP = 'DELETE' THEN
old_data := to_jsonb(OLD);
new_data := NULL;
ELSIF TG_OP = 'UPDATE' THEN
old_data := to_jsonb(OLD);
new_data := to_jsonb(NEW);
ELSIF TG_OP = 'INSERT' THEN
old_data := NULL;
new_data := to_jsonb(NEW);
END IF;
-- Запись в audit лог
INSERT INTO audit_log (
table_name, operation, old_values, new_values,
user_id, session_id, ip_address
) VALUES (
TG_TABLE_NAME, TG_OP, old_data, new_data,
current_user_id, current_session_id, client_ip
);
-- Возврат соответствующей записи
IF TG_OP = 'DELETE' THEN
RETURN OLD;
ELSE
RETURN NEW;
END IF;
END;
$$ LANGUAGE plpgsql;
-- Создание триггеров для таблиц
CREATE TRIGGER users_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION audit_trigger_function();
CREATE TRIGGER orders_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_trigger_function();
TG_OP — тип операции триггера (INSERT/UPDATE/DELETE). TG_TABLE_NAME — имя таблицы, которая вызвала триггер. OLD/NEW — записи до и после изменения.
Бизнес-логика в триггерах
-- Автоматическое обновление статистик заказов
CREATE OR REPLACE FUNCTION update_order_statistics()
RETURNS TRIGGER AS $$
DECLARE
user_stats RECORD;
BEGIN
-- Обновление статистики пользователя при изменении заказа
IF TG_OP = 'INSERT' OR (TG_OP = 'UPDATE' AND OLD.status != NEW.status) THEN
-- Пересчет статистики пользователя
SELECT
COUNT(*) as total_orders,
SUM(CASE WHEN status = 'COMPLETED' THEN total_amount ELSE 0 END) as total_spent,
AVG(CASE WHEN status = 'COMPLETED' THEN total_amount ELSE NULL END) as avg_order_value,
MAX(CASE WHEN status = 'COMPLETED' THEN created_at ELSE NULL END) as last_order_date
INTO user_stats
FROM orders
WHERE user_id = COALESCE(NEW.user_id, OLD.user_id);
-- Обновление денормализованных данных в таблице пользователей
UPDATE customers
SET
total_orders = user_stats.total_orders,
total_spent = COALESCE(user_stats.total_spent, 0),
avg_order_value = COALESCE(user_stats.avg_order_value, 0),
last_order_date = user_stats.last_order_date,
updated_at = NOW()
WHERE id = COALESCE(NEW.user_id, OLD.user_id);
-- Обновление tier клиента на основе потраченной суммы
UPDATE customers
SET customer_tier = CASE
WHEN total_spent >= 50000 THEN 'GOLD'
WHEN total_spent >= 20000 THEN 'SILVER'
ELSE 'BRONZE'
END
WHERE id = COALESCE(NEW.user_id, OLD.user_id);
END IF;
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER order_statistics_trigger
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION update_order_statistics();
Вызов из Java приложений
Вызов функций через JDBC
@Repository
public class PostgreSQLProcedureRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
// Вызов скалярной функции
public BigDecimal calculateDiscount(BigDecimal orderAmount, String customerType) {
String sql = "SELECT calculate_discount(?, ?)";
return jdbcTemplate.queryForObject(sql, BigDecimal.class, orderAmount, customerType);
}
// Вызов функции, возвращающей таблицу
public List<OrderDto> getUserOrders(Long userId) {
String sql = "SELECT * FROM get_user_orders(?)";
return jdbcTemplate.query(sql, new Object[]{userId}, (rs, rowNum) -> {
OrderDto order = new OrderDto();
order.setOrderId(rs.getLong("order_id"));
order.setOrderDate(rs.getTimestamp("order_date").toLocalDateTime());
order.setTotalAmount(rs.getBigDecimal("total_amount"));
order.setStatus(rs.getString("status"));
return order;
});
}
// Вызов функции с JSON результатом
public LoyaltyPointsResult calculateLoyaltyPoints(Long userId, String period) {
String sql = "SELECT calculate_loyalty_points(?, ?::INTERVAL)";
String jsonResult = jdbcTemplate.queryForObject(sql, String.class, userId, period);
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonResult, LoyaltyPointsResult.class);
} catch (Exception e) {
throw new RuntimeException("Failed to parse loyalty points result", e);
}
}
// Вызов процедуры
public void processOrderBatch(Long batchId, boolean autoCommit) {
String sql = "CALL process_order_batch(?, ?)";
jdbcTemplate.update(sql, batchId, autoCommit);
}
// Пакетный вызов с именованными параметрами
public void batchUpdateUserTiers(List<UserTierUpdate> updates) {
String sql = """
SELECT update_user_tier(
user_id := :userId,
new_tier := :newTier,
reason := :reason
)
""";
List<Map<String, Object>> batchParams = updates.stream()
.map(update -> {
Map<String, Object> params = new HashMap<>();
params.put("userId", update.getUserId());
params.put("newTier", update.getNewTier());
params.put("reason", update.getReason());
return params;
})
.collect(Collectors.toList());
namedParameterJdbcTemplate.batchUpdate(sql, batchParams.toArray(new Map[0]));
}
}
Использование с JPA/Hibernate
@Entity
@NamedStoredProcedureQuery(
name = "processOrderBatch",
procedureName = "process_order_batch",
parameters = {
@StoredProcedureParameter(mode = ParameterMode.IN, name = "batch_id", type = Long.class),
@StoredProcedureParameter(mode = ParameterMode.IN, name = "auto_commit", type = Boolean.class)
}
)
public class OrderBatch {
// entity fields
}
@Repository
public class OrderBatchRepository {
@PersistenceContext
private EntityManager entityManager;
public void processOrderBatch(Long batchId, boolean autoCommit) {
StoredProcedureQuery query = entityManager
.createNamedStoredProcedureQuery("processOrderBatch");
query.setParameter("batch_id", batchId);
query.setParameter("auto_commit", autoCommit);
query.execute();
}
// Вызов функции через native query
@Query(value = "SELECT * FROM get_user_orders(?1)", nativeQuery = true)
List<Object[]> getUserOrdersNative(Long userId);
// Функция с возвратом JSON
@Query(value = "SELECT calculate_loyalty_points(?1, ?2::INTERVAL)", nativeQuery = true)
String calculateLoyaltyPointsJson(Long userId, String period);
}
Производительность и оптимизация
Кэширование и план выполнения
-- Функция с кэшированием плана выполнения
CREATE OR REPLACE FUNCTION get_product_analytics(
category_id_param INT,
date_from DATE,
date_to DATE
) RETURNS TABLE(
product_id BIGINT,
product_name VARCHAR(255),
total_sales BIGINT,
revenue DECIMAL
) AS $$
DECLARE
cache_key TEXT;
cached_result RECORD;
BEGIN
-- Формирование ключа кэша
cache_key := format('product_analytics_%s_%s_%s',
category_id_param, date_from, date_to);
-- Проверка кэша (через temporary таблицу или Redis)
SELECT INTO cached_result * FROM temp_cache
WHERE key = cache_key AND expires_at > NOW();
IF FOUND THEN
-- Возврат кэшированного результата
RETURN QUERY EXECUTE cached_result.query_result;
RETURN;
END IF;
-- Выполнение основного запроса
RETURN QUERY
SELECT
p.id,
p.name,
COUNT(oi.id)::BIGINT,
SUM(oi.price * oi.quantity)
FROM products p
INNER JOIN order_items oi ON p.id = oi.product_id
INNER JOIN orders o ON oi.order_id = o.id
WHERE p.category_id = category_id_param
AND o.created_at BETWEEN date_from AND date_to
GROUP BY p.id, p.name
ORDER BY SUM(oi.price * oi.quantity) DESC;
-- Сохранение в кэш (упрощенная схема)
INSERT INTO temp_cache (key, query_result, expires_at)
VALUES (cache_key, 'cached_data', NOW() + INTERVAL '1 hour')
ON CONFLICT (key) DO UPDATE SET
query_result = EXCLUDED.query_result,
expires_at = EXCLUDED.expires_at;
END;
$$ LANGUAGE plpgsql;
-- Функция с оптимизацией для больших объемов
CREATE OR REPLACE FUNCTION bulk_process_orders(
batch_size INT DEFAULT 1000
) RETURNS INT AS $$
DECLARE
processed_count INT := 0;
current_batch INT;
BEGIN
LOOP
-- Обработка пакета заказов
WITH batch_orders AS (
SELECT id FROM orders
WHERE status = 'PENDING'
LIMIT batch_size
FOR UPDATE SKIP LOCKED -- Избегание блокировок в concurrent среде
)
UPDATE orders
SET
status = 'PROCESSING',
updated_at = NOW(),
processed_by = current_setting('app.worker_id', true)
FROM batch_orders
WHERE orders.id = batch_orders.id;
GET DIAGNOSTICS current_batch = ROW_COUNT;
processed_count := processed_count + current_batch;
-- Выход если пакет неполный
EXIT WHEN current_batch < batch_size;
-- Периодический COMMIT для длительных операций
PERFORM pg_sleep(0.01); -- Небольшая пауза
END LOOP;
RETURN processed_count;
END;
$$ LANGUAGE plpgsql;
FOR UPDATE SKIP LOCKED — позволяет пропускать заблокированные строки, полезно для concurrent обработки очередей. GET DIAGNOSTICS — получение информации о выполненной операции.
Частые вопросы на собеседовании
Q: В чем разница между функциями и процедурами в PostgreSQL? A: Functions возвращают значение и могут использоваться в SELECT, по умолчанию READ-ONLY. Procedures не возвращают значение, поддерживают COMMIT/ROLLBACK, вызываются через CALL.
Q: Когда использовать stored procedures вместо бизнес-логики в Java? A: Для complex data transformations, bulk operations, audit/triggers, atomic operations требующих консистентности, когда логика тесно связана с данными.
Q: Как обеспечить безопасность динамического SQL в функциях? A: Использовать format() с %I для идентификаторов и %L для литералов, валидировать входные параметры, применять whitelist для table/column names.
Q: Что такое VOLATILE, STABLE, IMMUTABLE в контексте функций? A: VOLATILE - функция может изменять БД и возвращать разные результаты. STABLE - не изменяет БД, одинаковый результат в рамках одного statement. IMMUTABLE - всегда одинаковый результат для одинаковых параметров.
Q: Как оптимизировать производительность stored procedures? A: Использовать PREPARE statements, избегать cursors где возможно, применять bulk operations, использовать SKIP LOCKED для concurrent processing, кэшировать планы выполнения.
Q: Как обрабатывать ошибки в PL/pgSQL? A: Через EXCEPTION блоки с specific error types (UNIQUE_VIOLATION, NOT_NULL_VIOLATION) или WHEN OTHERS для общих ошибок. SQLERRM для получения текста ошибки.