Рецепты PostgreSQL: загрузка Государственного Адресного Реестра

Kate

Administrator
Команда форума
Для приготовления загрузки Государственного Адресного Реестра в PostgreSQL нам понадобится сам PostgreSQL, bash, sh, curl, wget, xml2csv, jq, или можно воспользоваться готовым образом.

ОСТОРОЖНО! Может потребоваться много дискового пространства! Терабайта должно хватить, может, даже пол-терабайта хватит.

Первым делом создадим файлы

# показываем, что выполняем
# выходим при ошибках
# отсутствие переменной окружения - это ошибка
set -eux
SELF="$$" # запоминаем свой pid
touch deltaVersionId.txt # дата последней загруженной дельты
touch fullVersionId.txt # дата загрузки полного ГАРа
touch state.txt # текущее состояние
Далее, в зависимости от состояния будем делать различную работу

state="$(cat state.txt)" # читаем текущее состояние
while # пока
case "$state" in # при состоянии
... # таком-то делаем то-то
esac
state="$(cat state.txt)" # читаем текущее состояние
test "$state" != "done" # выполняем пока состояние не готово
do true; done 2>&1 | tee cron.log # пишем в лог и консоль
Итак, изначально не было никакого состояния (а также в случае состояния готово)

* ) # в остальных случаях
echo "done" >state.txt # запишем состояние готово
deltaVersionId="$(cat deltaVersionId.txt)" # прочитаем дату дельты
fullVersionId="$(cat fullVersionId.txt)" # прочитаем дату полного
if [ -z "$fullVersionId" ]; then # если ещё ни разу не загружали полный
echo sql2pg >state.txt # то переходми к состоянию инициализации базы
elif [ -z "$deltaVersionId" ]; then # иначе если ни разу не загружали дельту, то
# скачиваем информацию о последних доступных выгрузках
wget --output-document=GetLastDownloadFileInfo.json https://fias.nalog.ru/WebServices/Public/GetLastDownloadFileInfo
# извлекаем из неё дату последней версии
lastVersionId="$(jq --raw-output .VersionId <GetLastDownloadFileInfo.json)"
# и адрес последней выгрузки
URL="$(jq --raw-output .GarXMLFullURL <GetLastDownloadFileInfo.json)"
# задаём имя скачиваемого файла
ZIP="$lastVersionId.zip"
# скачиваем выгрузку (полная выгрузка скачивается ОЧЕНЬ долго, часов 5)
wget --continue --output-document="$ZIP" "$URL"
echo "$lastVersionId" >deltaVersionId.txt # сохраняем дату последней выгрузки как дельты
echo "$lastVersionId" >fullVersionId.txt # так и полной
echo unzip >state.txt # переходим к состоянию распаковки
else # иначе (база проинициализирована и уже скачивали полную выгрузку или дельту)
# скачиваем информацию обо всех доступных выгрузках
wget --output-document=GetAllDownloadFileInfo.json https://fias.nalog.ru/WebServices/Public/GetAllDownloadFileInfo
# скачиваем все выгрузки после даты последней дельты
jq --raw-output "sort_by(.VersionId) | .[] | select(.VersionId > $deltaVersionId) \
# параллельно в несколько потоков (по количеству доступных процессоров)
| .GarXMLDeltaURL" <GetAllDownloadFileInfo.json | xargs --verbose --no-run-if-empty --max-procs="$(nproc)" --replace=URL bash /usr/local/bin/wget.sh "URL"
test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке
# обновляем соответственно дату последней дельты
jq --raw-output "sort_by(.VersionId) | .[] | select(.VersionId > $deltaVersionId) \
# последовательно (и при успехе переходим к состоянию распаковки)
| .GarXMLDeltaURL" <GetAllDownloadFileInfo.json | xargs --verbose --no-run-if-empty --replace=URL bash /usr/local/bin/version.sh "URL"
test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке
fi
;;
файлы скачивания дельты и обновления версии ничего интересного из себя не представляют, кроме разве что в файле скачивания

# вычисляем размер скачиваемого файла
SIZE="$(curl -Is "$URL" | grep 'Content-Length' | grep -oP '\d+')"
test "$SIZE" -lt 1073741824 # если размер больше 1ГБ, то выходим
далее

"sql2pg" ) # в случае инициализации базы
# исполняем все sql-скрипты
find /usr/local/sql2pg -type f -name "*.sql" | sort -u \
# последовательно
| xargs --verbose --no-run-if-empty --replace=SQL cat "SQL" | psql --variable=ON_ERROR_STOP=1
test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке
echo region2pg >state.txt # переходим к состоянию инициализации для регионов
;;
sql-скрипты представляют из себя создание таблиц с комментариями и необходимых в дальнейшем индексов

далее

