Adaptive Query Rewriting в PostgreSQL

Kate

Administrator
Команда форума
В этой статье я покажу, как с помощью Adaptive Query Rewriting PostgreSQL может не только выбирать новый план, но и адаптироваться, учась на своих ошибках, чтобы быть быстрее и эффективнее. Почему один и тот же запрос в PostgreSQL иногда выполняется быстро, а иногда тянется? Всё дело в том, что классический оптимизатор ориентируется на устаревшие статистики и не всегда точно предсказывает реальную ситуацию в данных. В таких случаях адаптивная система, анализируя ошибки и реальные показатели выполнения, корректирует план, улучшая производительность.

Как работает оптимизатор PostgreSQL​

Когда вы вызываете EXPLAIN ANALYZE, оптимизатор оценивает стоимость каждого узла плана, используя параметры вроде seq_page_cost, random_page_cost, cpu_tuple_cost и так далее. Но, как часто бывает, модель оказывается несовершенной. Взглянем на классический пример запроса с тремя таблицами:

EXPLAIN ANALYZE
SELECT *
FROM users AS u1
JOIN messages AS m ON u1.id = m.sender_id
JOIN users AS u2 ON m.receiver_id = u2.id;
При выполнении этот запрос может породить план вроде:

Hash Join (cost=540.00..439429.44 rows=10003825 width=27)
Hash Cond: (m.receiver_id = u2.id)
-> Hash Join (cost=270.00..301606.84 rows=10003825 width=23)
Hash Cond: (m.sender_id = u1.id)
-> Seq Scan on messages m (cost=0.00..163784.25 rows=10003825 width=19)
-> Hash (cost=145.00..145.00 rows=10000 width=4)
-> Seq Scan on users u1 (cost=0.00..145.00 rows=10000 width=4)
-> Hash (cost=145.00..145.00 rows=10000 width=4)
-> Seq Scan on users u2 (cost=0.00..145.00 rows=10000 width=4)
На первый взгляд всё логично, но если статистика устарела или распределение данных изменилось, оптимизатор начинает спотыкаться, выбирая не тот план, который принесёт максимум производительности.

Adaptive Query Rewriting​

Суть AQR проста: собираем обратную связь о выполнении запроса и корректируем план на будущее.

Основные этапы работы AQR

  1. Планирование. Система предсказывает стоимость выполнения плана на основе имеющихся статистик.
  2. Исполнение. Во время выполнения собирается подробная статистика: фактическое число строк, время выполнения, использование ресурсов.
  3. Обратная связь. Сравниваем предсказанные значения с реальными.
  4. Корректировка плана. При последующих запусках запросов, имеющих сходную структуру, используется обновлённая модель, позволяющая выбирать более оптимальный план.
Также использовать машинное обучение. Классический подход, когда для каждой группы запросов накапливается огромная обучающая выборка, не всегда удобен. Поэтому можно использовать Gradient Approach to kNN: вместо хранения всего архива опыта оптимизируем фиксированное число виртуальных объектов с помощью стохастического градиентного спуска.

Практическая реализация​

Начнём с перехвата стандартного процесса планирования с помощью C‑расширения.

Хук для переписывания плана в C

/*
* Пример расширения для PostgreSQL, реализующего Adaptive Query Rewriting.
* Здесь используем planner_hook для перехвата и модификации плана запроса.
*/

#include "postgres.h"
#include "optimizer/planner.h"
#include "commands/explain.h"
#include "utils/guc.h"

/* Сохраняем оригинальный указатель на планировщик */
static planner_hook_type prev_planner_hook = NULL;

/* Прототип функции-планировщика */
static PlannedStmt* my_planner(Query *parse, int cursorOptions, ParamListInfo boundParams);

static PlannedStmt*
my_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
{
PlannedStmt *stmt;

/* Вызываем оригинальный планировщик */
if (prev_planner_hook)
stmt = prev_planner_hook(parse, cursorOptions, boundParams);
else
stmt = standard_planner(parse, cursorOptions, boundParams);

/* Здесь начинается адаптивная логика */
elog(INFO, "AQR: сформирован оригинальный план, начинаем адаптацию");

/* Пример проверки: если в плане обнаружен SeqScan, пробуем заменить его на IndexScan */
// if (contains_seqscan(stmt)) {
// stmt = rewrite_seqscan_to_indexscan(stmt);
// elog(INFO, "AQR: SeqScan заменён на IndexScan");
// }

/* Дополнительные проверки, сбор статистики и корректировка плана могут быть добавлены здесь */

return stmt;
}

/* Функция инициализации расширения */
void
_PG_init(void)
{
/* Сохраняем старый хук */
prev_planner_hook = planner_hook;
/* Устанавливаем наш хук */
planner_hook = my_planner;

DefineCustomBoolVariable("aqr.debug_mode",
"Enable debug mode for Adaptive Query Rewriting.",
NULL,
&DebugFlag,
false,
PGC_SUSET,
0,
NULL,
NULL,
NULL);

elog(INFO, "AQR: расширение загружено успешно");
}

/* Функция деинициализации расширения */
void
_PG_fini(void)
{
planner_hook = prev_planner_hook;
elog(INFO, "AQR: расширение выгружено");
}
Расширенный пример: сбор статистики и динамическое переписывание запросов

Чтобы система могла учиться, нужно собирать статистику выполнения. Ниже — пример того, как можно регистрировать время выполнения запроса и число обработанных строк.