"region2pg" ) # в случае инициализации базы для регионов
# для каждого региона от 01 до 99 (для простоты взят весь диапазон чисел)
seq --format "%02.0f" 1 99 \
# последовательно добавляем соответствующую схему в базу
| xargs --verbose --no-run-if-empty --replace=REGION echo "CREATE SCHEMA IF NOT EXISTS \"REGION\";" | psql --variable=ON_ERROR_STOP=1
test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке
# для всех sh-скриптов
find /usr/local/region2pg -type f -name "*.sh" | sort -u | while read -r SH; do
seq --format "%02.0f" 1 99 \ # для каждого региона
# последовательно выполняем скрипт, который генерирует sql-код
# для создания таблиц и индексов
| xargs --verbose --no-run-if-empty --replace=REGION sh "$SH" "REGION" | psql --variable=ON_ERROR_STOP=1
test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке
done
echo wget >fullVersionId.txt # зануляем полную версию
echo wget >state.txt # переходим к состоянию скачивания
# но т.к. отдельной обработки такого состояния нет, то оно
# обрабатывается дефолтным, описанным выше самым первым
;;
sh-скрипты представляют из себя генерацию sql-кода создания таблиц и необходимых в дальнейшем индексов для каждого заданного региона

далее

"unzip" ) # в случае распаковки
find . -type f -name "*.zip" | sort -u \ # все архивы
# распаковываем параллельно
# (делая названия файлов маленькими буквами)
# и при успехе удаляя архив
| xargs --verbose --no-run-if-empty --max-procs="$(nproc)" --replace=ZIP bash /usr/local/bin/unzip.sh "ZIP"
test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке
echo xml2csv >state.txt # переходим к состоянию конвертации
;;
далее

"xml2csv" ) # в случае конвертации
# для всех sh-скриптов
find /usr/local/xml2csv -type f -name "*.sh" | sort -u | while read -r SH; do
TABLE="$(basename -- "${SH%.*}")" # извлекаем название таблицы
# для всех соответствующих xml-файлов
find . -type f -name "as_${TABLE}_2*.xml" | sort -u \
# параллельно конвертируем их в csv-файлы
# и при успехе удаляем xml-файл
| xargs --verbose --no-run-if-empty --max-procs="$(nproc)" --replace=XML bash "$SH" "XML"
test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке
done
deltaVersionId="$(cat deltaVersionId.txt)" # читаем сохранённую дату дельты
fullVersionId="$(cat fullVersionId.txt)" # и полной выгрузки
# если они разные
if [ "$deltaVersionId" != "$fullVersionId" ]; then
echo delta2pg >state.txt # то переходим к состоянию загрузки дельты
else # иначе (если они одинаковые)
echo full2pg >state.txt # то переходим к состоянию загрузки полной
fi
;;
sh-скрипты представляют из себя команды для конвертации

сначала происходит

"full2pg" ) # в случае загрузки полной выгрузки
# для всех sh-скриптов
find /usr/local/full2pg -type f -name "*.sh" | sort -u | while read -r SH; do
TABLE="$(basename -- "${SH%.*}")" # извлекаем название таблицы
# для всех csv-файлов
find . -type f -name "as_${TABLE}_2*.csv" | sort -u \
# параллельно загружаем их в базу
# и при успехе удаляем csv-файл
| xargs --verbose --no-run-if-empty --max-procs="$(nproc)" --replace=CSV bash "$SH" "CSV"
test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке
done
echo update >state.txt # переходим к состоянию обновления
;;
sh-скрипты представляют из себя генерацию sql-кода для загрузки соответствующей таблицы в базу и выполнение загрузки с помощью оператора COPY только актуальных данных

или же (во все следующие разы) происходит

"delta2pg" ) # в случае загрузки дельты
# для всех sh-скриптов
find /usr/local/delta2pg -type f -name "*.sh" | sort -u | while read -r SH; do
TABLE="$(basename -- "${SH%.*}")" # извлекаем название таблицы
# для всех скачанных дельт (если их было несколько с предыдущего раза)
find -mindepth 1 -maxdepth 1 -type d | sort -u | while read -r DIR; do
# для всех csv-файлов
find "$DIR" -type f -name "as_${TABLE}_2*.csv" | sort -u \
# параллельно загружаем их в базу
# и успехе удаляем csv-файл
| xargs --verbose --no-run-if-empty --max-procs="$(nproc)" --replace=CSV bash "$SH" "CSV"
test $? -eq 0 || kill -SIGINT "$SELF" # выходим при ошибке
rmdir "$DIR"/* "$DIR" # удаляем директорию дельты
done
done
echo update >state.txt # переходим к состоянию обновления
;;
sh-скрипты представляют из себя генерацию sql-кода для загрузки соответствующей таблицы в базу и выполнение загрузки с помощью оператора COPY во временную таблицу всех данных, а затем вставку из временной таблицы в основную с обновлением при кофликте, а в конце неактуальные данные удаляются из основной таблицы

таким образом, запуская главный скрипт в докере по крону каждый день получаем актуальное состояние Государственного Адресного Реестра в PostgreSQL

 
Сверху