-- Создаем таблицу для хранения статистики запросов
CREATE TABLE aqr_query_stats (
query_hash TEXT PRIMARY KEY,
exec_count INTEGER DEFAULT 0,
total_time NUMERIC DEFAULT 0,
avg_rows NUMERIC DEFAULT 0
);

-- Функция для обновления статистики выполнения запроса
CREATE OR REPLACE FUNCTION update_aqr_stats(
p_query_hash TEXT,
p_exec_time NUMERIC,
p_rows INTEGER
) RETURNS VOID AS $$
BEGIN
INSERT INTO aqr_query_stats (query_hash, exec_count, total_time, avg_rows)
VALUES (p_query_hash, 1, p_exec_time, p_rows)
ON CONFLICT (query_hash) DO UPDATE SET
exec_count = aqr_query_stats.exec_count + 1,
total_time = aqr_query_stats.total_time + p_exec_time,
avg_rows = ((aqr_query_stats.avg_rows * aqr_query_stats.exec_count) + p_rows) / (aqr_query_stats.exec_count + 1);
END;
$$ LANGUAGE plpgsql;

-- Пример процедуры, оборачивающей выполнение запроса для сбора статистики
DO $$
DECLARE
start_time TIMESTAMP;
end_time TIMESTAMP;
exec_time NUMERIC;
row_count INTEGER;
BEGIN
start_time := clock_timestamp();

-- Выполнение запроса (замените на ваш реальный запрос)
PERFORM * FROM users WHERE age < 25 AND city = 'Ottawa';

end_time := clock_timestamp();
exec_time := EXTRACT(MILLISECOND FROM end_time - start_time);
GET DIAGNOSTICS row_count = ROW_COUNT;

-- Обновляем статистику для данного типа запроса
PERFORM update_aqr_stats(md5('SELECT * FROM users WHERE age < 25 AND city = ''Ottawa'''), exec_time, row_count);
END $$;
Здесь ручным способом собираем данные о выполнении запроса. В системе AQR эти данные будут передаваться в обучающую модель, которая сможет корректировать выбор плана при повторном выполнении запроса.

Запрос на основе накопленной статистики

Напишем PL/pgSQL‑функцию, которая динамически переписывает запрос на основе накопленной статистики. Функция проверяет, как часто и с каким средним числом строк выполнялся данный запрос, и в зависимости от этого, может, например, временно отключать SeqScan (если данные указывают на плохую селективность) перед выполнением запроса. Все это можно интегрировать в систему AQR для динамической корректировки планов.

-- Функция динамического переписывания запроса на основе статистики
CREATE OR REPLACE FUNCTION adaptive_execute(query_text TEXT)
RETURNS SETOF RECORD AS $$
DECLARE
new_query TEXT;
q_hash TEXT := md5(query_text);
stat RECORD;
BEGIN
-- Попытка получить статистику по данному запросу
SELECT * INTO stat
FROM aqr_query_stats
WHERE query_hash = q_hash;

-- Если статистика найдена, анализируем среднее число строк
IF stat IS NOT NULL THEN
-- Если запрос возвращает большое количество строк, попробуем отключить SeqScan
IF stat.avg_rows > 100000 THEN
RAISE NOTICE 'Высокая кардинальность обнаружена (avg_rows=%), переписываем запрос...', stat.avg_rows;
new_query := 'SET enable_seqscan = off; ' || query_text;
ELSE
new_query := query_text;
END IF;
ELSE
new_query := query_text;
END IF;

-- Выполнение переписанного запроса
RETURN QUERY EXECUTE new_query;
END;
$$ LANGUAGE plpgsql;
Эту функцию можно использовать для выполнения запросов, например:

SELECT * FROM adaptive_execute('SELECT * FROM users WHERE age < 25');

Интеграция с внешними метрикам​

Оптимальный план запроса зависит не только от данных, но и от текущей загрузки системы. Иногда, если сервер перегружен, даже самый быстрый план может оказаться тормозным. Поэтому можно интегрировать AQR с внешними метриками, получаемыми, например, от Prometheus или через REST API. Например, на Питоне можно получать метрики и подстраивать логику оптимизации таким образом:

import requests
import json

def get_system_metrics():
try:
response = requests.get("http://metrics.mycompany.com/api/system")
data = response.json()
cpu_load = data.get("cpu_load", 0)
io_wait = data.get("io_wait", 0)
return cpu_load, io_wait
except Exception as e:
print("Ошибка получения метрик:", e)
return 0, 0

def decide_query_plan():
cpu_load, io_wait = get_system_metrics()
if cpu_load > 0.8 or io_wait > 0.5:
print("Система перегружена: переключаемся на консервативный план")
return "conservative_plan"
else:
print("Нормальная загрузка: выбираем оптимальный план")
return "optimal_plan"

if __name__ == "__main__":
plan = decide_query_plan()
# Здесь можно передать параметр в БД, который повлияет на выбор плана
print("Выбранный план:", plan)
Так можно динамически менять стратегию оптимизации в зависимости от внешних условий.

Даже небольшие корректировки в планах выполнения запросов могут привести к колоссальному приросту производительности. Например, в моей практике на тестах TPC‑H и TPC‑DS AQR позволял сократить время выполнения сложных аналитических запросов на 20–40%.

 
Сверху